You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/05/30 08:31:35 UTC

[iotdb] branch CorrectTimeout created (now c2a9b93f555)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch CorrectTimeout
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at c2a9b93f555 [IOTDB-5939] Correct Flusing Task Timeout Detect Thread's timeout

This branch includes the following new commits:

     new c2a9b93f555 [IOTDB-5939] Correct Flusing Task Timeout Detect Thread's timeout

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: [IOTDB-5939] Correct Flusing Task Timeout Detect Thread's timeout

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch CorrectTimeout
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c2a9b93f555bd5fa0d7b13d0242736bd86cb6f96
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue May 30 16:31:22 2023 +0800

    [IOTDB-5939] Correct Flusing Task Timeout Detect Thread's timeout
---
 .../mpp/execution/fragment/FragmentInstanceExecution.java  | 14 +++++++++++---
 .../db/mpp/execution/fragment/FragmentInstanceManager.java | 11 ++++++-----
 2 files changed, 17 insertions(+), 8 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index b68ec33f096..4cb85b0ae93 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -53,6 +53,8 @@ public class FragmentInstanceExecution {
 
   private final FragmentInstanceStateMachine stateMachine;
 
+  private final long timeoutInMs;
+
   private long lastHeartbeat;
 
   public static FragmentInstanceExecution createFragmentInstanceExecution(
@@ -66,9 +68,9 @@ public class FragmentInstanceExecution {
       long timeOut)
       throws CpuNotEnoughException, MemoryNotEnoughException {
     FragmentInstanceExecution execution =
-        new FragmentInstanceExecution(instanceId, context, drivers, sinkHandle, stateMachine);
+        new FragmentInstanceExecution(
+            instanceId, context, drivers, sinkHandle, stateMachine, timeOut);
     execution.initialize(failedInstances, scheduler);
-    LOGGER.debug("timeout is {}ms.", timeOut);
     scheduler.submitDrivers(instanceId.getQueryId(), drivers, timeOut, context.getSessionInfo());
     return execution;
   }
@@ -78,12 +80,14 @@ public class FragmentInstanceExecution {
       FragmentInstanceContext context,
       List<IDriver> drivers,
       ISink sink,
-      FragmentInstanceStateMachine stateMachine) {
+      FragmentInstanceStateMachine stateMachine,
+      long timeoutInMs) {
     this.instanceId = instanceId;
     this.context = context;
     this.drivers = drivers;
     this.sink = sink;
     this.stateMachine = stateMachine;
+    this.timeoutInMs = timeoutInMs;
   }
 
   public void recordHeartbeat() {
@@ -110,6 +114,10 @@ public class FragmentInstanceExecution {
     return context.getStartTime();
   }
 
+  public long getTimeoutInMs() {
+    return timeoutInMs;
+  }
+
   public FragmentInstanceStateMachine getStateMachine() {
     return stateMachine;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 1cf2718e9fe..b0f816f21e0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -70,9 +70,6 @@ public class FragmentInstanceManager {
   // record failed instances count
   private final CounterStat failedInstances = new CounterStat();
 
-  private static final long QUERY_TIMEOUT_MS =
-      IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold();
-
   private final ExecutorService intoOperationExecutor;
 
   private static final QueryExecutionMetricSet QUERY_EXECUTION_METRIC_SET =
@@ -314,8 +311,12 @@ public class FragmentInstanceManager {
     instanceExecution.forEach(
         (key, execution) -> {
           if (execution.getStateMachine().getState() == FragmentInstanceState.FLUSHING
-              && (now - execution.getStartTime()) > QUERY_TIMEOUT_MS) {
-            execution.getStateMachine().failed(new TimeoutException());
+              && (now - execution.getStartTime()) > execution.getTimeoutInMs()) {
+            execution
+                .getStateMachine()
+                .failed(
+                    new TimeoutException(
+                        "Query has executed more than " + execution.getTimeoutInMs() + "ms"));
           }
         });
   }