You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/12/29 08:18:00 UTC
[iotdb] 02/02: seperate showplan, queryplan and authorplan
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch controlThreadNum
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bcaecba3b0b521a60aed30e0da8b03922b9baaf2
Author: Alima777 <wx...@gmail.com>
AuthorDate: Wed Dec 29 16:17:01 2021 +0800
seperate showplan, queryplan and authorplan
---
.../iotdb/db/qp/physical/crud/AggregationPlan.java | 2 +-
.../iotdb/db/qp/physical/crud/LastQueryPlan.java | 8 +-
.../iotdb/db/qp/physical/crud/QueryIndexPlan.java | 2 +-
.../iotdb/db/qp/physical/crud/QueryPlan.java | 3 +-
.../apache/iotdb/db/qp/physical/crud/UDTFPlan.java | 2 +-
.../db/service/thrift/impl/TSServiceImpl.java | 246 +++++++++++----------
6 files changed, 144 insertions(+), 119 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
index 6083605..c81e12e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
@@ -81,7 +81,7 @@ public class AggregationPlan extends RawDataQueryPlan {
@Override
public List<TSDataType> getWideQueryHeaders(
- List<String> respColumns, List<String> respSgColumns, Boolean isJdbcQuery, BitSet aliasList)
+ List<String> respColumns, List<String> respSgColumns, boolean isJdbcQuery, BitSet aliasList)
throws MetadataException {
List<TSDataType> seriesTypes = new ArrayList<>();
List<String> aggregations = getAggregations();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java
index b2028bf..9ddcaae 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java
@@ -35,7 +35,11 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.thrift.TException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
public class LastQueryPlan extends RawDataQueryPlan {
@@ -66,7 +70,7 @@ public class LastQueryPlan extends RawDataQueryPlan {
@Override
public List<TSDataType> getWideQueryHeaders(
- List<String> respColumns, List<String> respSgColumns, Boolean isJdbcQuery, BitSet aliasList)
+ List<String> respColumns, List<String> respSgColumns, boolean isJdbcQuery, BitSet aliasList)
throws TException {
throw new TException("unsupported query type: " + getOperatorType());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java
index 8ff816f..9d37dc0 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java
@@ -41,7 +41,7 @@ public class QueryIndexPlan extends RawDataQueryPlan {
@Override
public List<TSDataType> getWideQueryHeaders(
- List<String> respColumns, List<String> respSgColumns, Boolean isJdbcQuery, BitSet aliasList)
+ List<String> respColumns, List<String> respSgColumns, boolean isJdbcQuery, BitSet aliasList)
throws TException {
throw new TException("unsupported query type: " + getOperatorType());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index 6427137..1efa3b9 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -71,6 +71,7 @@ public abstract class QueryPlan extends PhysicalPlan {
public abstract void deduplicate(PhysicalGenerator physicalGenerator) throws MetadataException;
+ /** Construct TSExecuteStatementResp and the header of result set. */
public TSExecuteStatementResp getTSExecuteStatementResp(boolean isJdbcQuery)
throws TException, MetadataException {
List<String> respColumns = new ArrayList<>();
@@ -97,7 +98,7 @@ public abstract class QueryPlan extends PhysicalPlan {
}
public List<TSDataType> getWideQueryHeaders(
- List<String> respColumns, List<String> respSgColumns, Boolean isJdbcQuery, BitSet aliasList)
+ List<String> respColumns, List<String> respSgColumns, boolean isJdbcQuery, BitSet aliasList)
throws TException, MetadataException {
List<TSDataType> seriesTypes = new ArrayList<>();
for (int i = 0; i < resultColumns.size(); ++i) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
index 37c46e3..401512a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
@@ -91,7 +91,7 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
@Override
public List<TSDataType> getWideQueryHeaders(
- List<String> respColumns, List<String> respSgColumns, Boolean isJdbcQuery, BitSet aliasList) {
+ List<String> respColumns, List<String> respSgColumns, boolean isJdbcQuery, BitSet aliasList) {
List<TSDataType> seriesTypes = new ArrayList<>();
for (int i = 0; i < paths.size(); i++) {
respColumns.add(resultColumns.get(i).getResultColumnName());
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 1762ad6..50395ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.metadata.template.TemplateQueryType;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
@@ -48,7 +47,6 @@ import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.crud.SelectIntoPlan;
import org.apache.iotdb.db.qp.physical.crud.UDFPlan;
import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
-import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
@@ -58,12 +56,10 @@ import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.PruneTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowQueryProcesslistPlan;
import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.tracing.TracingConstant;
-import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet;
import org.apache.iotdb.db.service.IoTDB;
@@ -591,8 +587,9 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
}
/**
- * @param plan must be a plan for Query: FillQueryPlan, AggregationPlan, GroupByTimePlan, UDFPlan,
- * some AuthorPlan
+ * Execute query statement, return TSExecuteStatementResp with dataset.
+ *
+ * @param plan must be a plan for Query: QueryPlan, ShowPlan, and some AuthorPlan
*/
@SuppressWarnings({"squid:S3776", "squid:S1141"}) // Suppress high Cognitive Complexity warning
private TSExecuteStatementResp internalExecuteQueryStatement(
@@ -607,6 +604,16 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
throws QueryProcessException, SQLException, StorageEngineException,
QueryFilterOptimizationException, MetadataException, IOException, InterruptedException,
TException, AuthException {
+ String username = sessionManager.getUsername(sessionId);
+ plan.setLoginUserName(username);
+ // check permissions
+ if (!checkAuthorization(plan.getAuthPaths(), plan, username)) {
+ return RpcUtils.getTSExecuteStatementResp(
+ RpcUtils.getStatus(
+ TSStatusCode.NO_PERMISSION_ERROR,
+ "No permissions for this operation " + plan.getOperatorType()));
+ }
+
queryFrequencyRecorder.incrementAndGet();
AUDIT_LOGGER.debug(
"Session {} execute Query: {}", sessionManager.getCurrSessionId(), statement);
@@ -616,102 +623,17 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
QueryContext context =
genQueryContext(queryId, plan.isDebug(), queryStartTime, statement, timeout);
- if (plan instanceof QueryPlan && ((QueryPlan) plan).isEnableTracing()) {
- context.setEnableTracing(true);
- tracingManager.setStartTime(queryId, this.startTime);
- tracingManager.registerActivity(
- queryId,
- String.format(TracingConstant.ACTIVITY_START_EXECUTE, statement),
- this.startTime);
- tracingManager.registerActivity(queryId, TracingConstant.ACTIVITY_PARSE_SQL, queryStartTime);
- if (!(plan instanceof AlignByDevicePlan)) {
- tracingManager.setSeriesPathNum(queryId, plan.getPaths().size());
- }
- }
-
+ TSExecuteStatementResp resp;
try {
- String username = sessionManager.getUsername(sessionId);
- plan.setLoginUserName(username);
-
- TSExecuteStatementResp resp = null;
- // execute it before createDataSet since it may change the content of query plan
- if (plan instanceof QueryPlan && !(plan instanceof UDFPlan)) {
- resp = getQueryColumnHeaders(plan, username, isJdbcQuery);
- }
if (plan instanceof QueryPlan) {
- ((QueryPlan) plan).setEnableRedirect(enableRedirect);
- }
- // create and cache dataset
- QueryDataSet newDataSet = createQueryDataSet(context, plan, fetchSize);
- if (plan instanceof QueryPlan && ((QueryPlan) plan).isEnableTracing()) {
- tracingManager.registerActivity(
- queryId, TracingConstant.ACTIVITY_CREATE_DATASET, System.currentTimeMillis());
- }
-
- if (newDataSet.getEndPoint() != null && enableRedirect) {
- // redirect query
- LOGGER.debug(
- "need to redirect {} {} to node {}", statement, queryId, newDataSet.getEndPoint());
- TSStatus status = new TSStatus();
- status.setRedirectNode(
- new EndPoint(newDataSet.getEndPoint().getIp(), newDataSet.getEndPoint().getPort()));
- status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
- resp.setStatus(status);
- resp.setQueryId(queryId);
- return resp;
- }
-
- if (plan instanceof ShowPlan || plan instanceof AuthorPlan) {
- resp = getListDataSetHeaders(plan, newDataSet);
- } else if (plan instanceof UDFPlan
- || (plan instanceof QueryPlan && ((QueryPlan) plan).isGroupByLevel())) {
- resp = getQueryColumnHeaders(plan, username, isJdbcQuery);
- }
-
- resp.setOperationType(plan.getOperatorType().toString());
- if (newDataSet instanceof DirectNonAlignDataSet) {
- resp.setNonAlignQueryDataSet(fillRpcNonAlignReturnData(fetchSize, newDataSet, username));
+ QueryPlan queryPlan = (QueryPlan) plan;
+ queryPlan.setEnableRedirect(enableRedirect);
+ resp = executeQueryPlan(queryPlan, context, isJdbcQuery, fetchSize, username);
} else {
- try {
- TSQueryDataSet tsQueryDataSet = fillRpcReturnData(fetchSize, newDataSet, username);
- resp.setQueryDataSet(tsQueryDataSet);
- } catch (RedirectException e) {
- LOGGER.debug("need to redirect {} {} to {}", statement, queryId, e.getEndPoint());
- if (enableRedirect) {
- // redirect query
- TSStatus status = new TSStatus();
- status.setRedirectNode(e.getEndPoint());
- status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
- resp.setStatus(status);
- resp.setQueryId(queryId);
- return resp;
- } else {
- LOGGER.error(
- "execute {} error, if session does not support redirect,"
- + " should not throw redirection exception.",
- statement,
- e);
- }
- }
+ resp = executeShowOrAuthorPlan(plan, context, fetchSize, username);
}
-
resp.setQueryId(queryId);
-
- if (plan instanceof AlignByDevicePlan && ((QueryPlan) plan).isEnableTracing()) {
- tracingManager.setSeriesPathNum(queryId, ((AlignByDeviceDataSet) newDataSet).getPathsNum());
- }
-
- queryTimeManager.unRegisterQuery(queryId, false);
-
- if (plan instanceof QueryPlan && ((QueryPlan) plan).isEnableTracing()) {
- tracingManager.registerActivity(
- queryId, TracingConstant.ACTIVITY_REQUEST_COMPLETE, System.currentTimeMillis());
-
- TSTracingInfo tsTracingInfo = fillRpcReturnTracingInfo(queryId);
- resp.setTracingInfo(tsTracingInfo);
- }
-
- return resp;
+ resp.setOperationType(plan.getOperatorType().toString());
} catch (Exception e) {
sessionManager.releaseQueryResourceNoExceptions(queryId);
throw e;
@@ -722,9 +644,122 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
}
}
+ return resp;
}
- private TSExecuteStatementResp getListDataSetHeaders(PhysicalPlan plan, QueryDataSet dataSet) {
+ private TSExecuteStatementResp executeQueryPlan(
+ QueryPlan plan, QueryContext context, boolean isJdbcQuery, int fetchSize, String username)
+ throws TException, MetadataException, QueryProcessException, StorageEngineException,
+ SQLException, IOException, InterruptedException, QueryFilterOptimizationException,
+ AuthException {
+ if (plan.isEnableTracing()) {
+ context.setEnableTracing(true);
+ tracingManager.setStartTime(context.getQueryId(), this.startTime);
+ tracingManager.registerActivity(
+ context.getQueryId(),
+ String.format(TracingConstant.ACTIVITY_START_EXECUTE, context.getStatement()),
+ this.startTime);
+ tracingManager.registerActivity(
+ context.getQueryId(), TracingConstant.ACTIVITY_PARSE_SQL, context.getStartTime());
+ }
+
+ TSExecuteStatementResp resp = null;
+ // execute it before createDataSet since it may change the content of query plan
+ if (!(plan instanceof UDFPlan)) {
+ resp = plan.getTSExecuteStatementResp(isJdbcQuery);
+ }
+ // create and cache dataset
+ QueryDataSet newDataSet = createQueryDataSet(context, plan, fetchSize);
+
+ if (plan.isEnableTracing()) {
+ tracingManager.registerActivity(
+ context.getQueryId(),
+ TracingConstant.ACTIVITY_CREATE_DATASET,
+ System.currentTimeMillis());
+ }
+
+ if (newDataSet.getEndPoint() != null && plan.isEnableRedirect()) {
+ // redirect query
+ LOGGER.debug(
+ "need to redirect {} {} to node {}",
+ context.getStatement(),
+ context.getQueryId(),
+ newDataSet.getEndPoint());
+ TSStatus status = new TSStatus();
+ status.setRedirectNode(
+ new EndPoint(newDataSet.getEndPoint().getIp(), newDataSet.getEndPoint().getPort()));
+ status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
+ resp.setStatus(status);
+ resp.setQueryId(context.getQueryId());
+ return resp;
+ }
+
+ if (plan instanceof UDFPlan || plan.isGroupByLevel()) {
+ resp = plan.getTSExecuteStatementResp(isJdbcQuery);
+ }
+
+ if (newDataSet instanceof DirectNonAlignDataSet) {
+ resp.setNonAlignQueryDataSet(fillRpcNonAlignReturnData(fetchSize, newDataSet, username));
+ } else {
+ try {
+ TSQueryDataSet tsQueryDataSet = fillRpcReturnData(fetchSize, newDataSet, username);
+ resp.setQueryDataSet(tsQueryDataSet);
+ } catch (RedirectException e) {
+ LOGGER.debug(
+ "need to redirect {} {} to {}",
+ context.getStatement(),
+ context.getQueryId(),
+ e.getEndPoint());
+ if (plan.isEnableRedirect()) {
+ // redirect query
+ TSStatus status = new TSStatus();
+ status.setRedirectNode(e.getEndPoint());
+ status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
+ resp.setStatus(status);
+ resp.setQueryId(context.getQueryId());
+ return resp;
+ } else {
+ LOGGER.error(
+ "execute {} error, if session does not support redirect,"
+ + " should not throw redirection exception.",
+ context.getStatement(),
+ e);
+ }
+ }
+ }
+ queryTimeManager.unRegisterQuery(context.getQueryId(), false);
+
+ if (plan.isEnableTracing()) {
+ tracingManager.registerActivity(
+ context.getQueryId(),
+ TracingConstant.ACTIVITY_REQUEST_COMPLETE,
+ System.currentTimeMillis());
+
+ TSTracingInfo tsTracingInfo = fillRpcReturnTracingInfo(context.getQueryId());
+ resp.setTracingInfo(tsTracingInfo);
+ }
+ return resp;
+ }
+
+ private TSExecuteStatementResp executeShowOrAuthorPlan(
+ PhysicalPlan plan, QueryContext context, int fetchSize, String username)
+ throws QueryProcessException, TException, StorageEngineException, SQLException, IOException,
+ InterruptedException, QueryFilterOptimizationException, MetadataException, AuthException {
+ // create and cache dataset
+ QueryDataSet newDataSet = createQueryDataSet(context, plan, fetchSize);
+ TSExecuteStatementResp resp = getListDataSetResp(plan, newDataSet);
+
+ resp.setQueryDataSet(fillRpcReturnData(fetchSize, newDataSet, username));
+ queryTimeManager.unRegisterQuery(context.getQueryId(), false);
+ return resp;
+ }
+
+ /**
+ * Construct TSExecuteStatementResp and the header of list result set.
+ *
+ * @param plan maybe ShowPlan or AuthorPlan
+ */
+ private TSExecuteStatementResp getListDataSetResp(PhysicalPlan plan, QueryDataSet dataSet) {
TSExecuteStatementResp resp =
StaticResps.getNoTimeExecuteResp(
dataSet.getPaths().stream().map(Path::getFullPath).collect(Collectors.toList()),
@@ -735,21 +770,6 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
return resp;
}
- /** get ResultSet schema */
- private TSExecuteStatementResp getQueryColumnHeaders(
- PhysicalPlan physicalPlan, String username, boolean isJdbcQuery)
- throws AuthException, TException, MetadataException {
- // check permissions
- if (!checkAuthorization(physicalPlan.getAuthPaths(), physicalPlan, username)) {
- return RpcUtils.getTSExecuteStatementResp(
- RpcUtils.getStatus(
- TSStatusCode.NO_PERMISSION_ERROR,
- "No permissions for this operation " + physicalPlan.getOperatorType()));
- }
-
- return ((QueryPlan) physicalPlan).getTSExecuteStatementResp(isJdbcQuery);
- }
-
private TSExecuteStatementResp executeSelectIntoStatement(
String statement,
long statementId,