You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/11/24 07:53:20 UTC
[iotdb] 01/03: xianyi in p
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch xianyi
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 46d8f741e1eeab9b11b53e2e16c192490ed2ed13
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed Nov 24 11:20:49 2021 +0800
xianyi in p
---
.../org/apache/iotdb/db/service/TSServiceImpl.java | 113 +++++++++++++++++----
1 file changed, 94 insertions(+), 19 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 505c5a9..b8a0a6b 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -54,6 +54,7 @@ import org.apache.iotdb.db.qp.physical.crud.MeasurementInfo;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.crud.SelectIntoPlan;
import org.apache.iotdb.db.qp.physical.crud.UDFPlan;
+import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
@@ -133,11 +134,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
-import com.google.common.primitives.Bytes;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.SQLException;
@@ -151,8 +147,16 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.RecursiveTask;
import java.util.stream.Collectors;
+import com.google.common.primitives.Bytes;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNonQueryException;
@@ -935,22 +939,29 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
return new TSExecuteStatementResp(status);
}
- final long startTime = System.currentTimeMillis();
- final long queryId = sessionManager.requestQueryId(statementId, true);
- QueryContext context =
- genQueryContext(queryId, physicalPlan.isDebug(), startTime, statement, timeout);
final SelectIntoPlan selectIntoPlan = (SelectIntoPlan) physicalPlan;
final QueryPlan queryPlan = selectIntoPlan.getQueryPlan();
- queryFrequencyRecorder.incrementAndGet();
- AUDIT_LOGGER.debug(
- "Session {} execute select into: {}", sessionManager.getCurrSessionId(), statement);
- if (physicalPlan instanceof QueryPlan && ((QueryPlan) physicalPlan).isEnableTracing()) {
- tracingManager.setSeriesPathNum(queryId, queryPlan.getPaths().size());
+ if (queryPlan instanceof UDTFPlan
+ && queryPlan
+ .getResultColumns()
+ .get(0)
+ .getExpression()
+ .isTimeSeriesGeneratingFunctionExpression()
+ // && ((FunctionExpression) queryPlan.getResultColumns().get(0).getExpression())
+ // .getFunctionName()
+ // .equalsIgnoreCase("en")
+ ) {
+ return executeSelectIntoStatementXianyi(
+ (UDTFPlan) queryPlan, this, statement, statementId, timeout, fetchSize, sessionId);
}
- try {
+ final long startTime = System.currentTimeMillis();
+ final long queryId = sessionManager.requestQueryId(statementId, true);
+ QueryContext context =
+ genQueryContext(queryId, physicalPlan.isDebug(), startTime, statement, timeout);
+ try {
InsertTabletPlansIterator insertTabletPlansIterator =
new InsertTabletPlansIterator(
queryPlan,
@@ -969,10 +980,74 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
return RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS).setQueryId(queryId);
} finally {
sessionManager.releaseQueryResourceNoExceptions(queryId);
- Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_SELECT_INTO, startTime);
- long costTime = System.currentTimeMillis() - startTime;
- if (costTime >= CONFIG.getSlowQueryThreshold()) {
- SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
+ }
+ }
+
+ private TSExecuteStatementResp executeSelectIntoStatementXianyi(
+ UDTFPlan udtfPlan,
+ TSServiceImpl tsService,
+ String statement,
+ long statementId,
+ long timeout,
+ int fetchSize,
+ long sessionId) {
+ ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() << 1);
+ List<ForkJoinTask<Void>> futures = new ArrayList<>();
+ for (String subStatement : split(udtfPlan, statement)) {
+ futures.add(
+ forkJoinPool.submit(
+ new InsertTabletPlanTask(
+ tsService, statementId, timeout, fetchSize, sessionId, subStatement)));
+ }
+ for (ForkJoinTask<Void> v : futures) {
+ v.join();
+ }
+ forkJoinPool.shutdown();
+ return RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+ }
+
+ private List<String> split(UDTFPlan udtfPlan, String statement) {
+ List<String> statements = new ArrayList<>();
+
+ String prefix = statement.split("where")[0] + " where ";
+
+ return statements;
+ }
+
+ private class InsertTabletPlanTask extends RecursiveTask<Void> {
+
+ private final TSServiceImpl tsService;
+ private final long statementId;
+ private final long timeout;
+ private final int fetchSize;
+ private final long sessionId;
+ private final String statement;
+
+ InsertTabletPlanTask(
+ TSServiceImpl tsService,
+ long statementId,
+ long timeout,
+ int fetchSize,
+ long sessionId,
+ String statement) {
+ this.tsService = tsService;
+ this.statementId = statementId;
+ this.timeout = timeout;
+ this.fetchSize = fetchSize;
+ this.sessionId = sessionId;
+ this.statement = statement;
+ }
+
+ @Override
+ protected Void compute() {
+ try {
+ PhysicalPlan physicalPlan =
+ processor.parseSQLToPhysicalPlan(statement, sessionManager.getZoneId(sessionId));
+ tsService.executeSelectIntoStatement(
+ statement, statementId, physicalPlan, fetchSize, timeout, sessionId);
+ return null;
+ } catch (Exception e) {
+ throw new RuntimeException(e.getMessage());
}
}
}