You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/07/14 11:48:01 UTC
[iotdb] 08/08: modify TSServiceImpl
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch select-into
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 850f2c0bd744b5ab43c4812cae796997205fa1c0
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed Jul 14 19:45:32 2021 +0800
modify TSServiceImpl
---
.../engine/transporter/SelectIntoTransporter.java | 10 +++-
.../apache/iotdb/db/qp/physical/PhysicalPlan.java | 4 ++
.../iotdb/db/qp/physical/crud/SelectIntoPlan.java | 13 ++++
.../org/apache/iotdb/db/service/TSServiceImpl.java | 69 ++++++++++++++++++----
4 files changed, 83 insertions(+), 13 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/transporter/SelectIntoTransporter.java b/server/src/main/java/org/apache/iotdb/db/engine/transporter/SelectIntoTransporter.java
index 385ddbd..b88179b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/transporter/SelectIntoTransporter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/transporter/SelectIntoTransporter.java
@@ -19,9 +19,15 @@
package org.apache.iotdb.db.engine.transporter;
-import org.apache.iotdb.db.qp.physical.crud.SelectIntoPlan;
+import org.apache.iotdb.db.exception.IoTDBException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+import java.util.List;
public class SelectIntoTransporter {
- public void execute(SelectIntoPlan selectIntoPlan) {}
+ public SelectIntoTransporter(QueryDataSet queryDataSet, List<PartialPath> intoPaths) {}
+
+ public void execute() throws IoTDBException {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index af58de9..dbdd956 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -116,6 +116,10 @@ public abstract class PhysicalPlan {
return isQuery;
}
+ public boolean isSelectInto() {
+ return false;
+ }
+
public Operator.OperatorType getOperatorType() {
return operatorType;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/SelectIntoPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/SelectIntoPlan.java
index 71bf595..7606998 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/SelectIntoPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/SelectIntoPlan.java
@@ -45,6 +45,11 @@ public class SelectIntoPlan extends PhysicalPlan {
}
@Override
+ public boolean isSelectInto() {
+ return true;
+ }
+
+ @Override
public void serialize(DataOutputStream stream) throws IOException {
throw new UnsupportedOperationException();
}
@@ -63,4 +68,12 @@ public class SelectIntoPlan extends PhysicalPlan {
public List<PartialPath> getPaths() {
throw new UnsupportedOperationException();
}
+
+ public QueryPlan getQueryPlan() {
+ return queryPlan;
+ }
+
+ public List<PartialPath> getIntoPaths() {
+ return intoPaths;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index f1af737..6c2190d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.cost.statistic.Measurement;
import org.apache.iotdb.db.cost.statistic.Operation;
+import org.apache.iotdb.db.engine.transporter.SelectIntoTransporter;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.IoTDBException;
import org.apache.iotdb.db.exception.QueryInBatchStatementException;
@@ -59,6 +60,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.SelectIntoPlan;
import org.apache.iotdb.db.qp.physical.crud.SetDeviceTemplatePlan;
import org.apache.iotdb.db.qp.physical.crud.UDFPlan;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
@@ -498,7 +500,7 @@ public class TSServiceImpl implements TSIService.Iface {
PhysicalPlan physicalPlan =
processor.parseSQLToPhysicalPlan(
statement, sessionManager.getZoneId(req.sessionId), DEFAULT_FETCH_SIZE);
- if (physicalPlan.isQuery()) {
+ if (physicalPlan.isQuery() || physicalPlan.isSelectInto()) {
throw new QueryInBatchStatementException(statement);
}
@@ -597,16 +599,21 @@ public class TSServiceImpl implements TSIService.Iface {
processor.parseSQLToPhysicalPlan(
statement, sessionManager.getZoneId(req.getSessionId()), req.fetchSize);
- return physicalPlan.isQuery()
- ? internalExecuteQueryStatement(
- statement,
- req.statementId,
- physicalPlan,
- req.fetchSize,
- req.timeout,
- sessionManager.getUsername(req.getSessionId()),
- req.isEnableRedirectQuery())
- : executeUpdateStatement(physicalPlan, req.getSessionId());
+ if (physicalPlan.isQuery()) {
+ return internalExecuteQueryStatement(
+ statement,
+ req.statementId,
+ physicalPlan,
+ req.fetchSize,
+ req.timeout,
+ sessionManager.getUsername(req.getSessionId()),
+ req.isEnableRedirectQuery());
+ } else if (physicalPlan.isSelectInto()) {
+ return executeSelectIntoStatement(
+ statement, req.statementId, physicalPlan, req.fetchSize, req.timeout);
+ } else {
+ return executeUpdateStatement(physicalPlan, req.getSessionId());
+ }
} catch (InterruptedException e) {
LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
Thread.currentThread().interrupt();
@@ -1024,6 +1031,46 @@ public class TSServiceImpl implements TSIService.Iface {
plan.setPaths(null);
}
+ private TSExecuteStatementResp executeSelectIntoStatement(
+ String statement, long statementId, PhysicalPlan physicalPlan, int fetchSize, long timeout)
+ throws IoTDBException, TException, SQLException, IOException, InterruptedException,
+ QueryFilterOptimizationException {
+ final long startTime = System.currentTimeMillis();
+
+ final SelectIntoPlan selectIntoPlan = (SelectIntoPlan) physicalPlan;
+ final QueryPlan queryPlan = selectIntoPlan.getQueryPlan();
+
+ queryCount.incrementAndGet();
+ AUDIT_LOGGER.debug("Session {} execute select into: {}", currSessionId.get(), statement);
+
+ Pair<Integer, Integer> fetchSizeDeduplicatedPathNumPair =
+ getMemoryParametersFromPhysicalPlan(queryPlan, fetchSize);
+ fetchSize = fetchSizeDeduplicatedPathNumPair.left;
+ long queryId =
+ sessionManager.requestQueryId(
+ statementId, true, fetchSize, fetchSizeDeduplicatedPathNumPair.right);
+
+ tracingManager.writeQueryInfo(queryId, statement, startTime, queryPlan);
+ try {
+ queryTimeManager.registerQuery(queryId, startTime, statement, timeout, queryPlan);
+ new SelectIntoTransporter(
+ createQueryDataSet(queryId, queryPlan, fetchSize), selectIntoPlan.getIntoPaths())
+ .execute();
+ return RpcUtils.getTSExecuteStatementResp(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS))
+ .setQueryId(queryId);
+ } catch (Exception e) {
+ sessionManager.releaseQueryResourceNoExceptions(queryId);
+ throw e;
+ } finally {
+ queryTimeManager.unRegisterQuery(queryId, queryPlan);
+ Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+ long costTime = System.currentTimeMillis() - startTime;
+ if (costTime >= config.getSlowQueryThreshold()) {
+ SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
+ }
+ }
+ }
+
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
@Override
public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {