You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/12/29 08:18:00 UTC

[iotdb] 02/02: seperate showplan, queryplan and authorplan

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch controlThreadNum
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit bcaecba3b0b521a60aed30e0da8b03922b9baaf2
Author: Alima777 <wx...@gmail.com>
AuthorDate: Wed Dec 29 16:17:01 2021 +0800

    seperate showplan, queryplan and authorplan
---
 .../iotdb/db/qp/physical/crud/AggregationPlan.java |   2 +-
 .../iotdb/db/qp/physical/crud/LastQueryPlan.java   |   8 +-
 .../iotdb/db/qp/physical/crud/QueryIndexPlan.java  |   2 +-
 .../iotdb/db/qp/physical/crud/QueryPlan.java       |   3 +-
 .../apache/iotdb/db/qp/physical/crud/UDTFPlan.java |   2 +-
 .../db/service/thrift/impl/TSServiceImpl.java      | 246 +++++++++++----------
 6 files changed, 144 insertions(+), 119 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
index 6083605..c81e12e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
@@ -81,7 +81,7 @@ public class AggregationPlan extends RawDataQueryPlan {
 
   @Override
   public List<TSDataType> getWideQueryHeaders(
-      List<String> respColumns, List<String> respSgColumns, Boolean isJdbcQuery, BitSet aliasList)
+      List<String> respColumns, List<String> respSgColumns, boolean isJdbcQuery, BitSet aliasList)
       throws MetadataException {
     List<TSDataType> seriesTypes = new ArrayList<>();
     List<String> aggregations = getAggregations();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java
index b2028bf..9ddcaae 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java
@@ -35,7 +35,11 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import org.apache.thrift.TException;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
 public class LastQueryPlan extends RawDataQueryPlan {
 
@@ -66,7 +70,7 @@ public class LastQueryPlan extends RawDataQueryPlan {
 
   @Override
   public List<TSDataType> getWideQueryHeaders(
-      List<String> respColumns, List<String> respSgColumns, Boolean isJdbcQuery, BitSet aliasList)
+      List<String> respColumns, List<String> respSgColumns, boolean isJdbcQuery, BitSet aliasList)
       throws TException {
     throw new TException("unsupported query type: " + getOperatorType());
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java
index 8ff816f..9d37dc0 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java
@@ -41,7 +41,7 @@ public class QueryIndexPlan extends RawDataQueryPlan {
 
   @Override
   public List<TSDataType> getWideQueryHeaders(
-      List<String> respColumns, List<String> respSgColumns, Boolean isJdbcQuery, BitSet aliasList)
+      List<String> respColumns, List<String> respSgColumns, boolean isJdbcQuery, BitSet aliasList)
       throws TException {
     throw new TException("unsupported query type: " + getOperatorType());
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index 6427137..1efa3b9 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -71,6 +71,7 @@ public abstract class QueryPlan extends PhysicalPlan {
 
   public abstract void deduplicate(PhysicalGenerator physicalGenerator) throws MetadataException;
 
+  /** Construct TSExecuteStatementResp and the header of result set. */
   public TSExecuteStatementResp getTSExecuteStatementResp(boolean isJdbcQuery)
       throws TException, MetadataException {
     List<String> respColumns = new ArrayList<>();
@@ -97,7 +98,7 @@ public abstract class QueryPlan extends PhysicalPlan {
   }
 
   public List<TSDataType> getWideQueryHeaders(
-      List<String> respColumns, List<String> respSgColumns, Boolean isJdbcQuery, BitSet aliasList)
+      List<String> respColumns, List<String> respSgColumns, boolean isJdbcQuery, BitSet aliasList)
       throws TException, MetadataException {
     List<TSDataType> seriesTypes = new ArrayList<>();
     for (int i = 0; i < resultColumns.size(); ++i) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
index 37c46e3..401512a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
@@ -91,7 +91,7 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
 
   @Override
   public List<TSDataType> getWideQueryHeaders(
-      List<String> respColumns, List<String> respSgColumns, Boolean isJdbcQuery, BitSet aliasList) {
+      List<String> respColumns, List<String> respSgColumns, boolean isJdbcQuery, BitSet aliasList) {
     List<TSDataType> seriesTypes = new ArrayList<>();
     for (int i = 0; i < paths.size(); i++) {
       respColumns.add(resultColumns.get(i).getResultColumnName());
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 1762ad6..50395ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.metadata.template.TemplateQueryType;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
@@ -48,7 +47,6 @@ import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.SelectIntoPlan;
 import org.apache.iotdb.db.qp.physical.crud.UDFPlan;
 import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
-import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
@@ -58,12 +56,10 @@ import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.PruneTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowQueryProcesslistPlan;
 import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.tracing.TracingConstant;
-import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
 import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
 import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet;
 import org.apache.iotdb.db.service.IoTDB;
@@ -591,8 +587,9 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
   }
 
   /**
-   * @param plan must be a plan for Query: FillQueryPlan, AggregationPlan, GroupByTimePlan, UDFPlan,
-   *     some AuthorPlan
+   * Execute query statement, return TSExecuteStatementResp with dataset.
+   *
+   * @param plan must be a plan for Query: QueryPlan, ShowPlan, and some AuthorPlan
    */
   @SuppressWarnings({"squid:S3776", "squid:S1141"}) // Suppress high Cognitive Complexity warning
   private TSExecuteStatementResp internalExecuteQueryStatement(
@@ -607,6 +604,16 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
       throws QueryProcessException, SQLException, StorageEngineException,
           QueryFilterOptimizationException, MetadataException, IOException, InterruptedException,
           TException, AuthException {
+    String username = sessionManager.getUsername(sessionId);
+    plan.setLoginUserName(username);
+    // check permissions
+    if (!checkAuthorization(plan.getAuthPaths(), plan, username)) {
+      return RpcUtils.getTSExecuteStatementResp(
+          RpcUtils.getStatus(
+              TSStatusCode.NO_PERMISSION_ERROR,
+              "No permissions for this operation " + plan.getOperatorType()));
+    }
+
     queryFrequencyRecorder.incrementAndGet();
     AUDIT_LOGGER.debug(
         "Session {} execute Query: {}", sessionManager.getCurrSessionId(), statement);
@@ -616,102 +623,17 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
     QueryContext context =
         genQueryContext(queryId, plan.isDebug(), queryStartTime, statement, timeout);
 
-    if (plan instanceof QueryPlan && ((QueryPlan) plan).isEnableTracing()) {
-      context.setEnableTracing(true);
-      tracingManager.setStartTime(queryId, this.startTime);
-      tracingManager.registerActivity(
-          queryId,
-          String.format(TracingConstant.ACTIVITY_START_EXECUTE, statement),
-          this.startTime);
-      tracingManager.registerActivity(queryId, TracingConstant.ACTIVITY_PARSE_SQL, queryStartTime);
-      if (!(plan instanceof AlignByDevicePlan)) {
-        tracingManager.setSeriesPathNum(queryId, plan.getPaths().size());
-      }
-    }
-
+    TSExecuteStatementResp resp;
     try {
-      String username = sessionManager.getUsername(sessionId);
-      plan.setLoginUserName(username);
-
-      TSExecuteStatementResp resp = null;
-      // execute it before createDataSet since it may change the content of query plan
-      if (plan instanceof QueryPlan && !(plan instanceof UDFPlan)) {
-        resp = getQueryColumnHeaders(plan, username, isJdbcQuery);
-      }
       if (plan instanceof QueryPlan) {
-        ((QueryPlan) plan).setEnableRedirect(enableRedirect);
-      }
-      // create and cache dataset
-      QueryDataSet newDataSet = createQueryDataSet(context, plan, fetchSize);
-      if (plan instanceof QueryPlan && ((QueryPlan) plan).isEnableTracing()) {
-        tracingManager.registerActivity(
-            queryId, TracingConstant.ACTIVITY_CREATE_DATASET, System.currentTimeMillis());
-      }
-
-      if (newDataSet.getEndPoint() != null && enableRedirect) {
-        // redirect query
-        LOGGER.debug(
-            "need to redirect {} {} to node {}", statement, queryId, newDataSet.getEndPoint());
-        TSStatus status = new TSStatus();
-        status.setRedirectNode(
-            new EndPoint(newDataSet.getEndPoint().getIp(), newDataSet.getEndPoint().getPort()));
-        status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
-        resp.setStatus(status);
-        resp.setQueryId(queryId);
-        return resp;
-      }
-
-      if (plan instanceof ShowPlan || plan instanceof AuthorPlan) {
-        resp = getListDataSetHeaders(plan, newDataSet);
-      } else if (plan instanceof UDFPlan
-          || (plan instanceof QueryPlan && ((QueryPlan) plan).isGroupByLevel())) {
-        resp = getQueryColumnHeaders(plan, username, isJdbcQuery);
-      }
-
-      resp.setOperationType(plan.getOperatorType().toString());
-      if (newDataSet instanceof DirectNonAlignDataSet) {
-        resp.setNonAlignQueryDataSet(fillRpcNonAlignReturnData(fetchSize, newDataSet, username));
+        QueryPlan queryPlan = (QueryPlan) plan;
+        queryPlan.setEnableRedirect(enableRedirect);
+        resp = executeQueryPlan(queryPlan, context, isJdbcQuery, fetchSize, username);
       } else {
-        try {
-          TSQueryDataSet tsQueryDataSet = fillRpcReturnData(fetchSize, newDataSet, username);
-          resp.setQueryDataSet(tsQueryDataSet);
-        } catch (RedirectException e) {
-          LOGGER.debug("need to redirect {} {} to {}", statement, queryId, e.getEndPoint());
-          if (enableRedirect) {
-            // redirect query
-            TSStatus status = new TSStatus();
-            status.setRedirectNode(e.getEndPoint());
-            status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
-            resp.setStatus(status);
-            resp.setQueryId(queryId);
-            return resp;
-          } else {
-            LOGGER.error(
-                "execute {} error, if session does not support redirect,"
-                    + " should not throw redirection exception.",
-                statement,
-                e);
-          }
-        }
+        resp = executeShowOrAuthorPlan(plan, context, fetchSize, username);
       }
-
       resp.setQueryId(queryId);
-
-      if (plan instanceof AlignByDevicePlan && ((QueryPlan) plan).isEnableTracing()) {
-        tracingManager.setSeriesPathNum(queryId, ((AlignByDeviceDataSet) newDataSet).getPathsNum());
-      }
-
-      queryTimeManager.unRegisterQuery(queryId, false);
-
-      if (plan instanceof QueryPlan && ((QueryPlan) plan).isEnableTracing()) {
-        tracingManager.registerActivity(
-            queryId, TracingConstant.ACTIVITY_REQUEST_COMPLETE, System.currentTimeMillis());
-
-        TSTracingInfo tsTracingInfo = fillRpcReturnTracingInfo(queryId);
-        resp.setTracingInfo(tsTracingInfo);
-      }
-
-      return resp;
+      resp.setOperationType(plan.getOperatorType().toString());
     } catch (Exception e) {
       sessionManager.releaseQueryResourceNoExceptions(queryId);
       throw e;
@@ -722,9 +644,122 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
         SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
       }
     }
+    return resp;
   }
 
-  private TSExecuteStatementResp getListDataSetHeaders(PhysicalPlan plan, QueryDataSet dataSet) {
+  private TSExecuteStatementResp executeQueryPlan(
+      QueryPlan plan, QueryContext context, boolean isJdbcQuery, int fetchSize, String username)
+      throws TException, MetadataException, QueryProcessException, StorageEngineException,
+          SQLException, IOException, InterruptedException, QueryFilterOptimizationException,
+          AuthException {
+    if (plan.isEnableTracing()) {
+      context.setEnableTracing(true);
+      tracingManager.setStartTime(context.getQueryId(), this.startTime);
+      tracingManager.registerActivity(
+          context.getQueryId(),
+          String.format(TracingConstant.ACTIVITY_START_EXECUTE, context.getStatement()),
+          this.startTime);
+      tracingManager.registerActivity(
+          context.getQueryId(), TracingConstant.ACTIVITY_PARSE_SQL, context.getStartTime());
+    }
+
+    TSExecuteStatementResp resp = null;
+    // execute it before createDataSet since it may change the content of query plan
+    if (!(plan instanceof UDFPlan)) {
+      resp = plan.getTSExecuteStatementResp(isJdbcQuery);
+    }
+    // create and cache dataset
+    QueryDataSet newDataSet = createQueryDataSet(context, plan, fetchSize);
+
+    if (plan.isEnableTracing()) {
+      tracingManager.registerActivity(
+          context.getQueryId(),
+          TracingConstant.ACTIVITY_CREATE_DATASET,
+          System.currentTimeMillis());
+    }
+
+    if (newDataSet.getEndPoint() != null && plan.isEnableRedirect()) {
+      // redirect query
+      LOGGER.debug(
+          "need to redirect {} {} to node {}",
+          context.getStatement(),
+          context.getQueryId(),
+          newDataSet.getEndPoint());
+      TSStatus status = new TSStatus();
+      status.setRedirectNode(
+          new EndPoint(newDataSet.getEndPoint().getIp(), newDataSet.getEndPoint().getPort()));
+      status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
+      resp.setStatus(status);
+      resp.setQueryId(context.getQueryId());
+      return resp;
+    }
+
+    if (plan instanceof UDFPlan || plan.isGroupByLevel()) {
+      resp = plan.getTSExecuteStatementResp(isJdbcQuery);
+    }
+
+    if (newDataSet instanceof DirectNonAlignDataSet) {
+      resp.setNonAlignQueryDataSet(fillRpcNonAlignReturnData(fetchSize, newDataSet, username));
+    } else {
+      try {
+        TSQueryDataSet tsQueryDataSet = fillRpcReturnData(fetchSize, newDataSet, username);
+        resp.setQueryDataSet(tsQueryDataSet);
+      } catch (RedirectException e) {
+        LOGGER.debug(
+            "need to redirect {} {} to {}",
+            context.getStatement(),
+            context.getQueryId(),
+            e.getEndPoint());
+        if (plan.isEnableRedirect()) {
+          // redirect query
+          TSStatus status = new TSStatus();
+          status.setRedirectNode(e.getEndPoint());
+          status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
+          resp.setStatus(status);
+          resp.setQueryId(context.getQueryId());
+          return resp;
+        } else {
+          LOGGER.error(
+              "execute {} error, if session does not support redirect,"
+                  + " should not throw redirection exception.",
+              context.getStatement(),
+              e);
+        }
+      }
+    }
+    queryTimeManager.unRegisterQuery(context.getQueryId(), false);
+
+    if (plan.isEnableTracing()) {
+      tracingManager.registerActivity(
+          context.getQueryId(),
+          TracingConstant.ACTIVITY_REQUEST_COMPLETE,
+          System.currentTimeMillis());
+
+      TSTracingInfo tsTracingInfo = fillRpcReturnTracingInfo(context.getQueryId());
+      resp.setTracingInfo(tsTracingInfo);
+    }
+    return resp;
+  }
+
+  private TSExecuteStatementResp executeShowOrAuthorPlan(
+      PhysicalPlan plan, QueryContext context, int fetchSize, String username)
+      throws QueryProcessException, TException, StorageEngineException, SQLException, IOException,
+          InterruptedException, QueryFilterOptimizationException, MetadataException, AuthException {
+    // create and cache dataset
+    QueryDataSet newDataSet = createQueryDataSet(context, plan, fetchSize);
+    TSExecuteStatementResp resp = getListDataSetResp(plan, newDataSet);
+
+    resp.setQueryDataSet(fillRpcReturnData(fetchSize, newDataSet, username));
+    queryTimeManager.unRegisterQuery(context.getQueryId(), false);
+    return resp;
+  }
+
+  /**
+   * Construct TSExecuteStatementResp and the header of list result set.
+   *
+   * @param plan maybe ShowPlan or AuthorPlan
+   */
+  private TSExecuteStatementResp getListDataSetResp(PhysicalPlan plan, QueryDataSet dataSet) {
     TSExecuteStatementResp resp =
         StaticResps.getNoTimeExecuteResp(
             dataSet.getPaths().stream().map(Path::getFullPath).collect(Collectors.toList()),
@@ -735,21 +770,6 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
     return resp;
   }
 
-  /** get ResultSet schema */
-  private TSExecuteStatementResp getQueryColumnHeaders(
-      PhysicalPlan physicalPlan, String username, boolean isJdbcQuery)
-      throws AuthException, TException, MetadataException {
-    // check permissions
-    if (!checkAuthorization(physicalPlan.getAuthPaths(), physicalPlan, username)) {
-      return RpcUtils.getTSExecuteStatementResp(
-          RpcUtils.getStatus(
-              TSStatusCode.NO_PERMISSION_ERROR,
-              "No permissions for this operation " + physicalPlan.getOperatorType()));
-    }
-
-    return ((QueryPlan) physicalPlan).getTSExecuteStatementResp(isJdbcQuery);
-  }
-
   private TSExecuteStatementResp executeSelectIntoStatement(
       String statement,
       long statementId,