You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/07/20 13:23:40 UTC
[iotdb] branch master updated: [IOTDB-3721] Keep each FI's execution timeout consistent with timeout set in session (#6663)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6d4d8c0d73 [IOTDB-3721] Keep each FI's execution timeout consistent with timeout set in session (#6663)
6d4d8c0d73 is described below
commit 6d4d8c0d738f21395b629c59f49ae5a271303613
Author: FangJL <37...@users.noreply.github.com>
AuthorDate: Wed Jul 20 21:23:33 2022 +0800
[IOTDB-3721] Keep each FI's execution timeout consistent with timeout set in session (#6663)
---
.../main/java/org/apache/iotdb/SessionExample.java | 5 ++-
.../iotdb/SyntaxConventionRelatedExample.java | 4 +-
.../iotdb/session/IoTDBSessionComplexIT.java | 2 +-
.../apache/iotdb/session/pool/SessionPoolTest.java | 2 +-
.../apache/iotdb/session/template/TemplateUT.java | 5 ++-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 6 +--
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 4 +-
.../iotdb/db/mpp/common/MPPQueryContext.java | 36 ++++++++++++++++
.../fragment/FragmentInstanceExecution.java | 5 ++-
.../fragment/FragmentInstanceManager.java | 16 ++++++-
.../db/mpp/execution/schedule/DriverScheduler.java | 9 ++--
.../mpp/execution/schedule/IDriverScheduler.java | 3 +-
.../org/apache/iotdb/db/mpp/plan/Coordinator.java | 50 +++++++++++++++++++++-
.../db/mpp/plan/analyze/ClusterSchemaFetcher.java | 16 ++++++-
.../db/mpp/plan/execution/QueryExecution.java | 7 +++
.../SimpleFragmentParallelPlanner.java | 3 +-
.../distribution/WriteFragmentParallelPlanner.java | 3 +-
.../db/mpp/plan/planner/plan/FragmentInstance.java | 21 ++++++++-
.../mpprest/impl/GrafanaApiServiceImpl.java | 26 +++++++++--
.../protocol/mpprest/impl/RestApiServiceImpl.java | 14 ++++--
.../iotdb/db/protocol/mqtt/MPPPublishHandler.java | 5 ++-
.../service/thrift/impl/ClientRPCServiceImpl.java | 12 ++++--
.../execution/schedule/DriverSchedulerTest.java | 9 ++--
.../db/mpp/plan/StandaloneCoordinatorTest.java | 9 +++-
.../mpp/plan/plan/FragmentInstanceSerdeTest.java | 9 +++-
.../plan/scheduler/StandaloneSchedulerTest.java | 15 ++++---
.../main/java/org/apache/iotdb/session/Config.java | 2 +
.../java/org/apache/iotdb/session/Session.java | 21 ++++++---
.../apache/iotdb/session/SessionConnection.java | 7 ++-
.../org/apache/iotdb/session/pool/SessionPool.java | 5 ++-
thrift/src/main/thrift/client.thrift | 3 ++
31 files changed, 273 insertions(+), 61 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index d779086480..43d93cdf97 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -754,8 +754,9 @@ public class SessionExample {
paths.add(ROOT_SG1_D1_S3);
long startTime = 10L;
long endTime = 200L;
+ long timeOut = 60000;
- try (SessionDataSet dataSet = session.executeRawDataQuery(paths, startTime, endTime)) {
+ try (SessionDataSet dataSet = session.executeRawDataQuery(paths, startTime, endTime, timeOut)) {
System.out.println(dataSet.getColumnNames());
dataSet.setFetchSize(1024);
@@ -770,7 +771,7 @@ public class SessionExample {
paths.add(ROOT_SG1_D1_S1);
paths.add(ROOT_SG1_D1_S2);
paths.add(ROOT_SG1_D1_S3);
- try (SessionDataSet sessionDataSet = session.executeLastDataQuery(paths, 3)) {
+ try (SessionDataSet sessionDataSet = session.executeLastDataQuery(paths, 3, 60000)) {
System.out.println(sessionDataSet.getColumnNames());
sessionDataSet.setFetchSize(1024);
while (sessionDataSet.hasNext()) {
diff --git a/example/session/src/main/java/org/apache/iotdb/SyntaxConventionRelatedExample.java b/example/session/src/main/java/org/apache/iotdb/SyntaxConventionRelatedExample.java
index 362702f76a..f6529e2910 100644
--- a/example/session/src/main/java/org/apache/iotdb/SyntaxConventionRelatedExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SyntaxConventionRelatedExample.java
@@ -109,8 +109,10 @@ public class SyntaxConventionRelatedExample {
long startTime = 1L;
long endTime = 100L;
+ long timeOut = 60000;
- try (SessionDataSet dataSet1 = session.executeRawDataQuery(paths, startTime, endTime)) {
+ try (SessionDataSet dataSet1 =
+ session.executeRawDataQuery(paths, startTime, endTime, timeOut)) {
System.out.println(dataSet1.getColumnNames());
dataSet1.setFetchSize(1024);
diff --git a/integration/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java b/integration/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java
index 3aeb9de5ee..114508878a 100644
--- a/integration/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java
+++ b/integration/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java
@@ -593,7 +593,7 @@ public class IoTDBSessionComplexIT {
paths.add("root.sg1.d2.s1");
paths.add("root.sg1.d2.s2");
- SessionDataSet sessionDataSet = session.executeRawDataQuery(paths, 450L, 600L);
+ SessionDataSet sessionDataSet = session.executeRawDataQuery(paths, 450L, 600L, 60000);
sessionDataSet.setFetchSize(1024);
int count = 0;
diff --git a/integration/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java b/integration/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
index 2e8981d578..27ac0c8a69 100644
--- a/integration/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
+++ b/integration/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
@@ -209,7 +209,7 @@ public class SessionPoolTest {
service.submit(
() -> {
try {
- SessionDataSetWrapper wrapper = pool.executeRawDataQuery(pathList, no, no + 1);
+ SessionDataSetWrapper wrapper = pool.executeRawDataQuery(pathList, no, no + 1, 60000);
if (wrapper.hasNext()) {
Assert.assertEquals(no, wrapper.sessionDataSet.next().getTimestamp());
}
diff --git a/integration/src/test/java/org/apache/iotdb/session/template/TemplateUT.java b/integration/src/test/java/org/apache/iotdb/session/template/TemplateUT.java
index 6ea9dc5b97..5548c461df 100644
--- a/integration/src/test/java/org/apache/iotdb/session/template/TemplateUT.java
+++ b/integration/src/test/java/org/apache/iotdb/session/template/TemplateUT.java
@@ -352,7 +352,7 @@ public class TemplateUT {
Collections.singletonList(12345L));
SessionDataSet res =
- session.executeRawDataQuery(Collections.singletonList("root.sg.v1.*"), 0L, 999L);
+ session.executeRawDataQuery(Collections.singletonList("root.sg.v1.*"), 0L, 999L, 60000);
while (res.hasNext()) {
RowRecord rec = res.next();
// correspond to x, y, append
@@ -366,7 +366,8 @@ public class TemplateUT {
Collections.singletonList(TSDataType.INT64),
Collections.singletonList(12345L));
- res = session.executeRawDataQuery(Collections.singletonList("root.sg.v1.d0.*"), 0L, 999L);
+ res =
+ session.executeRawDataQuery(Collections.singletonList("root.sg.v1.d0.*"), 0L, 999L, 60000);
while (res.hasNext()) {
RowRecord rec = res.next();
// x is not inside template
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 66d48dfa21..3df27bacd3 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -516,7 +516,7 @@ public class IoTDBConfig {
private long cacheFileReaderClearPeriod = 100000;
/** the max executing time of query in ms. Unit: millisecond */
- private int queryTimeoutThreshold = 60000;
+ private long queryTimeoutThreshold = 60000;
/** the max time to live of a session in ms. Unit: millisecond */
private int sessionTimeoutThreshold = 0;
@@ -1489,11 +1489,11 @@ public class IoTDBConfig {
this.cacheFileReaderClearPeriod = cacheFileReaderClearPeriod;
}
- public int getQueryTimeoutThreshold() {
+ public long getQueryTimeoutThreshold() {
return queryTimeoutThreshold;
}
- public void setQueryTimeoutThreshold(int queryTimeoutThreshold) {
+ public void setQueryTimeoutThreshold(long queryTimeoutThreshold) {
this.queryTimeoutThreshold = queryTimeoutThreshold;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 2c3a2a6ed0..046cebbdc0 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -412,9 +412,9 @@ public class IoTDBDescriptor {
conf.setSubCompactionTaskNum(subtaskNum);
conf.setQueryTimeoutThreshold(
- Integer.parseInt(
+ Long.parseLong(
properties.getProperty(
- "query_timeout_threshold", Integer.toString(conf.getQueryTimeoutThreshold()))));
+ "query_timeout_threshold", Long.toString(conf.getQueryTimeoutThreshold()))));
conf.setSessionTimeoutThreshold(
Integer.parseInt(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
index 153b1e8598..81e1330eca 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
@@ -30,6 +30,8 @@ public class MPPQueryContext {
private QueryId queryId;
private SessionInfo session;
private QueryType queryType = QueryType.READ;
+ private long timeOut;
+ private long startTime;
private TEndPoint localDataBlockEndpoint;
private TEndPoint localInternalEndpoint;
@@ -53,6 +55,24 @@ public class MPPQueryContext {
this.resultNodeContext = new ResultNodeContext(queryId);
}
+ public MPPQueryContext(
+ String sql,
+ QueryId queryId,
+ SessionInfo session,
+ TEndPoint localDataBlockEndpoint,
+ TEndPoint localInternalEndpoint,
+ long timeOut,
+ long startTime) {
+ this.sql = sql;
+ this.queryId = queryId;
+ this.session = session;
+ this.localDataBlockEndpoint = localDataBlockEndpoint;
+ this.localInternalEndpoint = localInternalEndpoint;
+ this.resultNodeContext = new ResultNodeContext(queryId);
+ this.timeOut = timeOut;
+ this.startTime = startTime;
+ }
+
public QueryId getQueryId() {
return queryId;
}
@@ -61,6 +81,14 @@ public class MPPQueryContext {
return queryType;
}
+ public long getTimeOut() {
+ return timeOut;
+ }
+
+ public void setTimeOut(long timeOut) {
+ this.timeOut = timeOut;
+ }
+
public void setQueryType(QueryType queryType) {
this.queryType = queryType;
}
@@ -80,4 +108,12 @@ public class MPPQueryContext {
public SessionInfo getSession() {
return session;
}
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index c1fd9568a7..767487fc67 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -51,11 +51,12 @@ public class FragmentInstanceExecution {
FragmentInstanceContext context,
IDriver driver,
FragmentInstanceStateMachine stateMachine,
- CounterStat failedInstances) {
+ CounterStat failedInstances,
+ long timeOut) {
FragmentInstanceExecution execution =
new FragmentInstanceExecution(instanceId, context, driver, stateMachine);
execution.initialize(failedInstances, scheduler);
- scheduler.submitDrivers(instanceId.getQueryId(), ImmutableList.of(driver));
+ scheduler.submitDrivers(instanceId.getQueryId(), ImmutableList.of(driver), timeOut);
return execution;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 51b689eda2..ca164e9e51 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -108,7 +108,13 @@ public class FragmentInstanceManager {
instance.getTimeFilter(),
dataRegion);
return createFragmentInstanceExecution(
- scheduler, instanceId, context, driver, stateMachine, failedInstances);
+ scheduler,
+ instanceId,
+ context,
+ driver,
+ stateMachine,
+ failedInstances,
+ instance.getTimeOut());
} catch (Throwable t) {
logger.error("error when create FragmentInstanceExecution.", t);
stateMachine.failed(t);
@@ -140,7 +146,13 @@ public class FragmentInstanceManager {
SchemaDriver driver =
planner.plan(instance.getFragment().getRoot(), context, schemaRegion);
return createFragmentInstanceExecution(
- scheduler, instanceId, context, driver, stateMachine, failedInstances);
+ scheduler,
+ instanceId,
+ context,
+ driver,
+ stateMachine,
+ failedInstances,
+ instance.getTimeOut());
} catch (Throwable t) {
logger.error("Execute error caused by ", t);
stateMachine.failed(t);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index ce079f4f3c..f3d33b3df2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -70,7 +70,7 @@ public class DriverScheduler implements IDriverScheduler, IService {
IoTDBDescriptor.getInstance().getConfig().getMaxAllowedConcurrentQueries();
private static final int WORKER_THREAD_NUM =
IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread();
- private static final int QUERY_TIMEOUT_MS =
+ private static final long QUERY_TIMEOUT_MS =
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold();
private final ThreadGroup workerGroups;
private final List<AbstractDriverThread> threads;
@@ -122,10 +122,13 @@ public class DriverScheduler implements IDriverScheduler, IService {
}
@Override
- public void submitDrivers(QueryId queryId, List<IDriver> instances) {
+ public void submitDrivers(QueryId queryId, List<IDriver> instances, long timeOut) {
List<DriverTask> tasks =
instances.stream()
- .map(v -> new DriverTask(v, QUERY_TIMEOUT_MS, DriverTaskStatus.READY))
+ .map(
+ v ->
+ new DriverTask(
+ v, timeOut > 0 ? timeOut : QUERY_TIMEOUT_MS, DriverTaskStatus.READY))
.collect(Collectors.toList());
queryMap
.computeIfAbsent(queryId, v -> Collections.synchronizedSet(new HashSet<>()))
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/IDriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/IDriverScheduler.java
index 704ee4a1a0..7939c25a25 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/IDriverScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/IDriverScheduler.java
@@ -32,8 +32,9 @@ public interface IDriverScheduler {
*
* @param queryId the queryId these instances belong to.
* @param instances the submitted instances.
+ * @param timeOut the query timeout
*/
- void submitDrivers(QueryId queryId, List<IDriver> instances);
+ void submitDrivers(QueryId queryId, List<IDriver> instances, long timeOut);
/**
* Abort all the instances in this query.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index c8a44bdff1..3a4b0aaf44 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.QueryId;
@@ -58,6 +59,7 @@ public class Coordinator {
private static final String COORDINATOR_WRITE_EXECUTOR_NAME = "MPPCoordinatorWrite";
private static final String COORDINATOR_SCHEDULED_EXECUTOR_NAME = "MPPCoordinatorScheduled";
private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 10;
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
INTERNAL_SERVICE_CLIENT_MANAGER =
@@ -86,7 +88,11 @@ public class Coordinator {
Statement statement,
MPPQueryContext queryContext,
IPartitionFetcher partitionFetcher,
- ISchemaFetcher schemaFetcher) {
+ ISchemaFetcher schemaFetcher,
+ long timeOut,
+ long startTime) {
+ queryContext.setTimeOut(timeOut);
+ queryContext.setStartTime(startTime);
if (statement instanceof IConfigStatement) {
queryContext.setQueryType(((IConfigStatement) statement).getQueryType());
return new ConfigExecution(queryContext, statement, executor);
@@ -102,6 +108,43 @@ public class Coordinator {
INTERNAL_SERVICE_CLIENT_MANAGER);
}
+ public ExecutionResult execute(
+ Statement statement,
+ long queryId,
+ SessionInfo session,
+ String sql,
+ IPartitionFetcher partitionFetcher,
+ ISchemaFetcher schemaFetcher,
+ long timeOut) {
+ long startTime = System.currentTimeMillis();
+ QueryId globalQueryId = queryIdGenerator.createNextQueryId();
+ try (SetThreadName queryName = new SetThreadName(globalQueryId.getId())) {
+ if (sql != null && sql.length() > 0) {
+ LOGGER.info("start executing sql: {}", sql);
+ }
+ IQueryExecution execution =
+ createQueryExecution(
+ statement,
+ new MPPQueryContext(
+ sql,
+ globalQueryId,
+ session,
+ DataNodeEndPoints.LOCAL_HOST_DATA_BLOCK_ENDPOINT,
+ DataNodeEndPoints.LOCAL_HOST_INTERNAL_ENDPOINT),
+ partitionFetcher,
+ schemaFetcher,
+ timeOut,
+ startTime);
+ if (execution.isQuery()) {
+ queryExecutionMap.put(queryId, execution);
+ }
+ execution.start();
+
+ return execution.getStatus();
+ }
+ }
+
+ /** This method is called by the write method. So it does not set the timeout parameter. */
public ExecutionResult execute(
Statement statement,
long queryId,
@@ -109,6 +152,7 @@ public class Coordinator {
String sql,
IPartitionFetcher partitionFetcher,
ISchemaFetcher schemaFetcher) {
+ long startTime = System.currentTimeMillis();
QueryId globalQueryId = queryIdGenerator.createNextQueryId();
try (SetThreadName queryName = new SetThreadName(globalQueryId.getId())) {
if (sql != null && sql.length() > 0) {
@@ -124,7 +168,9 @@ public class Coordinator {
DataNodeEndPoints.LOCAL_HOST_DATA_BLOCK_ENDPOINT,
DataNodeEndPoints.LOCAL_HOST_INTERNAL_ENDPOINT),
partitionFetcher,
- schemaFetcher);
+ schemaFetcher,
+ Long.MAX_VALUE,
+ startTime);
if (execution.isQuery()) {
queryExecutionMap.put(queryId, execution);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
index 77c6dda7fe..17c7591b27 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
@@ -92,7 +92,13 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
try {
ExecutionResult executionResult =
coordinator.execute(
- schemaFetchStatement, queryId, null, "", ClusterPartitionFetcher.getInstance(), this);
+ schemaFetchStatement,
+ queryId,
+ null,
+ "",
+ ClusterPartitionFetcher.getInstance(),
+ this,
+ config.getQueryTimeoutThreshold());
// TODO: (xingtanzjr) throw exception
if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new RuntimeException(
@@ -330,7 +336,13 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
long queryId = SessionManager.getInstance().requestQueryId(false);
ExecutionResult executionResult =
coordinator.execute(
- statement, queryId, null, "", ClusterPartitionFetcher.getInstance(), this);
+ statement,
+ queryId,
+ null,
+ "",
+ ClusterPartitionFetcher.getInstance(),
+ this,
+ config.getQueryTimeoutThreshold());
// TODO: throw exception
int statusCode = executionResult.status.getCode();
if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 9c9d290052..666e7cf683 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
import org.apache.iotdb.db.mpp.execution.QueryState;
@@ -170,6 +171,12 @@ public class QueryExecution implements IQueryExecution {
stateMachine.transitionToRunning();
return;
}
+ long remainTime = context.getTimeOut() - (System.currentTimeMillis() - context.getStartTime());
+ if (remainTime <= 0) {
+ throw new QueryTimeoutRuntimeException();
+ }
+ context.setTimeOut(remainTime);
+
doLogicalPlan();
doDistributedPlan();
if (context.getQueryType() == QueryType.READ) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 80b92f4f45..854bb15467 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -91,7 +91,8 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
new PlanFragment(fragment.getId(), rootCopy),
fragment.getId().genFragmentInstanceId(),
timeFilter,
- queryContext.getQueryType());
+ queryContext.getQueryType(),
+ queryContext.getTimeOut());
// Get the target region for origin PlanFragment, then its instance will be distributed one
// of them.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
index fb91a0c2fe..4f043a05b9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
@@ -61,7 +61,8 @@ public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner {
new PlanFragment(fragment.getId(), split),
fragment.getId().genFragmentInstanceId(),
timeFilter,
- queryContext.getQueryType());
+ queryContext.getQueryType(),
+ queryContext.getTimeOut());
instance.setDataRegionAndHost(split.getRegionReplicaSet());
ret.add(instance);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
index f617ff9f02..bcc54931de 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.runtime.SerializationRunTimeException;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
@@ -46,6 +47,8 @@ public class FragmentInstance implements IConsensusRequest {
private final Logger logger = LoggerFactory.getLogger(FragmentInstance.class);
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
private final FragmentInstanceId id;
private final QueryType type;
// The reference of PlanFragment which this instance is generated from
@@ -58,15 +61,22 @@ public class FragmentInstance implements IConsensusRequest {
private Filter timeFilter;
+ private final long timeOut;
+
// We can add some more params for a specific FragmentInstance
// So that we can make different FragmentInstance owns different data range.
public FragmentInstance(
- PlanFragment fragment, FragmentInstanceId id, Filter timeFilter, QueryType type) {
+ PlanFragment fragment,
+ FragmentInstanceId id,
+ Filter timeFilter,
+ QueryType type,
+ long timeOut) {
this.fragment = fragment;
this.timeFilter = timeFilter;
this.id = id;
this.type = type;
+ this.timeOut = timeOut > 0 ? timeOut : config.getQueryTimeoutThreshold();
}
public TRegionReplicaSet getDataRegionId() {
@@ -149,17 +159,19 @@ public class FragmentInstance implements IConsensusRequest {
getRegionReplicaSet() == null ? "Not set" : getRegionReplicaSet().getRegionId()));
ret.append("\n---- Plan Node Tree ----\n");
ret.append(PlanNodeUtil.nodeToString(getFragment().getRoot()));
+ ret.append(String.format("timeOut-%s:", getTimeOut()));
return ret.toString();
}
public static FragmentInstance deserializeFrom(ByteBuffer buffer) {
FragmentInstanceId id = FragmentInstanceId.deserialize(buffer);
PlanFragment planFragment = PlanFragment.deserialize(buffer);
+ long timeOut = ReadWriteIOUtils.readLong(buffer);
boolean hasTimeFilter = ReadWriteIOUtils.readBool(buffer);
Filter timeFilter = hasTimeFilter ? FilterFactory.deserialize(buffer) : null;
QueryType queryType = QueryType.values()[ReadWriteIOUtils.readInt(buffer)];
FragmentInstance fragmentInstance =
- new FragmentInstance(planFragment, id, timeFilter, queryType);
+ new FragmentInstance(planFragment, id, timeFilter, queryType, timeOut);
boolean hasHostDataNode = ReadWriteIOUtils.readBool(buffer);
fragmentInstance.hostDataNode =
hasHostDataNode ? ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer) : null;
@@ -171,6 +183,7 @@ public class FragmentInstance implements IConsensusRequest {
DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
id.serialize(outputStream);
fragment.serialize(outputStream);
+ ReadWriteIOUtils.write(timeOut, outputStream);
ReadWriteIOUtils.write(timeFilter != null, outputStream);
if (timeFilter != null) {
timeFilter.serialize(outputStream);
@@ -208,4 +221,8 @@ public class FragmentInstance implements IConsensusRequest {
public TDataNodeLocation getHostDataNode() {
return hostDataNode;
}
+
+ public long getTimeOut() {
+ return timeOut;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/GrafanaApiServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/GrafanaApiServiceImpl.java
index 3ab3779fc4..9978d558d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/GrafanaApiServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/GrafanaApiServiceImpl.java
@@ -117,7 +117,13 @@ public class GrafanaApiServiceImpl extends GrafanaApiService {
// create and cache dataset
ExecutionResult result =
COORDINATOR.execute(
- statement, queryId, null, sql.getSql(), PARTITION_FETCHER, SCHEMA_FETCHER);
+ statement,
+ queryId,
+ null,
+ sql.getSql(),
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER,
+ config.getQueryTimeoutThreshold());
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return Response.ok()
.entity(
@@ -172,7 +178,14 @@ public class GrafanaApiServiceImpl extends GrafanaApiService {
final long queryId = SESSION_MANAGER.requestQueryId(true);
// create and cache dataset
ExecutionResult result =
- COORDINATOR.execute(statement, queryId, null, sql, PARTITION_FETCHER, SCHEMA_FETCHER);
+ COORDINATOR.execute(
+ statement,
+ queryId,
+ null,
+ sql,
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER,
+ config.getQueryTimeoutThreshold());
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return Response.ok()
.entity(
@@ -221,7 +234,14 @@ public class GrafanaApiServiceImpl extends GrafanaApiService {
final long queryId = SESSION_MANAGER.requestQueryId(true);
// create and cache dataset
ExecutionResult result =
- COORDINATOR.execute(statement, queryId, null, sql, PARTITION_FETCHER, SCHEMA_FETCHER);
+ COORDINATOR.execute(
+ statement,
+ queryId,
+ null,
+ sql,
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER,
+ config.getQueryTimeoutThreshold());
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return Response.ok()
.entity(
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/RestApiServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/RestApiServiceImpl.java
index f0fa2b0c04..34dbe2ffcd 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/RestApiServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/RestApiServiceImpl.java
@@ -97,7 +97,8 @@ public class RestApiServiceImpl extends RestApiService {
null,
sql.getSql(),
PARTITION_FETCHER,
- SCHEMA_FETCHER);
+ SCHEMA_FETCHER,
+ config.getQueryTimeoutThreshold());
return Response.ok()
.entity(
@@ -130,7 +131,13 @@ public class RestApiServiceImpl extends RestApiService {
// create and cache dataset
ExecutionResult result =
COORDINATOR.execute(
- statement, queryId, null, sql.getSql(), PARTITION_FETCHER, SCHEMA_FETCHER);
+ statement,
+ queryId,
+ null,
+ sql.getSql(),
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER,
+ config.getQueryTimeoutThreshold());
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return Response.ok()
.entity(
@@ -174,7 +181,8 @@ public class RestApiServiceImpl extends RestApiService {
null,
"",
PARTITION_FETCHER,
- SCHEMA_FETCHER);
+ SCHEMA_FETCHER,
+ config.getQueryTimeoutThreshold());
return Response.ok()
.entity(
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
index 09dad0f03a..67785a6b08 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
@@ -21,6 +21,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
@@ -54,6 +55,7 @@ public class MPPPublishHandler extends AbstractInterceptHandler {
private static final Logger LOG = LoggerFactory.getLogger(MPPPublishHandler.class);
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private final SessionManager SESSION_MANAGER = SessionManager.getInstance();
private long sessionId;
private final PayloadFormatter payloadFormat;
@@ -146,7 +148,8 @@ public class MPPPublishHandler extends AbstractInterceptHandler {
SESSION_MANAGER.getSessionInfo(sessionId),
"",
partitionFetcher,
- schemaFetcher);
+ schemaFetcher,
+ config.getQueryTimeoutThreshold());
tsStatus = result.status;
}
} catch (Exception e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index e87958510f..77e59defd9 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -565,7 +565,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
SESSION_MANAGER.getSessionInfo(req.sessionId),
statement,
PARTITION_FETCHER,
- SCHEMA_FETCHER);
+ SCHEMA_FETCHER,
+ req.getTimeout());
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& result.status.code != TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
@@ -638,7 +639,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
SESSION_MANAGER.getSessionInfo(req.sessionId),
statement,
PARTITION_FETCHER,
- SCHEMA_FETCHER);
+ SCHEMA_FETCHER,
+ config.getQueryTimeoutThreshold());
addOperationLatency(Operation.EXECUTE_ONE_SQL_IN_BATCH, t2);
results.add(result.status);
} catch (Exception e) {
@@ -1135,7 +1137,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
SESSION_MANAGER.getSessionInfo(req.sessionId),
"",
PARTITION_FETCHER,
- SCHEMA_FETCHER);
+ SCHEMA_FETCHER,
+ req.getTimeout());
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new RuntimeException("error code: " + result.status);
@@ -1195,7 +1198,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
SESSION_MANAGER.getSessionInfo(req.sessionId),
"",
PARTITION_FETCHER,
- SCHEMA_FETCHER);
+ SCHEMA_FETCHER,
+ req.getTimeout());
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new RuntimeException("error code: " + result.status);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
index 0d57b138e3..b40339d80a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.execution.schedule;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
@@ -39,6 +40,8 @@ import java.util.List;
public class DriverSchedulerTest {
private final DriverScheduler manager = DriverScheduler.getInstance();
+ private static final long QUERY_TIMEOUT_MS =
+ IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold();
@After
public void tearDown() {
@@ -63,7 +66,7 @@ public class DriverSchedulerTest {
IDriver mockDriver2 = Mockito.mock(IDriver.class);
Mockito.when(mockDriver2.getInfo()).thenReturn(instanceId2);
List<IDriver> instances = Arrays.asList(mockDriver1, mockDriver2);
- manager.submitDrivers(queryId, instances);
+ manager.submitDrivers(queryId, instances, QUERY_TIMEOUT_MS);
Assert.assertTrue(manager.getBlockedTasks().isEmpty());
Assert.assertEquals(1, manager.getQueryMap().size());
Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
@@ -83,7 +86,7 @@ public class DriverSchedulerTest {
IDriver mockDriver3 = Mockito.mock(IDriver.class);
FragmentInstanceId instanceId3 = new FragmentInstanceId(fragmentId, "inst-2");
Mockito.when(mockDriver3.getInfo()).thenReturn(instanceId3);
- manager.submitDrivers(queryId, Collections.singletonList(mockDriver3));
+ manager.submitDrivers(queryId, Collections.singletonList(mockDriver3), QUERY_TIMEOUT_MS);
Assert.assertTrue(manager.getBlockedTasks().isEmpty());
Assert.assertEquals(1, manager.getQueryMap().size());
Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
@@ -101,7 +104,7 @@ public class DriverSchedulerTest {
FragmentInstanceId instanceId4 = new FragmentInstanceId(fragmentId2, "inst-0");
IDriver mockDriver4 = Mockito.mock(IDriver.class);
Mockito.when(mockDriver4.getInfo()).thenReturn(instanceId4);
- manager.submitDrivers(queryId2, Collections.singletonList(mockDriver4));
+ manager.submitDrivers(queryId2, Collections.singletonList(mockDriver4), QUERY_TIMEOUT_MS);
Assert.assertTrue(manager.getBlockedTasks().isEmpty());
Assert.assertEquals(2, manager.getQueryMap().size());
Assert.assertTrue(manager.getQueryMap().containsKey(queryId2));
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java
index c1e7bc591b..a8978a5ec3 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java
@@ -138,7 +138,14 @@ public class StandaloneCoordinatorTest {
private void executeStatement(Statement statement, boolean isDataQuery) {
long queryId = SessionManager.getInstance().requestQueryId(isDataQuery);
ExecutionResult executionResult =
- coordinator.execute(statement, queryId, null, "", partitionFetcher, schemaFetcher);
+ coordinator.execute(
+ statement,
+ queryId,
+ null,
+ "",
+ partitionFetcher,
+ schemaFetcher,
+ conf.getQueryTimeoutThreshold());
try {
int statusCode = executionResult.status.getCode();
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), statusCode);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java
index ebcd805c0b..ca5def4f03 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
@@ -47,6 +49,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class FragmentInstanceSerdeTest {
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@Test
public void testSerializeAndDeserializeForTree1() throws IllegalPathException {
@@ -64,7 +67,8 @@ public class FragmentInstanceSerdeTest {
new PlanFragment(planFragmentId, constructPlanNodeTree()),
planFragmentId.genFragmentInstanceId(),
new GroupByFilter(1, 2, 3, 4),
- QueryType.READ);
+ QueryType.READ,
+ config.getQueryTimeoutThreshold());
TRegionReplicaSet regionReplicaSet =
new TRegionReplicaSet(
new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
@@ -96,7 +100,8 @@ public class FragmentInstanceSerdeTest {
new PlanFragment(planFragmentId, constructPlanNodeTree()),
planFragmentId.genFragmentInstanceId(),
null,
- QueryType.READ);
+ QueryType.READ,
+ config.getQueryTimeoutThreshold());
TRegionReplicaSet regionReplicaSet =
new TRegionReplicaSet(
new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
index 7a7d562b1c..a5c65def26 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
@@ -134,7 +134,8 @@ public class StandaloneSchedulerTest {
planFragment,
planFragment.getId().genFragmentInstanceId(),
new GroupByFilter(1, 2, 3, 4),
- QueryType.WRITE);
+ QueryType.WRITE,
+ conf.getQueryTimeoutThreshold());
fragmentInstance.setDataRegionAndHost(regionReplicaSet);
configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln.wf01.wt01.status"));
@@ -237,7 +238,8 @@ public class StandaloneSchedulerTest {
planFragment,
planFragment.getId().genFragmentInstanceId(),
new GroupByFilter(1, 2, 3, 4),
- QueryType.WRITE);
+ QueryType.WRITE,
+ conf.getQueryTimeoutThreshold());
fragmentInstance.setDataRegionAndHost(regionReplicaSet);
configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln.wf01.GPS"));
@@ -350,7 +352,8 @@ public class StandaloneSchedulerTest {
planFragment,
planFragment.getId().genFragmentInstanceId(),
new GroupByFilter(1, 2, 3, 4),
- QueryType.WRITE);
+ QueryType.WRITE,
+ conf.getQueryTimeoutThreshold());
fragmentInstance.setDataRegionAndHost(regionReplicaSet);
configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln.d3"));
@@ -401,7 +404,8 @@ public class StandaloneSchedulerTest {
planFragment,
planFragment.getId().genFragmentInstanceId(),
new GroupByFilter(1, 2, 3, 4),
- QueryType.WRITE);
+ QueryType.WRITE,
+ conf.getQueryTimeoutThreshold());
fragmentInstance.setDataRegionAndHost(regionReplicaSet);
configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath(deviceId));
@@ -482,7 +486,8 @@ public class StandaloneSchedulerTest {
planFragment,
planFragment.getId().genFragmentInstanceId(),
new GroupByFilter(1, 2, 3, 4),
- QueryType.WRITE);
+ QueryType.WRITE,
+ conf.getQueryTimeoutThreshold());
fragmentInstance.setDataRegionAndHost(regionReplicaSet);
configNode.getBelongedSchemaRegionIdWithAutoCreate(deviceId);
diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java b/session/src/main/java/org/apache/iotdb/session/Config.java
index 6304fb5a5a..23dd73c5e2 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/session/src/main/java/org/apache/iotdb/session/Config.java
@@ -37,6 +37,8 @@ public class Config {
public static final int RETRY_NUM = 3;
public static final long RETRY_INTERVAL_MS = 1000;
+ public static final long DEFAULT_QUERY_TIME_OUT = 60000;
+
/** thrift init buffer size, 1KB by default */
public static final int DEFAULT_INITIAL_BUFFER_CAPACITY = 1024;
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 854450f85d..df5930187d 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -723,17 +723,18 @@ public class Session {
* @throws StatementExecutionException statement is not right
* @throws IoTDBConnectionException the network is not good
*/
- public SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime)
+ public SessionDataSet executeRawDataQuery(
+ List<String> paths, long startTime, long endTime, long timeOut)
throws StatementExecutionException, IoTDBConnectionException {
try {
- return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime);
+ return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime, timeOut);
} catch (RedirectException e) {
handleQueryRedirection(e.getEndPoint());
if (enableQueryRedirection) {
logger.debug("redirect query {} to {}", paths, e.getEndPoint());
// retry
try {
- return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime);
+ return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime, timeOut);
} catch (RedirectException redirectException) {
logger.error("Redirect twice", redirectException);
throw new StatementExecutionException("Redirect twice, please try again.");
@@ -751,16 +752,16 @@ public class Session {
* @param LastTime get the last data, whose timestamp is greater than or equal LastTime e.g.
* 1621326244168
*/
- public SessionDataSet executeLastDataQuery(List<String> paths, long LastTime)
+ public SessionDataSet executeLastDataQuery(List<String> paths, long LastTime, long timeOut)
throws StatementExecutionException, IoTDBConnectionException {
try {
- return defaultSessionConnection.executeLastDataQuery(paths, LastTime);
+ return defaultSessionConnection.executeLastDataQuery(paths, LastTime, timeOut);
} catch (RedirectException e) {
handleQueryRedirection(e.getEndPoint());
if (enableQueryRedirection) {
// retry
try {
- return defaultSessionConnection.executeLastDataQuery(paths, LastTime);
+ return defaultSessionConnection.executeLastDataQuery(paths, LastTime, timeOut);
} catch (RedirectException redirectException) {
logger.error("redirect twice", redirectException);
throw new StatementExecutionException("redirect twice, please try again.");
@@ -779,7 +780,7 @@ public class Session {
public SessionDataSet executeLastDataQuery(List<String> paths)
throws StatementExecutionException, IoTDBConnectionException {
long time = 0L;
- return executeLastDataQuery(paths, time);
+ return executeLastDataQuery(paths, time, 60000);
}
/**
@@ -2501,6 +2502,7 @@ public class Session {
private int thriftMaxFrameSize = Config.DEFAULT_MAX_FRAME_SIZE;
private boolean enableCacheLeader = Config.DEFAULT_CACHE_LEADER_MODE;
private Version version = Config.DEFAULT_VERSION;
+ private long timeOut = Config.DEFAULT_QUERY_TIME_OUT;
private List<String> nodeUrls = null;
@@ -2559,6 +2561,11 @@ public class Session {
return this;
}
+ public Builder timeOut(long timeOut) {
+ this.timeOut = timeOut;
+ return this;
+ }
+
public Session build() {
if (nodeUrls != null
&& (!Config.DEFAULT_HOST.equals(host) || rpcPort != Config.DEFAULT_PORT)) {
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 3c2ab44b72..38687fea5e 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -391,11 +391,13 @@ public class SessionConnection {
}
}
- protected SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime)
+ protected SessionDataSet executeRawDataQuery(
+ List<String> paths, long startTime, long endTime, long timeOut)
throws StatementExecutionException, IoTDBConnectionException, RedirectException {
TSRawDataQueryReq execReq =
new TSRawDataQueryReq(sessionId, paths, startTime, endTime, statementId);
execReq.setFetchSize(session.fetchSize);
+ execReq.setTimeout(timeOut);
TSExecuteStatementResp execResp;
try {
execReq.setEnableRedirectQuery(enableRedirect);
@@ -429,12 +431,13 @@ public class SessionConnection {
execResp.isIgnoreTimeStamp());
}
- protected SessionDataSet executeLastDataQuery(List<String> paths, long time)
+ protected SessionDataSet executeLastDataQuery(List<String> paths, long time, long timeOut)
throws StatementExecutionException, IoTDBConnectionException, RedirectException {
TSLastDataQueryReq tsLastDataQueryReq =
new TSLastDataQueryReq(sessionId, paths, time, statementId);
tsLastDataQueryReq.setFetchSize(session.fetchSize);
tsLastDataQueryReq.setEnableRedirectQuery(enableRedirect);
+ tsLastDataQueryReq.setTimeout(timeOut);
TSExecuteStatementResp tsExecuteStatementResp;
try {
tsExecuteStatementResp = client.executeLastDataQuery(tsLastDataQueryReq);
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 37249b9971..f46718f9c0 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -2235,12 +2235,13 @@ public class SessionPool {
}
@SuppressWarnings("squid:S2095") // Suppress wrapper not closed warning
- public SessionDataSetWrapper executeRawDataQuery(List<String> paths, long startTime, long endTime)
+ public SessionDataSetWrapper executeRawDataQuery(
+ List<String> paths, long startTime, long endTime, long timeOut)
throws IoTDBConnectionException, StatementExecutionException {
for (int i = 0; i < RETRY; i++) {
Session session = getSession();
try {
- SessionDataSet resp = session.executeRawDataQuery(paths, startTime, endTime);
+ SessionDataSet resp = session.executeRawDataQuery(paths, startTime, endTime, timeOut);
SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
occupy(session);
return wrapper;
diff --git a/thrift/src/main/thrift/client.thrift b/thrift/src/main/thrift/client.thrift
index 4a4fab7d05..f2ecbeedea 100644
--- a/thrift/src/main/thrift/client.thrift
+++ b/thrift/src/main/thrift/client.thrift
@@ -215,6 +215,7 @@ struct TSInsertStringRecordReq {
4: required list<string> values
5: required i64 timestamp
6: optional bool isAligned
+ 7: optional i64 timeout
}
struct TSInsertTabletReq {
@@ -315,6 +316,7 @@ struct TSRawDataQueryReq {
6: required i64 statementId
7: optional bool enableRedirectQuery;
8: optional bool jdbcQuery;
+ 9: optional i64 timeout
}
struct TSLastDataQueryReq {
@@ -325,6 +327,7 @@ struct TSLastDataQueryReq {
5: required i64 statementId
6: optional bool enableRedirectQuery;
7: optional bool jdbcQuery;
+ 8: optional i64 timeout
}
struct TSCreateMultiTimeseriesReq {