You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/09 08:49:01 UTC

[iotdb] branch ty/Timeout created (now 208c2d6d90)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch ty/Timeout
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 208c2d6d90 Make sure FI won't be aborted by timeout checker by mistake

This branch includes the following new commits:

     new 208c2d6d90 Make sure FI won't be aborted by timeout checker by mistake

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Make sure FI won't be aborted by timeout checker by mistake

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/Timeout
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 208c2d6d90e2f827169362877fe26d4afbf7c7fc
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Sun Oct 9 16:48:43 2022 +0800

    Make sure FI won't be aborted by timeout checker by mistake
---
 .../fragment/FragmentInstanceExecution.java        |  6 ++++++
 .../schedule/DriverTaskTimeoutSentinelThread.java  | 24 +++++++++++++++++++++-
 2 files changed, 29 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index f2dd3605b0..05d15cd7ba 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -21,17 +21,22 @@ package org.apache.iotdb.db.mpp.execution.fragment;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.driver.IDriver;
 import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
+import org.apache.iotdb.db.mpp.execution.schedule.DriverTaskTimeoutSentinelThread;
 import org.apache.iotdb.db.mpp.execution.schedule.IDriverScheduler;
 import org.apache.iotdb.db.utils.SetThreadName;
 
 import com.google.common.collect.ImmutableList;
 import io.airlift.stats.CounterStat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static java.util.Objects.requireNonNull;
 import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState.FAILED;
 
 public class FragmentInstanceExecution {
 
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(DriverTaskTimeoutSentinelThread.class);
   private final FragmentInstanceId instanceId;
   private final FragmentInstanceContext context;
 
@@ -56,6 +61,7 @@ public class FragmentInstanceExecution {
     FragmentInstanceExecution execution =
         new FragmentInstanceExecution(instanceId, context, driver, stateMachine);
     execution.initialize(failedInstances, scheduler);
+    LOGGER.info("timeout is {}ms.", timeOut);
     scheduler.submitDrivers(instanceId.getQueryId(), ImmutableList.of(driver), timeOut);
     return execution;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThread.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThread.java
index ce4bc4884f..7bf30053a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThread.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThread.java
@@ -21,9 +21,15 @@ package org.apache.iotdb.db.mpp.execution.schedule;
 import org.apache.iotdb.db.mpp.execution.schedule.queue.IndexedBlockingQueue;
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /** the thread for watching the timeout of {@link DriverTask} */
 public class DriverTaskTimeoutSentinelThread extends AbstractDriverThread {
 
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(DriverTaskTimeoutSentinelThread.class);
+
   public DriverTaskTimeoutSentinelThread(
       String workerId,
       ThreadGroup tg,
@@ -46,10 +52,26 @@ public class DriverTaskTimeoutSentinelThread extends AbstractDriverThread {
     }
     // if this task is not timeout, we can wait it to timeout.
     long waitTime = task.getDDL() - System.currentTimeMillis();
-    if (waitTime > 0L) {
+    while (waitTime > 0L) {
       // After this time, the task must be timeout.
       Thread.sleep(waitTime);
+      waitTime = task.getDDL() - System.currentTimeMillis();
+    }
+
+    task.lock();
+    try {
+      // if this task is already in an end state, it means that the resource releasing will be
+      // handled by other threads, we don't care anymore.
+      if (task.isEndState()) {
+        return;
+      }
+    } finally {
+      task.unlock();
     }
+    LOGGER.warn(
+        "[DriverTaskTimeout] Current time is {}, ddl of task is {}",
+        System.currentTimeMillis(),
+        task.getDDL());
     task.setAbortCause(FragmentInstanceAbortedException.BY_TIMEOUT);
     scheduler.toAborted(task);
   }