You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/31 06:01:59 UTC

[iotdb] branch master updated: [IOTDB-3346] Clear DriverTask when any RuntimeException is thrown (#6086)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3918bf6992 [IOTDB-3346] Clear DriverTask when any RuntimeException is thrown (#6086)
3918bf6992 is described below

commit 3918bf6992223d59e9cde2a0d65365be933d63d7
Author: BaiJian <er...@hotmail.com>
AuthorDate: Tue May 31 14:01:54 2022 +0800

    [IOTDB-3346] Clear DriverTask when any RuntimeException is thrown (#6086)
---
 .../execution/schedule/AbstractDriverThread.java   |  8 +++-
 .../db/mpp/execution/schedule/DriverScheduler.java | 34 ++++++++++-------
 .../schedule/FragmentInstanceAbortedException.java |  1 +
 ...erTest.java => DefaultDriverSchedulerTest.java} |  2 +-
 .../execution/schedule/DriverSchedulerTest.java    |  2 +-
 .../DriverTaskTimeoutSentinelThreadTest.java       | 43 ++++++++++++++++++++++
 6 files changed, 74 insertions(+), 16 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
index 2b548dce22..8dea0529e2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
@@ -49,14 +49,20 @@ public abstract class AbstractDriverThread extends Thread implements Closeable {
 
   @Override
   public void run() {
+    DriverTask next;
     while (!closed && !Thread.currentThread().isInterrupted()) {
+      next = null;
       try {
-        DriverTask next = queue.poll();
+        next = queue.poll();
         execute(next);
       } catch (InterruptedException e) {
         break;
       } catch (Exception e) {
         logger.error("Executor " + this.getName() + " processes failed", e);
+        if (next != null) {
+          next.setAbortCause(FragmentInstanceAbortedException.BY_INTERNAL_ERROR_SCHEDULED);
+          scheduler.toAborted(next);
+        }
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index b1a5cc66fb..7fb57eea42 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -184,19 +184,6 @@ public class DriverScheduler implements IDriverScheduler, IService {
     if (task.getStatus() != DriverTaskStatus.FINISHED) {
       task.setStatus(DriverTaskStatus.ABORTED);
     }
-    if (task.getAbortCause() != null) {
-      task.getFragmentInstance()
-          .failed(
-              new FragmentInstanceAbortedException(
-                  task.getFragmentInstance().getInfo(), task.getAbortCause()));
-    }
-    if (task.getStatus() == DriverTaskStatus.ABORTED) {
-      blockManager.forceDeregisterFragmentInstance(
-          new TFragmentInstanceId(
-              task.getId().getQueryId().getId(),
-              task.getId().getFragmentId().getId(),
-              task.getId().getInstanceId()));
-    }
     readyQueue.remove(task.getId());
     timeoutQueue.remove(task.getId());
     blockedTasks.remove(task);
@@ -207,6 +194,27 @@ public class DriverScheduler implements IDriverScheduler, IService {
         queryMap.remove(task.getId().getQueryId());
       }
     }
+    if (task.getAbortCause() != null) {
+      try {
+        task.getFragmentInstance()
+            .failed(
+                new FragmentInstanceAbortedException(
+                    task.getFragmentInstance().getInfo(), task.getAbortCause()));
+      } catch (Exception e) {
+        logger.error("Clear DriverTask {} failed", task.getId().toString(), e);
+      }
+    }
+    if (task.getStatus() == DriverTaskStatus.ABORTED) {
+      try {
+        blockManager.forceDeregisterFragmentInstance(
+            new TFragmentInstanceId(
+                task.getId().getQueryId().getId(),
+                task.getId().getFragmentId().getId(),
+                task.getId().getInstanceId()));
+      } catch (Exception e) {
+        logger.error("Clear DriverTask {} failed", task.getId().toString(), e);
+      }
+    }
   }
 
   ITaskScheduler getScheduler() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/FragmentInstanceAbortedException.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/FragmentInstanceAbortedException.java
index 75f1325964..7425adc239 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/FragmentInstanceAbortedException.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/FragmentInstanceAbortedException.java
@@ -28,6 +28,7 @@ public class FragmentInstanceAbortedException extends Exception {
   public static final String BY_FRAGMENT_ABORT_CALLED = "fragment abort called";
   public static final String BY_QUERY_CASCADING_ABORTED = "query cascading aborted";
   public static final String BY_ALREADY_BEING_CANCELLED = "already being cancelled";
+  public static final String BY_INTERNAL_ERROR_SCHEDULED = "internal error scheduled";
 
   public FragmentInstanceAbortedException(FragmentInstanceId id, String causeMsg) {
     super(String.format("FragmentInstance %s is aborted by %s", id.toString(), causeMsg));
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultTaskSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
similarity index 99%
rename from server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultTaskSchedulerTest.java
rename to server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
index 9b930fcba4..2eddb5cdb6 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultTaskSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
@@ -38,7 +38,7 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-public class DefaultTaskSchedulerTest {
+public class DefaultDriverSchedulerTest {
 
   private final DriverScheduler manager = DriverScheduler.getInstance();
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
index aa2378eb07..c1efd8f045 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
@@ -49,7 +49,7 @@ public class DriverSchedulerTest {
   }
 
   @Test
-  public void testManagingFragmentInstance() {
+  public void testManagingDriver() {
     IDataBlockManager mockDataBlockManager = Mockito.mock(IDataBlockManager.class);
     manager.setBlockManager(mockDataBlockManager);
     // submit 2 tasks in one query
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
index c2bd63e9e3..6f49600921 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.driver.IDriver;
 import org.apache.iotdb.db.mpp.execution.schedule.queue.IndexedBlockingQueue;
 import org.apache.iotdb.db.mpp.execution.schedule.queue.L1PriorityQueue;
+import org.apache.iotdb.db.mpp.execution.schedule.queue.L2PriorityQueue;
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskStatus;
 
@@ -266,4 +267,46 @@ public class DriverTaskTimeoutSentinelThreadTest {
     Mockito.verify(mockScheduler, Mockito.never()).runningToFinished(Mockito.any(), Mockito.any());
     Mockito.verify(mockScheduler, Mockito.never()).blockedToReady(Mockito.any());
   }
+
+  @Test
+  public void testHandleTaskWithInternalError() {
+    ITaskScheduler mockScheduler = Mockito.mock(ITaskScheduler.class);
+    Mockito.when(mockScheduler.readyToRunning(Mockito.any()))
+        .thenAnswer(
+            ans -> {
+              DriverTask task = ans.getArgument(0);
+              if (task.getStatus() != DriverTaskStatus.READY) {
+                return false;
+              }
+              task.setStatus(DriverTaskStatus.RUNNING);
+              return true;
+            });
+    IndexedBlockingQueue<DriverTask> taskQueue =
+        new L2PriorityQueue<>(100, new DriverTask.SchedulePriorityComparator(), new DriverTask());
+    IDriver mockDriver = Mockito.mock(IDriver.class);
+    QueryId queryId = new QueryId("test");
+    PlanFragmentId fragmentId = new PlanFragmentId(queryId, 0);
+    FragmentInstanceId instanceId = new FragmentInstanceId(fragmentId, "inst-0");
+    Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
+    AbstractDriverThread executor =
+        new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler);
+    Mockito.when(mockDriver.processFor(Mockito.any()))
+        .thenAnswer(
+            ans -> {
+              executor.close();
+              throw new RuntimeException("mock exception");
+            });
+    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
+    taskQueue.push(testTask);
+    executor.run(); // Here we use run() instead of start() to execute the task in the same thread
+    Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
+    Assert.assertEquals(
+        FragmentInstanceAbortedException.BY_INTERNAL_ERROR_SCHEDULED, testTask.getAbortCause());
+    Assert.assertEquals(0, taskQueue.size());
+    Mockito.verify(mockScheduler, Mockito.times(1)).toAborted(Mockito.any());
+    Mockito.verify(mockScheduler, Mockito.never()).runningToReady(Mockito.any(), Mockito.any());
+    Mockito.verify(mockScheduler, Mockito.never()).runningToBlocked(Mockito.any(), Mockito.any());
+    Mockito.verify(mockScheduler, Mockito.never()).runningToFinished(Mockito.any(), Mockito.any());
+    Mockito.verify(mockScheduler, Mockito.never()).blockedToReady(Mockito.any());
+  }
 }