You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/07/14 11:48:01 UTC

[iotdb] 08/08: modify TSServiceImpl

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch select-into
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 850f2c0bd744b5ab43c4812cae796997205fa1c0
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed Jul 14 19:45:32 2021 +0800

    modify TSServiceImpl
---
 .../engine/transporter/SelectIntoTransporter.java  | 10 +++-
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  |  4 ++
 .../iotdb/db/qp/physical/crud/SelectIntoPlan.java  | 13 ++++
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 69 ++++++++++++++++++----
 4 files changed, 83 insertions(+), 13 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/transporter/SelectIntoTransporter.java b/server/src/main/java/org/apache/iotdb/db/engine/transporter/SelectIntoTransporter.java
index 385ddbd..b88179b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/transporter/SelectIntoTransporter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/transporter/SelectIntoTransporter.java
@@ -19,9 +19,15 @@
 
 package org.apache.iotdb.db.engine.transporter;
 
-import org.apache.iotdb.db.qp.physical.crud.SelectIntoPlan;
+import org.apache.iotdb.db.exception.IoTDBException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+import java.util.List;
 
 public class SelectIntoTransporter {
 
-  public void execute(SelectIntoPlan selectIntoPlan) {}
+  public SelectIntoTransporter(QueryDataSet queryDataSet, List<PartialPath> intoPaths) {}
+
+  public void execute() throws IoTDBException {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index af58de9..dbdd956 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -116,6 +116,10 @@ public abstract class PhysicalPlan {
     return isQuery;
   }
 
+  public boolean isSelectInto() {
+    return false;
+  }
+
   public Operator.OperatorType getOperatorType() {
     return operatorType;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/SelectIntoPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/SelectIntoPlan.java
index 71bf595..7606998 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/SelectIntoPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/SelectIntoPlan.java
@@ -45,6 +45,11 @@ public class SelectIntoPlan extends PhysicalPlan {
   }
 
   @Override
+  public boolean isSelectInto() {
+    return true;
+  }
+
+  @Override
   public void serialize(DataOutputStream stream) throws IOException {
     throw new UnsupportedOperationException();
   }
@@ -63,4 +68,12 @@ public class SelectIntoPlan extends PhysicalPlan {
   public List<PartialPath> getPaths() {
     throw new UnsupportedOperationException();
   }
+
+  public QueryPlan getQueryPlan() {
+    return queryPlan;
+  }
+
+  public List<PartialPath> getIntoPaths() {
+    return intoPaths;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index f1af737..6c2190d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.cost.statistic.Measurement;
 import org.apache.iotdb.db.cost.statistic.Operation;
+import org.apache.iotdb.db.engine.transporter.SelectIntoTransporter;
 import org.apache.iotdb.db.exception.BatchProcessException;
 import org.apache.iotdb.db.exception.IoTDBException;
 import org.apache.iotdb.db.exception.QueryInBatchStatementException;
@@ -59,6 +60,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.SelectIntoPlan;
 import org.apache.iotdb.db.qp.physical.crud.SetDeviceTemplatePlan;
 import org.apache.iotdb.db.qp.physical.crud.UDFPlan;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
@@ -498,7 +500,7 @@ public class TSServiceImpl implements TSIService.Iface {
         PhysicalPlan physicalPlan =
             processor.parseSQLToPhysicalPlan(
                 statement, sessionManager.getZoneId(req.sessionId), DEFAULT_FETCH_SIZE);
-        if (physicalPlan.isQuery()) {
+        if (physicalPlan.isQuery() || physicalPlan.isSelectInto()) {
           throw new QueryInBatchStatementException(statement);
         }
 
@@ -597,16 +599,21 @@ public class TSServiceImpl implements TSIService.Iface {
           processor.parseSQLToPhysicalPlan(
               statement, sessionManager.getZoneId(req.getSessionId()), req.fetchSize);
 
-      return physicalPlan.isQuery()
-          ? internalExecuteQueryStatement(
-              statement,
-              req.statementId,
-              physicalPlan,
-              req.fetchSize,
-              req.timeout,
-              sessionManager.getUsername(req.getSessionId()),
-              req.isEnableRedirectQuery())
-          : executeUpdateStatement(physicalPlan, req.getSessionId());
+      if (physicalPlan.isQuery()) {
+        return internalExecuteQueryStatement(
+            statement,
+            req.statementId,
+            physicalPlan,
+            req.fetchSize,
+            req.timeout,
+            sessionManager.getUsername(req.getSessionId()),
+            req.isEnableRedirectQuery());
+      } else if (physicalPlan.isSelectInto()) {
+        return executeSelectIntoStatement(
+            statement, req.statementId, physicalPlan, req.fetchSize, req.timeout);
+      } else {
+        return executeUpdateStatement(physicalPlan, req.getSessionId());
+      }
     } catch (InterruptedException e) {
       LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
       Thread.currentThread().interrupt();
@@ -1024,6 +1031,46 @@ public class TSServiceImpl implements TSIService.Iface {
     plan.setPaths(null);
   }
 
+  private TSExecuteStatementResp executeSelectIntoStatement(
+      String statement, long statementId, PhysicalPlan physicalPlan, int fetchSize, long timeout)
+      throws IoTDBException, TException, SQLException, IOException, InterruptedException,
+          QueryFilterOptimizationException {
+    final long startTime = System.currentTimeMillis();
+
+    final SelectIntoPlan selectIntoPlan = (SelectIntoPlan) physicalPlan;
+    final QueryPlan queryPlan = selectIntoPlan.getQueryPlan();
+
+    queryCount.incrementAndGet();
+    AUDIT_LOGGER.debug("Session {} execute select into: {}", currSessionId.get(), statement);
+
+    Pair<Integer, Integer> fetchSizeDeduplicatedPathNumPair =
+        getMemoryParametersFromPhysicalPlan(queryPlan, fetchSize);
+    fetchSize = fetchSizeDeduplicatedPathNumPair.left;
+    long queryId =
+        sessionManager.requestQueryId(
+            statementId, true, fetchSize, fetchSizeDeduplicatedPathNumPair.right);
+
+    tracingManager.writeQueryInfo(queryId, statement, startTime, queryPlan);
+    try {
+      queryTimeManager.registerQuery(queryId, startTime, statement, timeout, queryPlan);
+      new SelectIntoTransporter(
+              createQueryDataSet(queryId, queryPlan, fetchSize), selectIntoPlan.getIntoPaths())
+          .execute();
+      return RpcUtils.getTSExecuteStatementResp(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS))
+          .setQueryId(queryId);
+    } catch (Exception e) {
+      sessionManager.releaseQueryResourceNoExceptions(queryId);
+      throw e;
+    } finally {
+      queryTimeManager.unRegisterQuery(queryId, queryPlan);
+      Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+      long costTime = System.currentTimeMillis() - startTime;
+      if (costTime >= config.getSlowQueryThreshold()) {
+        SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
+      }
+    }
+  }
+
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   @Override
   public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {