You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/01/04 02:14:37 UTC

[iotdb] branch rel/0.12 updated: [To rel/0.12] [IOTDB-2195] Control the concurrent query execution thread (#4699)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new 1c9d42f  [To rel/0.12] [IOTDB-2195] Control the concurrent query execution thread (#4699)
1c9d42f is described below

commit 1c9d42f8520e6a9f307e1d352c3a0b8634217999
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Tue Jan 4 10:14:06 2022 +0800

    [To rel/0.12] [IOTDB-2195] Control the concurrent query execution thread (#4699)
---
 client-cpp/pom.xml                                 |   4 +-
 compile-tools/pom.xml                              |   6 +-
 distribution/pom.xml                               |   2 +-
 example/client-cpp-example/pom.xml                 |   2 +-
 example/udf/pom.xml                                |   2 +-
 grafana/pom.xml                                    |   2 +-
 jdbc/pom.xml                                       |   2 +-
 pom.xml                                            |   8 +-
 .../resources/conf/iotdb-engine.properties         |  13 +-
 .../org/apache/iotdb/db/concurrent/ThreadName.java |   3 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  30 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  16 +
 .../db/query/dataset/NonAlignEngineDataSet.java    |   4 +-
 .../dataset/RawQueryDataSetWithoutValueFilter.java |   9 +-
 ...yTaskPoolManager.java => QueryTaskManager.java} |  18 +-
 ...nager.java => RawQueryReadTaskPoolManager.java} |  30 +-
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 381 ++++++++++++---------
 17 files changed, 321 insertions(+), 211 deletions(-)

diff --git a/client-cpp/pom.xml b/client-cpp/pom.xml
index 014a0f5..79e295f 100644
--- a/client-cpp/pom.xml
+++ b/client-cpp/pom.xml
@@ -98,8 +98,8 @@
                 <cmake.root.dir>${project.parent.basedir}/compile-tools/thrift/target/cmake-${cmake-version}-win64-x64/</cmake.root.dir>
                 <thrift.exec.absolute.path>${project.parent.basedir}/compile-tools/thrift/target/build/compiler/cpp/bin/${cmake.build.type}/thrift.exe</thrift.exec.absolute.path>
                 <iotdb.server.script>start-server.bat</iotdb.server.script>
-                <boost.include.dir />
-                <boost.library.dir />
+                <boost.include.dir/>
+                <boost.library.dir/>
             </properties>
         </profile>
         <profile>
diff --git a/compile-tools/pom.xml b/compile-tools/pom.xml
index 88bb39b..3d82f11 100644
--- a/compile-tools/pom.xml
+++ b/compile-tools/pom.xml
@@ -35,7 +35,7 @@
         <cmake-version>3.17.3</cmake-version>
         <openssl.include.dir>-Dtrue1=true1</openssl.include.dir>
         <bison.executable.dir>-Dtrue1=true1</bison.executable.dir>
-        <cmake.build.type />
+        <cmake.build.type/>
     </properties>
     <modules>
         <module>thrift</module>
@@ -114,8 +114,8 @@
                 <thrift.make.executable>make</thrift.make.executable>
                 <thrift.compiler.executable>thrift.exe</thrift.compiler.executable>
                 <gradlew.executable>gradlew.bat</gradlew.executable>
-                <boost.include.dir />
-                <boost.library.dir />
+                <boost.include.dir/>
+                <boost.library.dir/>
             </properties>
         </profile>
     </profiles>
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 8f8c28e..49dceed 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -29,7 +29,7 @@
     </parent>
     <artifactId>iotdb-distribution</artifactId>
     <name>IoTDB Distribution</name>
-    <modules />
+    <modules/>
     <build>
         <plugins>
             <plugin>
diff --git a/example/client-cpp-example/pom.xml b/example/client-cpp-example/pom.xml
index 1cc5a23..c06f132 100644
--- a/example/client-cpp-example/pom.xml
+++ b/example/client-cpp-example/pom.xml
@@ -69,7 +69,7 @@
             <properties>
                 <cmake.generator>Visual Studio 16 2019</cmake.generator>
                 <cmake.root.dir>${project.parent.basedir}/../compile-tools/thrift/target/cmake-${cmake-version}-win64-x64/</cmake.root.dir>
-                <boost.include.dir />
+                <boost.include.dir/>
             </properties>
         </profile>
         <profile>
diff --git a/example/udf/pom.xml b/example/udf/pom.xml
index 131f05d..6d3127c 100644
--- a/example/udf/pom.xml
+++ b/example/udf/pom.xml
@@ -77,7 +77,7 @@
                         <importOrder>
                             <order>org.apache.iotdb,,javax,java,\#</order>
                         </importOrder>
-                        <removeUnusedImports />
+                        <removeUnusedImports/>
                     </java>
                 </configuration>
                 <executions>
diff --git a/grafana/pom.xml b/grafana/pom.xml
index d0c29ec..d4d80ac 100644
--- a/grafana/pom.xml
+++ b/grafana/pom.xml
@@ -170,7 +170,7 @@
                                     <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                         <resource>META-INF/spring.schemas</resource>
                                     </transformer>
-                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                     <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                         <mainClass>${start-class}</mainClass>
                                     </transformer>
diff --git a/jdbc/pom.xml b/jdbc/pom.xml
index 940678b..84a39f0 100644
--- a/jdbc/pom.xml
+++ b/jdbc/pom.xml
@@ -198,7 +198,7 @@
                                                 </goals>
                                             </pluginExecutionFilter>
                                             <action>
-                                                <ignore />
+                                                <ignore/>
                                             </action>
                                         </pluginExecution>
                                     </pluginExecutions>
diff --git a/pom.xml b/pom.xml
index b7d08a5..6e03cf7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -151,7 +151,7 @@
         <sonar.junit.reportPaths>target/surefire-reports,target/failsafe-reports</sonar.junit.reportPaths>
         <!-- By default, the argLine is empty-->
         <gson.version>2.8.6</gson.version>
-        <argLine />
+        <argLine/>
         <!-- whether enable compiling the cpp client-->
         <client-cpp>false</client-cpp>
         <!-- disable enforcer by default-->
@@ -688,7 +688,7 @@
                             <importOrder>
                                 <order>org.apache.iotdb,,javax,java,\#</order>
                             </importOrder>
-                            <removeUnusedImports />
+                            <removeUnusedImports/>
                         </java>
                         <lineEndings>UNIX</lineEndings>
                     </configuration>
@@ -760,7 +760,7 @@
                         <phase>validate</phase>
                         <configuration>
                             <rules>
-                                <dependencyConvergence />
+                                <dependencyConvergence/>
                             </rules>
                         </configuration>
                         <goals>
@@ -806,7 +806,7 @@
                                 </requireJavaVersion>
                                 <!-- Disabled for now as it breaks the ability to build single modules -->
                                 <!--reactorModuleConvergence/-->
-                                <banVulnerable implementation="org.sonatype.ossindex.maven.enforcer.BanVulnerableDependencies" />
+                                <banVulnerable implementation="org.sonatype.ossindex.maven.enforcer.BanVulnerableDependencies"/>
                             </rules>
                         </configuration>
                     </execution>
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 28a4f65..801e96d 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -249,8 +249,17 @@ timestamp_precision=ms
 # How many threads can concurrently flush. When <= 0, use CPU core number.
 # concurrent_flush_thread=0
 
-# How many threads can concurrently query. When <= 0, use CPU core number.
-# concurrent_query_thread=8
+# How many threads can concurrently execute query statement. When <= 0, use CPU core number.
+# Datatype: int
+# concurrent_query_thread=16
+
+# How many threads can concurrently read data for raw data query. When <= 0, use CPU core number.
+# Datatype: int
+# concurrent_sub_rawQuery_thread=8
+
+# Blocking queue size for read task in raw data query. Must >= 1.
+# Datatype: int
+# raw_query_blocking_queue_capacity=5
 
 # whether take over the memory management by IoTDB rather than JVM when serializing memtable as bytes in memory
 # (i.e., whether use ChunkBufferPool), value true, false
diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
index 5f83c12..ad1e613 100644
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
@@ -43,7 +43,8 @@ public enum ThreadName {
   SYNC_MONITOR("Sync-Monitor"),
   LOAD_TSFILE("Load-TsFile"),
   TIME_COST_STATISTIC("TIME_COST_STATISTIC"),
-  QUERY_SERVICE("Query");
+  QUERY_SERVICE("Query"),
+  SUB_RAW_QUERY_SERVICE("Sub_RawQuery");
 
   private String name;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index b880ae5..adca087 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -238,8 +238,16 @@ public class IoTDBConfig {
   /** How many threads can concurrently flush. When <= 0, use CPU core number. */
   private int concurrentFlushThread = Runtime.getRuntime().availableProcessors();
 
-  /** How many threads can concurrently query. When <= 0, use CPU core number. */
-  private int concurrentQueryThread = 8;
+  /** How many threads can concurrently execute query statement. When <= 0, use CPU core number. */
+  private int concurrentQueryThread = 16;
+
+  /**
+   * How many threads can concurrently read data for raw data query. When <= 0, use CPU core number.
+   */
+  private int concurrentSubRawQueryThread = 8;
+
+  /** Blocking queue size for read task in raw data query. */
+  private int rawQueryBlockingQueueCapacity = 5;
 
   /** Is the write mem control for writing enable. */
   private boolean enableMemControl = true;
@@ -1091,10 +1099,26 @@ public class IoTDBConfig {
     return concurrentQueryThread;
   }
 
-  void setConcurrentQueryThread(int concurrentQueryThread) {
+  public void setConcurrentQueryThread(int concurrentQueryThread) {
     this.concurrentQueryThread = concurrentQueryThread;
   }
 
+  public int getConcurrentSubRawQueryThread() {
+    return concurrentSubRawQueryThread;
+  }
+
+  void setConcurrentSubRawQueryThread(int concurrentSubRawQueryThread) {
+    this.concurrentSubRawQueryThread = concurrentSubRawQueryThread;
+  }
+
+  public int getRawQueryBlockingQueueCapacity() {
+    return rawQueryBlockingQueueCapacity;
+  }
+
+  public void setRawQueryBlockingQueueCapacity(int rawQueryBlockingQueueCapacity) {
+    this.rawQueryBlockingQueueCapacity = rawQueryBlockingQueueCapacity;
+  }
+
   public long getSeqTsFileSize() {
     return seqTsFileSize;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index a2c6d3d..66b6fbe 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -426,6 +426,22 @@ public class IoTDBDescriptor {
         conf.setConcurrentQueryThread(Runtime.getRuntime().availableProcessors());
       }
 
+      conf.setConcurrentSubRawQueryThread(
+          Integer.parseInt(
+              properties.getProperty(
+                  "concurrent_sub_rawQuery_thread",
+                  Integer.toString(conf.getConcurrentSubRawQueryThread()))));
+
+      if (conf.getConcurrentSubRawQueryThread() <= 0) {
+        conf.setConcurrentSubRawQueryThread(Runtime.getRuntime().availableProcessors());
+      }
+
+      conf.setRawQueryBlockingQueueCapacity(
+          Integer.parseInt(
+              properties.getProperty(
+                  "raw_query_blocking_queue_capacity",
+                  Integer.toString(conf.getRawQueryBlockingQueueCapacity()))));
+
       conf.setmManagerCacheSize(
           Integer.parseInt(
               properties
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
index f0f887a..f17a62d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.query.dataset;
 import org.apache.iotdb.db.concurrent.WrappedRunnable;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.query.control.QueryTimeManager;
-import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
+import org.apache.iotdb.db.query.pool.RawQueryReadTaskPoolManager;
 import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
@@ -239,7 +239,7 @@ public class NonAlignEngineDataSet extends QueryDataSet implements DirectNonAlig
   /** flag that main thread is interrupted or not */
   private volatile boolean interrupted = false;
 
-  private static final QueryTaskPoolManager pool = QueryTaskPoolManager.getInstance();
+  private static final RawQueryReadTaskPoolManager pool = RawQueryReadTaskPoolManager.getInstance();
 
   private static final Logger LOGGER = LoggerFactory.getLogger(NonAlignEngineDataSet.class);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
index a5f613d..d739d10 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
@@ -20,9 +20,10 @@
 package org.apache.iotdb.db.query.dataset;
 
 import org.apache.iotdb.db.concurrent.WrappedRunnable;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.query.control.QueryTimeManager;
-import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
+import org.apache.iotdb.db.query.pool.RawQueryReadTaskPoolManager;
 import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
@@ -165,11 +166,13 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
   protected int[] batchDataLengthList;
 
   // capacity for blocking queue
-  private static final int BLOCKING_QUEUE_CAPACITY = 5;
+  private static final int BLOCKING_QUEUE_CAPACITY =
+      IoTDBDescriptor.getInstance().getConfig().getRawQueryBlockingQueueCapacity();
 
   private final long queryId;
 
-  private static final QueryTaskPoolManager TASK_POOL_MANAGER = QueryTaskPoolManager.getInstance();
+  private static final RawQueryReadTaskPoolManager TASK_POOL_MANAGER =
+      RawQueryReadTaskPoolManager.getInstance();
 
   private static final Logger LOGGER =
       LoggerFactory.getLogger(RawQueryDataSetWithoutValueFilter.class);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskManager.java
similarity index 82%
copy from server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java
copy to server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskManager.java
index 1dcda50..c68f526 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskManager.java
@@ -27,11 +27,17 @@ import org.apache.iotdb.db.engine.flush.pool.AbstractPoolManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class QueryTaskPoolManager extends AbstractPoolManager {
+/**
+ * This pool is used to execute all query task send from client, and return TSExecuteStatementResp.
+ * Thread named by Query.
+ *
+ * <p>Execute QueryTask() in TSServiceImpl
+ */
+public class QueryTaskManager extends AbstractPoolManager {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(QueryTaskPoolManager.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(QueryTaskManager.class);
 
-  private QueryTaskPoolManager() {
+  private QueryTaskManager() {
     int threadCnt =
         Math.min(
             Runtime.getRuntime().availableProcessors(),
@@ -39,8 +45,8 @@ public class QueryTaskPoolManager extends AbstractPoolManager {
     pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.QUERY_SERVICE.getName());
   }
 
-  public static QueryTaskPoolManager getInstance() {
-    return QueryTaskPoolManager.InstanceHolder.instance;
+  public static QueryTaskManager getInstance() {
+    return QueryTaskManager.InstanceHolder.instance;
   }
 
   @Override
@@ -79,6 +85,6 @@ public class QueryTaskPoolManager extends AbstractPoolManager {
       // allowed to do nothing
     }
 
-    private static QueryTaskPoolManager instance = new QueryTaskPoolManager();
+    private static QueryTaskManager instance = new QueryTaskManager();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/query/pool/RawQueryReadTaskPoolManager.java
similarity index 67%
rename from server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java
rename to server/src/main/java/org/apache/iotdb/db/query/pool/RawQueryReadTaskPoolManager.java
index 1dcda50..dc84652 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/pool/RawQueryReadTaskPoolManager.java
@@ -27,20 +27,27 @@ import org.apache.iotdb.db.engine.flush.pool.AbstractPoolManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class QueryTaskPoolManager extends AbstractPoolManager {
+/**
+ * This thread pool is used to read data for raw data query. Thread named by Sub_Raw_Query.
+ *
+ * <p>Execute ReadTask() in RawQueryReadTaskPoolManager
+ */
+public class RawQueryReadTaskPoolManager extends AbstractPoolManager {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(QueryTaskPoolManager.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(RawQueryReadTaskPoolManager.class);
 
-  private QueryTaskPoolManager() {
+  private RawQueryReadTaskPoolManager() {
     int threadCnt =
         Math.min(
             Runtime.getRuntime().availableProcessors(),
-            IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread());
-    pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.QUERY_SERVICE.getName());
+            IoTDBDescriptor.getInstance().getConfig().getConcurrentSubRawQueryThread());
+    pool =
+        IoTDBThreadPoolFactory.newFixedThreadPool(
+            threadCnt, ThreadName.SUB_RAW_QUERY_SERVICE.getName());
   }
 
-  public static QueryTaskPoolManager getInstance() {
-    return QueryTaskPoolManager.InstanceHolder.instance;
+  public static RawQueryReadTaskPoolManager getInstance() {
+    return RawQueryReadTaskPoolManager.InstanceHolder.instance;
   }
 
   @Override
@@ -50,7 +57,7 @@ public class QueryTaskPoolManager extends AbstractPoolManager {
 
   @Override
   public String getName() {
-    return "query task";
+    return "raw query read task";
   }
 
   @Override
@@ -59,9 +66,10 @@ public class QueryTaskPoolManager extends AbstractPoolManager {
       int threadCnt =
           Math.min(
               Runtime.getRuntime().availableProcessors(),
-              IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread());
+              IoTDBDescriptor.getInstance().getConfig().getConcurrentSubRawQueryThread());
       pool =
-          IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.QUERY_SERVICE.getName());
+          IoTDBThreadPoolFactory.newFixedThreadPool(
+              threadCnt, ThreadName.SUB_RAW_QUERY_SERVICE.getName());
     }
   }
 
@@ -79,6 +87,6 @@ public class QueryTaskPoolManager extends AbstractPoolManager {
       // allowed to do nothing
     }
 
-    private static QueryTaskPoolManager instance = new QueryTaskPoolManager();
+    private static RawQueryReadTaskPoolManager instance = new RawQueryReadTaskPoolManager();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 3f7c349..45988da 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -28,7 +28,11 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.cost.statistic.Measurement;
 import org.apache.iotdb.db.cost.statistic.Operation;
-import org.apache.iotdb.db.exception.*;
+import org.apache.iotdb.db.exception.BatchProcessException;
+import org.apache.iotdb.db.exception.IoTDBException;
+import org.apache.iotdb.db.exception.QueryInBatchStatementException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.StorageGroupNotReadyException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
@@ -76,6 +80,7 @@ import org.apache.iotdb.db.query.control.TracingManager;
 import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
 import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
 import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet;
+import org.apache.iotdb.db.query.pool.QueryTaskManager;
 import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
@@ -141,6 +146,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -207,6 +214,151 @@ public class TSServiceImpl implements TSIService.Iface {
         TimeUnit.MINUTES);
   }
 
+  /**
+   * Execute query statement, return TSExecuteStatementResp with dataset.
+   *
+   * @param plan must be a plan for Query: QueryPlan, ShowPlan, and some AuthorPlan
+   */
+  protected class QueryTask implements Callable<TSExecuteStatementResp> {
+
+    private PhysicalPlan plan;
+    private final String username;
+    private final String statement;
+    private final long statementId;
+    private final long timeout;
+    private final int fetchSize;
+    private final boolean enableRedirectQuery;
+
+    public QueryTask(
+        PhysicalPlan plan,
+        String username,
+        String statement,
+        long statementId,
+        long timeout,
+        int fetchSize,
+        boolean enableRedirectQuery) {
+      this.plan = plan;
+      this.username = username;
+      this.statement = statement;
+      this.statementId = statementId;
+      this.timeout = timeout;
+      this.fetchSize = fetchSize;
+      this.enableRedirectQuery = enableRedirectQuery;
+    }
+
+    @Override
+    public TSExecuteStatementResp call() throws Exception {
+      queryCount.incrementAndGet();
+      AUDIT_LOGGER.debug(
+          "Session {} execute Query: {}", sessionManager.getCurrSessionId(), statement);
+      long startTime = System.currentTimeMillis();
+      // generate the queryId for the operation
+      long queryId = sessionManager.requestQueryId(statementId, true);
+      try {
+        // register query info to queryTimeManager
+        if (!(plan instanceof ShowQueryProcesslistPlan)) {
+          queryTimeManager.registerQuery(queryId, startTime, statement, timeout);
+        }
+        if (plan instanceof QueryPlan && config.isEnablePerformanceTracing()) {
+          TracingManager tracingManager = TracingManager.getInstance();
+          if (!(plan instanceof AlignByDevicePlan)) {
+            tracingManager.writeQueryInfo(queryId, statement, startTime, plan.getPaths().size());
+          } else {
+            tracingManager.writeQueryInfo(queryId, statement, startTime);
+          }
+        }
+
+        if (plan instanceof AuthorPlan) {
+          plan.setLoginUserName(username);
+        }
+
+        TSExecuteStatementResp resp = null;
+        // execute it before createDataSet since it may change the content of query plan
+        if (plan instanceof QueryPlan && !(plan instanceof UDFPlan)) {
+          resp = getQueryColumnHeaders(plan, username);
+        }
+        if (plan instanceof QueryPlan) {
+          ((QueryPlan) plan).setEnableRedirect(enableRedirectQuery);
+        }
+        // create and cache dataset
+        QueryDataSet newDataSet = createQueryDataSet(queryId, plan, fetchSize);
+
+        if (newDataSet.getEndPoint() != null && enableRedirectQuery) {
+          LOGGER.debug(
+              "need to redirect {} {} to node {}", statement, queryId, newDataSet.getEndPoint());
+          QueryDataSet.EndPoint endPoint = newDataSet.getEndPoint();
+          return redirectQueryToAnotherNode(resp, queryId, endPoint.getIp(), endPoint.getPort());
+        }
+
+        if (plan instanceof ShowPlan || plan instanceof AuthorPlan) {
+          resp = getListDataSetHeaders(newDataSet);
+        } else if (plan instanceof UDFPlan) {
+          resp = getQueryColumnHeaders(plan, username);
+        }
+
+        resp.setOperationType(plan.getOperatorType().toString());
+        if (plan.getOperatorType() == OperatorType.AGGREGATION) {
+          resp.setIgnoreTimeStamp(true);
+        } else if (plan instanceof ShowQueryProcesslistPlan) {
+          resp.setIgnoreTimeStamp(false);
+        }
+
+        if (newDataSet instanceof DirectNonAlignDataSet) {
+          resp.setNonAlignQueryDataSet(fillRpcNonAlignReturnData(fetchSize, newDataSet, username));
+        } else {
+          try {
+            TSQueryDataSet tsQueryDataSet = fillRpcReturnData(fetchSize, newDataSet, username);
+            resp.setQueryDataSet(tsQueryDataSet);
+          } catch (RedirectException e) {
+            LOGGER.debug("need to redirect {} {} to {}", statement, queryId, e.getEndPoint());
+            if (enableRedirectQuery) {
+              EndPoint endPoint = e.getEndPoint();
+              redirectQueryToAnotherNode(resp, queryId, endPoint.ip, endPoint.port);
+            } else {
+              LOGGER.error(
+                  "execute {} error, if session does not support redirect,"
+                      + " should not throw redirection exception.",
+                  statement,
+                  e);
+            }
+          }
+        }
+        resp.setQueryId(queryId);
+
+        if (plan instanceof AlignByDevicePlan && config.isEnablePerformanceTracing()) {
+          TracingManager.getInstance()
+              .writePathsNum(queryId, ((AlignByDeviceDataSet) newDataSet).getPathsNum());
+        }
+
+        if (enableMetric) {
+          long endTime = System.currentTimeMillis();
+          SqlArgument sqlArgument = new SqlArgument(resp, plan, statement, startTime, endTime);
+          synchronized (sqlArgumentList) {
+            sqlArgumentList.add(sqlArgument);
+            if (sqlArgumentList.size() >= MAX_SIZE) {
+              sqlArgumentList.subList(0, DELETE_SIZE).clear();
+            }
+          }
+        }
+
+        // remove query info in QueryTimeManager
+        if (!(plan instanceof ShowQueryProcesslistPlan)) {
+          queryTimeManager.unRegisterQuery(queryId);
+        }
+        return resp;
+      } catch (Exception e) {
+        releaseQueryResourceNoExceptions(queryId);
+        throw e;
+      } finally {
+        Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+        long costTime = System.currentTimeMillis() - startTime;
+        if (costTime >= config.getSlowQueryThreshold()) {
+          SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
+        }
+      }
+    }
+  }
+
   public static List<SqlArgument> getSqlArgumentList() {
     return sqlArgumentList;
   }
@@ -599,16 +751,22 @@ public class TSServiceImpl implements TSIService.Iface {
           processor.parseSQLToPhysicalPlan(
               statement, sessionManager.getZoneId(req.getSessionId()), req.fetchSize);
 
-      return physicalPlan.isQuery()
-          ? internalExecuteQueryStatement(
-              statement,
-              req.statementId,
-              physicalPlan,
-              req.fetchSize,
-              req.timeout,
-              sessionManager.getUsername(req.getSessionId()),
-              req.isEnableRedirectQuery())
-          : executeUpdateStatement(physicalPlan, req.getSessionId());
+      if (physicalPlan.isQuery()) {
+        Future<TSExecuteStatementResp> resp =
+            QueryTaskManager.getInstance()
+                .submit(
+                    new QueryTask(
+                        physicalPlan,
+                        sessionManager.getUsername(req.sessionId),
+                        req.statement,
+                        req.statementId,
+                        req.timeout,
+                        req.fetchSize,
+                        req.enableRedirectQuery));
+        return resp.get();
+      } else {
+        return executeUpdateStatement(physicalPlan, req.getSessionId());
+      }
     } catch (InterruptedException e) {
       LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
       Thread.currentThread().interrupt();
@@ -632,17 +790,23 @@ public class TSServiceImpl implements TSIService.Iface {
           processor.parseSQLToPhysicalPlan(
               statement, sessionManager.getZoneId(req.sessionId), req.fetchSize);
 
-      return physicalPlan.isQuery()
-          ? internalExecuteQueryStatement(
-              statement,
-              req.statementId,
-              physicalPlan,
-              req.fetchSize,
-              req.timeout,
-              sessionManager.getUsername(req.getSessionId()),
-              req.isEnableRedirectQuery())
-          : RpcUtils.getTSExecuteStatementResp(
-              TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
+      if (physicalPlan.isQuery()) {
+        Future<TSExecuteStatementResp> resp =
+            QueryTaskManager.getInstance()
+                .submit(
+                    new QueryTask(
+                        physicalPlan,
+                        sessionManager.getUsername(req.sessionId),
+                        req.statement,
+                        req.statementId,
+                        req.timeout,
+                        req.fetchSize,
+                        req.enableRedirectQuery));
+        return resp.get();
+      } else {
+        return RpcUtils.getTSExecuteStatementResp(
+            TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
+      }
     } catch (InterruptedException e) {
       LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
       Thread.currentThread().interrupt();
@@ -663,17 +827,23 @@ public class TSServiceImpl implements TSIService.Iface {
 
       PhysicalPlan physicalPlan =
           processor.rawDataQueryReqToPhysicalPlan(req, sessionManager.getZoneId(req.sessionId));
-      return physicalPlan.isQuery()
-          ? internalExecuteQueryStatement(
-              "",
-              req.statementId,
-              physicalPlan,
-              req.fetchSize,
-              config.getQueryTimeoutThreshold(),
-              sessionManager.getUsername(req.sessionId),
-              req.isEnableRedirectQuery())
-          : RpcUtils.getTSExecuteStatementResp(
-              TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
+      if (physicalPlan.isQuery()) {
+        Future<TSExecuteStatementResp> resp =
+            QueryTaskManager.getInstance()
+                .submit(
+                    new QueryTask(
+                        physicalPlan,
+                        sessionManager.getUsername(req.sessionId),
+                        "",
+                        req.statementId,
+                        config.getQueryTimeoutThreshold(),
+                        req.fetchSize,
+                        req.enableRedirectQuery));
+        return resp.get();
+      } else {
+        return RpcUtils.getTSExecuteStatementResp(
+            TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
+      }
     } catch (InterruptedException e) {
       LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
       Thread.currentThread().interrupt();
@@ -685,142 +855,15 @@ public class TSServiceImpl implements TSIService.Iface {
     }
   }
 
-  /**
-   * @param plan must be a plan for Query: FillQueryPlan, AggregationPlan, GroupByTimePlan, UDFPlan,
-   *     some AuthorPlan
-   */
-  @SuppressWarnings({"squid:S3776", "squid:S1141"}) // Suppress high Cognitive Complexity warning
-  private TSExecuteStatementResp internalExecuteQueryStatement(
-      String statement,
-      long statementId,
-      PhysicalPlan plan,
-      int fetchSize,
-      long timeout,
-      String username,
-      boolean enableRedirect)
-      throws QueryProcessException, SQLException, StorageEngineException,
-          QueryFilterOptimizationException, MetadataException, IOException, InterruptedException,
-          TException, AuthException {
-    queryCount.incrementAndGet();
-    AUDIT_LOGGER.debug(
-        "Session {} execute Query: {}", sessionManager.getCurrSessionId(), statement);
-    long startTime = System.currentTimeMillis();
-    long queryId = -1;
-    try {
-      // generate the queryId for the operation
-      queryId = sessionManager.requestQueryId(statementId, true);
-      // register query info to queryTimeManager
-      if (!(plan instanceof ShowQueryProcesslistPlan)) {
-        queryTimeManager.registerQuery(queryId, startTime, statement, timeout);
-      }
-      if (plan instanceof QueryPlan && config.isEnablePerformanceTracing()) {
-        TracingManager tracingManager = TracingManager.getInstance();
-        if (!(plan instanceof AlignByDevicePlan)) {
-          tracingManager.writeQueryInfo(queryId, statement, startTime, plan.getPaths().size());
-        } else {
-          tracingManager.writeQueryInfo(queryId, statement, startTime);
-        }
-      }
-
-      if (plan instanceof AuthorPlan) {
-        plan.setLoginUserName(username);
-      }
-
-      TSExecuteStatementResp resp = null;
-      // execute it before createDataSet since it may change the content of query plan
-      if (plan instanceof QueryPlan && !(plan instanceof UDFPlan)) {
-        resp = getQueryColumnHeaders(plan, username);
-      }
-      if (plan instanceof QueryPlan) {
-        ((QueryPlan) plan).setEnableRedirect(enableRedirect);
-      }
-      // create and cache dataset
-      QueryDataSet newDataSet = createQueryDataSet(queryId, plan, fetchSize);
-
-      if (newDataSet.getEndPoint() != null && enableRedirect) {
-        // redirect query
-        LOGGER.debug(
-            "need to redirect {} {} to node {}", statement, queryId, newDataSet.getEndPoint());
-        TSStatus status = new TSStatus();
-        status.setRedirectNode(
-            new EndPoint(newDataSet.getEndPoint().getIp(), newDataSet.getEndPoint().getPort()));
-        status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
-        resp.setStatus(status);
-        resp.setQueryId(queryId);
-        return resp;
-      }
-
-      if (plan instanceof ShowPlan || plan instanceof AuthorPlan) {
-        resp = getListDataSetHeaders(newDataSet);
-      } else if (plan instanceof UDFPlan) {
-        resp = getQueryColumnHeaders(plan, username);
-      }
-
-      resp.setOperationType(plan.getOperatorType().toString());
-      if (plan.getOperatorType() == OperatorType.AGGREGATION) {
-        resp.setIgnoreTimeStamp(true);
-      } else if (plan instanceof ShowQueryProcesslistPlan) {
-        resp.setIgnoreTimeStamp(false);
-      }
-
-      if (newDataSet instanceof DirectNonAlignDataSet) {
-        resp.setNonAlignQueryDataSet(fillRpcNonAlignReturnData(fetchSize, newDataSet, username));
-      } else {
-        try {
-          TSQueryDataSet tsQueryDataSet = fillRpcReturnData(fetchSize, newDataSet, username);
-          resp.setQueryDataSet(tsQueryDataSet);
-        } catch (RedirectException e) {
-          LOGGER.debug("need to redirect {} {} to {}", statement, queryId, e.getEndPoint());
-          if (enableRedirect) {
-            // redirect query
-            TSStatus status = new TSStatus();
-            status.setRedirectNode(e.getEndPoint());
-            status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
-            resp.setStatus(status);
-            resp.setQueryId(queryId);
-            return resp;
-          } else {
-            LOGGER.error(
-                "execute {} error, if session does not support redirect,"
-                    + " should not throw redirection exception.",
-                statement,
-                e);
-          }
-        }
-      }
-      resp.setQueryId(queryId);
-
-      if (plan instanceof AlignByDevicePlan && config.isEnablePerformanceTracing()) {
-        TracingManager.getInstance()
-            .writePathsNum(queryId, ((AlignByDeviceDataSet) newDataSet).getPathsNum());
-      }
-
-      if (enableMetric) {
-        long endTime = System.currentTimeMillis();
-        SqlArgument sqlArgument = new SqlArgument(resp, plan, statement, startTime, endTime);
-        synchronized (sqlArgumentList) {
-          sqlArgumentList.add(sqlArgument);
-          if (sqlArgumentList.size() >= MAX_SIZE) {
-            sqlArgumentList.subList(0, DELETE_SIZE).clear();
-          }
-        }
-      }
-
-      // remove query info in QueryTimeManager
-      if (!(plan instanceof ShowQueryProcesslistPlan)) {
-        queryTimeManager.unRegisterQuery(queryId);
-      }
-      return resp;
-    } catch (Exception e) {
-      releaseQueryResourceNoExceptions(queryId);
-      throw e;
-    } finally {
-      Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, startTime);
-      long costTime = System.currentTimeMillis() - startTime;
-      if (costTime >= config.getSlowQueryThreshold()) {
-        SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
-      }
-    }
+  /** Redirect query */
+  private TSExecuteStatementResp redirectQueryToAnotherNode(
+      TSExecuteStatementResp resp, long queryId, String ip, int port) {
+    TSStatus status = new TSStatus();
+    status.setRedirectNode(new EndPoint(ip, port));
+    status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
+    resp.setStatus(status);
+    resp.setQueryId(queryId);
+    return resp;
   }
 
   private TSExecuteStatementResp getListDataSetHeaders(QueryDataSet dataSet) {