You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/09 08:49:02 UTC

[iotdb] 01/01: Make sure FI won't be aborted by timeout checker by mistake

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/Timeout
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 208c2d6d90e2f827169362877fe26d4afbf7c7fc
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Sun Oct 9 16:48:43 2022 +0800

    Make sure FI won't be aborted by timeout checker by mistake
---
 .../fragment/FragmentInstanceExecution.java        |  6 ++++++
 .../schedule/DriverTaskTimeoutSentinelThread.java  | 24 +++++++++++++++++++++-
 2 files changed, 29 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index f2dd3605b0..05d15cd7ba 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -21,17 +21,22 @@ package org.apache.iotdb.db.mpp.execution.fragment;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.driver.IDriver;
 import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
+import org.apache.iotdb.db.mpp.execution.schedule.DriverTaskTimeoutSentinelThread;
 import org.apache.iotdb.db.mpp.execution.schedule.IDriverScheduler;
 import org.apache.iotdb.db.utils.SetThreadName;
 
 import com.google.common.collect.ImmutableList;
 import io.airlift.stats.CounterStat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static java.util.Objects.requireNonNull;
 import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState.FAILED;
 
 public class FragmentInstanceExecution {
 
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(DriverTaskTimeoutSentinelThread.class);
   private final FragmentInstanceId instanceId;
   private final FragmentInstanceContext context;
 
@@ -56,6 +61,7 @@ public class FragmentInstanceExecution {
     FragmentInstanceExecution execution =
         new FragmentInstanceExecution(instanceId, context, driver, stateMachine);
     execution.initialize(failedInstances, scheduler);
+    LOGGER.info("timeout is {}ms.", timeOut);
     scheduler.submitDrivers(instanceId.getQueryId(), ImmutableList.of(driver), timeOut);
     return execution;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThread.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThread.java
index ce4bc4884f..7bf30053a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThread.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThread.java
@@ -21,9 +21,15 @@ package org.apache.iotdb.db.mpp.execution.schedule;
 import org.apache.iotdb.db.mpp.execution.schedule.queue.IndexedBlockingQueue;
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /** the thread for watching the timeout of {@link DriverTask} */
 public class DriverTaskTimeoutSentinelThread extends AbstractDriverThread {
 
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(DriverTaskTimeoutSentinelThread.class);
+
   public DriverTaskTimeoutSentinelThread(
       String workerId,
       ThreadGroup tg,
@@ -46,10 +52,26 @@ public class DriverTaskTimeoutSentinelThread extends AbstractDriverThread {
     }
     // if this task is not timeout, we can wait it to timeout.
     long waitTime = task.getDDL() - System.currentTimeMillis();
-    if (waitTime > 0L) {
+    while (waitTime > 0L) {
       // After this time, the task must be timeout.
       Thread.sleep(waitTime);
+      waitTime = task.getDDL() - System.currentTimeMillis();
+    }
+
+    task.lock();
+    try {
+      // if this task is already in an end state, it means that the resource releasing will be
+      // handled by other threads, we don't care anymore.
+      if (task.isEndState()) {
+        return;
+      }
+    } finally {
+      task.unlock();
     }
+    LOGGER.warn(
+        "[DriverTaskTimeout] Current time is {}, ddl of task is {}",
+        System.currentTimeMillis(),
+        task.getDDL());
     task.setAbortCause(FragmentInstanceAbortedException.BY_TIMEOUT);
     scheduler.toAborted(task);
   }