You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/11/24 07:53:20 UTC

[iotdb] 01/03: xianyi in p

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch xianyi
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 46d8f741e1eeab9b11b53e2e16c192490ed2ed13
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed Nov 24 11:20:49 2021 +0800

    xianyi in p
---
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 113 +++++++++++++++++----
 1 file changed, 94 insertions(+), 19 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 505c5a9..b8a0a6b 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -54,6 +54,7 @@ import org.apache.iotdb.db.qp.physical.crud.MeasurementInfo;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.SelectIntoPlan;
 import org.apache.iotdb.db.qp.physical.crud.UDFPlan;
+import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
@@ -133,11 +134,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
-import com.google.common.primitives.Bytes;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.sql.SQLException;
@@ -151,8 +147,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.RecursiveTask;
 import java.util.stream.Collectors;
 
+import com.google.common.primitives.Bytes;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNonQueryException;
@@ -935,22 +939,29 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
       return new TSExecuteStatementResp(status);
     }
 
-    final long startTime = System.currentTimeMillis();
-    final long queryId = sessionManager.requestQueryId(statementId, true);
-    QueryContext context =
-        genQueryContext(queryId, physicalPlan.isDebug(), startTime, statement, timeout);
     final SelectIntoPlan selectIntoPlan = (SelectIntoPlan) physicalPlan;
     final QueryPlan queryPlan = selectIntoPlan.getQueryPlan();
 
-    queryFrequencyRecorder.incrementAndGet();
-    AUDIT_LOGGER.debug(
-        "Session {} execute select into: {}", sessionManager.getCurrSessionId(), statement);
-    if (physicalPlan instanceof QueryPlan && ((QueryPlan) physicalPlan).isEnableTracing()) {
-      tracingManager.setSeriesPathNum(queryId, queryPlan.getPaths().size());
+    if (queryPlan instanceof UDTFPlan
+        && queryPlan
+            .getResultColumns()
+            .get(0)
+            .getExpression()
+            .isTimeSeriesGeneratingFunctionExpression()
+    //        && ((FunctionExpression) queryPlan.getResultColumns().get(0).getExpression())
+    //            .getFunctionName()
+    //            .equalsIgnoreCase("en")
+    ) {
+      return executeSelectIntoStatementXianyi(
+          (UDTFPlan) queryPlan, this, statement, statementId, timeout, fetchSize, sessionId);
     }
 
-    try {
+    final long startTime = System.currentTimeMillis();
+    final long queryId = sessionManager.requestQueryId(statementId, true);
+    QueryContext context =
+        genQueryContext(queryId, physicalPlan.isDebug(), startTime, statement, timeout);
 
+    try {
       InsertTabletPlansIterator insertTabletPlansIterator =
           new InsertTabletPlansIterator(
               queryPlan,
@@ -969,10 +980,74 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
       return RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS).setQueryId(queryId);
     } finally {
       sessionManager.releaseQueryResourceNoExceptions(queryId);
-      Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_SELECT_INTO, startTime);
-      long costTime = System.currentTimeMillis() - startTime;
-      if (costTime >= CONFIG.getSlowQueryThreshold()) {
-        SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
+    }
+  }
+
+  private TSExecuteStatementResp executeSelectIntoStatementXianyi(
+      UDTFPlan udtfPlan,
+      TSServiceImpl tsService,
+      String statement,
+      long statementId,
+      long timeout,
+      int fetchSize,
+      long sessionId) {
+    ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() << 1);
+    List<ForkJoinTask<Void>> futures = new ArrayList<>();
+    for (String subStatement : split(udtfPlan, statement)) {
+      futures.add(
+          forkJoinPool.submit(
+              new InsertTabletPlanTask(
+                  tsService, statementId, timeout, fetchSize, sessionId, subStatement)));
+    }
+    for (ForkJoinTask<Void> v : futures) {
+      v.join();
+    }
+    forkJoinPool.shutdown();
+    return RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  private List<String> split(UDTFPlan udtfPlan, String statement) {
+    List<String> statements = new ArrayList<>();
+
+    String prefix = statement.split("where")[0] + " where ";
+
+    return statements;
+  }
+
+  private class InsertTabletPlanTask extends RecursiveTask<Void> {
+
+    private final TSServiceImpl tsService;
+    private final long statementId;
+    private final long timeout;
+    private final int fetchSize;
+    private final long sessionId;
+    private final String statement;
+
+    InsertTabletPlanTask(
+        TSServiceImpl tsService,
+        long statementId,
+        long timeout,
+        int fetchSize,
+        long sessionId,
+        String statement) {
+      this.tsService = tsService;
+      this.statementId = statementId;
+      this.timeout = timeout;
+      this.fetchSize = fetchSize;
+      this.sessionId = sessionId;
+      this.statement = statement;
+    }
+
+    @Override
+    protected Void compute() {
+      try {
+        PhysicalPlan physicalPlan =
+            processor.parseSQLToPhysicalPlan(statement, sessionManager.getZoneId(sessionId));
+        tsService.executeSelectIntoStatement(
+            statement, statementId, physicalPlan, fetchSize, timeout, sessionId);
+        return null;
+      } catch (Exception e) {
+        throw new RuntimeException(e.getMessage());
       }
     }
   }