You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2022/05/31 03:30:08 UTC
[iotdb] 01/01: [IOTDB-3346] Clear DriverTask when any RuntimeException is thrown
This is an automated email from the ASF dual-hosted git repository.
ericpai pushed a commit to branch bugfix/iotdb-3346
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a9958e188cdf95ef18cf4d4a3bdd8dbcf07ff74c
Author: ericpai <er...@hotmail.com>
AuthorDate: Tue May 31 11:29:40 2022 +0800
[IOTDB-3346] Clear DriverTask when any RuntimeException is thrown
---
.../execution/schedule/AbstractDriverThread.java | 8 +++-
.../schedule/FragmentInstanceAbortedException.java | 2 +
...erTest.java => DefaultDriverSchedulerTest.java} | 2 +-
.../execution/schedule/DriverSchedulerTest.java | 2 +-
.../DriverTaskTimeoutSentinelThreadTest.java | 43 ++++++++++++++++++++++
5 files changed, 54 insertions(+), 3 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
index 2b548dce22..8dea0529e2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
@@ -49,14 +49,20 @@ public abstract class AbstractDriverThread extends Thread implements Closeable {
@Override
public void run() {
+ DriverTask next;
while (!closed && !Thread.currentThread().isInterrupted()) {
+ next = null;
try {
- DriverTask next = queue.poll();
+ next = queue.poll();
execute(next);
} catch (InterruptedException e) {
break;
} catch (Exception e) {
logger.error("Executor " + this.getName() + " processes failed", e);
+ if (next != null) {
+ next.setAbortCause(FragmentInstanceAbortedException.BY_INTERNAL_ERROR_SCHEDULED);
+ scheduler.toAborted(next);
+ }
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/FragmentInstanceAbortedException.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/FragmentInstanceAbortedException.java
index 75f1325964..bbd892934c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/FragmentInstanceAbortedException.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/FragmentInstanceAbortedException.java
@@ -29,6 +29,8 @@ public class FragmentInstanceAbortedException extends Exception {
public static final String BY_QUERY_CASCADING_ABORTED = "query cascading aborted";
public static final String BY_ALREADY_BEING_CANCELLED = "already being cancelled";
+ public static final String BY_INTERNAL_ERROR_SCHEDULED = "internal error scheduled";
+
public FragmentInstanceAbortedException(FragmentInstanceId id, String causeMsg) {
super(String.format("FragmentInstance %s is aborted by %s", id.toString(), causeMsg));
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultTaskSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
similarity index 99%
rename from server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultTaskSchedulerTest.java
rename to server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
index 9b930fcba4..2eddb5cdb6 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultTaskSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
@@ -38,7 +38,7 @@ import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-public class DefaultTaskSchedulerTest {
+public class DefaultDriverSchedulerTest {
private final DriverScheduler manager = DriverScheduler.getInstance();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
index aa2378eb07..c1efd8f045 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
@@ -49,7 +49,7 @@ public class DriverSchedulerTest {
}
@Test
- public void testManagingFragmentInstance() {
+ public void testManagingDriver() {
IDataBlockManager mockDataBlockManager = Mockito.mock(IDataBlockManager.class);
manager.setBlockManager(mockDataBlockManager);
// submit 2 tasks in one query
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
index c2bd63e9e3..6f49600921 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.driver.IDriver;
import org.apache.iotdb.db.mpp.execution.schedule.queue.IndexedBlockingQueue;
import org.apache.iotdb.db.mpp.execution.schedule.queue.L1PriorityQueue;
+import org.apache.iotdb.db.mpp.execution.schedule.queue.L2PriorityQueue;
import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskStatus;
@@ -266,4 +267,46 @@ public class DriverTaskTimeoutSentinelThreadTest {
Mockito.verify(mockScheduler, Mockito.never()).runningToFinished(Mockito.any(), Mockito.any());
Mockito.verify(mockScheduler, Mockito.never()).blockedToReady(Mockito.any());
}
+
+ @Test
+ public void testHandleTaskWithInternalError() {
+ ITaskScheduler mockScheduler = Mockito.mock(ITaskScheduler.class);
+ Mockito.when(mockScheduler.readyToRunning(Mockito.any()))
+ .thenAnswer(
+ ans -> {
+ DriverTask task = ans.getArgument(0);
+ if (task.getStatus() != DriverTaskStatus.READY) {
+ return false;
+ }
+ task.setStatus(DriverTaskStatus.RUNNING);
+ return true;
+ });
+ IndexedBlockingQueue<DriverTask> taskQueue =
+ new L2PriorityQueue<>(100, new DriverTask.SchedulePriorityComparator(), new DriverTask());
+ IDriver mockDriver = Mockito.mock(IDriver.class);
+ QueryId queryId = new QueryId("test");
+ PlanFragmentId fragmentId = new PlanFragmentId(queryId, 0);
+ FragmentInstanceId instanceId = new FragmentInstanceId(fragmentId, "inst-0");
+ Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
+ AbstractDriverThread executor =
+ new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler);
+ Mockito.when(mockDriver.processFor(Mockito.any()))
+ .thenAnswer(
+ ans -> {
+ executor.close();
+ throw new RuntimeException("mock exception");
+ });
+ DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
+ taskQueue.push(testTask);
+ executor.run(); // Here we use run() instead of start() to execute the task in the same thread
+ Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
+ Assert.assertEquals(
+ FragmentInstanceAbortedException.BY_INTERNAL_ERROR_SCHEDULED, testTask.getAbortCause());
+ Assert.assertEquals(0, taskQueue.size());
+ Mockito.verify(mockScheduler, Mockito.times(1)).toAborted(Mockito.any());
+ Mockito.verify(mockScheduler, Mockito.never()).runningToReady(Mockito.any(), Mockito.any());
+ Mockito.verify(mockScheduler, Mockito.never()).runningToBlocked(Mockito.any(), Mockito.any());
+ Mockito.verify(mockScheduler, Mockito.never()).runningToFinished(Mockito.any(), Mockito.any());
+ Mockito.verify(mockScheduler, Mockito.never()).blockedToReady(Mockito.any());
+ }
}