You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/07/20 13:23:40 UTC

[iotdb] branch master updated: [IOTDB-3721] Keep each FI's execution timeout consistent with timeout set in session (#6663)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d4d8c0d73 [IOTDB-3721] Keep each FI's execution timeout consistent with timeout set in session (#6663)
6d4d8c0d73 is described below

commit 6d4d8c0d738f21395b629c59f49ae5a271303613
Author: FangJL <37...@users.noreply.github.com>
AuthorDate: Wed Jul 20 21:23:33 2022 +0800

    [IOTDB-3721] Keep each FI's execution timeout consistent with timeout set in session (#6663)
---
 .../main/java/org/apache/iotdb/SessionExample.java |  5 ++-
 .../iotdb/SyntaxConventionRelatedExample.java      |  4 +-
 .../iotdb/session/IoTDBSessionComplexIT.java       |  2 +-
 .../apache/iotdb/session/pool/SessionPoolTest.java |  2 +-
 .../apache/iotdb/session/template/TemplateUT.java  |  5 ++-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  6 +--
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  4 +-
 .../iotdb/db/mpp/common/MPPQueryContext.java       | 36 ++++++++++++++++
 .../fragment/FragmentInstanceExecution.java        |  5 ++-
 .../fragment/FragmentInstanceManager.java          | 16 ++++++-
 .../db/mpp/execution/schedule/DriverScheduler.java |  9 ++--
 .../mpp/execution/schedule/IDriverScheduler.java   |  3 +-
 .../org/apache/iotdb/db/mpp/plan/Coordinator.java  | 50 +++++++++++++++++++++-
 .../db/mpp/plan/analyze/ClusterSchemaFetcher.java  | 16 ++++++-
 .../db/mpp/plan/execution/QueryExecution.java      |  7 +++
 .../SimpleFragmentParallelPlanner.java             |  3 +-
 .../distribution/WriteFragmentParallelPlanner.java |  3 +-
 .../db/mpp/plan/planner/plan/FragmentInstance.java | 21 ++++++++-
 .../mpprest/impl/GrafanaApiServiceImpl.java        | 26 +++++++++--
 .../protocol/mpprest/impl/RestApiServiceImpl.java  | 14 ++++--
 .../iotdb/db/protocol/mqtt/MPPPublishHandler.java  |  5 ++-
 .../service/thrift/impl/ClientRPCServiceImpl.java  | 12 ++++--
 .../execution/schedule/DriverSchedulerTest.java    |  9 ++--
 .../db/mpp/plan/StandaloneCoordinatorTest.java     |  9 +++-
 .../mpp/plan/plan/FragmentInstanceSerdeTest.java   |  9 +++-
 .../plan/scheduler/StandaloneSchedulerTest.java    | 15 ++++---
 .../main/java/org/apache/iotdb/session/Config.java |  2 +
 .../java/org/apache/iotdb/session/Session.java     | 21 ++++++---
 .../apache/iotdb/session/SessionConnection.java    |  7 ++-
 .../org/apache/iotdb/session/pool/SessionPool.java |  5 ++-
 thrift/src/main/thrift/client.thrift               |  3 ++
 31 files changed, 273 insertions(+), 61 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index d779086480..43d93cdf97 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -754,8 +754,9 @@ public class SessionExample {
     paths.add(ROOT_SG1_D1_S3);
     long startTime = 10L;
     long endTime = 200L;
+    long timeOut = 60000;
 
-    try (SessionDataSet dataSet = session.executeRawDataQuery(paths, startTime, endTime)) {
+    try (SessionDataSet dataSet = session.executeRawDataQuery(paths, startTime, endTime, timeOut)) {
 
       System.out.println(dataSet.getColumnNames());
       dataSet.setFetchSize(1024);
@@ -770,7 +771,7 @@ public class SessionExample {
     paths.add(ROOT_SG1_D1_S1);
     paths.add(ROOT_SG1_D1_S2);
     paths.add(ROOT_SG1_D1_S3);
-    try (SessionDataSet sessionDataSet = session.executeLastDataQuery(paths, 3)) {
+    try (SessionDataSet sessionDataSet = session.executeLastDataQuery(paths, 3, 60000)) {
       System.out.println(sessionDataSet.getColumnNames());
       sessionDataSet.setFetchSize(1024);
       while (sessionDataSet.hasNext()) {
diff --git a/example/session/src/main/java/org/apache/iotdb/SyntaxConventionRelatedExample.java b/example/session/src/main/java/org/apache/iotdb/SyntaxConventionRelatedExample.java
index 362702f76a..f6529e2910 100644
--- a/example/session/src/main/java/org/apache/iotdb/SyntaxConventionRelatedExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SyntaxConventionRelatedExample.java
@@ -109,8 +109,10 @@ public class SyntaxConventionRelatedExample {
 
     long startTime = 1L;
     long endTime = 100L;
+    long timeOut = 60000;
 
-    try (SessionDataSet dataSet1 = session.executeRawDataQuery(paths, startTime, endTime)) {
+    try (SessionDataSet dataSet1 =
+        session.executeRawDataQuery(paths, startTime, endTime, timeOut)) {
 
       System.out.println(dataSet1.getColumnNames());
       dataSet1.setFetchSize(1024);
diff --git a/integration/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java b/integration/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java
index 3aeb9de5ee..114508878a 100644
--- a/integration/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java
+++ b/integration/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java
@@ -593,7 +593,7 @@ public class IoTDBSessionComplexIT {
     paths.add("root.sg1.d2.s1");
     paths.add("root.sg1.d2.s2");
 
-    SessionDataSet sessionDataSet = session.executeRawDataQuery(paths, 450L, 600L);
+    SessionDataSet sessionDataSet = session.executeRawDataQuery(paths, 450L, 600L, 60000);
     sessionDataSet.setFetchSize(1024);
 
     int count = 0;
diff --git a/integration/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java b/integration/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
index 2e8981d578..27ac0c8a69 100644
--- a/integration/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
+++ b/integration/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
@@ -209,7 +209,7 @@ public class SessionPoolTest {
       service.submit(
           () -> {
             try {
-              SessionDataSetWrapper wrapper = pool.executeRawDataQuery(pathList, no, no + 1);
+              SessionDataSetWrapper wrapper = pool.executeRawDataQuery(pathList, no, no + 1, 60000);
               if (wrapper.hasNext()) {
                 Assert.assertEquals(no, wrapper.sessionDataSet.next().getTimestamp());
               }
diff --git a/integration/src/test/java/org/apache/iotdb/session/template/TemplateUT.java b/integration/src/test/java/org/apache/iotdb/session/template/TemplateUT.java
index 6ea9dc5b97..5548c461df 100644
--- a/integration/src/test/java/org/apache/iotdb/session/template/TemplateUT.java
+++ b/integration/src/test/java/org/apache/iotdb/session/template/TemplateUT.java
@@ -352,7 +352,7 @@ public class TemplateUT {
         Collections.singletonList(12345L));
 
     SessionDataSet res =
-        session.executeRawDataQuery(Collections.singletonList("root.sg.v1.*"), 0L, 999L);
+        session.executeRawDataQuery(Collections.singletonList("root.sg.v1.*"), 0L, 999L, 60000);
     while (res.hasNext()) {
       RowRecord rec = res.next();
       // correspond to x, y, append
@@ -366,7 +366,8 @@ public class TemplateUT {
         Collections.singletonList(TSDataType.INT64),
         Collections.singletonList(12345L));
 
-    res = session.executeRawDataQuery(Collections.singletonList("root.sg.v1.d0.*"), 0L, 999L);
+    res =
+        session.executeRawDataQuery(Collections.singletonList("root.sg.v1.d0.*"), 0L, 999L, 60000);
     while (res.hasNext()) {
       RowRecord rec = res.next();
       // x is not inside template
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 66d48dfa21..3df27bacd3 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -516,7 +516,7 @@ public class IoTDBConfig {
   private long cacheFileReaderClearPeriod = 100000;
 
   /** the max executing time of query in ms. Unit: millisecond */
-  private int queryTimeoutThreshold = 60000;
+  private long queryTimeoutThreshold = 60000;
 
   /** the max time to live of a session in ms. Unit: millisecond */
   private int sessionTimeoutThreshold = 0;
@@ -1489,11 +1489,11 @@ public class IoTDBConfig {
     this.cacheFileReaderClearPeriod = cacheFileReaderClearPeriod;
   }
 
-  public int getQueryTimeoutThreshold() {
+  public long getQueryTimeoutThreshold() {
     return queryTimeoutThreshold;
   }
 
-  public void setQueryTimeoutThreshold(int queryTimeoutThreshold) {
+  public void setQueryTimeoutThreshold(long queryTimeoutThreshold) {
     this.queryTimeoutThreshold = queryTimeoutThreshold;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 2c3a2a6ed0..046cebbdc0 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -412,9 +412,9 @@ public class IoTDBDescriptor {
       conf.setSubCompactionTaskNum(subtaskNum);
 
       conf.setQueryTimeoutThreshold(
-          Integer.parseInt(
+          Long.parseLong(
               properties.getProperty(
-                  "query_timeout_threshold", Integer.toString(conf.getQueryTimeoutThreshold()))));
+                  "query_timeout_threshold", Long.toString(conf.getQueryTimeoutThreshold()))));
 
       conf.setSessionTimeoutThreshold(
           Integer.parseInt(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
index 153b1e8598..81e1330eca 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
@@ -30,6 +30,8 @@ public class MPPQueryContext {
   private QueryId queryId;
   private SessionInfo session;
   private QueryType queryType = QueryType.READ;
+  private long timeOut;
+  private long startTime;
 
   private TEndPoint localDataBlockEndpoint;
   private TEndPoint localInternalEndpoint;
@@ -53,6 +55,24 @@ public class MPPQueryContext {
     this.resultNodeContext = new ResultNodeContext(queryId);
   }
 
+  public MPPQueryContext(
+      String sql,
+      QueryId queryId,
+      SessionInfo session,
+      TEndPoint localDataBlockEndpoint,
+      TEndPoint localInternalEndpoint,
+      long timeOut,
+      long startTime) {
+    this.sql = sql;
+    this.queryId = queryId;
+    this.session = session;
+    this.localDataBlockEndpoint = localDataBlockEndpoint;
+    this.localInternalEndpoint = localInternalEndpoint;
+    this.resultNodeContext = new ResultNodeContext(queryId);
+    this.timeOut = timeOut;
+    this.startTime = startTime;
+  }
+
   public QueryId getQueryId() {
     return queryId;
   }
@@ -61,6 +81,14 @@ public class MPPQueryContext {
     return queryType;
   }
 
+  public long getTimeOut() {
+    return timeOut;
+  }
+
+  public void setTimeOut(long timeOut) {
+    this.timeOut = timeOut;
+  }
+
   public void setQueryType(QueryType queryType) {
     this.queryType = queryType;
   }
@@ -80,4 +108,12 @@ public class MPPQueryContext {
   public SessionInfo getSession() {
     return session;
   }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index c1fd9568a7..767487fc67 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -51,11 +51,12 @@ public class FragmentInstanceExecution {
       FragmentInstanceContext context,
       IDriver driver,
       FragmentInstanceStateMachine stateMachine,
-      CounterStat failedInstances) {
+      CounterStat failedInstances,
+      long timeOut) {
     FragmentInstanceExecution execution =
         new FragmentInstanceExecution(instanceId, context, driver, stateMachine);
     execution.initialize(failedInstances, scheduler);
-    scheduler.submitDrivers(instanceId.getQueryId(), ImmutableList.of(driver));
+    scheduler.submitDrivers(instanceId.getQueryId(), ImmutableList.of(driver), timeOut);
     return execution;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 51b689eda2..ca164e9e51 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -108,7 +108,13 @@ public class FragmentInstanceManager {
                           instance.getTimeFilter(),
                           dataRegion);
                   return createFragmentInstanceExecution(
-                      scheduler, instanceId, context, driver, stateMachine, failedInstances);
+                      scheduler,
+                      instanceId,
+                      context,
+                      driver,
+                      stateMachine,
+                      failedInstances,
+                      instance.getTimeOut());
                 } catch (Throwable t) {
                   logger.error("error when create FragmentInstanceExecution.", t);
                   stateMachine.failed(t);
@@ -140,7 +146,13 @@ public class FragmentInstanceManager {
                 SchemaDriver driver =
                     planner.plan(instance.getFragment().getRoot(), context, schemaRegion);
                 return createFragmentInstanceExecution(
-                    scheduler, instanceId, context, driver, stateMachine, failedInstances);
+                    scheduler,
+                    instanceId,
+                    context,
+                    driver,
+                    stateMachine,
+                    failedInstances,
+                    instance.getTimeOut());
               } catch (Throwable t) {
                 logger.error("Execute error caused by ", t);
                 stateMachine.failed(t);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index ce079f4f3c..f3d33b3df2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -70,7 +70,7 @@ public class DriverScheduler implements IDriverScheduler, IService {
       IoTDBDescriptor.getInstance().getConfig().getMaxAllowedConcurrentQueries();
   private static final int WORKER_THREAD_NUM =
       IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread();
-  private static final int QUERY_TIMEOUT_MS =
+  private static final long QUERY_TIMEOUT_MS =
       IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold();
   private final ThreadGroup workerGroups;
   private final List<AbstractDriverThread> threads;
@@ -122,10 +122,13 @@ public class DriverScheduler implements IDriverScheduler, IService {
   }
 
   @Override
-  public void submitDrivers(QueryId queryId, List<IDriver> instances) {
+  public void submitDrivers(QueryId queryId, List<IDriver> instances, long timeOut) {
     List<DriverTask> tasks =
         instances.stream()
-            .map(v -> new DriverTask(v, QUERY_TIMEOUT_MS, DriverTaskStatus.READY))
+            .map(
+                v ->
+                    new DriverTask(
+                        v, timeOut > 0 ? timeOut : QUERY_TIMEOUT_MS, DriverTaskStatus.READY))
             .collect(Collectors.toList());
     queryMap
         .computeIfAbsent(queryId, v -> Collections.synchronizedSet(new HashSet<>()))
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/IDriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/IDriverScheduler.java
index 704ee4a1a0..7939c25a25 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/IDriverScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/IDriverScheduler.java
@@ -32,8 +32,9 @@ public interface IDriverScheduler {
    *
    * @param queryId the queryId these instances belong to.
    * @param instances the submitted instances.
+   * @param timeOut the query timeout
    */
-  void submitDrivers(QueryId queryId, List<IDriver> instances);
+  void submitDrivers(QueryId queryId, List<IDriver> instances, long timeOut);
 
   /**
    * Abort all the instances in this query.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index c8a44bdff1..3a4b0aaf44 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
@@ -58,6 +59,7 @@ public class Coordinator {
   private static final String COORDINATOR_WRITE_EXECUTOR_NAME = "MPPCoordinatorWrite";
   private static final String COORDINATOR_SCHEDULED_EXECUTOR_NAME = "MPPCoordinatorScheduled";
   private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 10;
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
   private static final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
       INTERNAL_SERVICE_CLIENT_MANAGER =
@@ -86,7 +88,11 @@ public class Coordinator {
       Statement statement,
       MPPQueryContext queryContext,
       IPartitionFetcher partitionFetcher,
-      ISchemaFetcher schemaFetcher) {
+      ISchemaFetcher schemaFetcher,
+      long timeOut,
+      long startTime) {
+    queryContext.setTimeOut(timeOut);
+    queryContext.setStartTime(startTime);
     if (statement instanceof IConfigStatement) {
       queryContext.setQueryType(((IConfigStatement) statement).getQueryType());
       return new ConfigExecution(queryContext, statement, executor);
@@ -102,6 +108,43 @@ public class Coordinator {
         INTERNAL_SERVICE_CLIENT_MANAGER);
   }
 
+  public ExecutionResult execute(
+      Statement statement,
+      long queryId,
+      SessionInfo session,
+      String sql,
+      IPartitionFetcher partitionFetcher,
+      ISchemaFetcher schemaFetcher,
+      long timeOut) {
+    long startTime = System.currentTimeMillis();
+    QueryId globalQueryId = queryIdGenerator.createNextQueryId();
+    try (SetThreadName queryName = new SetThreadName(globalQueryId.getId())) {
+      if (sql != null && sql.length() > 0) {
+        LOGGER.info("start executing sql: {}", sql);
+      }
+      IQueryExecution execution =
+          createQueryExecution(
+              statement,
+              new MPPQueryContext(
+                  sql,
+                  globalQueryId,
+                  session,
+                  DataNodeEndPoints.LOCAL_HOST_DATA_BLOCK_ENDPOINT,
+                  DataNodeEndPoints.LOCAL_HOST_INTERNAL_ENDPOINT),
+              partitionFetcher,
+              schemaFetcher,
+              timeOut,
+              startTime);
+      if (execution.isQuery()) {
+        queryExecutionMap.put(queryId, execution);
+      }
+      execution.start();
+
+      return execution.getStatus();
+    }
+  }
+
+  /** This method is called by the write method. So it does not set the timeout parameter. */
   public ExecutionResult execute(
       Statement statement,
       long queryId,
@@ -109,6 +152,7 @@ public class Coordinator {
       String sql,
       IPartitionFetcher partitionFetcher,
       ISchemaFetcher schemaFetcher) {
+    long startTime = System.currentTimeMillis();
     QueryId globalQueryId = queryIdGenerator.createNextQueryId();
     try (SetThreadName queryName = new SetThreadName(globalQueryId.getId())) {
       if (sql != null && sql.length() > 0) {
@@ -124,7 +168,9 @@ public class Coordinator {
                   DataNodeEndPoints.LOCAL_HOST_DATA_BLOCK_ENDPOINT,
                   DataNodeEndPoints.LOCAL_HOST_INTERNAL_ENDPOINT),
               partitionFetcher,
-              schemaFetcher);
+              schemaFetcher,
+              Long.MAX_VALUE,
+              startTime);
       if (execution.isQuery()) {
         queryExecutionMap.put(queryId, execution);
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
index 77c6dda7fe..17c7591b27 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
@@ -92,7 +92,13 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
     try {
       ExecutionResult executionResult =
           coordinator.execute(
-              schemaFetchStatement, queryId, null, "", ClusterPartitionFetcher.getInstance(), this);
+              schemaFetchStatement,
+              queryId,
+              null,
+              "",
+              ClusterPartitionFetcher.getInstance(),
+              this,
+              config.getQueryTimeoutThreshold());
       // TODO: (xingtanzjr) throw exception
       if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         throw new RuntimeException(
@@ -330,7 +336,13 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
     long queryId = SessionManager.getInstance().requestQueryId(false);
     ExecutionResult executionResult =
         coordinator.execute(
-            statement, queryId, null, "", ClusterPartitionFetcher.getInstance(), this);
+            statement,
+            queryId,
+            null,
+            "",
+            ClusterPartitionFetcher.getInstance(),
+            this,
+            config.getQueryTimeoutThreshold());
     // TODO: throw exception
     int statusCode = executionResult.status.getCode();
     if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 9c9d290052..666e7cf683 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
 import org.apache.iotdb.db.mpp.execution.QueryState;
@@ -170,6 +171,12 @@ public class QueryExecution implements IQueryExecution {
       stateMachine.transitionToRunning();
       return;
     }
+    long remainTime = context.getTimeOut() - (System.currentTimeMillis() - context.getStartTime());
+    if (remainTime <= 0) {
+      throw new QueryTimeoutRuntimeException();
+    }
+    context.setTimeOut(remainTime);
+
     doLogicalPlan();
     doDistributedPlan();
     if (context.getQueryType() == QueryType.READ) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 80b92f4f45..854bb15467 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -91,7 +91,8 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
             new PlanFragment(fragment.getId(), rootCopy),
             fragment.getId().genFragmentInstanceId(),
             timeFilter,
-            queryContext.getQueryType());
+            queryContext.getQueryType(),
+            queryContext.getTimeOut());
 
     // Get the target region for origin PlanFragment, then its instance will be distributed one
     // of them.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
index fb91a0c2fe..4f043a05b9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
@@ -61,7 +61,8 @@ public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner {
               new PlanFragment(fragment.getId(), split),
               fragment.getId().genFragmentInstanceId(),
               timeFilter,
-              queryContext.getQueryType());
+              queryContext.getQueryType(),
+              queryContext.getTimeOut());
       instance.setDataRegionAndHost(split.getRegionReplicaSet());
       ret.add(instance);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
index f617ff9f02..bcc54931de 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.runtime.SerializationRunTimeException;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
@@ -46,6 +47,8 @@ public class FragmentInstance implements IConsensusRequest {
 
   private final Logger logger = LoggerFactory.getLogger(FragmentInstance.class);
 
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
   private final FragmentInstanceId id;
   private final QueryType type;
   // The reference of PlanFragment which this instance is generated from
@@ -58,15 +61,22 @@ public class FragmentInstance implements IConsensusRequest {
 
   private Filter timeFilter;
 
+  private final long timeOut;
+
   // We can add some more params for a specific FragmentInstance
   // So that we can make different FragmentInstance owns different data range.
 
   public FragmentInstance(
-      PlanFragment fragment, FragmentInstanceId id, Filter timeFilter, QueryType type) {
+      PlanFragment fragment,
+      FragmentInstanceId id,
+      Filter timeFilter,
+      QueryType type,
+      long timeOut) {
     this.fragment = fragment;
     this.timeFilter = timeFilter;
     this.id = id;
     this.type = type;
+    this.timeOut = timeOut > 0 ? timeOut : config.getQueryTimeoutThreshold();
   }
 
   public TRegionReplicaSet getDataRegionId() {
@@ -149,17 +159,19 @@ public class FragmentInstance implements IConsensusRequest {
             getRegionReplicaSet() == null ? "Not set" : getRegionReplicaSet().getRegionId()));
     ret.append("\n---- Plan Node Tree ----\n");
     ret.append(PlanNodeUtil.nodeToString(getFragment().getRoot()));
+    ret.append(String.format("timeOut-%s:", getTimeOut()));
     return ret.toString();
   }
 
   public static FragmentInstance deserializeFrom(ByteBuffer buffer) {
     FragmentInstanceId id = FragmentInstanceId.deserialize(buffer);
     PlanFragment planFragment = PlanFragment.deserialize(buffer);
+    long timeOut = ReadWriteIOUtils.readLong(buffer);
     boolean hasTimeFilter = ReadWriteIOUtils.readBool(buffer);
     Filter timeFilter = hasTimeFilter ? FilterFactory.deserialize(buffer) : null;
     QueryType queryType = QueryType.values()[ReadWriteIOUtils.readInt(buffer)];
     FragmentInstance fragmentInstance =
-        new FragmentInstance(planFragment, id, timeFilter, queryType);
+        new FragmentInstance(planFragment, id, timeFilter, queryType, timeOut);
     boolean hasHostDataNode = ReadWriteIOUtils.readBool(buffer);
     fragmentInstance.hostDataNode =
         hasHostDataNode ? ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer) : null;
@@ -171,6 +183,7 @@ public class FragmentInstance implements IConsensusRequest {
         DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
       id.serialize(outputStream);
       fragment.serialize(outputStream);
+      ReadWriteIOUtils.write(timeOut, outputStream);
       ReadWriteIOUtils.write(timeFilter != null, outputStream);
       if (timeFilter != null) {
         timeFilter.serialize(outputStream);
@@ -208,4 +221,8 @@ public class FragmentInstance implements IConsensusRequest {
   public TDataNodeLocation getHostDataNode() {
     return hostDataNode;
   }
+
+  public long getTimeOut() {
+    return timeOut;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/GrafanaApiServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/GrafanaApiServiceImpl.java
index 3ab3779fc4..9978d558d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/GrafanaApiServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/GrafanaApiServiceImpl.java
@@ -117,7 +117,13 @@ public class GrafanaApiServiceImpl extends GrafanaApiService {
       // create and cache dataset
       ExecutionResult result =
           COORDINATOR.execute(
-              statement, queryId, null, sql.getSql(), PARTITION_FETCHER, SCHEMA_FETCHER);
+              statement,
+              queryId,
+              null,
+              sql.getSql(),
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER,
+              config.getQueryTimeoutThreshold());
       if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return Response.ok()
             .entity(
@@ -172,7 +178,14 @@ public class GrafanaApiServiceImpl extends GrafanaApiService {
       final long queryId = SESSION_MANAGER.requestQueryId(true);
       // create and cache dataset
       ExecutionResult result =
-          COORDINATOR.execute(statement, queryId, null, sql, PARTITION_FETCHER, SCHEMA_FETCHER);
+          COORDINATOR.execute(
+              statement,
+              queryId,
+              null,
+              sql,
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER,
+              config.getQueryTimeoutThreshold());
       if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return Response.ok()
             .entity(
@@ -221,7 +234,14 @@ public class GrafanaApiServiceImpl extends GrafanaApiService {
         final long queryId = SESSION_MANAGER.requestQueryId(true);
         // create and cache dataset
         ExecutionResult result =
-            COORDINATOR.execute(statement, queryId, null, sql, PARTITION_FETCHER, SCHEMA_FETCHER);
+            COORDINATOR.execute(
+                statement,
+                queryId,
+                null,
+                sql,
+                PARTITION_FETCHER,
+                SCHEMA_FETCHER,
+                config.getQueryTimeoutThreshold());
         if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
           return Response.ok()
               .entity(
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/RestApiServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/RestApiServiceImpl.java
index f0fa2b0c04..34dbe2ffcd 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/RestApiServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/RestApiServiceImpl.java
@@ -97,7 +97,8 @@ public class RestApiServiceImpl extends RestApiService {
               null,
               sql.getSql(),
               PARTITION_FETCHER,
-              SCHEMA_FETCHER);
+              SCHEMA_FETCHER,
+              config.getQueryTimeoutThreshold());
 
       return Response.ok()
           .entity(
@@ -130,7 +131,13 @@ public class RestApiServiceImpl extends RestApiService {
       // create and cache dataset
       ExecutionResult result =
           COORDINATOR.execute(
-              statement, queryId, null, sql.getSql(), PARTITION_FETCHER, SCHEMA_FETCHER);
+              statement,
+              queryId,
+              null,
+              sql.getSql(),
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER,
+              config.getQueryTimeoutThreshold());
       if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return Response.ok()
             .entity(
@@ -174,7 +181,8 @@ public class RestApiServiceImpl extends RestApiService {
               null,
               "",
               PARTITION_FETCHER,
-              SCHEMA_FETCHER);
+              SCHEMA_FETCHER,
+              config.getQueryTimeoutThreshold());
 
       return Response.ok()
           .entity(
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
index 09dad0f03a..67785a6b08 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
@@ -21,6 +21,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
@@ -54,6 +55,7 @@ public class MPPPublishHandler extends AbstractInterceptHandler {
 
   private static final Logger LOG = LoggerFactory.getLogger(MPPPublishHandler.class);
 
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private final SessionManager SESSION_MANAGER = SessionManager.getInstance();
   private long sessionId;
   private final PayloadFormatter payloadFormat;
@@ -146,7 +148,8 @@ public class MPPPublishHandler extends AbstractInterceptHandler {
                       SESSION_MANAGER.getSessionInfo(sessionId),
                       "",
                       partitionFetcher,
-                      schemaFetcher);
+                      schemaFetcher,
+                      config.getQueryTimeoutThreshold());
           tsStatus = result.status;
         }
       } catch (Exception e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index e87958510f..77e59defd9 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -565,7 +565,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
               SESSION_MANAGER.getSessionInfo(req.sessionId),
               statement,
               PARTITION_FETCHER,
-              SCHEMA_FETCHER);
+              SCHEMA_FETCHER,
+              req.getTimeout());
 
       if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
           && result.status.code != TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
@@ -638,7 +639,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
                 SESSION_MANAGER.getSessionInfo(req.sessionId),
                 statement,
                 PARTITION_FETCHER,
-                SCHEMA_FETCHER);
+                SCHEMA_FETCHER,
+                config.getQueryTimeoutThreshold());
         addOperationLatency(Operation.EXECUTE_ONE_SQL_IN_BATCH, t2);
         results.add(result.status);
       } catch (Exception e) {
@@ -1135,7 +1137,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
               SESSION_MANAGER.getSessionInfo(req.sessionId),
               "",
               PARTITION_FETCHER,
-              SCHEMA_FETCHER);
+              SCHEMA_FETCHER,
+              req.getTimeout());
 
       if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         throw new RuntimeException("error code: " + result.status);
@@ -1195,7 +1198,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
               SESSION_MANAGER.getSessionInfo(req.sessionId),
               "",
               PARTITION_FETCHER,
-              SCHEMA_FETCHER);
+              SCHEMA_FETCHER,
+              req.getTimeout());
 
       if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         throw new RuntimeException("error code: " + result.status);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
index 0d57b138e3..b40339d80a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution.schedule;
 
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
@@ -39,6 +40,8 @@ import java.util.List;
 public class DriverSchedulerTest {
 
   private final DriverScheduler manager = DriverScheduler.getInstance();
+  private static final long QUERY_TIMEOUT_MS =
+      IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold();
 
   @After
   public void tearDown() {
@@ -63,7 +66,7 @@ public class DriverSchedulerTest {
     IDriver mockDriver2 = Mockito.mock(IDriver.class);
     Mockito.when(mockDriver2.getInfo()).thenReturn(instanceId2);
     List<IDriver> instances = Arrays.asList(mockDriver1, mockDriver2);
-    manager.submitDrivers(queryId, instances);
+    manager.submitDrivers(queryId, instances, QUERY_TIMEOUT_MS);
     Assert.assertTrue(manager.getBlockedTasks().isEmpty());
     Assert.assertEquals(1, manager.getQueryMap().size());
     Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
@@ -83,7 +86,7 @@ public class DriverSchedulerTest {
     IDriver mockDriver3 = Mockito.mock(IDriver.class);
     FragmentInstanceId instanceId3 = new FragmentInstanceId(fragmentId, "inst-2");
     Mockito.when(mockDriver3.getInfo()).thenReturn(instanceId3);
-    manager.submitDrivers(queryId, Collections.singletonList(mockDriver3));
+    manager.submitDrivers(queryId, Collections.singletonList(mockDriver3), QUERY_TIMEOUT_MS);
     Assert.assertTrue(manager.getBlockedTasks().isEmpty());
     Assert.assertEquals(1, manager.getQueryMap().size());
     Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
@@ -101,7 +104,7 @@ public class DriverSchedulerTest {
     FragmentInstanceId instanceId4 = new FragmentInstanceId(fragmentId2, "inst-0");
     IDriver mockDriver4 = Mockito.mock(IDriver.class);
     Mockito.when(mockDriver4.getInfo()).thenReturn(instanceId4);
-    manager.submitDrivers(queryId2, Collections.singletonList(mockDriver4));
+    manager.submitDrivers(queryId2, Collections.singletonList(mockDriver4), QUERY_TIMEOUT_MS);
     Assert.assertTrue(manager.getBlockedTasks().isEmpty());
     Assert.assertEquals(2, manager.getQueryMap().size());
     Assert.assertTrue(manager.getQueryMap().containsKey(queryId2));
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java
index c1e7bc591b..a8978a5ec3 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java
@@ -138,7 +138,14 @@ public class StandaloneCoordinatorTest {
   private void executeStatement(Statement statement, boolean isDataQuery) {
     long queryId = SessionManager.getInstance().requestQueryId(isDataQuery);
     ExecutionResult executionResult =
-        coordinator.execute(statement, queryId, null, "", partitionFetcher, schemaFetcher);
+        coordinator.execute(
+            statement,
+            queryId,
+            null,
+            "",
+            partitionFetcher,
+            schemaFetcher,
+            conf.getQueryTimeoutThreshold());
     try {
       int statusCode = executionResult.status.getCode();
       Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), statusCode);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java
index ebcd805c0b..ca5def4f03 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
@@ -47,6 +49,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
 public class FragmentInstanceSerdeTest {
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
   @Test
   public void testSerializeAndDeserializeForTree1() throws IllegalPathException {
@@ -64,7 +67,8 @@ public class FragmentInstanceSerdeTest {
             new PlanFragment(planFragmentId, constructPlanNodeTree()),
             planFragmentId.genFragmentInstanceId(),
             new GroupByFilter(1, 2, 3, 4),
-            QueryType.READ);
+            QueryType.READ,
+            config.getQueryTimeoutThreshold());
     TRegionReplicaSet regionReplicaSet =
         new TRegionReplicaSet(
             new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
@@ -96,7 +100,8 @@ public class FragmentInstanceSerdeTest {
             new PlanFragment(planFragmentId, constructPlanNodeTree()),
             planFragmentId.genFragmentInstanceId(),
             null,
-            QueryType.READ);
+            QueryType.READ,
+            config.getQueryTimeoutThreshold());
     TRegionReplicaSet regionReplicaSet =
         new TRegionReplicaSet(
             new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
index 7a7d562b1c..a5c65def26 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
@@ -134,7 +134,8 @@ public class StandaloneSchedulerTest {
             planFragment,
             planFragment.getId().genFragmentInstanceId(),
             new GroupByFilter(1, 2, 3, 4),
-            QueryType.WRITE);
+            QueryType.WRITE,
+            conf.getQueryTimeoutThreshold());
     fragmentInstance.setDataRegionAndHost(regionReplicaSet);
 
     configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln.wf01.wt01.status"));
@@ -237,7 +238,8 @@ public class StandaloneSchedulerTest {
             planFragment,
             planFragment.getId().genFragmentInstanceId(),
             new GroupByFilter(1, 2, 3, 4),
-            QueryType.WRITE);
+            QueryType.WRITE,
+            conf.getQueryTimeoutThreshold());
     fragmentInstance.setDataRegionAndHost(regionReplicaSet);
 
     configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln.wf01.GPS"));
@@ -350,7 +352,8 @@ public class StandaloneSchedulerTest {
             planFragment,
             planFragment.getId().genFragmentInstanceId(),
             new GroupByFilter(1, 2, 3, 4),
-            QueryType.WRITE);
+            QueryType.WRITE,
+            conf.getQueryTimeoutThreshold());
     fragmentInstance.setDataRegionAndHost(regionReplicaSet);
 
     configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln.d3"));
@@ -401,7 +404,8 @@ public class StandaloneSchedulerTest {
             planFragment,
             planFragment.getId().genFragmentInstanceId(),
             new GroupByFilter(1, 2, 3, 4),
-            QueryType.WRITE);
+            QueryType.WRITE,
+            conf.getQueryTimeoutThreshold());
     fragmentInstance.setDataRegionAndHost(regionReplicaSet);
 
     configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath(deviceId));
@@ -482,7 +486,8 @@ public class StandaloneSchedulerTest {
             planFragment,
             planFragment.getId().genFragmentInstanceId(),
             new GroupByFilter(1, 2, 3, 4),
-            QueryType.WRITE);
+            QueryType.WRITE,
+            conf.getQueryTimeoutThreshold());
     fragmentInstance.setDataRegionAndHost(regionReplicaSet);
 
     configNode.getBelongedSchemaRegionIdWithAutoCreate(deviceId);
diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java b/session/src/main/java/org/apache/iotdb/session/Config.java
index 6304fb5a5a..23dd73c5e2 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/session/src/main/java/org/apache/iotdb/session/Config.java
@@ -37,6 +37,8 @@ public class Config {
   public static final int RETRY_NUM = 3;
   public static final long RETRY_INTERVAL_MS = 1000;
 
+  public static final long DEFAULT_QUERY_TIME_OUT = 60000;
+
   /** thrift init buffer size, 1KB by default */
   public static final int DEFAULT_INITIAL_BUFFER_CAPACITY = 1024;
 
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 854450f85d..df5930187d 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -723,17 +723,18 @@ public class Session {
    * @throws StatementExecutionException statement is not right
    * @throws IoTDBConnectionException the network is not good
    */
-  public SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime)
+  public SessionDataSet executeRawDataQuery(
+      List<String> paths, long startTime, long endTime, long timeOut)
       throws StatementExecutionException, IoTDBConnectionException {
     try {
-      return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime);
+      return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime, timeOut);
     } catch (RedirectException e) {
       handleQueryRedirection(e.getEndPoint());
       if (enableQueryRedirection) {
         logger.debug("redirect query {} to {}", paths, e.getEndPoint());
         // retry
         try {
-          return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime);
+          return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime, timeOut);
         } catch (RedirectException redirectException) {
           logger.error("Redirect twice", redirectException);
           throw new StatementExecutionException("Redirect twice, please try again.");
@@ -751,16 +752,16 @@ public class Session {
    * @param LastTime get the last data, whose timestamp is greater than or equal LastTime e.g.
    *     1621326244168
    */
-  public SessionDataSet executeLastDataQuery(List<String> paths, long LastTime)
+  public SessionDataSet executeLastDataQuery(List<String> paths, long LastTime, long timeOut)
       throws StatementExecutionException, IoTDBConnectionException {
     try {
-      return defaultSessionConnection.executeLastDataQuery(paths, LastTime);
+      return defaultSessionConnection.executeLastDataQuery(paths, LastTime, timeOut);
     } catch (RedirectException e) {
       handleQueryRedirection(e.getEndPoint());
       if (enableQueryRedirection) {
         // retry
         try {
-          return defaultSessionConnection.executeLastDataQuery(paths, LastTime);
+          return defaultSessionConnection.executeLastDataQuery(paths, LastTime, timeOut);
         } catch (RedirectException redirectException) {
           logger.error("redirect twice", redirectException);
           throw new StatementExecutionException("redirect twice, please try again.");
@@ -779,7 +780,7 @@ public class Session {
   public SessionDataSet executeLastDataQuery(List<String> paths)
       throws StatementExecutionException, IoTDBConnectionException {
     long time = 0L;
-    return executeLastDataQuery(paths, time);
+    return executeLastDataQuery(paths, time, 60000);
   }
 
   /**
@@ -2501,6 +2502,7 @@ public class Session {
     private int thriftMaxFrameSize = Config.DEFAULT_MAX_FRAME_SIZE;
     private boolean enableCacheLeader = Config.DEFAULT_CACHE_LEADER_MODE;
     private Version version = Config.DEFAULT_VERSION;
+    private long timeOut = Config.DEFAULT_QUERY_TIME_OUT;
 
     private List<String> nodeUrls = null;
 
@@ -2559,6 +2561,11 @@ public class Session {
       return this;
     }
 
+    public Builder timeOut(long timeOut) {
+      this.timeOut = timeOut;
+      return this;
+    }
+
     public Session build() {
       if (nodeUrls != null
           && (!Config.DEFAULT_HOST.equals(host) || rpcPort != Config.DEFAULT_PORT)) {
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 3c2ab44b72..38687fea5e 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -391,11 +391,13 @@ public class SessionConnection {
     }
   }
 
-  protected SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime)
+  protected SessionDataSet executeRawDataQuery(
+      List<String> paths, long startTime, long endTime, long timeOut)
       throws StatementExecutionException, IoTDBConnectionException, RedirectException {
     TSRawDataQueryReq execReq =
         new TSRawDataQueryReq(sessionId, paths, startTime, endTime, statementId);
     execReq.setFetchSize(session.fetchSize);
+    execReq.setTimeout(timeOut);
     TSExecuteStatementResp execResp;
     try {
       execReq.setEnableRedirectQuery(enableRedirect);
@@ -429,12 +431,13 @@ public class SessionConnection {
         execResp.isIgnoreTimeStamp());
   }
 
-  protected SessionDataSet executeLastDataQuery(List<String> paths, long time)
+  protected SessionDataSet executeLastDataQuery(List<String> paths, long time, long timeOut)
       throws StatementExecutionException, IoTDBConnectionException, RedirectException {
     TSLastDataQueryReq tsLastDataQueryReq =
         new TSLastDataQueryReq(sessionId, paths, time, statementId);
     tsLastDataQueryReq.setFetchSize(session.fetchSize);
     tsLastDataQueryReq.setEnableRedirectQuery(enableRedirect);
+    tsLastDataQueryReq.setTimeout(timeOut);
     TSExecuteStatementResp tsExecuteStatementResp;
     try {
       tsExecuteStatementResp = client.executeLastDataQuery(tsLastDataQueryReq);
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 37249b9971..f46718f9c0 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -2235,12 +2235,13 @@ public class SessionPool {
   }
 
   @SuppressWarnings("squid:S2095") // Suppress wrapper not closed warning
-  public SessionDataSetWrapper executeRawDataQuery(List<String> paths, long startTime, long endTime)
+  public SessionDataSetWrapper executeRawDataQuery(
+      List<String> paths, long startTime, long endTime, long timeOut)
       throws IoTDBConnectionException, StatementExecutionException {
     for (int i = 0; i < RETRY; i++) {
       Session session = getSession();
       try {
-        SessionDataSet resp = session.executeRawDataQuery(paths, startTime, endTime);
+        SessionDataSet resp = session.executeRawDataQuery(paths, startTime, endTime, timeOut);
         SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
         occupy(session);
         return wrapper;
diff --git a/thrift/src/main/thrift/client.thrift b/thrift/src/main/thrift/client.thrift
index 4a4fab7d05..f2ecbeedea 100644
--- a/thrift/src/main/thrift/client.thrift
+++ b/thrift/src/main/thrift/client.thrift
@@ -215,6 +215,7 @@ struct TSInsertStringRecordReq {
   4: required list<string> values
   5: required i64 timestamp
   6: optional bool isAligned
+  7: optional i64 timeout
 }
 
 struct TSInsertTabletReq {
@@ -315,6 +316,7 @@ struct TSRawDataQueryReq {
   6: required i64 statementId
   7: optional bool enableRedirectQuery;
   8: optional bool jdbcQuery;
+  9: optional i64 timeout
 }
 
 struct TSLastDataQueryReq {
@@ -325,6 +327,7 @@ struct TSLastDataQueryReq {
   5: required i64 statementId
   6: optional bool enableRedirectQuery;
   7: optional bool jdbcQuery;
+  8: optional i64 timeout
 }
 
 struct TSCreateMultiTimeseriesReq {