You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/01/03 01:10:05 UTC
[iotdb] branch master updated: [IOTDB-2195] Control the concurrent query execution thread - Part 2 (#4664)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8759fa4 [IOTDB-2195] Control the concurrent query execution thread - Part 2 (#4664)
8759fa4 is described below
commit 8759fa419a8072ebad1a51c02880dd284580b33c
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Mon Jan 3 09:09:18 2022 +0800
[IOTDB-2195] Control the concurrent query execution thread - Part 2 (#4664)
---
.../resources/conf/iotdb-engine.properties | 12 +-
.../org/apache/iotdb/db/concurrent/ThreadName.java | 1 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 30 ++-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 16 ++
.../db/query/dataset/NonAlignEngineDataSet.java | 4 +-
.../dataset/RawQueryDataSetWithoutValueFilter.java | 9 +-
...yTaskPoolManager.java => QueryTaskManager.java} | 46 +---
...nager.java => RawQueryReadTaskPoolManager.java} | 34 ++-
.../db/service/thrift/impl/TSServiceImpl.java | 272 ++++++++++++---------
9 files changed, 256 insertions(+), 168 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 9d3ad92..13becfd 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -271,9 +271,17 @@ timestamp_precision=ms
# Datatype: int
# concurrent_flush_thread=0
-# How many threads can concurrently query. When <= 0, use CPU core number.
+# How many threads can concurrently execute query statement. When <= 0, use CPU core number.
# Datatype: int
-# concurrent_query_thread=8
+# concurrent_query_thread=16
+
+# How many threads can concurrently read data for raw data query. When <= 0, use CPU core number.
+# Datatype: int
+# concurrent_sub_rawQuery_thread=8
+
+# Blocking queue size for read task in raw data query. Must >= 1.
+# Datatype: int
+# raw_query_blocking_queue_capacity=5
# whether take over the memory management by IoTDB rather than JVM when serializing memtable as bytes in memory
# (i.e., whether use ChunkBufferPool), value true, false
diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
index 54a04b9..bd4b149 100644
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
@@ -46,6 +46,7 @@ public enum ThreadName {
LOAD_TSFILE("Load-TsFile"),
TIME_COST_STATISTIC("TIME_COST_STATISTIC"),
QUERY_SERVICE("Query"),
+ SUB_RAW_QUERY_SERVICE("Sub_RawQuery"),
INSERTION_SERVICE("MultithreadingInsertionPool"),
WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager"),
CONTINUOUS_QUERY_SERVICE("ContinuousQueryTaskPoolManager"),
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 5e9ef95..4842bb2 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -251,8 +251,16 @@ public class IoTDBConfig {
/** How many threads can concurrently flush. When <= 0, use CPU core number. */
private int concurrentFlushThread = Runtime.getRuntime().availableProcessors();
- /** How many threads can concurrently query. When <= 0, use CPU core number. */
- private int concurrentQueryThread = 8;
+ /** How many threads can concurrently execute query statement. When <= 0, use CPU core number. */
+ private int concurrentQueryThread = 16;
+
+ /**
+ * How many threads can concurrently read data for raw data query. When <= 0, use CPU core number.
+ */
+ private int concurrentSubRawQueryThread = 8;
+
+ /** Blocking queue size for read task in raw data query. */
+ private int rawQueryBlockingQueueCapacity = 5;
/** How many threads can concurrently evaluate windows. When <= 0, use CPU core number. */
private int concurrentWindowEvaluationThread = Runtime.getRuntime().availableProcessors();
@@ -1187,10 +1195,26 @@ public class IoTDBConfig {
return concurrentQueryThread;
}
- void setConcurrentQueryThread(int concurrentQueryThread) {
+ public void setConcurrentQueryThread(int concurrentQueryThread) {
this.concurrentQueryThread = concurrentQueryThread;
}
+ public int getConcurrentSubRawQueryThread() {
+ return concurrentSubRawQueryThread;
+ }
+
+ void setConcurrentSubRawQueryThread(int concurrentSubRawQueryThread) {
+ this.concurrentSubRawQueryThread = concurrentSubRawQueryThread;
+ }
+
+ public int getRawQueryBlockingQueueCapacity() {
+ return rawQueryBlockingQueueCapacity;
+ }
+
+ public void setRawQueryBlockingQueueCapacity(int rawQueryBlockingQueueCapacity) {
+ this.rawQueryBlockingQueueCapacity = rawQueryBlockingQueueCapacity;
+ }
+
public int getConcurrentWindowEvaluationThread() {
return concurrentWindowEvaluationThread;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index cc878ad..849fddb 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -445,6 +445,22 @@ public class IoTDBDescriptor {
conf.setConcurrentQueryThread(Runtime.getRuntime().availableProcessors());
}
+ conf.setConcurrentSubRawQueryThread(
+ Integer.parseInt(
+ properties.getProperty(
+ "concurrent_sub_rawQuery_thread",
+ Integer.toString(conf.getConcurrentSubRawQueryThread()))));
+
+ if (conf.getConcurrentSubRawQueryThread() <= 0) {
+ conf.setConcurrentSubRawQueryThread(Runtime.getRuntime().availableProcessors());
+ }
+
+ conf.setRawQueryBlockingQueueCapacity(
+ Integer.parseInt(
+ properties.getProperty(
+ "raw_query_blocking_queue_capacity",
+ Integer.toString(conf.getRawQueryBlockingQueueCapacity()))));
+
conf.setmManagerCacheSize(
Integer.parseInt(
properties
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
index 2c46dd9..933ce8b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.query.dataset;
import org.apache.iotdb.db.concurrent.WrappedRunnable;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.query.control.QueryTimeManager;
-import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
+import org.apache.iotdb.db.query.pool.RawQueryReadTaskPoolManager;
import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
@@ -239,7 +239,7 @@ public class NonAlignEngineDataSet extends QueryDataSet implements DirectNonAlig
/** flag that main thread is interrupted or not */
private volatile boolean interrupted = false;
- private static final QueryTaskPoolManager pool = QueryTaskPoolManager.getInstance();
+ private static final RawQueryReadTaskPoolManager pool = RawQueryReadTaskPoolManager.getInstance();
private static final Logger LOGGER = LoggerFactory.getLogger(NonAlignEngineDataSet.class);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
index ef44714..b4f37aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
@@ -20,11 +20,12 @@
package org.apache.iotdb.db.query.dataset;
import org.apache.iotdb.db.concurrent.WrappedRunnable;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.query.control.QueryTimeManager;
-import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
+import org.apache.iotdb.db.query.pool.RawQueryReadTaskPoolManager;
import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
import org.apache.iotdb.db.utils.datastructure.TimeSelector;
@@ -174,11 +175,13 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
private int bufferNum;
// capacity for blocking queue
- private static final int BLOCKING_QUEUE_CAPACITY = 5;
+ private static final int BLOCKING_QUEUE_CAPACITY =
+ IoTDBDescriptor.getInstance().getConfig().getRawQueryBlockingQueueCapacity();
private final long queryId;
- private static final QueryTaskPoolManager TASK_POOL_MANAGER = QueryTaskPoolManager.getInstance();
+ private static final RawQueryReadTaskPoolManager TASK_POOL_MANAGER =
+ RawQueryReadTaskPoolManager.getInstance();
private static final Logger LOGGER =
LoggerFactory.getLogger(RawQueryDataSetWithoutValueFilter.class);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskManager.java
similarity index 60%
copy from server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java
copy to server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskManager.java
index 63b04de..f251e28 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskManager.java
@@ -23,52 +23,30 @@ import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.rescon.AbstractPoolManager;
-import org.apache.iotdb.db.service.metrics.Metric;
-import org.apache.iotdb.db.service.metrics.MetricsService;
-import org.apache.iotdb.db.service.metrics.Tag;
-import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-
-public class QueryTaskPoolManager extends AbstractPoolManager {
+/**
+ * This pool is used to execute all query task send from client, and return TSExecuteStatementResp.
+ * Thread named by Query.
+ *
+ * <p>Execute QueryTask() in TSServiceImpl
+ */
+public class QueryTaskManager extends AbstractPoolManager {
- private static final Logger LOGGER = LoggerFactory.getLogger(QueryTaskPoolManager.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(QueryTaskManager.class);
- private QueryTaskPoolManager() {
+ private QueryTaskManager() {
int threadCnt =
Math.min(
Runtime.getRuntime().availableProcessors(),
IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread());
pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.QUERY_SERVICE.getName());
- if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric()) {
- MetricsService.getInstance()
- .getMetricManager()
- .getOrCreateAutoGauge(
- Metric.QUEUE.toString(),
- pool,
- p -> ((ThreadPoolExecutor) p).getActiveCount(),
- Tag.NAME.toString(),
- ThreadName.QUERY_SERVICE.getName(),
- Tag.STATUS.toString(),
- "running");
- MetricsService.getInstance()
- .getMetricManager()
- .getOrCreateAutoGauge(
- Metric.QUEUE.toString(),
- pool,
- p -> ((ThreadPoolExecutor) p).getQueue().size(),
- Tag.NAME.toString(),
- ThreadName.QUERY_SERVICE.getName(),
- Tag.STATUS.toString(),
- "waiting");
- }
}
- public static QueryTaskPoolManager getInstance() {
- return QueryTaskPoolManager.InstanceHolder.instance;
+ public static QueryTaskManager getInstance() {
+ return QueryTaskManager.InstanceHolder.instance;
}
@Override
@@ -107,6 +85,6 @@ public class QueryTaskPoolManager extends AbstractPoolManager {
// allowed to do nothing
}
- private static QueryTaskPoolManager instance = new QueryTaskPoolManager();
+ private static QueryTaskManager instance = new QueryTaskManager();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/query/pool/RawQueryReadTaskPoolManager.java
similarity index 73%
rename from server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java
rename to server/src/main/java/org/apache/iotdb/db/query/pool/RawQueryReadTaskPoolManager.java
index 63b04de..20a9044 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/pool/RawQueryReadTaskPoolManager.java
@@ -33,16 +33,23 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.ThreadPoolExecutor;
-public class QueryTaskPoolManager extends AbstractPoolManager {
+/**
+ * This thread pool is used to read data for raw data query. Thread named by Sub_Raw_Query.
+ *
+ * <p>Execute ReadTask() in RawQueryReadTaskPoolManager
+ */
+public class RawQueryReadTaskPoolManager extends AbstractPoolManager {
- private static final Logger LOGGER = LoggerFactory.getLogger(QueryTaskPoolManager.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(RawQueryReadTaskPoolManager.class);
- private QueryTaskPoolManager() {
+ private RawQueryReadTaskPoolManager() {
int threadCnt =
Math.min(
Runtime.getRuntime().availableProcessors(),
- IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread());
- pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.QUERY_SERVICE.getName());
+ IoTDBDescriptor.getInstance().getConfig().getConcurrentSubRawQueryThread());
+ pool =
+ IoTDBThreadPoolFactory.newFixedThreadPool(
+ threadCnt, ThreadName.SUB_RAW_QUERY_SERVICE.getName());
if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric()) {
MetricsService.getInstance()
.getMetricManager()
@@ -51,7 +58,7 @@ public class QueryTaskPoolManager extends AbstractPoolManager {
pool,
p -> ((ThreadPoolExecutor) p).getActiveCount(),
Tag.NAME.toString(),
- ThreadName.QUERY_SERVICE.getName(),
+ ThreadName.SUB_RAW_QUERY_SERVICE.getName(),
Tag.STATUS.toString(),
"running");
MetricsService.getInstance()
@@ -61,14 +68,14 @@ public class QueryTaskPoolManager extends AbstractPoolManager {
pool,
p -> ((ThreadPoolExecutor) p).getQueue().size(),
Tag.NAME.toString(),
- ThreadName.QUERY_SERVICE.getName(),
+ ThreadName.SUB_RAW_QUERY_SERVICE.getName(),
Tag.STATUS.toString(),
"waiting");
}
}
- public static QueryTaskPoolManager getInstance() {
- return QueryTaskPoolManager.InstanceHolder.instance;
+ public static RawQueryReadTaskPoolManager getInstance() {
+ return RawQueryReadTaskPoolManager.InstanceHolder.instance;
}
@Override
@@ -78,7 +85,7 @@ public class QueryTaskPoolManager extends AbstractPoolManager {
@Override
public String getName() {
- return "query task";
+ return "raw query read task";
}
@Override
@@ -87,9 +94,10 @@ public class QueryTaskPoolManager extends AbstractPoolManager {
int threadCnt =
Math.min(
Runtime.getRuntime().availableProcessors(),
- IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread());
+ IoTDBDescriptor.getInstance().getConfig().getConcurrentSubRawQueryThread());
pool =
- IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.QUERY_SERVICE.getName());
+ IoTDBThreadPoolFactory.newFixedThreadPool(
+ threadCnt, ThreadName.SUB_RAW_QUERY_SERVICE.getName());
}
}
@@ -107,6 +115,6 @@ public class QueryTaskPoolManager extends AbstractPoolManager {
// allowed to do nothing
}
- private static QueryTaskPoolManager instance = new QueryTaskPoolManager();
+ private static RawQueryReadTaskPoolManager instance = new RawQueryReadTaskPoolManager();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index a6880a7..0d06348 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -62,6 +62,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.tracing.TracingConstant;
import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet;
+import org.apache.iotdb.db.query.pool.QueryTaskManager;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.service.StaticResps;
import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
@@ -137,6 +138,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
import java.util.stream.Collectors;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
@@ -148,6 +151,81 @@ import static org.apache.iotdb.db.utils.ErrorHandlingUtils.tryCatchQueryExceptio
/** Thrift RPC implementation at server side. */
public class TSServiceImpl extends BasicServiceProvider implements TSIService.Iface {
+ protected class QueryTask implements Callable<TSExecuteStatementResp> {
+
+ private PhysicalPlan plan;
+ private final long queryStartTime;
+ private final long sessionId;
+ private final String statement;
+ private final long statementId;
+ private final long timeout;
+ private final int fetchSize;
+ private final boolean isJdbcQuery;
+ private final boolean enableRedirectQuery;
+
+ /**
+ * Execute query statement, return TSExecuteStatementResp with dataset.
+ *
+ * @param plan must be a plan for Query: QueryPlan, ShowPlan, and some AuthorPlan
+ */
+ public QueryTask(
+ PhysicalPlan plan,
+ long queryStartTime,
+ long sessionId,
+ String statement,
+ long statementId,
+ long timeout,
+ int fetchSize,
+ boolean isJdbcQuery,
+ boolean enableRedirectQuery) {
+ this.plan = plan;
+ this.queryStartTime = queryStartTime;
+ this.sessionId = sessionId;
+ this.statement = statement;
+ this.statementId = statementId;
+ this.timeout = timeout;
+ this.fetchSize = fetchSize;
+ this.isJdbcQuery = isJdbcQuery;
+ this.enableRedirectQuery = enableRedirectQuery;
+ }
+
+ @Override
+ public TSExecuteStatementResp call() throws Exception {
+ String username = sessionManager.getUsername(sessionId);
+ plan.setLoginUserName(username);
+
+ queryFrequencyRecorder.incrementAndGet();
+ AUDIT_LOGGER.debug("Session {} execute Query: {}", sessionId, statement);
+
+ final long queryId = sessionManager.requestQueryId(statementId, true);
+ QueryContext context =
+ genQueryContext(queryId, plan.isDebug(), queryStartTime, statement, timeout);
+
+ TSExecuteStatementResp resp;
+ try {
+ if (plan instanceof QueryPlan) {
+ QueryPlan queryPlan = (QueryPlan) plan;
+ queryPlan.setEnableRedirect(enableRedirectQuery);
+ resp = executeQueryPlan(queryPlan, context, isJdbcQuery, fetchSize, username);
+ } else {
+ resp = executeShowOrAuthorPlan(plan, context, fetchSize, username);
+ }
+ resp.setQueryId(queryId);
+ resp.setOperationType(plan.getOperatorType().toString());
+ } catch (Exception e) {
+ sessionManager.releaseQueryResourceNoExceptions(queryId);
+ throw e;
+ } finally {
+ addOperationLatency(Operation.EXECUTE_QUERY, queryStartTime);
+ long costTime = System.currentTimeMillis() - queryStartTime;
+ if (costTime >= CONFIG.getSlowQueryThreshold()) {
+ SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
+ }
+ }
+ return resp;
+ }
+ }
+
// main logger
private static final Logger LOGGER = LoggerFactory.getLogger(TSServiceImpl.class);
@@ -455,24 +533,30 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
PhysicalPlan physicalPlan =
processor.parseSQLToPhysicalPlan(statement, sessionManager.getZoneId(req.getSessionId()));
- return physicalPlan.isQuery()
- ? internalExecuteQueryStatement(
- statement,
- req.statementId,
- physicalPlan,
- req.fetchSize,
- startTime,
- req.timeout,
- req.getSessionId(),
- req.isEnableRedirectQuery(),
- req.isJdbcQuery())
- : executeUpdateStatement(
- statement,
- req.statementId,
- physicalPlan,
- req.fetchSize,
- req.timeout,
- req.getSessionId());
+ if (physicalPlan.isQuery()) {
+ Future<TSExecuteStatementResp> resp =
+ QueryTaskManager.getInstance()
+ .submit(
+ new QueryTask(
+ physicalPlan,
+ startTime,
+ req.sessionId,
+ req.statement,
+ req.statementId,
+ req.timeout,
+ req.fetchSize,
+ req.jdbcQuery,
+ req.enableRedirectQuery));
+ return resp.get();
+ } else {
+ return executeUpdateStatement(
+ statement,
+ req.statementId,
+ physicalPlan,
+ req.fetchSize,
+ req.timeout,
+ req.getSessionId());
+ }
} catch (InterruptedException e) {
LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
Thread.currentThread().interrupt();
@@ -496,19 +580,25 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
PhysicalPlan physicalPlan =
processor.parseSQLToPhysicalPlan(statement, sessionManager.getZoneId(req.sessionId));
- return physicalPlan.isQuery()
- ? internalExecuteQueryStatement(
- statement,
- req.statementId,
- physicalPlan,
- req.fetchSize,
- startTime,
- req.timeout,
- req.getSessionId(),
- req.isEnableRedirectQuery(),
- req.isJdbcQuery())
- : RpcUtils.getTSExecuteStatementResp(
- TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
+ if (physicalPlan.isQuery()) {
+ Future<TSExecuteStatementResp> resp =
+ QueryTaskManager.getInstance()
+ .submit(
+ new QueryTask(
+ physicalPlan,
+ startTime,
+ req.sessionId,
+ req.statement,
+ req.statementId,
+ req.timeout,
+ req.fetchSize,
+ req.jdbcQuery,
+ req.enableRedirectQuery));
+ return resp.get();
+ } else {
+ return RpcUtils.getTSExecuteStatementResp(
+ TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
+ }
} catch (InterruptedException e) {
LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
Thread.currentThread().interrupt();
@@ -532,19 +622,26 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
long startTime = System.currentTimeMillis();
PhysicalPlan physicalPlan =
processor.rawDataQueryReqToPhysicalPlan(req, sessionManager.getZoneId(req.sessionId));
- return physicalPlan.isQuery()
- ? internalExecuteQueryStatement(
- "",
- req.statementId,
- physicalPlan,
- req.fetchSize,
- startTime,
- CONFIG.getQueryTimeoutThreshold(),
- req.sessionId,
- req.isEnableRedirectQuery(),
- req.isJdbcQuery())
- : RpcUtils.getTSExecuteStatementResp(
- TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
+
+ if (physicalPlan.isQuery()) {
+ Future<TSExecuteStatementResp> resp =
+ QueryTaskManager.getInstance()
+ .submit(
+ new QueryTask(
+ physicalPlan,
+ startTime,
+ req.sessionId,
+ "",
+ req.statementId,
+ CONFIG.getQueryTimeoutThreshold(),
+ req.fetchSize,
+ req.isJdbcQuery(),
+ req.enableRedirectQuery));
+ return resp.get();
+ } else {
+ return RpcUtils.getTSExecuteStatementResp(
+ TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
+ }
} catch (InterruptedException e) {
LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
Thread.currentThread().interrupt();
@@ -566,19 +663,26 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
long startTime = System.currentTimeMillis();
PhysicalPlan physicalPlan =
processor.lastDataQueryReqToPhysicalPlan(req, sessionManager.getZoneId(req.sessionId));
- return physicalPlan.isQuery()
- ? internalExecuteQueryStatement(
- "",
- req.statementId,
- physicalPlan,
- req.fetchSize,
- startTime,
- CONFIG.getQueryTimeoutThreshold(),
- req.sessionId,
- req.isEnableRedirectQuery(),
- req.isJdbcQuery())
- : RpcUtils.getTSExecuteStatementResp(
- TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
+
+ if (physicalPlan.isQuery()) {
+ Future<TSExecuteStatementResp> resp =
+ QueryTaskManager.getInstance()
+ .submit(
+ new QueryTask(
+ physicalPlan,
+ startTime,
+ req.sessionId,
+ "",
+ req.statementId,
+ CONFIG.getQueryTimeoutThreshold(),
+ req.fetchSize,
+ req.isJdbcQuery(),
+ req.enableRedirectQuery));
+ return resp.get();
+ } else {
+ return RpcUtils.getTSExecuteStatementResp(
+ TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
+ }
} catch (InterruptedException e) {
LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
Thread.currentThread().interrupt();
@@ -590,60 +694,6 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
}
}
- /**
- * Execute query statement, return TSExecuteStatementResp with dataset.
- *
- * @param plan must be a plan for Query: QueryPlan, ShowPlan, and some AuthorPlan
- */
- @SuppressWarnings({"squid:S3776", "squid:S1141"}) // Suppress high Cognitive Complexity warning
- private TSExecuteStatementResp internalExecuteQueryStatement(
- String statement,
- long statementId,
- PhysicalPlan plan,
- int fetchSize,
- long queryStartTime,
- long timeout,
- long sessionId,
- boolean enableRedirect,
- boolean isJdbcQuery)
- throws QueryProcessException, SQLException, StorageEngineException,
- QueryFilterOptimizationException, MetadataException, IOException, InterruptedException,
- TException, AuthException {
- String username = sessionManager.getUsername(sessionId);
- plan.setLoginUserName(username);
-
- queryFrequencyRecorder.incrementAndGet();
- AUDIT_LOGGER.debug(
- "Session {} execute Query: {}", sessionManager.getCurrSessionId(), statement);
-
- final long queryId = sessionManager.requestQueryId(statementId, true);
- QueryContext context =
- genQueryContext(queryId, plan.isDebug(), queryStartTime, statement, timeout);
-
- TSExecuteStatementResp resp;
- try {
- if (plan instanceof QueryPlan) {
- QueryPlan queryPlan = (QueryPlan) plan;
- queryPlan.setEnableRedirect(enableRedirect);
- resp = executeQueryPlan(queryPlan, context, isJdbcQuery, fetchSize, username);
- } else {
- resp = executeShowOrAuthorPlan(plan, context, fetchSize, username);
- }
- resp.setQueryId(queryId);
- resp.setOperationType(plan.getOperatorType().toString());
- } catch (Exception e) {
- sessionManager.releaseQueryResourceNoExceptions(queryId);
- throw e;
- } finally {
- addOperationLatency(Operation.EXECUTE_QUERY, queryStartTime);
- long costTime = System.currentTimeMillis() - queryStartTime;
- if (costTime >= CONFIG.getSlowQueryThreshold()) {
- SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
- }
- }
- return resp;
- }
-
private TSExecuteStatementResp executeQueryPlan(
QueryPlan plan, QueryContext context, boolean isJdbcQuery, int fetchSize, String username)
throws TException, MetadataException, QueryProcessException, StorageEngineException,