You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/01/04 02:14:37 UTC
[iotdb] branch rel/0.12 updated: [To rel/0.12] [IOTDB-2195] Control the concurrent query execution thread (#4699)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new 1c9d42f [To rel/0.12] [IOTDB-2195] Control the concurrent query execution thread (#4699)
1c9d42f is described below
commit 1c9d42f8520e6a9f307e1d352c3a0b8634217999
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Tue Jan 4 10:14:06 2022 +0800
[To rel/0.12] [IOTDB-2195] Control the concurrent query execution thread (#4699)
---
client-cpp/pom.xml | 4 +-
compile-tools/pom.xml | 6 +-
distribution/pom.xml | 2 +-
example/client-cpp-example/pom.xml | 2 +-
example/udf/pom.xml | 2 +-
grafana/pom.xml | 2 +-
jdbc/pom.xml | 2 +-
pom.xml | 8 +-
.../resources/conf/iotdb-engine.properties | 13 +-
.../org/apache/iotdb/db/concurrent/ThreadName.java | 3 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 30 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 16 +
.../db/query/dataset/NonAlignEngineDataSet.java | 4 +-
.../dataset/RawQueryDataSetWithoutValueFilter.java | 9 +-
...yTaskPoolManager.java => QueryTaskManager.java} | 18 +-
...nager.java => RawQueryReadTaskPoolManager.java} | 30 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 381 ++++++++++++---------
17 files changed, 321 insertions(+), 211 deletions(-)
diff --git a/client-cpp/pom.xml b/client-cpp/pom.xml
index 014a0f5..79e295f 100644
--- a/client-cpp/pom.xml
+++ b/client-cpp/pom.xml
@@ -98,8 +98,8 @@
<cmake.root.dir>${project.parent.basedir}/compile-tools/thrift/target/cmake-${cmake-version}-win64-x64/</cmake.root.dir>
<thrift.exec.absolute.path>${project.parent.basedir}/compile-tools/thrift/target/build/compiler/cpp/bin/${cmake.build.type}/thrift.exe</thrift.exec.absolute.path>
<iotdb.server.script>start-server.bat</iotdb.server.script>
- <boost.include.dir />
- <boost.library.dir />
+ <boost.include.dir/>
+ <boost.library.dir/>
</properties>
</profile>
<profile>
diff --git a/compile-tools/pom.xml b/compile-tools/pom.xml
index 88bb39b..3d82f11 100644
--- a/compile-tools/pom.xml
+++ b/compile-tools/pom.xml
@@ -35,7 +35,7 @@
<cmake-version>3.17.3</cmake-version>
<openssl.include.dir>-Dtrue1=true1</openssl.include.dir>
<bison.executable.dir>-Dtrue1=true1</bison.executable.dir>
- <cmake.build.type />
+ <cmake.build.type/>
</properties>
<modules>
<module>thrift</module>
@@ -114,8 +114,8 @@
<thrift.make.executable>make</thrift.make.executable>
<thrift.compiler.executable>thrift.exe</thrift.compiler.executable>
<gradlew.executable>gradlew.bat</gradlew.executable>
- <boost.include.dir />
- <boost.library.dir />
+ <boost.include.dir/>
+ <boost.library.dir/>
</properties>
</profile>
</profiles>
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 8f8c28e..49dceed 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -29,7 +29,7 @@
</parent>
<artifactId>iotdb-distribution</artifactId>
<name>IoTDB Distribution</name>
- <modules />
+ <modules/>
<build>
<plugins>
<plugin>
diff --git a/example/client-cpp-example/pom.xml b/example/client-cpp-example/pom.xml
index 1cc5a23..c06f132 100644
--- a/example/client-cpp-example/pom.xml
+++ b/example/client-cpp-example/pom.xml
@@ -69,7 +69,7 @@
<properties>
<cmake.generator>Visual Studio 16 2019</cmake.generator>
<cmake.root.dir>${project.parent.basedir}/../compile-tools/thrift/target/cmake-${cmake-version}-win64-x64/</cmake.root.dir>
- <boost.include.dir />
+ <boost.include.dir/>
</properties>
</profile>
<profile>
diff --git a/example/udf/pom.xml b/example/udf/pom.xml
index 131f05d..6d3127c 100644
--- a/example/udf/pom.xml
+++ b/example/udf/pom.xml
@@ -77,7 +77,7 @@
<importOrder>
<order>org.apache.iotdb,,javax,java,\#</order>
</importOrder>
- <removeUnusedImports />
+ <removeUnusedImports/>
</java>
</configuration>
<executions>
diff --git a/grafana/pom.xml b/grafana/pom.xml
index d0c29ec..d4d80ac 100644
--- a/grafana/pom.xml
+++ b/grafana/pom.xml
@@ -170,7 +170,7 @@
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>${start-class}</mainClass>
</transformer>
diff --git a/jdbc/pom.xml b/jdbc/pom.xml
index 940678b..84a39f0 100644
--- a/jdbc/pom.xml
+++ b/jdbc/pom.xml
@@ -198,7 +198,7 @@
</goals>
</pluginExecutionFilter>
<action>
- <ignore />
+ <ignore/>
</action>
</pluginExecution>
</pluginExecutions>
diff --git a/pom.xml b/pom.xml
index b7d08a5..6e03cf7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -151,7 +151,7 @@
<sonar.junit.reportPaths>target/surefire-reports,target/failsafe-reports</sonar.junit.reportPaths>
<!-- By default, the argLine is empty-->
<gson.version>2.8.6</gson.version>
- <argLine />
+ <argLine/>
<!-- whether enable compiling the cpp client-->
<client-cpp>false</client-cpp>
<!-- disable enforcer by default-->
@@ -688,7 +688,7 @@
<importOrder>
<order>org.apache.iotdb,,javax,java,\#</order>
</importOrder>
- <removeUnusedImports />
+ <removeUnusedImports/>
</java>
<lineEndings>UNIX</lineEndings>
</configuration>
@@ -760,7 +760,7 @@
<phase>validate</phase>
<configuration>
<rules>
- <dependencyConvergence />
+ <dependencyConvergence/>
</rules>
</configuration>
<goals>
@@ -806,7 +806,7 @@
</requireJavaVersion>
<!-- Disabled for now as it breaks the ability to build single modules -->
<!--reactorModuleConvergence/-->
- <banVulnerable implementation="org.sonatype.ossindex.maven.enforcer.BanVulnerableDependencies" />
+ <banVulnerable implementation="org.sonatype.ossindex.maven.enforcer.BanVulnerableDependencies"/>
</rules>
</configuration>
</execution>
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 28a4f65..801e96d 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -249,8 +249,17 @@ timestamp_precision=ms
# How many threads can concurrently flush. When <= 0, use CPU core number.
# concurrent_flush_thread=0
-# How many threads can concurrently query. When <= 0, use CPU core number.
-# concurrent_query_thread=8
+# How many threads can concurrently execute query statement. When <= 0, use CPU core number.
+# Datatype: int
+# concurrent_query_thread=16
+
+# How many threads can concurrently read data for raw data query. When <= 0, use CPU core number.
+# Datatype: int
+# concurrent_sub_rawQuery_thread=8
+
+# Blocking queue size for read task in raw data query. Must >= 1.
+# Datatype: int
+# raw_query_blocking_queue_capacity=5
# whether take over the memory management by IoTDB rather than JVM when serializing memtable as bytes in memory
# (i.e., whether use ChunkBufferPool), value true, false
diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
index 5f83c12..ad1e613 100644
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
@@ -43,7 +43,8 @@ public enum ThreadName {
SYNC_MONITOR("Sync-Monitor"),
LOAD_TSFILE("Load-TsFile"),
TIME_COST_STATISTIC("TIME_COST_STATISTIC"),
- QUERY_SERVICE("Query");
+ QUERY_SERVICE("Query"),
+ SUB_RAW_QUERY_SERVICE("Sub_RawQuery");
private String name;
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index b880ae5..adca087 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -238,8 +238,16 @@ public class IoTDBConfig {
/** How many threads can concurrently flush. When <= 0, use CPU core number. */
private int concurrentFlushThread = Runtime.getRuntime().availableProcessors();
- /** How many threads can concurrently query. When <= 0, use CPU core number. */
- private int concurrentQueryThread = 8;
+ /** How many threads can concurrently execute query statement. When <= 0, use CPU core number. */
+ private int concurrentQueryThread = 16;
+
+ /**
+ * How many threads can concurrently read data for raw data query. When <= 0, use CPU core number.
+ */
+ private int concurrentSubRawQueryThread = 8;
+
+ /** Blocking queue size for read task in raw data query. */
+ private int rawQueryBlockingQueueCapacity = 5;
/** Is the write mem control for writing enable. */
private boolean enableMemControl = true;
@@ -1091,10 +1099,26 @@ public class IoTDBConfig {
return concurrentQueryThread;
}
- void setConcurrentQueryThread(int concurrentQueryThread) {
+ public void setConcurrentQueryThread(int concurrentQueryThread) {
this.concurrentQueryThread = concurrentQueryThread;
}
+ public int getConcurrentSubRawQueryThread() {
+ return concurrentSubRawQueryThread;
+ }
+
+ void setConcurrentSubRawQueryThread(int concurrentSubRawQueryThread) {
+ this.concurrentSubRawQueryThread = concurrentSubRawQueryThread;
+ }
+
+ public int getRawQueryBlockingQueueCapacity() {
+ return rawQueryBlockingQueueCapacity;
+ }
+
+ public void setRawQueryBlockingQueueCapacity(int rawQueryBlockingQueueCapacity) {
+ this.rawQueryBlockingQueueCapacity = rawQueryBlockingQueueCapacity;
+ }
+
public long getSeqTsFileSize() {
return seqTsFileSize;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index a2c6d3d..66b6fbe 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -426,6 +426,22 @@ public class IoTDBDescriptor {
conf.setConcurrentQueryThread(Runtime.getRuntime().availableProcessors());
}
+ conf.setConcurrentSubRawQueryThread(
+ Integer.parseInt(
+ properties.getProperty(
+ "concurrent_sub_rawQuery_thread",
+ Integer.toString(conf.getConcurrentSubRawQueryThread()))));
+
+ if (conf.getConcurrentSubRawQueryThread() <= 0) {
+ conf.setConcurrentSubRawQueryThread(Runtime.getRuntime().availableProcessors());
+ }
+
+ conf.setRawQueryBlockingQueueCapacity(
+ Integer.parseInt(
+ properties.getProperty(
+ "raw_query_blocking_queue_capacity",
+ Integer.toString(conf.getRawQueryBlockingQueueCapacity()))));
+
conf.setmManagerCacheSize(
Integer.parseInt(
properties
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
index f0f887a..f17a62d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.query.dataset;
import org.apache.iotdb.db.concurrent.WrappedRunnable;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.query.control.QueryTimeManager;
-import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
+import org.apache.iotdb.db.query.pool.RawQueryReadTaskPoolManager;
import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
@@ -239,7 +239,7 @@ public class NonAlignEngineDataSet extends QueryDataSet implements DirectNonAlig
/** flag that main thread is interrupted or not */
private volatile boolean interrupted = false;
- private static final QueryTaskPoolManager pool = QueryTaskPoolManager.getInstance();
+ private static final RawQueryReadTaskPoolManager pool = RawQueryReadTaskPoolManager.getInstance();
private static final Logger LOGGER = LoggerFactory.getLogger(NonAlignEngineDataSet.class);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
index a5f613d..d739d10 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
@@ -20,9 +20,10 @@
package org.apache.iotdb.db.query.dataset;
import org.apache.iotdb.db.concurrent.WrappedRunnable;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.query.control.QueryTimeManager;
-import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
+import org.apache.iotdb.db.query.pool.RawQueryReadTaskPoolManager;
import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
import org.apache.iotdb.db.utils.datastructure.TimeSelector;
@@ -165,11 +166,13 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
protected int[] batchDataLengthList;
// capacity for blocking queue
- private static final int BLOCKING_QUEUE_CAPACITY = 5;
+ private static final int BLOCKING_QUEUE_CAPACITY =
+ IoTDBDescriptor.getInstance().getConfig().getRawQueryBlockingQueueCapacity();
private final long queryId;
- private static final QueryTaskPoolManager TASK_POOL_MANAGER = QueryTaskPoolManager.getInstance();
+ private static final RawQueryReadTaskPoolManager TASK_POOL_MANAGER =
+ RawQueryReadTaskPoolManager.getInstance();
private static final Logger LOGGER =
LoggerFactory.getLogger(RawQueryDataSetWithoutValueFilter.class);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskManager.java
similarity index 82%
copy from server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java
copy to server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskManager.java
index 1dcda50..c68f526 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskManager.java
@@ -27,11 +27,17 @@ import org.apache.iotdb.db.engine.flush.pool.AbstractPoolManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class QueryTaskPoolManager extends AbstractPoolManager {
+/**
+ * This pool is used to execute all query task send from client, and return TSExecuteStatementResp.
+ * Thread named by Query.
+ *
+ * <p>Execute QueryTask() in TSServiceImpl
+ */
+public class QueryTaskManager extends AbstractPoolManager {
- private static final Logger LOGGER = LoggerFactory.getLogger(QueryTaskPoolManager.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(QueryTaskManager.class);
- private QueryTaskPoolManager() {
+ private QueryTaskManager() {
int threadCnt =
Math.min(
Runtime.getRuntime().availableProcessors(),
@@ -39,8 +45,8 @@ public class QueryTaskPoolManager extends AbstractPoolManager {
pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.QUERY_SERVICE.getName());
}
- public static QueryTaskPoolManager getInstance() {
- return QueryTaskPoolManager.InstanceHolder.instance;
+ public static QueryTaskManager getInstance() {
+ return QueryTaskManager.InstanceHolder.instance;
}
@Override
@@ -79,6 +85,6 @@ public class QueryTaskPoolManager extends AbstractPoolManager {
// allowed to do nothing
}
- private static QueryTaskPoolManager instance = new QueryTaskPoolManager();
+ private static QueryTaskManager instance = new QueryTaskManager();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/query/pool/RawQueryReadTaskPoolManager.java
similarity index 67%
rename from server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java
rename to server/src/main/java/org/apache/iotdb/db/query/pool/RawQueryReadTaskPoolManager.java
index 1dcda50..dc84652 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/pool/RawQueryReadTaskPoolManager.java
@@ -27,20 +27,27 @@ import org.apache.iotdb.db.engine.flush.pool.AbstractPoolManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class QueryTaskPoolManager extends AbstractPoolManager {
+/**
+ * This thread pool is used to read data for raw data query. Thread named by Sub_Raw_Query.
+ *
+ * <p>Execute ReadTask() in RawQueryReadTaskPoolManager
+ */
+public class RawQueryReadTaskPoolManager extends AbstractPoolManager {
- private static final Logger LOGGER = LoggerFactory.getLogger(QueryTaskPoolManager.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(RawQueryReadTaskPoolManager.class);
- private QueryTaskPoolManager() {
+ private RawQueryReadTaskPoolManager() {
int threadCnt =
Math.min(
Runtime.getRuntime().availableProcessors(),
- IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread());
- pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.QUERY_SERVICE.getName());
+ IoTDBDescriptor.getInstance().getConfig().getConcurrentSubRawQueryThread());
+ pool =
+ IoTDBThreadPoolFactory.newFixedThreadPool(
+ threadCnt, ThreadName.SUB_RAW_QUERY_SERVICE.getName());
}
- public static QueryTaskPoolManager getInstance() {
- return QueryTaskPoolManager.InstanceHolder.instance;
+ public static RawQueryReadTaskPoolManager getInstance() {
+ return RawQueryReadTaskPoolManager.InstanceHolder.instance;
}
@Override
@@ -50,7 +57,7 @@ public class QueryTaskPoolManager extends AbstractPoolManager {
@Override
public String getName() {
- return "query task";
+ return "raw query read task";
}
@Override
@@ -59,9 +66,10 @@ public class QueryTaskPoolManager extends AbstractPoolManager {
int threadCnt =
Math.min(
Runtime.getRuntime().availableProcessors(),
- IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread());
+ IoTDBDescriptor.getInstance().getConfig().getConcurrentSubRawQueryThread());
pool =
- IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.QUERY_SERVICE.getName());
+ IoTDBThreadPoolFactory.newFixedThreadPool(
+ threadCnt, ThreadName.SUB_RAW_QUERY_SERVICE.getName());
}
}
@@ -79,6 +87,6 @@ public class QueryTaskPoolManager extends AbstractPoolManager {
// allowed to do nothing
}
- private static QueryTaskPoolManager instance = new QueryTaskPoolManager();
+ private static RawQueryReadTaskPoolManager instance = new RawQueryReadTaskPoolManager();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 3f7c349..45988da 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -28,7 +28,11 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.cost.statistic.Measurement;
import org.apache.iotdb.db.cost.statistic.Operation;
-import org.apache.iotdb.db.exception.*;
+import org.apache.iotdb.db.exception.BatchProcessException;
+import org.apache.iotdb.db.exception.IoTDBException;
+import org.apache.iotdb.db.exception.QueryInBatchStatementException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.StorageGroupNotReadyException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
@@ -76,6 +80,7 @@ import org.apache.iotdb.db.query.control.TracingManager;
import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet;
+import org.apache.iotdb.db.query.pool.QueryTaskManager;
import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder;
import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
@@ -141,6 +146,8 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -207,6 +214,151 @@ public class TSServiceImpl implements TSIService.Iface {
TimeUnit.MINUTES);
}
+ /**
+ * Execute query statement, return TSExecuteStatementResp with dataset.
+ *
+ * @param plan must be a plan for Query: QueryPlan, ShowPlan, and some AuthorPlan
+ */
+ protected class QueryTask implements Callable<TSExecuteStatementResp> {
+
+ private PhysicalPlan plan;
+ private final String username;
+ private final String statement;
+ private final long statementId;
+ private final long timeout;
+ private final int fetchSize;
+ private final boolean enableRedirectQuery;
+
+ public QueryTask(
+ PhysicalPlan plan,
+ String username,
+ String statement,
+ long statementId,
+ long timeout,
+ int fetchSize,
+ boolean enableRedirectQuery) {
+ this.plan = plan;
+ this.username = username;
+ this.statement = statement;
+ this.statementId = statementId;
+ this.timeout = timeout;
+ this.fetchSize = fetchSize;
+ this.enableRedirectQuery = enableRedirectQuery;
+ }
+
+ @Override
+ public TSExecuteStatementResp call() throws Exception {
+ queryCount.incrementAndGet();
+ AUDIT_LOGGER.debug(
+ "Session {} execute Query: {}", sessionManager.getCurrSessionId(), statement);
+ long startTime = System.currentTimeMillis();
+ // generate the queryId for the operation
+ long queryId = sessionManager.requestQueryId(statementId, true);
+ try {
+ // register query info to queryTimeManager
+ if (!(plan instanceof ShowQueryProcesslistPlan)) {
+ queryTimeManager.registerQuery(queryId, startTime, statement, timeout);
+ }
+ if (plan instanceof QueryPlan && config.isEnablePerformanceTracing()) {
+ TracingManager tracingManager = TracingManager.getInstance();
+ if (!(plan instanceof AlignByDevicePlan)) {
+ tracingManager.writeQueryInfo(queryId, statement, startTime, plan.getPaths().size());
+ } else {
+ tracingManager.writeQueryInfo(queryId, statement, startTime);
+ }
+ }
+
+ if (plan instanceof AuthorPlan) {
+ plan.setLoginUserName(username);
+ }
+
+ TSExecuteStatementResp resp = null;
+ // execute it before createDataSet since it may change the content of query plan
+ if (plan instanceof QueryPlan && !(plan instanceof UDFPlan)) {
+ resp = getQueryColumnHeaders(plan, username);
+ }
+ if (plan instanceof QueryPlan) {
+ ((QueryPlan) plan).setEnableRedirect(enableRedirectQuery);
+ }
+ // create and cache dataset
+ QueryDataSet newDataSet = createQueryDataSet(queryId, plan, fetchSize);
+
+ if (newDataSet.getEndPoint() != null && enableRedirectQuery) {
+ LOGGER.debug(
+ "need to redirect {} {} to node {}", statement, queryId, newDataSet.getEndPoint());
+ QueryDataSet.EndPoint endPoint = newDataSet.getEndPoint();
+ return redirectQueryToAnotherNode(resp, queryId, endPoint.getIp(), endPoint.getPort());
+ }
+
+ if (plan instanceof ShowPlan || plan instanceof AuthorPlan) {
+ resp = getListDataSetHeaders(newDataSet);
+ } else if (plan instanceof UDFPlan) {
+ resp = getQueryColumnHeaders(plan, username);
+ }
+
+ resp.setOperationType(plan.getOperatorType().toString());
+ if (plan.getOperatorType() == OperatorType.AGGREGATION) {
+ resp.setIgnoreTimeStamp(true);
+ } else if (plan instanceof ShowQueryProcesslistPlan) {
+ resp.setIgnoreTimeStamp(false);
+ }
+
+ if (newDataSet instanceof DirectNonAlignDataSet) {
+ resp.setNonAlignQueryDataSet(fillRpcNonAlignReturnData(fetchSize, newDataSet, username));
+ } else {
+ try {
+ TSQueryDataSet tsQueryDataSet = fillRpcReturnData(fetchSize, newDataSet, username);
+ resp.setQueryDataSet(tsQueryDataSet);
+ } catch (RedirectException e) {
+ LOGGER.debug("need to redirect {} {} to {}", statement, queryId, e.getEndPoint());
+ if (enableRedirectQuery) {
+ EndPoint endPoint = e.getEndPoint();
+ redirectQueryToAnotherNode(resp, queryId, endPoint.ip, endPoint.port);
+ } else {
+ LOGGER.error(
+ "execute {} error, if session does not support redirect,"
+ + " should not throw redirection exception.",
+ statement,
+ e);
+ }
+ }
+ }
+ resp.setQueryId(queryId);
+
+ if (plan instanceof AlignByDevicePlan && config.isEnablePerformanceTracing()) {
+ TracingManager.getInstance()
+ .writePathsNum(queryId, ((AlignByDeviceDataSet) newDataSet).getPathsNum());
+ }
+
+ if (enableMetric) {
+ long endTime = System.currentTimeMillis();
+ SqlArgument sqlArgument = new SqlArgument(resp, plan, statement, startTime, endTime);
+ synchronized (sqlArgumentList) {
+ sqlArgumentList.add(sqlArgument);
+ if (sqlArgumentList.size() >= MAX_SIZE) {
+ sqlArgumentList.subList(0, DELETE_SIZE).clear();
+ }
+ }
+ }
+
+ // remove query info in QueryTimeManager
+ if (!(plan instanceof ShowQueryProcesslistPlan)) {
+ queryTimeManager.unRegisterQuery(queryId);
+ }
+ return resp;
+ } catch (Exception e) {
+ releaseQueryResourceNoExceptions(queryId);
+ throw e;
+ } finally {
+ Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+ long costTime = System.currentTimeMillis() - startTime;
+ if (costTime >= config.getSlowQueryThreshold()) {
+ SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
+ }
+ }
+ }
+ }
+
public static List<SqlArgument> getSqlArgumentList() {
return sqlArgumentList;
}
@@ -599,16 +751,22 @@ public class TSServiceImpl implements TSIService.Iface {
processor.parseSQLToPhysicalPlan(
statement, sessionManager.getZoneId(req.getSessionId()), req.fetchSize);
- return physicalPlan.isQuery()
- ? internalExecuteQueryStatement(
- statement,
- req.statementId,
- physicalPlan,
- req.fetchSize,
- req.timeout,
- sessionManager.getUsername(req.getSessionId()),
- req.isEnableRedirectQuery())
- : executeUpdateStatement(physicalPlan, req.getSessionId());
+ if (physicalPlan.isQuery()) {
+ Future<TSExecuteStatementResp> resp =
+ QueryTaskManager.getInstance()
+ .submit(
+ new QueryTask(
+ physicalPlan,
+ sessionManager.getUsername(req.sessionId),
+ req.statement,
+ req.statementId,
+ req.timeout,
+ req.fetchSize,
+ req.enableRedirectQuery));
+ return resp.get();
+ } else {
+ return executeUpdateStatement(physicalPlan, req.getSessionId());
+ }
} catch (InterruptedException e) {
LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
Thread.currentThread().interrupt();
@@ -632,17 +790,23 @@ public class TSServiceImpl implements TSIService.Iface {
processor.parseSQLToPhysicalPlan(
statement, sessionManager.getZoneId(req.sessionId), req.fetchSize);
- return physicalPlan.isQuery()
- ? internalExecuteQueryStatement(
- statement,
- req.statementId,
- physicalPlan,
- req.fetchSize,
- req.timeout,
- sessionManager.getUsername(req.getSessionId()),
- req.isEnableRedirectQuery())
- : RpcUtils.getTSExecuteStatementResp(
- TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
+ if (physicalPlan.isQuery()) {
+ Future<TSExecuteStatementResp> resp =
+ QueryTaskManager.getInstance()
+ .submit(
+ new QueryTask(
+ physicalPlan,
+ sessionManager.getUsername(req.sessionId),
+ req.statement,
+ req.statementId,
+ req.timeout,
+ req.fetchSize,
+ req.enableRedirectQuery));
+ return resp.get();
+ } else {
+ return RpcUtils.getTSExecuteStatementResp(
+ TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
+ }
} catch (InterruptedException e) {
LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
Thread.currentThread().interrupt();
@@ -663,17 +827,23 @@ public class TSServiceImpl implements TSIService.Iface {
PhysicalPlan physicalPlan =
processor.rawDataQueryReqToPhysicalPlan(req, sessionManager.getZoneId(req.sessionId));
- return physicalPlan.isQuery()
- ? internalExecuteQueryStatement(
- "",
- req.statementId,
- physicalPlan,
- req.fetchSize,
- config.getQueryTimeoutThreshold(),
- sessionManager.getUsername(req.sessionId),
- req.isEnableRedirectQuery())
- : RpcUtils.getTSExecuteStatementResp(
- TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
+ if (physicalPlan.isQuery()) {
+ Future<TSExecuteStatementResp> resp =
+ QueryTaskManager.getInstance()
+ .submit(
+ new QueryTask(
+ physicalPlan,
+ sessionManager.getUsername(req.sessionId),
+ "",
+ req.statementId,
+ config.getQueryTimeoutThreshold(),
+ req.fetchSize,
+ req.enableRedirectQuery));
+ return resp.get();
+ } else {
+ return RpcUtils.getTSExecuteStatementResp(
+ TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
+ }
} catch (InterruptedException e) {
LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
Thread.currentThread().interrupt();
@@ -685,142 +855,15 @@ public class TSServiceImpl implements TSIService.Iface {
}
}
- /**
- * @param plan must be a plan for Query: FillQueryPlan, AggregationPlan, GroupByTimePlan, UDFPlan,
- * some AuthorPlan
- */
- @SuppressWarnings({"squid:S3776", "squid:S1141"}) // Suppress high Cognitive Complexity warning
- private TSExecuteStatementResp internalExecuteQueryStatement(
- String statement,
- long statementId,
- PhysicalPlan plan,
- int fetchSize,
- long timeout,
- String username,
- boolean enableRedirect)
- throws QueryProcessException, SQLException, StorageEngineException,
- QueryFilterOptimizationException, MetadataException, IOException, InterruptedException,
- TException, AuthException {
- queryCount.incrementAndGet();
- AUDIT_LOGGER.debug(
- "Session {} execute Query: {}", sessionManager.getCurrSessionId(), statement);
- long startTime = System.currentTimeMillis();
- long queryId = -1;
- try {
- // generate the queryId for the operation
- queryId = sessionManager.requestQueryId(statementId, true);
- // register query info to queryTimeManager
- if (!(plan instanceof ShowQueryProcesslistPlan)) {
- queryTimeManager.registerQuery(queryId, startTime, statement, timeout);
- }
- if (plan instanceof QueryPlan && config.isEnablePerformanceTracing()) {
- TracingManager tracingManager = TracingManager.getInstance();
- if (!(plan instanceof AlignByDevicePlan)) {
- tracingManager.writeQueryInfo(queryId, statement, startTime, plan.getPaths().size());
- } else {
- tracingManager.writeQueryInfo(queryId, statement, startTime);
- }
- }
-
- if (plan instanceof AuthorPlan) {
- plan.setLoginUserName(username);
- }
-
- TSExecuteStatementResp resp = null;
- // execute it before createDataSet since it may change the content of query plan
- if (plan instanceof QueryPlan && !(plan instanceof UDFPlan)) {
- resp = getQueryColumnHeaders(plan, username);
- }
- if (plan instanceof QueryPlan) {
- ((QueryPlan) plan).setEnableRedirect(enableRedirect);
- }
- // create and cache dataset
- QueryDataSet newDataSet = createQueryDataSet(queryId, plan, fetchSize);
-
- if (newDataSet.getEndPoint() != null && enableRedirect) {
- // redirect query
- LOGGER.debug(
- "need to redirect {} {} to node {}", statement, queryId, newDataSet.getEndPoint());
- TSStatus status = new TSStatus();
- status.setRedirectNode(
- new EndPoint(newDataSet.getEndPoint().getIp(), newDataSet.getEndPoint().getPort()));
- status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
- resp.setStatus(status);
- resp.setQueryId(queryId);
- return resp;
- }
-
- if (plan instanceof ShowPlan || plan instanceof AuthorPlan) {
- resp = getListDataSetHeaders(newDataSet);
- } else if (plan instanceof UDFPlan) {
- resp = getQueryColumnHeaders(plan, username);
- }
-
- resp.setOperationType(plan.getOperatorType().toString());
- if (plan.getOperatorType() == OperatorType.AGGREGATION) {
- resp.setIgnoreTimeStamp(true);
- } else if (plan instanceof ShowQueryProcesslistPlan) {
- resp.setIgnoreTimeStamp(false);
- }
-
- if (newDataSet instanceof DirectNonAlignDataSet) {
- resp.setNonAlignQueryDataSet(fillRpcNonAlignReturnData(fetchSize, newDataSet, username));
- } else {
- try {
- TSQueryDataSet tsQueryDataSet = fillRpcReturnData(fetchSize, newDataSet, username);
- resp.setQueryDataSet(tsQueryDataSet);
- } catch (RedirectException e) {
- LOGGER.debug("need to redirect {} {} to {}", statement, queryId, e.getEndPoint());
- if (enableRedirect) {
- // redirect query
- TSStatus status = new TSStatus();
- status.setRedirectNode(e.getEndPoint());
- status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
- resp.setStatus(status);
- resp.setQueryId(queryId);
- return resp;
- } else {
- LOGGER.error(
- "execute {} error, if session does not support redirect,"
- + " should not throw redirection exception.",
- statement,
- e);
- }
- }
- }
- resp.setQueryId(queryId);
-
- if (plan instanceof AlignByDevicePlan && config.isEnablePerformanceTracing()) {
- TracingManager.getInstance()
- .writePathsNum(queryId, ((AlignByDeviceDataSet) newDataSet).getPathsNum());
- }
-
- if (enableMetric) {
- long endTime = System.currentTimeMillis();
- SqlArgument sqlArgument = new SqlArgument(resp, plan, statement, startTime, endTime);
- synchronized (sqlArgumentList) {
- sqlArgumentList.add(sqlArgument);
- if (sqlArgumentList.size() >= MAX_SIZE) {
- sqlArgumentList.subList(0, DELETE_SIZE).clear();
- }
- }
- }
-
- // remove query info in QueryTimeManager
- if (!(plan instanceof ShowQueryProcesslistPlan)) {
- queryTimeManager.unRegisterQuery(queryId);
- }
- return resp;
- } catch (Exception e) {
- releaseQueryResourceNoExceptions(queryId);
- throw e;
- } finally {
- Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, startTime);
- long costTime = System.currentTimeMillis() - startTime;
- if (costTime >= config.getSlowQueryThreshold()) {
- SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
- }
- }
+ /** Redirect query */
+ private TSExecuteStatementResp redirectQueryToAnotherNode(
+ TSExecuteStatementResp resp, long queryId, String ip, int port) {
+ TSStatus status = new TSStatus();
+ status.setRedirectNode(new EndPoint(ip, port));
+ status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
+ resp.setStatus(status);
+ resp.setQueryId(queryId);
+ return resp;
}
private TSExecuteStatementResp getListDataSetHeaders(QueryDataSet dataSet) {