You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/01/03 01:10:05 UTC

[iotdb] branch master updated: [IOTDB-2195] Control the concurrent query execution thread - Part 2 (#4664)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8759fa4  [IOTDB-2195] Control the concurrent query execution thread - Part 2 (#4664)
8759fa4 is described below

commit 8759fa419a8072ebad1a51c02880dd284580b33c
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Mon Jan 3 09:09:18 2022 +0800

    [IOTDB-2195] Control the concurrent query execution thread - Part 2 (#4664)
---
 .../resources/conf/iotdb-engine.properties         |  12 +-
 .../org/apache/iotdb/db/concurrent/ThreadName.java |   1 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  30 ++-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  16 ++
 .../db/query/dataset/NonAlignEngineDataSet.java    |   4 +-
 .../dataset/RawQueryDataSetWithoutValueFilter.java |   9 +-
 ...yTaskPoolManager.java => QueryTaskManager.java} |  46 +---
 ...nager.java => RawQueryReadTaskPoolManager.java} |  34 ++-
 .../db/service/thrift/impl/TSServiceImpl.java      | 272 ++++++++++++---------
 9 files changed, 256 insertions(+), 168 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 9d3ad92..13becfd 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -271,9 +271,17 @@ timestamp_precision=ms
 # Datatype: int
 # concurrent_flush_thread=0
 
-# How many threads can concurrently query. When <= 0, use CPU core number.
+# How many threads can concurrently execute query statement. When <= 0, use CPU core number.
 # Datatype: int
-# concurrent_query_thread=8
+# concurrent_query_thread=16
+
+# How many threads can concurrently read data for raw data query. When <= 0, use CPU core number.
+# Datatype: int
+# concurrent_sub_rawQuery_thread=8
+
+# Blocking queue size for read task in raw data query. Must >= 1.
+# Datatype: int
+# raw_query_blocking_queue_capacity=5
 
 # whether take over the memory management by IoTDB rather than JVM when serializing memtable as bytes in memory
 # (i.e., whether use ChunkBufferPool), value true, false
diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
index 54a04b9..bd4b149 100644
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
@@ -46,6 +46,7 @@ public enum ThreadName {
   LOAD_TSFILE("Load-TsFile"),
   TIME_COST_STATISTIC("TIME_COST_STATISTIC"),
   QUERY_SERVICE("Query"),
+  SUB_RAW_QUERY_SERVICE("Sub_RawQuery"),
   INSERTION_SERVICE("MultithreadingInsertionPool"),
   WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager"),
   CONTINUOUS_QUERY_SERVICE("ContinuousQueryTaskPoolManager"),
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 5e9ef95..4842bb2 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -251,8 +251,16 @@ public class IoTDBConfig {
   /** How many threads can concurrently flush. When <= 0, use CPU core number. */
   private int concurrentFlushThread = Runtime.getRuntime().availableProcessors();
 
-  /** How many threads can concurrently query. When <= 0, use CPU core number. */
-  private int concurrentQueryThread = 8;
+  /** How many threads can concurrently execute query statement. When <= 0, use CPU core number. */
+  private int concurrentQueryThread = 16;
+
+  /**
+   * How many threads can concurrently read data for raw data query. When <= 0, use CPU core number.
+   */
+  private int concurrentSubRawQueryThread = 8;
+
+  /** Blocking queue size for read task in raw data query. */
+  private int rawQueryBlockingQueueCapacity = 5;
 
   /** How many threads can concurrently evaluate windows. When <= 0, use CPU core number. */
   private int concurrentWindowEvaluationThread = Runtime.getRuntime().availableProcessors();
@@ -1187,10 +1195,26 @@ public class IoTDBConfig {
     return concurrentQueryThread;
   }
 
-  void setConcurrentQueryThread(int concurrentQueryThread) {
+  public void setConcurrentQueryThread(int concurrentQueryThread) {
     this.concurrentQueryThread = concurrentQueryThread;
   }
 
+  public int getConcurrentSubRawQueryThread() {
+    return concurrentSubRawQueryThread;
+  }
+
+  void setConcurrentSubRawQueryThread(int concurrentSubRawQueryThread) {
+    this.concurrentSubRawQueryThread = concurrentSubRawQueryThread;
+  }
+
+  public int getRawQueryBlockingQueueCapacity() {
+    return rawQueryBlockingQueueCapacity;
+  }
+
+  public void setRawQueryBlockingQueueCapacity(int rawQueryBlockingQueueCapacity) {
+    this.rawQueryBlockingQueueCapacity = rawQueryBlockingQueueCapacity;
+  }
+
   public int getConcurrentWindowEvaluationThread() {
     return concurrentWindowEvaluationThread;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index cc878ad..849fddb 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -445,6 +445,22 @@ public class IoTDBDescriptor {
         conf.setConcurrentQueryThread(Runtime.getRuntime().availableProcessors());
       }
 
+      conf.setConcurrentSubRawQueryThread(
+          Integer.parseInt(
+              properties.getProperty(
+                  "concurrent_sub_rawQuery_thread",
+                  Integer.toString(conf.getConcurrentSubRawQueryThread()))));
+
+      if (conf.getConcurrentSubRawQueryThread() <= 0) {
+        conf.setConcurrentSubRawQueryThread(Runtime.getRuntime().availableProcessors());
+      }
+
+      conf.setRawQueryBlockingQueueCapacity(
+          Integer.parseInt(
+              properties.getProperty(
+                  "raw_query_blocking_queue_capacity",
+                  Integer.toString(conf.getRawQueryBlockingQueueCapacity()))));
+
       conf.setmManagerCacheSize(
           Integer.parseInt(
               properties
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
index 2c46dd9..933ce8b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.query.dataset;
 import org.apache.iotdb.db.concurrent.WrappedRunnable;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.query.control.QueryTimeManager;
-import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
+import org.apache.iotdb.db.query.pool.RawQueryReadTaskPoolManager;
 import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
@@ -239,7 +239,7 @@ public class NonAlignEngineDataSet extends QueryDataSet implements DirectNonAlig
   /** flag that main thread is interrupted or not */
   private volatile boolean interrupted = false;
 
-  private static final QueryTaskPoolManager pool = QueryTaskPoolManager.getInstance();
+  private static final RawQueryReadTaskPoolManager pool = RawQueryReadTaskPoolManager.getInstance();
 
   private static final Logger LOGGER = LoggerFactory.getLogger(NonAlignEngineDataSet.class);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
index ef44714..b4f37aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
@@ -20,11 +20,12 @@
 package org.apache.iotdb.db.query.dataset;
 
 import org.apache.iotdb.db.concurrent.WrappedRunnable;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.query.control.QueryTimeManager;
-import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
+import org.apache.iotdb.db.query.pool.RawQueryReadTaskPoolManager;
 import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
@@ -174,11 +175,13 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
   private int bufferNum;
 
   // capacity for blocking queue
-  private static final int BLOCKING_QUEUE_CAPACITY = 5;
+  private static final int BLOCKING_QUEUE_CAPACITY =
+      IoTDBDescriptor.getInstance().getConfig().getRawQueryBlockingQueueCapacity();
 
   private final long queryId;
 
-  private static final QueryTaskPoolManager TASK_POOL_MANAGER = QueryTaskPoolManager.getInstance();
+  private static final RawQueryReadTaskPoolManager TASK_POOL_MANAGER =
+      RawQueryReadTaskPoolManager.getInstance();
 
   private static final Logger LOGGER =
       LoggerFactory.getLogger(RawQueryDataSetWithoutValueFilter.class);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskManager.java
similarity index 60%
copy from server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java
copy to server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskManager.java
index 63b04de..f251e28 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskManager.java
@@ -23,52 +23,30 @@ import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.rescon.AbstractPoolManager;
-import org.apache.iotdb.db.service.metrics.Metric;
-import org.apache.iotdb.db.service.metrics.MetricsService;
-import org.apache.iotdb.db.service.metrics.Tag;
-import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.ThreadPoolExecutor;
-
-public class QueryTaskPoolManager extends AbstractPoolManager {
+/**
+ * This pool is used to execute all query task send from client, and return TSExecuteStatementResp.
+ * Thread named by Query.
+ *
+ * <p>Execute QueryTask() in TSServiceImpl
+ */
+public class QueryTaskManager extends AbstractPoolManager {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(QueryTaskPoolManager.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(QueryTaskManager.class);
 
-  private QueryTaskPoolManager() {
+  private QueryTaskManager() {
     int threadCnt =
         Math.min(
             Runtime.getRuntime().availableProcessors(),
             IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread());
     pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.QUERY_SERVICE.getName());
-    if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric()) {
-      MetricsService.getInstance()
-          .getMetricManager()
-          .getOrCreateAutoGauge(
-              Metric.QUEUE.toString(),
-              pool,
-              p -> ((ThreadPoolExecutor) p).getActiveCount(),
-              Tag.NAME.toString(),
-              ThreadName.QUERY_SERVICE.getName(),
-              Tag.STATUS.toString(),
-              "running");
-      MetricsService.getInstance()
-          .getMetricManager()
-          .getOrCreateAutoGauge(
-              Metric.QUEUE.toString(),
-              pool,
-              p -> ((ThreadPoolExecutor) p).getQueue().size(),
-              Tag.NAME.toString(),
-              ThreadName.QUERY_SERVICE.getName(),
-              Tag.STATUS.toString(),
-              "waiting");
-    }
   }
 
-  public static QueryTaskPoolManager getInstance() {
-    return QueryTaskPoolManager.InstanceHolder.instance;
+  public static QueryTaskManager getInstance() {
+    return QueryTaskManager.InstanceHolder.instance;
   }
 
   @Override
@@ -107,6 +85,6 @@ public class QueryTaskPoolManager extends AbstractPoolManager {
       // allowed to do nothing
     }
 
-    private static QueryTaskPoolManager instance = new QueryTaskPoolManager();
+    private static QueryTaskManager instance = new QueryTaskManager();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/query/pool/RawQueryReadTaskPoolManager.java
similarity index 73%
rename from server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java
rename to server/src/main/java/org/apache/iotdb/db/query/pool/RawQueryReadTaskPoolManager.java
index 63b04de..20a9044 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/pool/RawQueryReadTaskPoolManager.java
@@ -33,16 +33,23 @@ import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.ThreadPoolExecutor;
 
-public class QueryTaskPoolManager extends AbstractPoolManager {
+/**
+ * This thread pool is used to read data for raw data query. Thread named by Sub_Raw_Query.
+ *
+ * <p>Execute ReadTask() in RawQueryReadTaskPoolManager
+ */
+public class RawQueryReadTaskPoolManager extends AbstractPoolManager {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(QueryTaskPoolManager.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(RawQueryReadTaskPoolManager.class);
 
-  private QueryTaskPoolManager() {
+  private RawQueryReadTaskPoolManager() {
     int threadCnt =
         Math.min(
             Runtime.getRuntime().availableProcessors(),
-            IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread());
-    pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.QUERY_SERVICE.getName());
+            IoTDBDescriptor.getInstance().getConfig().getConcurrentSubRawQueryThread());
+    pool =
+        IoTDBThreadPoolFactory.newFixedThreadPool(
+            threadCnt, ThreadName.SUB_RAW_QUERY_SERVICE.getName());
     if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric()) {
       MetricsService.getInstance()
           .getMetricManager()
@@ -51,7 +58,7 @@ public class QueryTaskPoolManager extends AbstractPoolManager {
               pool,
               p -> ((ThreadPoolExecutor) p).getActiveCount(),
               Tag.NAME.toString(),
-              ThreadName.QUERY_SERVICE.getName(),
+              ThreadName.SUB_RAW_QUERY_SERVICE.getName(),
               Tag.STATUS.toString(),
               "running");
       MetricsService.getInstance()
@@ -61,14 +68,14 @@ public class QueryTaskPoolManager extends AbstractPoolManager {
               pool,
               p -> ((ThreadPoolExecutor) p).getQueue().size(),
               Tag.NAME.toString(),
-              ThreadName.QUERY_SERVICE.getName(),
+              ThreadName.SUB_RAW_QUERY_SERVICE.getName(),
               Tag.STATUS.toString(),
               "waiting");
     }
   }
 
-  public static QueryTaskPoolManager getInstance() {
-    return QueryTaskPoolManager.InstanceHolder.instance;
+  public static RawQueryReadTaskPoolManager getInstance() {
+    return RawQueryReadTaskPoolManager.InstanceHolder.instance;
   }
 
   @Override
@@ -78,7 +85,7 @@ public class QueryTaskPoolManager extends AbstractPoolManager {
 
   @Override
   public String getName() {
-    return "query task";
+    return "raw query read task";
   }
 
   @Override
@@ -87,9 +94,10 @@ public class QueryTaskPoolManager extends AbstractPoolManager {
       int threadCnt =
           Math.min(
               Runtime.getRuntime().availableProcessors(),
-              IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread());
+              IoTDBDescriptor.getInstance().getConfig().getConcurrentSubRawQueryThread());
       pool =
-          IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.QUERY_SERVICE.getName());
+          IoTDBThreadPoolFactory.newFixedThreadPool(
+              threadCnt, ThreadName.SUB_RAW_QUERY_SERVICE.getName());
     }
   }
 
@@ -107,6 +115,6 @@ public class QueryTaskPoolManager extends AbstractPoolManager {
       // allowed to do nothing
     }
 
-    private static QueryTaskPoolManager instance = new QueryTaskPoolManager();
+    private static RawQueryReadTaskPoolManager instance = new RawQueryReadTaskPoolManager();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index a6880a7..0d06348 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -62,6 +62,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.tracing.TracingConstant;
 import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
 import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet;
+import org.apache.iotdb.db.query.pool.QueryTaskManager;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.service.StaticResps;
 import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
@@ -137,6 +138,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
@@ -148,6 +151,81 @@ import static org.apache.iotdb.db.utils.ErrorHandlingUtils.tryCatchQueryExceptio
 /** Thrift RPC implementation at server side. */
 public class TSServiceImpl extends BasicServiceProvider implements TSIService.Iface {
 
+  protected class QueryTask implements Callable<TSExecuteStatementResp> {
+
+    private PhysicalPlan plan;
+    private final long queryStartTime;
+    private final long sessionId;
+    private final String statement;
+    private final long statementId;
+    private final long timeout;
+    private final int fetchSize;
+    private final boolean isJdbcQuery;
+    private final boolean enableRedirectQuery;
+
+    /**
+     * Execute query statement, return TSExecuteStatementResp with dataset.
+     *
+     * @param plan must be a plan for Query: QueryPlan, ShowPlan, and some AuthorPlan
+     */
+    public QueryTask(
+        PhysicalPlan plan,
+        long queryStartTime,
+        long sessionId,
+        String statement,
+        long statementId,
+        long timeout,
+        int fetchSize,
+        boolean isJdbcQuery,
+        boolean enableRedirectQuery) {
+      this.plan = plan;
+      this.queryStartTime = queryStartTime;
+      this.sessionId = sessionId;
+      this.statement = statement;
+      this.statementId = statementId;
+      this.timeout = timeout;
+      this.fetchSize = fetchSize;
+      this.isJdbcQuery = isJdbcQuery;
+      this.enableRedirectQuery = enableRedirectQuery;
+    }
+
+    @Override
+    public TSExecuteStatementResp call() throws Exception {
+      String username = sessionManager.getUsername(sessionId);
+      plan.setLoginUserName(username);
+
+      queryFrequencyRecorder.incrementAndGet();
+      AUDIT_LOGGER.debug("Session {} execute Query: {}", sessionId, statement);
+
+      final long queryId = sessionManager.requestQueryId(statementId, true);
+      QueryContext context =
+          genQueryContext(queryId, plan.isDebug(), queryStartTime, statement, timeout);
+
+      TSExecuteStatementResp resp;
+      try {
+        if (plan instanceof QueryPlan) {
+          QueryPlan queryPlan = (QueryPlan) plan;
+          queryPlan.setEnableRedirect(enableRedirectQuery);
+          resp = executeQueryPlan(queryPlan, context, isJdbcQuery, fetchSize, username);
+        } else {
+          resp = executeShowOrAuthorPlan(plan, context, fetchSize, username);
+        }
+        resp.setQueryId(queryId);
+        resp.setOperationType(plan.getOperatorType().toString());
+      } catch (Exception e) {
+        sessionManager.releaseQueryResourceNoExceptions(queryId);
+        throw e;
+      } finally {
+        addOperationLatency(Operation.EXECUTE_QUERY, queryStartTime);
+        long costTime = System.currentTimeMillis() - queryStartTime;
+        if (costTime >= CONFIG.getSlowQueryThreshold()) {
+          SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
+        }
+      }
+      return resp;
+    }
+  }
+
   // main logger
   private static final Logger LOGGER = LoggerFactory.getLogger(TSServiceImpl.class);
 
@@ -455,24 +533,30 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
       PhysicalPlan physicalPlan =
           processor.parseSQLToPhysicalPlan(statement, sessionManager.getZoneId(req.getSessionId()));
 
-      return physicalPlan.isQuery()
-          ? internalExecuteQueryStatement(
-              statement,
-              req.statementId,
-              physicalPlan,
-              req.fetchSize,
-              startTime,
-              req.timeout,
-              req.getSessionId(),
-              req.isEnableRedirectQuery(),
-              req.isJdbcQuery())
-          : executeUpdateStatement(
-              statement,
-              req.statementId,
-              physicalPlan,
-              req.fetchSize,
-              req.timeout,
-              req.getSessionId());
+      if (physicalPlan.isQuery()) {
+        Future<TSExecuteStatementResp> resp =
+            QueryTaskManager.getInstance()
+                .submit(
+                    new QueryTask(
+                        physicalPlan,
+                        startTime,
+                        req.sessionId,
+                        req.statement,
+                        req.statementId,
+                        req.timeout,
+                        req.fetchSize,
+                        req.jdbcQuery,
+                        req.enableRedirectQuery));
+        return resp.get();
+      } else {
+        return executeUpdateStatement(
+            statement,
+            req.statementId,
+            physicalPlan,
+            req.fetchSize,
+            req.timeout,
+            req.getSessionId());
+      }
     } catch (InterruptedException e) {
       LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
       Thread.currentThread().interrupt();
@@ -496,19 +580,25 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
       PhysicalPlan physicalPlan =
           processor.parseSQLToPhysicalPlan(statement, sessionManager.getZoneId(req.sessionId));
 
-      return physicalPlan.isQuery()
-          ? internalExecuteQueryStatement(
-              statement,
-              req.statementId,
-              physicalPlan,
-              req.fetchSize,
-              startTime,
-              req.timeout,
-              req.getSessionId(),
-              req.isEnableRedirectQuery(),
-              req.isJdbcQuery())
-          : RpcUtils.getTSExecuteStatementResp(
-              TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
+      if (physicalPlan.isQuery()) {
+        Future<TSExecuteStatementResp> resp =
+            QueryTaskManager.getInstance()
+                .submit(
+                    new QueryTask(
+                        physicalPlan,
+                        startTime,
+                        req.sessionId,
+                        req.statement,
+                        req.statementId,
+                        req.timeout,
+                        req.fetchSize,
+                        req.jdbcQuery,
+                        req.enableRedirectQuery));
+        return resp.get();
+      } else {
+        return RpcUtils.getTSExecuteStatementResp(
+            TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
+      }
     } catch (InterruptedException e) {
       LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
       Thread.currentThread().interrupt();
@@ -532,19 +622,26 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
       long startTime = System.currentTimeMillis();
       PhysicalPlan physicalPlan =
           processor.rawDataQueryReqToPhysicalPlan(req, sessionManager.getZoneId(req.sessionId));
-      return physicalPlan.isQuery()
-          ? internalExecuteQueryStatement(
-              "",
-              req.statementId,
-              physicalPlan,
-              req.fetchSize,
-              startTime,
-              CONFIG.getQueryTimeoutThreshold(),
-              req.sessionId,
-              req.isEnableRedirectQuery(),
-              req.isJdbcQuery())
-          : RpcUtils.getTSExecuteStatementResp(
-              TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
+
+      if (physicalPlan.isQuery()) {
+        Future<TSExecuteStatementResp> resp =
+            QueryTaskManager.getInstance()
+                .submit(
+                    new QueryTask(
+                        physicalPlan,
+                        startTime,
+                        req.sessionId,
+                        "",
+                        req.statementId,
+                        CONFIG.getQueryTimeoutThreshold(),
+                        req.fetchSize,
+                        req.isJdbcQuery(),
+                        req.enableRedirectQuery));
+        return resp.get();
+      } else {
+        return RpcUtils.getTSExecuteStatementResp(
+            TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
+      }
     } catch (InterruptedException e) {
       LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
       Thread.currentThread().interrupt();
@@ -566,19 +663,26 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
       long startTime = System.currentTimeMillis();
       PhysicalPlan physicalPlan =
           processor.lastDataQueryReqToPhysicalPlan(req, sessionManager.getZoneId(req.sessionId));
-      return physicalPlan.isQuery()
-          ? internalExecuteQueryStatement(
-              "",
-              req.statementId,
-              physicalPlan,
-              req.fetchSize,
-              startTime,
-              CONFIG.getQueryTimeoutThreshold(),
-              req.sessionId,
-              req.isEnableRedirectQuery(),
-              req.isJdbcQuery())
-          : RpcUtils.getTSExecuteStatementResp(
-              TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
+
+      if (physicalPlan.isQuery()) {
+        Future<TSExecuteStatementResp> resp =
+            QueryTaskManager.getInstance()
+                .submit(
+                    new QueryTask(
+                        physicalPlan,
+                        startTime,
+                        req.sessionId,
+                        "",
+                        req.statementId,
+                        CONFIG.getQueryTimeoutThreshold(),
+                        req.fetchSize,
+                        req.isJdbcQuery(),
+                        req.enableRedirectQuery));
+        return resp.get();
+      } else {
+        return RpcUtils.getTSExecuteStatementResp(
+            TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
+      }
     } catch (InterruptedException e) {
       LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
       Thread.currentThread().interrupt();
@@ -590,60 +694,6 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
     }
   }
 
-  /**
-   * Execute query statement, return TSExecuteStatementResp with dataset.
-   *
-   * @param plan must be a plan for Query: QueryPlan, ShowPlan, and some AuthorPlan
-   */
-  @SuppressWarnings({"squid:S3776", "squid:S1141"}) // Suppress high Cognitive Complexity warning
-  private TSExecuteStatementResp internalExecuteQueryStatement(
-      String statement,
-      long statementId,
-      PhysicalPlan plan,
-      int fetchSize,
-      long queryStartTime,
-      long timeout,
-      long sessionId,
-      boolean enableRedirect,
-      boolean isJdbcQuery)
-      throws QueryProcessException, SQLException, StorageEngineException,
-          QueryFilterOptimizationException, MetadataException, IOException, InterruptedException,
-          TException, AuthException {
-    String username = sessionManager.getUsername(sessionId);
-    plan.setLoginUserName(username);
-
-    queryFrequencyRecorder.incrementAndGet();
-    AUDIT_LOGGER.debug(
-        "Session {} execute Query: {}", sessionManager.getCurrSessionId(), statement);
-
-    final long queryId = sessionManager.requestQueryId(statementId, true);
-    QueryContext context =
-        genQueryContext(queryId, plan.isDebug(), queryStartTime, statement, timeout);
-
-    TSExecuteStatementResp resp;
-    try {
-      if (plan instanceof QueryPlan) {
-        QueryPlan queryPlan = (QueryPlan) plan;
-        queryPlan.setEnableRedirect(enableRedirect);
-        resp = executeQueryPlan(queryPlan, context, isJdbcQuery, fetchSize, username);
-      } else {
-        resp = executeShowOrAuthorPlan(plan, context, fetchSize, username);
-      }
-      resp.setQueryId(queryId);
-      resp.setOperationType(plan.getOperatorType().toString());
-    } catch (Exception e) {
-      sessionManager.releaseQueryResourceNoExceptions(queryId);
-      throw e;
-    } finally {
-      addOperationLatency(Operation.EXECUTE_QUERY, queryStartTime);
-      long costTime = System.currentTimeMillis() - queryStartTime;
-      if (costTime >= CONFIG.getSlowQueryThreshold()) {
-        SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
-      }
-    }
-    return resp;
-  }
-
   private TSExecuteStatementResp executeQueryPlan(
       QueryPlan plan, QueryContext context, boolean isJdbcQuery, int fetchSize, String username)
       throws TException, MetadataException, QueryProcessException, StorageEngineException,