You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2022/05/31 03:30:08 UTC

[iotdb] 01/01: [IOTDB-3346] Clear DriverTask when any RuntimeException is thrown

This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch bugfix/iotdb-3346
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a9958e188cdf95ef18cf4d4a3bdd8dbcf07ff74c
Author: ericpai <er...@hotmail.com>
AuthorDate: Tue May 31 11:29:40 2022 +0800

    [IOTDB-3346] Clear DriverTask when any RuntimeException is thrown
---
 .../execution/schedule/AbstractDriverThread.java   |  8 +++-
 .../schedule/FragmentInstanceAbortedException.java |  2 +
 ...erTest.java => DefaultDriverSchedulerTest.java} |  2 +-
 .../execution/schedule/DriverSchedulerTest.java    |  2 +-
 .../DriverTaskTimeoutSentinelThreadTest.java       | 43 ++++++++++++++++++++++
 5 files changed, 54 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
index 2b548dce22..8dea0529e2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
@@ -49,14 +49,20 @@ public abstract class AbstractDriverThread extends Thread implements Closeable {
 
   @Override
   public void run() {
+    DriverTask next;
     while (!closed && !Thread.currentThread().isInterrupted()) {
+      next = null;
       try {
-        DriverTask next = queue.poll();
+        next = queue.poll();
         execute(next);
       } catch (InterruptedException e) {
         break;
       } catch (Exception e) {
         logger.error("Executor " + this.getName() + " processes failed", e);
+        if (next != null) {
+          next.setAbortCause(FragmentInstanceAbortedException.BY_INTERNAL_ERROR_SCHEDULED);
+          scheduler.toAborted(next);
+        }
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/FragmentInstanceAbortedException.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/FragmentInstanceAbortedException.java
index 75f1325964..bbd892934c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/FragmentInstanceAbortedException.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/FragmentInstanceAbortedException.java
@@ -29,6 +29,8 @@ public class FragmentInstanceAbortedException extends Exception {
   public static final String BY_QUERY_CASCADING_ABORTED = "query cascading aborted";
   public static final String BY_ALREADY_BEING_CANCELLED = "already being cancelled";
 
+  public static final String BY_INTERNAL_ERROR_SCHEDULED = "internal error scheduled";
+
   public FragmentInstanceAbortedException(FragmentInstanceId id, String causeMsg) {
     super(String.format("FragmentInstance %s is aborted by %s", id.toString(), causeMsg));
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultTaskSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
similarity index 99%
rename from server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultTaskSchedulerTest.java
rename to server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
index 9b930fcba4..2eddb5cdb6 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultTaskSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
@@ -38,7 +38,7 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-public class DefaultTaskSchedulerTest {
+public class DefaultDriverSchedulerTest {
 
   private final DriverScheduler manager = DriverScheduler.getInstance();
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
index aa2378eb07..c1efd8f045 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
@@ -49,7 +49,7 @@ public class DriverSchedulerTest {
   }
 
   @Test
-  public void testManagingFragmentInstance() {
+  public void testManagingDriver() {
     IDataBlockManager mockDataBlockManager = Mockito.mock(IDataBlockManager.class);
     manager.setBlockManager(mockDataBlockManager);
     // submit 2 tasks in one query
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
index c2bd63e9e3..6f49600921 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.driver.IDriver;
 import org.apache.iotdb.db.mpp.execution.schedule.queue.IndexedBlockingQueue;
 import org.apache.iotdb.db.mpp.execution.schedule.queue.L1PriorityQueue;
+import org.apache.iotdb.db.mpp.execution.schedule.queue.L2PriorityQueue;
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskStatus;
 
@@ -266,4 +267,46 @@ public class DriverTaskTimeoutSentinelThreadTest {
     Mockito.verify(mockScheduler, Mockito.never()).runningToFinished(Mockito.any(), Mockito.any());
     Mockito.verify(mockScheduler, Mockito.never()).blockedToReady(Mockito.any());
   }
+
+  @Test
+  public void testHandleTaskWithInternalError() {
+    ITaskScheduler mockScheduler = Mockito.mock(ITaskScheduler.class);
+    Mockito.when(mockScheduler.readyToRunning(Mockito.any()))
+        .thenAnswer(
+            ans -> {
+              DriverTask task = ans.getArgument(0);
+              if (task.getStatus() != DriverTaskStatus.READY) {
+                return false;
+              }
+              task.setStatus(DriverTaskStatus.RUNNING);
+              return true;
+            });
+    IndexedBlockingQueue<DriverTask> taskQueue =
+        new L2PriorityQueue<>(100, new DriverTask.SchedulePriorityComparator(), new DriverTask());
+    IDriver mockDriver = Mockito.mock(IDriver.class);
+    QueryId queryId = new QueryId("test");
+    PlanFragmentId fragmentId = new PlanFragmentId(queryId, 0);
+    FragmentInstanceId instanceId = new FragmentInstanceId(fragmentId, "inst-0");
+    Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
+    AbstractDriverThread executor =
+        new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler);
+    Mockito.when(mockDriver.processFor(Mockito.any()))
+        .thenAnswer(
+            ans -> {
+              executor.close();
+              throw new RuntimeException("mock exception");
+            });
+    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
+    taskQueue.push(testTask);
+    executor.run(); // Here we use run() instead of start() to execute the task in the same thread
+    Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
+    Assert.assertEquals(
+        FragmentInstanceAbortedException.BY_INTERNAL_ERROR_SCHEDULED, testTask.getAbortCause());
+    Assert.assertEquals(0, taskQueue.size());
+    Mockito.verify(mockScheduler, Mockito.times(1)).toAborted(Mockito.any());
+    Mockito.verify(mockScheduler, Mockito.never()).runningToReady(Mockito.any(), Mockito.any());
+    Mockito.verify(mockScheduler, Mockito.never()).runningToBlocked(Mockito.any(), Mockito.any());
+    Mockito.verify(mockScheduler, Mockito.never()).runningToFinished(Mockito.any(), Mockito.any());
+    Mockito.verify(mockScheduler, Mockito.never()).blockedToReady(Mockito.any());
+  }
 }