You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2022/05/31 03:30:07 UTC

[iotdb] branch bugfix/iotdb-3346 created (now a9958e188c)

This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a change to branch bugfix/iotdb-3346
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at a9958e188c [IOTDB-3346] Clear DriverTask when any RuntimeException is thrown

This branch includes the following new commits:

     new a9958e188c [IOTDB-3346] Clear DriverTask when any RuntimeException is thrown

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: [IOTDB-3346] Clear DriverTask when any RuntimeException is thrown

Posted by er...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch bugfix/iotdb-3346
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a9958e188cdf95ef18cf4d4a3bdd8dbcf07ff74c
Author: ericpai <er...@hotmail.com>
AuthorDate: Tue May 31 11:29:40 2022 +0800

    [IOTDB-3346] Clear DriverTask when any RuntimeException is thrown
---
 .../execution/schedule/AbstractDriverThread.java   |  8 +++-
 .../schedule/FragmentInstanceAbortedException.java |  2 +
 ...erTest.java => DefaultDriverSchedulerTest.java} |  2 +-
 .../execution/schedule/DriverSchedulerTest.java    |  2 +-
 .../DriverTaskTimeoutSentinelThreadTest.java       | 43 ++++++++++++++++++++++
 5 files changed, 54 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
index 2b548dce22..8dea0529e2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
@@ -49,14 +49,20 @@ public abstract class AbstractDriverThread extends Thread implements Closeable {
 
   @Override
   public void run() {
+    DriverTask next;
     while (!closed && !Thread.currentThread().isInterrupted()) {
+      next = null;
       try {
-        DriverTask next = queue.poll();
+        next = queue.poll();
         execute(next);
       } catch (InterruptedException e) {
         break;
       } catch (Exception e) {
         logger.error("Executor " + this.getName() + " processes failed", e);
+        if (next != null) {
+          next.setAbortCause(FragmentInstanceAbortedException.BY_INTERNAL_ERROR_SCHEDULED);
+          scheduler.toAborted(next);
+        }
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/FragmentInstanceAbortedException.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/FragmentInstanceAbortedException.java
index 75f1325964..bbd892934c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/FragmentInstanceAbortedException.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/FragmentInstanceAbortedException.java
@@ -29,6 +29,8 @@ public class FragmentInstanceAbortedException extends Exception {
   public static final String BY_QUERY_CASCADING_ABORTED = "query cascading aborted";
   public static final String BY_ALREADY_BEING_CANCELLED = "already being cancelled";
 
+  public static final String BY_INTERNAL_ERROR_SCHEDULED = "internal error scheduled";
+
   public FragmentInstanceAbortedException(FragmentInstanceId id, String causeMsg) {
     super(String.format("FragmentInstance %s is aborted by %s", id.toString(), causeMsg));
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultTaskSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
similarity index 99%
rename from server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultTaskSchedulerTest.java
rename to server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
index 9b930fcba4..2eddb5cdb6 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultTaskSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
@@ -38,7 +38,7 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-public class DefaultTaskSchedulerTest {
+public class DefaultDriverSchedulerTest {
 
   private final DriverScheduler manager = DriverScheduler.getInstance();
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
index aa2378eb07..c1efd8f045 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
@@ -49,7 +49,7 @@ public class DriverSchedulerTest {
   }
 
   @Test
-  public void testManagingFragmentInstance() {
+  public void testManagingDriver() {
     IDataBlockManager mockDataBlockManager = Mockito.mock(IDataBlockManager.class);
     manager.setBlockManager(mockDataBlockManager);
     // submit 2 tasks in one query
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
index c2bd63e9e3..6f49600921 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.driver.IDriver;
 import org.apache.iotdb.db.mpp.execution.schedule.queue.IndexedBlockingQueue;
 import org.apache.iotdb.db.mpp.execution.schedule.queue.L1PriorityQueue;
+import org.apache.iotdb.db.mpp.execution.schedule.queue.L2PriorityQueue;
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskStatus;
 
@@ -266,4 +267,46 @@ public class DriverTaskTimeoutSentinelThreadTest {
     Mockito.verify(mockScheduler, Mockito.never()).runningToFinished(Mockito.any(), Mockito.any());
     Mockito.verify(mockScheduler, Mockito.never()).blockedToReady(Mockito.any());
   }
+
+  @Test
+  public void testHandleTaskWithInternalError() {
+    ITaskScheduler mockScheduler = Mockito.mock(ITaskScheduler.class);
+    Mockito.when(mockScheduler.readyToRunning(Mockito.any()))
+        .thenAnswer(
+            ans -> {
+              DriverTask task = ans.getArgument(0);
+              if (task.getStatus() != DriverTaskStatus.READY) {
+                return false;
+              }
+              task.setStatus(DriverTaskStatus.RUNNING);
+              return true;
+            });
+    IndexedBlockingQueue<DriverTask> taskQueue =
+        new L2PriorityQueue<>(100, new DriverTask.SchedulePriorityComparator(), new DriverTask());
+    IDriver mockDriver = Mockito.mock(IDriver.class);
+    QueryId queryId = new QueryId("test");
+    PlanFragmentId fragmentId = new PlanFragmentId(queryId, 0);
+    FragmentInstanceId instanceId = new FragmentInstanceId(fragmentId, "inst-0");
+    Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
+    AbstractDriverThread executor =
+        new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler);
+    Mockito.when(mockDriver.processFor(Mockito.any()))
+        .thenAnswer(
+            ans -> {
+              executor.close();
+              throw new RuntimeException("mock exception");
+            });
+    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
+    taskQueue.push(testTask);
+    executor.run(); // Here we use run() instead of start() to execute the task in the same thread
+    Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
+    Assert.assertEquals(
+        FragmentInstanceAbortedException.BY_INTERNAL_ERROR_SCHEDULED, testTask.getAbortCause());
+    Assert.assertEquals(0, taskQueue.size());
+    Mockito.verify(mockScheduler, Mockito.times(1)).toAborted(Mockito.any());
+    Mockito.verify(mockScheduler, Mockito.never()).runningToReady(Mockito.any(), Mockito.any());
+    Mockito.verify(mockScheduler, Mockito.never()).runningToBlocked(Mockito.any(), Mockito.any());
+    Mockito.verify(mockScheduler, Mockito.never()).runningToFinished(Mockito.any(), Mockito.any());
+    Mockito.verify(mockScheduler, Mockito.never()).blockedToReady(Mockito.any());
+  }
 }