You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/31 06:01:59 UTC
[iotdb] branch master updated: [IOTDB-3346] Clear DriverTask when any RuntimeException is thrown (#6086)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3918bf6992 [IOTDB-3346] Clear DriverTask when any RuntimeException is thrown (#6086)
3918bf6992 is described below
commit 3918bf6992223d59e9cde2a0d65365be933d63d7
Author: BaiJian <er...@hotmail.com>
AuthorDate: Tue May 31 14:01:54 2022 +0800
[IOTDB-3346] Clear DriverTask when any RuntimeException is thrown (#6086)
---
.../execution/schedule/AbstractDriverThread.java | 8 +++-
.../db/mpp/execution/schedule/DriverScheduler.java | 34 ++++++++++-------
.../schedule/FragmentInstanceAbortedException.java | 1 +
...erTest.java => DefaultDriverSchedulerTest.java} | 2 +-
.../execution/schedule/DriverSchedulerTest.java | 2 +-
.../DriverTaskTimeoutSentinelThreadTest.java | 43 ++++++++++++++++++++++
6 files changed, 74 insertions(+), 16 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
index 2b548dce22..8dea0529e2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
@@ -49,14 +49,20 @@ public abstract class AbstractDriverThread extends Thread implements Closeable {
@Override
public void run() {
+ DriverTask next;
while (!closed && !Thread.currentThread().isInterrupted()) {
+ next = null;
try {
- DriverTask next = queue.poll();
+ next = queue.poll();
execute(next);
} catch (InterruptedException e) {
break;
} catch (Exception e) {
logger.error("Executor " + this.getName() + " processes failed", e);
+ if (next != null) {
+ next.setAbortCause(FragmentInstanceAbortedException.BY_INTERNAL_ERROR_SCHEDULED);
+ scheduler.toAborted(next);
+ }
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index b1a5cc66fb..7fb57eea42 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -184,19 +184,6 @@ public class DriverScheduler implements IDriverScheduler, IService {
if (task.getStatus() != DriverTaskStatus.FINISHED) {
task.setStatus(DriverTaskStatus.ABORTED);
}
- if (task.getAbortCause() != null) {
- task.getFragmentInstance()
- .failed(
- new FragmentInstanceAbortedException(
- task.getFragmentInstance().getInfo(), task.getAbortCause()));
- }
- if (task.getStatus() == DriverTaskStatus.ABORTED) {
- blockManager.forceDeregisterFragmentInstance(
- new TFragmentInstanceId(
- task.getId().getQueryId().getId(),
- task.getId().getFragmentId().getId(),
- task.getId().getInstanceId()));
- }
readyQueue.remove(task.getId());
timeoutQueue.remove(task.getId());
blockedTasks.remove(task);
@@ -207,6 +194,27 @@ public class DriverScheduler implements IDriverScheduler, IService {
queryMap.remove(task.getId().getQueryId());
}
}
+ if (task.getAbortCause() != null) {
+ try {
+ task.getFragmentInstance()
+ .failed(
+ new FragmentInstanceAbortedException(
+ task.getFragmentInstance().getInfo(), task.getAbortCause()));
+ } catch (Exception e) {
+ logger.error("Clear DriverTask {} failed", task.getId().toString(), e);
+ }
+ }
+ if (task.getStatus() == DriverTaskStatus.ABORTED) {
+ try {
+ blockManager.forceDeregisterFragmentInstance(
+ new TFragmentInstanceId(
+ task.getId().getQueryId().getId(),
+ task.getId().getFragmentId().getId(),
+ task.getId().getInstanceId()));
+ } catch (Exception e) {
+ logger.error("Clear DriverTask {} failed", task.getId().toString(), e);
+ }
+ }
}
ITaskScheduler getScheduler() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/FragmentInstanceAbortedException.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/FragmentInstanceAbortedException.java
index 75f1325964..7425adc239 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/FragmentInstanceAbortedException.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/FragmentInstanceAbortedException.java
@@ -28,6 +28,7 @@ public class FragmentInstanceAbortedException extends Exception {
public static final String BY_FRAGMENT_ABORT_CALLED = "fragment abort called";
public static final String BY_QUERY_CASCADING_ABORTED = "query cascading aborted";
public static final String BY_ALREADY_BEING_CANCELLED = "already being cancelled";
+ public static final String BY_INTERNAL_ERROR_SCHEDULED = "internal error scheduled";
public FragmentInstanceAbortedException(FragmentInstanceId id, String causeMsg) {
super(String.format("FragmentInstance %s is aborted by %s", id.toString(), causeMsg));
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultTaskSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
similarity index 99%
rename from server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultTaskSchedulerTest.java
rename to server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
index 9b930fcba4..2eddb5cdb6 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultTaskSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
@@ -38,7 +38,7 @@ import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-public class DefaultTaskSchedulerTest {
+public class DefaultDriverSchedulerTest {
private final DriverScheduler manager = DriverScheduler.getInstance();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
index aa2378eb07..c1efd8f045 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
@@ -49,7 +49,7 @@ public class DriverSchedulerTest {
}
@Test
- public void testManagingFragmentInstance() {
+ public void testManagingDriver() {
IDataBlockManager mockDataBlockManager = Mockito.mock(IDataBlockManager.class);
manager.setBlockManager(mockDataBlockManager);
// submit 2 tasks in one query
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
index c2bd63e9e3..6f49600921 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.driver.IDriver;
import org.apache.iotdb.db.mpp.execution.schedule.queue.IndexedBlockingQueue;
import org.apache.iotdb.db.mpp.execution.schedule.queue.L1PriorityQueue;
+import org.apache.iotdb.db.mpp.execution.schedule.queue.L2PriorityQueue;
import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskStatus;
@@ -266,4 +267,46 @@ public class DriverTaskTimeoutSentinelThreadTest {
Mockito.verify(mockScheduler, Mockito.never()).runningToFinished(Mockito.any(), Mockito.any());
Mockito.verify(mockScheduler, Mockito.never()).blockedToReady(Mockito.any());
}
+
+ @Test
+ public void testHandleTaskWithInternalError() {
+ ITaskScheduler mockScheduler = Mockito.mock(ITaskScheduler.class);
+ Mockito.when(mockScheduler.readyToRunning(Mockito.any()))
+ .thenAnswer(
+ ans -> {
+ DriverTask task = ans.getArgument(0);
+ if (task.getStatus() != DriverTaskStatus.READY) {
+ return false;
+ }
+ task.setStatus(DriverTaskStatus.RUNNING);
+ return true;
+ });
+ IndexedBlockingQueue<DriverTask> taskQueue =
+ new L2PriorityQueue<>(100, new DriverTask.SchedulePriorityComparator(), new DriverTask());
+ IDriver mockDriver = Mockito.mock(IDriver.class);
+ QueryId queryId = new QueryId("test");
+ PlanFragmentId fragmentId = new PlanFragmentId(queryId, 0);
+ FragmentInstanceId instanceId = new FragmentInstanceId(fragmentId, "inst-0");
+ Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
+ AbstractDriverThread executor =
+ new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler);
+ Mockito.when(mockDriver.processFor(Mockito.any()))
+ .thenAnswer(
+ ans -> {
+ executor.close();
+ throw new RuntimeException("mock exception");
+ });
+ DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
+ taskQueue.push(testTask);
+ executor.run(); // Here we use run() instead of start() to execute the task in the same thread
+ Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
+ Assert.assertEquals(
+ FragmentInstanceAbortedException.BY_INTERNAL_ERROR_SCHEDULED, testTask.getAbortCause());
+ Assert.assertEquals(0, taskQueue.size());
+ Mockito.verify(mockScheduler, Mockito.times(1)).toAborted(Mockito.any());
+ Mockito.verify(mockScheduler, Mockito.never()).runningToReady(Mockito.any(), Mockito.any());
+ Mockito.verify(mockScheduler, Mockito.never()).runningToBlocked(Mockito.any(), Mockito.any());
+ Mockito.verify(mockScheduler, Mockito.never()).runningToFinished(Mockito.any(), Mockito.any());
+ Mockito.verify(mockScheduler, Mockito.never()).blockedToReady(Mockito.any());
+ }
}