You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/09 08:49:02 UTC
[iotdb] 01/01: Make sure FI won't be aborted by timeout checker by mistake
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ty/Timeout
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 208c2d6d90e2f827169362877fe26d4afbf7c7fc
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Sun Oct 9 16:48:43 2022 +0800
Make sure FI won't be aborted by timeout checker by mistake
---
.../fragment/FragmentInstanceExecution.java | 6 ++++++
.../schedule/DriverTaskTimeoutSentinelThread.java | 24 +++++++++++++++++++++-
2 files changed, 29 insertions(+), 1 deletion(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index f2dd3605b0..05d15cd7ba 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -21,17 +21,22 @@ package org.apache.iotdb.db.mpp.execution.fragment;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.driver.IDriver;
import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
+import org.apache.iotdb.db.mpp.execution.schedule.DriverTaskTimeoutSentinelThread;
import org.apache.iotdb.db.mpp.execution.schedule.IDriverScheduler;
import org.apache.iotdb.db.utils.SetThreadName;
import com.google.common.collect.ImmutableList;
import io.airlift.stats.CounterStat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static java.util.Objects.requireNonNull;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState.FAILED;
public class FragmentInstanceExecution {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(DriverTaskTimeoutSentinelThread.class);
private final FragmentInstanceId instanceId;
private final FragmentInstanceContext context;
@@ -56,6 +61,7 @@ public class FragmentInstanceExecution {
FragmentInstanceExecution execution =
new FragmentInstanceExecution(instanceId, context, driver, stateMachine);
execution.initialize(failedInstances, scheduler);
+ LOGGER.info("timeout is {}ms.", timeOut);
scheduler.submitDrivers(instanceId.getQueryId(), ImmutableList.of(driver), timeOut);
return execution;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThread.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThread.java
index ce4bc4884f..7bf30053a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThread.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThread.java
@@ -21,9 +21,15 @@ package org.apache.iotdb.db.mpp.execution.schedule;
import org.apache.iotdb.db.mpp.execution.schedule.queue.IndexedBlockingQueue;
import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/** the thread for watching the timeout of {@link DriverTask} */
public class DriverTaskTimeoutSentinelThread extends AbstractDriverThread {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(DriverTaskTimeoutSentinelThread.class);
+
public DriverTaskTimeoutSentinelThread(
String workerId,
ThreadGroup tg,
@@ -46,10 +52,26 @@ public class DriverTaskTimeoutSentinelThread extends AbstractDriverThread {
}
// if this task is not timeout, we can wait it to timeout.
long waitTime = task.getDDL() - System.currentTimeMillis();
- if (waitTime > 0L) {
+ while (waitTime > 0L) {
// After this time, the task must be timeout.
Thread.sleep(waitTime);
+ waitTime = task.getDDL() - System.currentTimeMillis();
+ }
+
+ task.lock();
+ try {
+ // if this task is already in an end state, it means that the resource releasing will be
+ // handled by other threads, we don't care anymore.
+ if (task.isEndState()) {
+ return;
+ }
+ } finally {
+ task.unlock();
}
+ LOGGER.warn(
+ "[DriverTaskTimeout] Current time is {}, ddl of task is {}",
+ System.currentTimeMillis(),
+ task.getDDL());
task.setAbortCause(FragmentInstanceAbortedException.BY_TIMEOUT);
scheduler.toAborted(task);
}