You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/01/03 07:43:34 UTC

[iotdb] branch 0.12controlQueryThread created (now 53c1503)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a change to branch 0.12controlQueryThread
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 53c1503  wrap internalExecuteQueryStatement with QueryTask()

This branch includes the following new commits:

     new 5834195  abstract redirect method
     new 53c1503  wrap internalExecuteQueryStatement with QueryTask()

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 02/02: wrap internalExecuteQueryStatement with QueryTask()

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch 0.12controlQueryThread
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 53c15030ee2f8d48a978fae1834603602d611553
Author: Alima777 <wx...@gmail.com>
AuthorDate: Mon Jan 3 15:42:00 2022 +0800

    wrap internalExecuteQueryStatement with QueryTask()
---
 .../org/apache/iotdb/db/concurrent/ThreadName.java |   3 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  19 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  10 +
 .../db/query/dataset/NonAlignEngineDataSet.java    |   4 +-
 .../dataset/RawQueryDataSetWithoutValueFilter.java |   5 +-
 ...yTaskPoolManager.java => QueryTaskManager.java} |  18 +-
 ...nager.java => RawQueryReadTaskPoolManager.java} |  30 +-
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 357 ++++++++++++---------
 8 files changed, 262 insertions(+), 184 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
index 5f83c12..ad1e613 100644
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
@@ -43,7 +43,8 @@ public enum ThreadName {
   SYNC_MONITOR("Sync-Monitor"),
   LOAD_TSFILE("Load-TsFile"),
   TIME_COST_STATISTIC("TIME_COST_STATISTIC"),
-  QUERY_SERVICE("Query");
+  QUERY_SERVICE("Query"),
+  SUB_RAW_QUERY_SERVICE("Sub_RawQuery");
 
   private String name;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index b880ae5..05699c6 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -238,8 +238,13 @@ public class IoTDBConfig {
   /** How many threads can concurrently flush. When <= 0, use CPU core number. */
   private int concurrentFlushThread = Runtime.getRuntime().availableProcessors();
 
-  /** How many threads can concurrently query. When <= 0, use CPU core number. */
-  private int concurrentQueryThread = 8;
+  /** How many threads can concurrently execute query statement. When <= 0, use CPU core number. */
+  private int concurrentQueryThread = 16;
+
+  /**
+   * How many threads can concurrently read data for raw data query. When <= 0, use CPU core number.
+   */
+  private int concurrentSubRawQueryThread = 8;
 
   /** Is the write mem control for writing enable. */
   private boolean enableMemControl = true;
@@ -1091,10 +1096,18 @@ public class IoTDBConfig {
     return concurrentQueryThread;
   }
 
-  void setConcurrentQueryThread(int concurrentQueryThread) {
+  public void setConcurrentQueryThread(int concurrentQueryThread) {
     this.concurrentQueryThread = concurrentQueryThread;
   }
 
+  public int getConcurrentSubRawQueryThread() {
+    return concurrentSubRawQueryThread;
+  }
+
+  void setConcurrentSubRawQueryThread(int concurrentSubRawQueryThread) {
+    this.concurrentSubRawQueryThread = concurrentSubRawQueryThread;
+  }
+
   public long getSeqTsFileSize() {
     return seqTsFileSize;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index a2c6d3d..10825e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -426,6 +426,16 @@ public class IoTDBDescriptor {
         conf.setConcurrentQueryThread(Runtime.getRuntime().availableProcessors());
       }
 
+      conf.setConcurrentSubRawQueryThread(
+          Integer.parseInt(
+              properties.getProperty(
+                  "concurrent_sub_rawQuery_thread",
+                  Integer.toString(conf.getConcurrentSubRawQueryThread()))));
+
+      if (conf.getConcurrentSubRawQueryThread() <= 0) {
+        conf.setConcurrentSubRawQueryThread(Runtime.getRuntime().availableProcessors());
+      }
+
       conf.setmManagerCacheSize(
           Integer.parseInt(
               properties
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
index f0f887a..f17a62d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.query.dataset;
 import org.apache.iotdb.db.concurrent.WrappedRunnable;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.query.control.QueryTimeManager;
-import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
+import org.apache.iotdb.db.query.pool.RawQueryReadTaskPoolManager;
 import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
@@ -239,7 +239,7 @@ public class NonAlignEngineDataSet extends QueryDataSet implements DirectNonAlig
   /** flag that main thread is interrupted or not */
   private volatile boolean interrupted = false;
 
-  private static final QueryTaskPoolManager pool = QueryTaskPoolManager.getInstance();
+  private static final RawQueryReadTaskPoolManager pool = RawQueryReadTaskPoolManager.getInstance();
 
   private static final Logger LOGGER = LoggerFactory.getLogger(NonAlignEngineDataSet.class);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
index a5f613d..1df8c4f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.query.dataset;
 import org.apache.iotdb.db.concurrent.WrappedRunnable;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.query.control.QueryTimeManager;
-import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
+import org.apache.iotdb.db.query.pool.RawQueryReadTaskPoolManager;
 import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
@@ -169,7 +169,8 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
 
   private final long queryId;
 
-  private static final QueryTaskPoolManager TASK_POOL_MANAGER = QueryTaskPoolManager.getInstance();
+  private static final RawQueryReadTaskPoolManager TASK_POOL_MANAGER =
+      RawQueryReadTaskPoolManager.getInstance();
 
   private static final Logger LOGGER =
       LoggerFactory.getLogger(RawQueryDataSetWithoutValueFilter.class);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskManager.java
similarity index 82%
copy from server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java
copy to server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskManager.java
index 1dcda50..c68f526 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskManager.java
@@ -27,11 +27,17 @@ import org.apache.iotdb.db.engine.flush.pool.AbstractPoolManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class QueryTaskPoolManager extends AbstractPoolManager {
+/**
+ * This pool is used to execute all query task send from client, and return TSExecuteStatementResp.
+ * Thread named by Query.
+ *
+ * <p>Execute QueryTask() in TSServiceImpl
+ */
+public class QueryTaskManager extends AbstractPoolManager {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(QueryTaskPoolManager.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(QueryTaskManager.class);
 
-  private QueryTaskPoolManager() {
+  private QueryTaskManager() {
     int threadCnt =
         Math.min(
             Runtime.getRuntime().availableProcessors(),
@@ -39,8 +45,8 @@ public class QueryTaskPoolManager extends AbstractPoolManager {
     pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.QUERY_SERVICE.getName());
   }
 
-  public static QueryTaskPoolManager getInstance() {
-    return QueryTaskPoolManager.InstanceHolder.instance;
+  public static QueryTaskManager getInstance() {
+    return QueryTaskManager.InstanceHolder.instance;
   }
 
   @Override
@@ -79,6 +85,6 @@ public class QueryTaskPoolManager extends AbstractPoolManager {
       // allowed to do nothing
     }
 
-    private static QueryTaskPoolManager instance = new QueryTaskPoolManager();
+    private static QueryTaskManager instance = new QueryTaskManager();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/query/pool/RawQueryReadTaskPoolManager.java
similarity index 67%
rename from server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java
rename to server/src/main/java/org/apache/iotdb/db/query/pool/RawQueryReadTaskPoolManager.java
index 1dcda50..dc84652 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/pool/RawQueryReadTaskPoolManager.java
@@ -27,20 +27,27 @@ import org.apache.iotdb.db.engine.flush.pool.AbstractPoolManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class QueryTaskPoolManager extends AbstractPoolManager {
+/**
+ * This thread pool is used to read data for raw data query. Thread named by Sub_Raw_Query.
+ *
+ * <p>Execute ReadTask() in RawQueryReadTaskPoolManager
+ */
+public class RawQueryReadTaskPoolManager extends AbstractPoolManager {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(QueryTaskPoolManager.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(RawQueryReadTaskPoolManager.class);
 
-  private QueryTaskPoolManager() {
+  private RawQueryReadTaskPoolManager() {
     int threadCnt =
         Math.min(
             Runtime.getRuntime().availableProcessors(),
-            IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread());
-    pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.QUERY_SERVICE.getName());
+            IoTDBDescriptor.getInstance().getConfig().getConcurrentSubRawQueryThread());
+    pool =
+        IoTDBThreadPoolFactory.newFixedThreadPool(
+            threadCnt, ThreadName.SUB_RAW_QUERY_SERVICE.getName());
   }
 
-  public static QueryTaskPoolManager getInstance() {
-    return QueryTaskPoolManager.InstanceHolder.instance;
+  public static RawQueryReadTaskPoolManager getInstance() {
+    return RawQueryReadTaskPoolManager.InstanceHolder.instance;
   }
 
   @Override
@@ -50,7 +57,7 @@ public class QueryTaskPoolManager extends AbstractPoolManager {
 
   @Override
   public String getName() {
-    return "query task";
+    return "raw query read task";
   }
 
   @Override
@@ -59,9 +66,10 @@ public class QueryTaskPoolManager extends AbstractPoolManager {
       int threadCnt =
           Math.min(
               Runtime.getRuntime().availableProcessors(),
-              IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread());
+              IoTDBDescriptor.getInstance().getConfig().getConcurrentSubRawQueryThread());
       pool =
-          IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.QUERY_SERVICE.getName());
+          IoTDBThreadPoolFactory.newFixedThreadPool(
+              threadCnt, ThreadName.SUB_RAW_QUERY_SERVICE.getName());
     }
   }
 
@@ -79,6 +87,6 @@ public class QueryTaskPoolManager extends AbstractPoolManager {
       // allowed to do nothing
     }
 
-    private static QueryTaskPoolManager instance = new QueryTaskPoolManager();
+    private static RawQueryReadTaskPoolManager instance = new RawQueryReadTaskPoolManager();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 3fd9bee..45988da 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -80,6 +80,7 @@ import org.apache.iotdb.db.query.control.TracingManager;
 import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
 import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
 import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet;
+import org.apache.iotdb.db.query.pool.QueryTaskManager;
 import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
@@ -145,6 +146,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -211,6 +214,151 @@ public class TSServiceImpl implements TSIService.Iface {
         TimeUnit.MINUTES);
   }
 
+  /**
+   * Execute query statement, return TSExecuteStatementResp with dataset.
+   *
+   * @param plan must be a plan for Query: QueryPlan, ShowPlan, and some AuthorPlan
+   */
+  protected class QueryTask implements Callable<TSExecuteStatementResp> {
+
+    private PhysicalPlan plan;
+    private final String username;
+    private final String statement;
+    private final long statementId;
+    private final long timeout;
+    private final int fetchSize;
+    private final boolean enableRedirectQuery;
+
+    public QueryTask(
+        PhysicalPlan plan,
+        String username,
+        String statement,
+        long statementId,
+        long timeout,
+        int fetchSize,
+        boolean enableRedirectQuery) {
+      this.plan = plan;
+      this.username = username;
+      this.statement = statement;
+      this.statementId = statementId;
+      this.timeout = timeout;
+      this.fetchSize = fetchSize;
+      this.enableRedirectQuery = enableRedirectQuery;
+    }
+
+    @Override
+    public TSExecuteStatementResp call() throws Exception {
+      queryCount.incrementAndGet();
+      AUDIT_LOGGER.debug(
+          "Session {} execute Query: {}", sessionManager.getCurrSessionId(), statement);
+      long startTime = System.currentTimeMillis();
+      // generate the queryId for the operation
+      long queryId = sessionManager.requestQueryId(statementId, true);
+      try {
+        // register query info to queryTimeManager
+        if (!(plan instanceof ShowQueryProcesslistPlan)) {
+          queryTimeManager.registerQuery(queryId, startTime, statement, timeout);
+        }
+        if (plan instanceof QueryPlan && config.isEnablePerformanceTracing()) {
+          TracingManager tracingManager = TracingManager.getInstance();
+          if (!(plan instanceof AlignByDevicePlan)) {
+            tracingManager.writeQueryInfo(queryId, statement, startTime, plan.getPaths().size());
+          } else {
+            tracingManager.writeQueryInfo(queryId, statement, startTime);
+          }
+        }
+
+        if (plan instanceof AuthorPlan) {
+          plan.setLoginUserName(username);
+        }
+
+        TSExecuteStatementResp resp = null;
+        // execute it before createDataSet since it may change the content of query plan
+        if (plan instanceof QueryPlan && !(plan instanceof UDFPlan)) {
+          resp = getQueryColumnHeaders(plan, username);
+        }
+        if (plan instanceof QueryPlan) {
+          ((QueryPlan) plan).setEnableRedirect(enableRedirectQuery);
+        }
+        // create and cache dataset
+        QueryDataSet newDataSet = createQueryDataSet(queryId, plan, fetchSize);
+
+        if (newDataSet.getEndPoint() != null && enableRedirectQuery) {
+          LOGGER.debug(
+              "need to redirect {} {} to node {}", statement, queryId, newDataSet.getEndPoint());
+          QueryDataSet.EndPoint endPoint = newDataSet.getEndPoint();
+          return redirectQueryToAnotherNode(resp, queryId, endPoint.getIp(), endPoint.getPort());
+        }
+
+        if (plan instanceof ShowPlan || plan instanceof AuthorPlan) {
+          resp = getListDataSetHeaders(newDataSet);
+        } else if (plan instanceof UDFPlan) {
+          resp = getQueryColumnHeaders(plan, username);
+        }
+
+        resp.setOperationType(plan.getOperatorType().toString());
+        if (plan.getOperatorType() == OperatorType.AGGREGATION) {
+          resp.setIgnoreTimeStamp(true);
+        } else if (plan instanceof ShowQueryProcesslistPlan) {
+          resp.setIgnoreTimeStamp(false);
+        }
+
+        if (newDataSet instanceof DirectNonAlignDataSet) {
+          resp.setNonAlignQueryDataSet(fillRpcNonAlignReturnData(fetchSize, newDataSet, username));
+        } else {
+          try {
+            TSQueryDataSet tsQueryDataSet = fillRpcReturnData(fetchSize, newDataSet, username);
+            resp.setQueryDataSet(tsQueryDataSet);
+          } catch (RedirectException e) {
+            LOGGER.debug("need to redirect {} {} to {}", statement, queryId, e.getEndPoint());
+            if (enableRedirectQuery) {
+              EndPoint endPoint = e.getEndPoint();
+              redirectQueryToAnotherNode(resp, queryId, endPoint.ip, endPoint.port);
+            } else {
+              LOGGER.error(
+                  "execute {} error, if session does not support redirect,"
+                      + " should not throw redirection exception.",
+                  statement,
+                  e);
+            }
+          }
+        }
+        resp.setQueryId(queryId);
+
+        if (plan instanceof AlignByDevicePlan && config.isEnablePerformanceTracing()) {
+          TracingManager.getInstance()
+              .writePathsNum(queryId, ((AlignByDeviceDataSet) newDataSet).getPathsNum());
+        }
+
+        if (enableMetric) {
+          long endTime = System.currentTimeMillis();
+          SqlArgument sqlArgument = new SqlArgument(resp, plan, statement, startTime, endTime);
+          synchronized (sqlArgumentList) {
+            sqlArgumentList.add(sqlArgument);
+            if (sqlArgumentList.size() >= MAX_SIZE) {
+              sqlArgumentList.subList(0, DELETE_SIZE).clear();
+            }
+          }
+        }
+
+        // remove query info in QueryTimeManager
+        if (!(plan instanceof ShowQueryProcesslistPlan)) {
+          queryTimeManager.unRegisterQuery(queryId);
+        }
+        return resp;
+      } catch (Exception e) {
+        releaseQueryResourceNoExceptions(queryId);
+        throw e;
+      } finally {
+        Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+        long costTime = System.currentTimeMillis() - startTime;
+        if (costTime >= config.getSlowQueryThreshold()) {
+          SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
+        }
+      }
+    }
+  }
+
   public static List<SqlArgument> getSqlArgumentList() {
     return sqlArgumentList;
   }
@@ -603,16 +751,22 @@ public class TSServiceImpl implements TSIService.Iface {
           processor.parseSQLToPhysicalPlan(
               statement, sessionManager.getZoneId(req.getSessionId()), req.fetchSize);
 
-      return physicalPlan.isQuery()
-          ? internalExecuteQueryStatement(
-              statement,
-              req.statementId,
-              physicalPlan,
-              req.fetchSize,
-              req.timeout,
-              sessionManager.getUsername(req.getSessionId()),
-              req.isEnableRedirectQuery())
-          : executeUpdateStatement(physicalPlan, req.getSessionId());
+      if (physicalPlan.isQuery()) {
+        Future<TSExecuteStatementResp> resp =
+            QueryTaskManager.getInstance()
+                .submit(
+                    new QueryTask(
+                        physicalPlan,
+                        sessionManager.getUsername(req.sessionId),
+                        req.statement,
+                        req.statementId,
+                        req.timeout,
+                        req.fetchSize,
+                        req.enableRedirectQuery));
+        return resp.get();
+      } else {
+        return executeUpdateStatement(physicalPlan, req.getSessionId());
+      }
     } catch (InterruptedException e) {
       LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
       Thread.currentThread().interrupt();
@@ -636,17 +790,23 @@ public class TSServiceImpl implements TSIService.Iface {
           processor.parseSQLToPhysicalPlan(
               statement, sessionManager.getZoneId(req.sessionId), req.fetchSize);
 
-      return physicalPlan.isQuery()
-          ? internalExecuteQueryStatement(
-              statement,
-              req.statementId,
-              physicalPlan,
-              req.fetchSize,
-              req.timeout,
-              sessionManager.getUsername(req.getSessionId()),
-              req.isEnableRedirectQuery())
-          : RpcUtils.getTSExecuteStatementResp(
-              TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
+      if (physicalPlan.isQuery()) {
+        Future<TSExecuteStatementResp> resp =
+            QueryTaskManager.getInstance()
+                .submit(
+                    new QueryTask(
+                        physicalPlan,
+                        sessionManager.getUsername(req.sessionId),
+                        req.statement,
+                        req.statementId,
+                        req.timeout,
+                        req.fetchSize,
+                        req.enableRedirectQuery));
+        return resp.get();
+      } else {
+        return RpcUtils.getTSExecuteStatementResp(
+            TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
+      }
     } catch (InterruptedException e) {
       LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
       Thread.currentThread().interrupt();
@@ -667,17 +827,23 @@ public class TSServiceImpl implements TSIService.Iface {
 
       PhysicalPlan physicalPlan =
           processor.rawDataQueryReqToPhysicalPlan(req, sessionManager.getZoneId(req.sessionId));
-      return physicalPlan.isQuery()
-          ? internalExecuteQueryStatement(
-              "",
-              req.statementId,
-              physicalPlan,
-              req.fetchSize,
-              config.getQueryTimeoutThreshold(),
-              sessionManager.getUsername(req.sessionId),
-              req.isEnableRedirectQuery())
-          : RpcUtils.getTSExecuteStatementResp(
-              TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
+      if (physicalPlan.isQuery()) {
+        Future<TSExecuteStatementResp> resp =
+            QueryTaskManager.getInstance()
+                .submit(
+                    new QueryTask(
+                        physicalPlan,
+                        sessionManager.getUsername(req.sessionId),
+                        "",
+                        req.statementId,
+                        config.getQueryTimeoutThreshold(),
+                        req.fetchSize,
+                        req.enableRedirectQuery));
+        return resp.get();
+      } else {
+        return RpcUtils.getTSExecuteStatementResp(
+            TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
+      }
     } catch (InterruptedException e) {
       LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
       Thread.currentThread().interrupt();
@@ -689,133 +855,6 @@ public class TSServiceImpl implements TSIService.Iface {
     }
   }
 
-  /**
-   * @param plan must be a plan for Query: FillQueryPlan, AggregationPlan, GroupByTimePlan, UDFPlan,
-   *     some AuthorPlan
-   */
-  @SuppressWarnings({"squid:S3776", "squid:S1141"}) // Suppress high Cognitive Complexity warning
-  private TSExecuteStatementResp internalExecuteQueryStatement(
-      String statement,
-      long statementId,
-      PhysicalPlan plan,
-      int fetchSize,
-      long timeout,
-      String username,
-      boolean enableRedirect)
-      throws QueryProcessException, SQLException, StorageEngineException,
-          QueryFilterOptimizationException, MetadataException, IOException, InterruptedException,
-          TException, AuthException {
-    queryCount.incrementAndGet();
-    AUDIT_LOGGER.debug(
-        "Session {} execute Query: {}", sessionManager.getCurrSessionId(), statement);
-    long startTime = System.currentTimeMillis();
-    long queryId = -1;
-    try {
-      // generate the queryId for the operation
-      queryId = sessionManager.requestQueryId(statementId, true);
-      // register query info to queryTimeManager
-      if (!(plan instanceof ShowQueryProcesslistPlan)) {
-        queryTimeManager.registerQuery(queryId, startTime, statement, timeout);
-      }
-      if (plan instanceof QueryPlan && config.isEnablePerformanceTracing()) {
-        TracingManager tracingManager = TracingManager.getInstance();
-        if (!(plan instanceof AlignByDevicePlan)) {
-          tracingManager.writeQueryInfo(queryId, statement, startTime, plan.getPaths().size());
-        } else {
-          tracingManager.writeQueryInfo(queryId, statement, startTime);
-        }
-      }
-
-      if (plan instanceof AuthorPlan) {
-        plan.setLoginUserName(username);
-      }
-
-      TSExecuteStatementResp resp = null;
-      // execute it before createDataSet since it may change the content of query plan
-      if (plan instanceof QueryPlan && !(plan instanceof UDFPlan)) {
-        resp = getQueryColumnHeaders(plan, username);
-      }
-      if (plan instanceof QueryPlan) {
-        ((QueryPlan) plan).setEnableRedirect(enableRedirect);
-      }
-      // create and cache dataset
-      QueryDataSet newDataSet = createQueryDataSet(queryId, plan, fetchSize);
-
-      if (newDataSet.getEndPoint() != null && enableRedirect) {
-        LOGGER.debug(
-            "need to redirect {} {} to node {}", statement, queryId, newDataSet.getEndPoint());
-        QueryDataSet.EndPoint endPoint = newDataSet.getEndPoint();
-        return redirectQueryToAnotherNode(resp, queryId, endPoint.getIp(), endPoint.getPort());
-      }
-
-      if (plan instanceof ShowPlan || plan instanceof AuthorPlan) {
-        resp = getListDataSetHeaders(newDataSet);
-      } else if (plan instanceof UDFPlan) {
-        resp = getQueryColumnHeaders(plan, username);
-      }
-
-      resp.setOperationType(plan.getOperatorType().toString());
-      if (plan.getOperatorType() == OperatorType.AGGREGATION) {
-        resp.setIgnoreTimeStamp(true);
-      } else if (plan instanceof ShowQueryProcesslistPlan) {
-        resp.setIgnoreTimeStamp(false);
-      }
-
-      if (newDataSet instanceof DirectNonAlignDataSet) {
-        resp.setNonAlignQueryDataSet(fillRpcNonAlignReturnData(fetchSize, newDataSet, username));
-      } else {
-        try {
-          TSQueryDataSet tsQueryDataSet = fillRpcReturnData(fetchSize, newDataSet, username);
-          resp.setQueryDataSet(tsQueryDataSet);
-        } catch (RedirectException e) {
-          LOGGER.debug("need to redirect {} {} to {}", statement, queryId, e.getEndPoint());
-          if (enableRedirect) {
-            EndPoint endPoint = e.getEndPoint();
-            redirectQueryToAnotherNode(resp, queryId, endPoint.ip, endPoint.port);
-          } else {
-            LOGGER.error(
-                "execute {} error, if session does not support redirect,"
-                    + " should not throw redirection exception.",
-                statement,
-                e);
-          }
-        }
-      }
-      resp.setQueryId(queryId);
-
-      if (plan instanceof AlignByDevicePlan && config.isEnablePerformanceTracing()) {
-        TracingManager.getInstance()
-            .writePathsNum(queryId, ((AlignByDeviceDataSet) newDataSet).getPathsNum());
-      }
-
-      if (enableMetric) {
-        long endTime = System.currentTimeMillis();
-        SqlArgument sqlArgument = new SqlArgument(resp, plan, statement, startTime, endTime);
-        synchronized (sqlArgumentList) {
-          sqlArgumentList.add(sqlArgument);
-          if (sqlArgumentList.size() >= MAX_SIZE) {
-            sqlArgumentList.subList(0, DELETE_SIZE).clear();
-          }
-        }
-      }
-
-      // remove query info in QueryTimeManager
-      if (!(plan instanceof ShowQueryProcesslistPlan)) {
-        queryTimeManager.unRegisterQuery(queryId);
-      }
-      return resp;
-    } catch (Exception e) {
-      releaseQueryResourceNoExceptions(queryId);
-      throw e;
-    } finally {
-      Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, startTime);
-      long costTime = System.currentTimeMillis() - startTime;
-      if (costTime >= config.getSlowQueryThreshold()) {
-        SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
-      }
-    }
-  }
-
   /** Redirect query */
   private TSExecuteStatementResp redirectQueryToAnotherNode(
       TSExecuteStatementResp resp, long queryId, String ip, int port) {

[iotdb] 01/02: abstract redirect method

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch 0.12controlQueryThread
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5834195f4a2075e4b3f8b8496ce5f8bf9783a839
Author: Alima777 <wx...@gmail.com>
AuthorDate: Mon Jan 3 15:21:56 2022 +0800

    abstract redirect method
---
 client-cpp/pom.xml                                 |  4 +--
 compile-tools/pom.xml                              |  6 ++--
 distribution/pom.xml                               |  2 +-
 example/client-cpp-example/pom.xml                 |  2 +-
 example/udf/pom.xml                                |  2 +-
 grafana/pom.xml                                    |  2 +-
 jdbc/pom.xml                                       |  2 +-
 pom.xml                                            |  8 ++---
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 36 ++++++++++++----------
 9 files changed, 34 insertions(+), 30 deletions(-)

diff --git a/client-cpp/pom.xml b/client-cpp/pom.xml
index 014a0f5..79e295f 100644
--- a/client-cpp/pom.xml
+++ b/client-cpp/pom.xml
@@ -98,8 +98,8 @@
                 <cmake.root.dir>${project.parent.basedir}/compile-tools/thrift/target/cmake-${cmake-version}-win64-x64/</cmake.root.dir>
                 <thrift.exec.absolute.path>${project.parent.basedir}/compile-tools/thrift/target/build/compiler/cpp/bin/${cmake.build.type}/thrift.exe</thrift.exec.absolute.path>
                 <iotdb.server.script>start-server.bat</iotdb.server.script>
-                <boost.include.dir />
-                <boost.library.dir />
+                <boost.include.dir/>
+                <boost.library.dir/>
             </properties>
         </profile>
         <profile>
diff --git a/compile-tools/pom.xml b/compile-tools/pom.xml
index 88bb39b..3d82f11 100644
--- a/compile-tools/pom.xml
+++ b/compile-tools/pom.xml
@@ -35,7 +35,7 @@
         <cmake-version>3.17.3</cmake-version>
         <openssl.include.dir>-Dtrue1=true1</openssl.include.dir>
         <bison.executable.dir>-Dtrue1=true1</bison.executable.dir>
-        <cmake.build.type />
+        <cmake.build.type/>
     </properties>
     <modules>
         <module>thrift</module>
@@ -114,8 +114,8 @@
                 <thrift.make.executable>make</thrift.make.executable>
                 <thrift.compiler.executable>thrift.exe</thrift.compiler.executable>
                 <gradlew.executable>gradlew.bat</gradlew.executable>
-                <boost.include.dir />
-                <boost.library.dir />
+                <boost.include.dir/>
+                <boost.library.dir/>
             </properties>
         </profile>
     </profiles>
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 8f8c28e..49dceed 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -29,7 +29,7 @@
     </parent>
     <artifactId>iotdb-distribution</artifactId>
     <name>IoTDB Distribution</name>
-    <modules />
+    <modules/>
     <build>
         <plugins>
             <plugin>
diff --git a/example/client-cpp-example/pom.xml b/example/client-cpp-example/pom.xml
index 1cc5a23..c06f132 100644
--- a/example/client-cpp-example/pom.xml
+++ b/example/client-cpp-example/pom.xml
@@ -69,7 +69,7 @@
             <properties>
                 <cmake.generator>Visual Studio 16 2019</cmake.generator>
                 <cmake.root.dir>${project.parent.basedir}/../compile-tools/thrift/target/cmake-${cmake-version}-win64-x64/</cmake.root.dir>
-                <boost.include.dir />
+                <boost.include.dir/>
             </properties>
         </profile>
         <profile>
diff --git a/example/udf/pom.xml b/example/udf/pom.xml
index 131f05d..6d3127c 100644
--- a/example/udf/pom.xml
+++ b/example/udf/pom.xml
@@ -77,7 +77,7 @@
                         <importOrder>
                             <order>org.apache.iotdb,,javax,java,\#</order>
                         </importOrder>
-                        <removeUnusedImports />
+                        <removeUnusedImports/>
                     </java>
                 </configuration>
                 <executions>
diff --git a/grafana/pom.xml b/grafana/pom.xml
index d0c29ec..d4d80ac 100644
--- a/grafana/pom.xml
+++ b/grafana/pom.xml
@@ -170,7 +170,7 @@
                                     <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                         <resource>META-INF/spring.schemas</resource>
                                     </transformer>
-                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                     <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                         <mainClass>${start-class}</mainClass>
                                     </transformer>
diff --git a/jdbc/pom.xml b/jdbc/pom.xml
index 940678b..84a39f0 100644
--- a/jdbc/pom.xml
+++ b/jdbc/pom.xml
@@ -198,7 +198,7 @@
                                                 </goals>
                                             </pluginExecutionFilter>
                                             <action>
-                                                <ignore />
+                                                <ignore/>
                                             </action>
                                         </pluginExecution>
                                     </pluginExecutions>
diff --git a/pom.xml b/pom.xml
index b7d08a5..6e03cf7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -151,7 +151,7 @@
         <sonar.junit.reportPaths>target/surefire-reports,target/failsafe-reports</sonar.junit.reportPaths>
         <!-- By default, the argLine is empty-->
         <gson.version>2.8.6</gson.version>
-        <argLine />
+        <argLine/>
         <!-- whether enable compiling the cpp client-->
         <client-cpp>false</client-cpp>
         <!-- disable enforcer by default-->
@@ -688,7 +688,7 @@
                             <importOrder>
                                 <order>org.apache.iotdb,,javax,java,\#</order>
                             </importOrder>
-                            <removeUnusedImports />
+                            <removeUnusedImports/>
                         </java>
                         <lineEndings>UNIX</lineEndings>
                     </configuration>
@@ -760,7 +760,7 @@
                         <phase>validate</phase>
                         <configuration>
                             <rules>
-                                <dependencyConvergence />
+                                <dependencyConvergence/>
                             </rules>
                         </configuration>
                         <goals>
@@ -806,7 +806,7 @@
                                 </requireJavaVersion>
                                 <!-- Disabled for now as it breaks the ability to build single modules -->
                                 <!--reactorModuleConvergence/-->
-                                <banVulnerable implementation="org.sonatype.ossindex.maven.enforcer.BanVulnerableDependencies" />
+                                <banVulnerable implementation="org.sonatype.ossindex.maven.enforcer.BanVulnerableDependencies"/>
                             </rules>
                         </configuration>
                     </execution>
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 3f7c349..3fd9bee 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -28,7 +28,11 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.cost.statistic.Measurement;
 import org.apache.iotdb.db.cost.statistic.Operation;
-import org.apache.iotdb.db.exception.*;
+import org.apache.iotdb.db.exception.BatchProcessException;
+import org.apache.iotdb.db.exception.IoTDBException;
+import org.apache.iotdb.db.exception.QueryInBatchStatementException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.StorageGroupNotReadyException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
@@ -738,16 +742,10 @@ public class TSServiceImpl implements TSIService.Iface {
       QueryDataSet newDataSet = createQueryDataSet(queryId, plan, fetchSize);
 
       if (newDataSet.getEndPoint() != null && enableRedirect) {
-        // redirect query
         LOGGER.debug(
             "need to redirect {} {} to node {}", statement, queryId, newDataSet.getEndPoint());
-        TSStatus status = new TSStatus();
-        status.setRedirectNode(
-            new EndPoint(newDataSet.getEndPoint().getIp(), newDataSet.getEndPoint().getPort()));
-        status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
-        resp.setStatus(status);
-        resp.setQueryId(queryId);
-        return resp;
+        QueryDataSet.EndPoint endPoint = newDataSet.getEndPoint();
+        return redirectQueryToAnotherNode(resp, queryId, endPoint.getIp(), endPoint.getPort());
       }
 
       if (plan instanceof ShowPlan || plan instanceof AuthorPlan) {
@@ -772,13 +770,8 @@ public class TSServiceImpl implements TSIService.Iface {
         } catch (RedirectException e) {
           LOGGER.debug("need to redirect {} {} to {}", statement, queryId, e.getEndPoint());
           if (enableRedirect) {
-            // redirect query
-            TSStatus status = new TSStatus();
-            status.setRedirectNode(e.getEndPoint());
-            status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
-            resp.setStatus(status);
-            resp.setQueryId(queryId);
-            return resp;
+            EndPoint endPoint = e.getEndPoint();
+            redirectQueryToAnotherNode(resp, queryId, endPoint.ip, endPoint.port);
           } else {
             LOGGER.error(
                 "execute {} error, if session does not support redirect,"
@@ -823,6 +816,17 @@ public class TSServiceImpl implements TSIService.Iface {
     }
   }
 
+  /** Redirect query */
+  private TSExecuteStatementResp redirectQueryToAnotherNode(
+      TSExecuteStatementResp resp, long queryId, String ip, int port) {
+    TSStatus status = new TSStatus();
+    status.setRedirectNode(new EndPoint(ip, port));
+    status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
+    resp.setStatus(status);
+    resp.setQueryId(queryId);
+    return resp;
+  }
+
   private TSExecuteStatementResp getListDataSetHeaders(QueryDataSet dataSet) {
     return StaticResps.getNoTimeExecuteResp(
         dataSet.getPaths().stream().map(Path::getFullPath).collect(Collectors.toList()),