You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/07 05:22:26 UTC

[iotdb] branch master updated: Implement mpp scheduler interface (#5438)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 54623d8e30 Implement mpp scheduler interface (#5438)
54623d8e30 is described below

commit 54623d8e3018aea0cab395aeeb12b5cb62bab8e9
Author: BaiJian <er...@hotmail.com>
AuthorDate: Thu Apr 7 13:22:21 2022 +0800

    Implement mpp scheduler interface (#5438)
---
 .../db/mpp/schedule/FragmentInstanceScheduler.java    | 14 ++++++++++----
 .../db/mpp/schedule/IFragmentInstanceScheduler.java   |  5 +----
 .../mpp/schedule/FragmentInstanceSchedulerTest.java   | 19 +++++++++++++++++--
 3 files changed, 28 insertions(+), 10 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
index 777ba0c1e6..2ddd981249 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
@@ -166,12 +166,18 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
 
   @Override
   public void abortFragmentInstance(FragmentInstanceId instanceId) {
-    // TODO(EricPai)
+    FragmentInstanceTask task = timeoutQueue.get(new FragmentInstanceTaskID(instanceId));
+    if (task == null) {
+      return;
+    }
+    task.lock();
+    try {
+      clearFragmentInstanceTask(task);
+    } finally {
+      task.unlock();
+    }
   }
 
-  @Override
-  public void fetchFragmentInstance(Driver instance) {}
-
   @Override
   public double getSchedulePriority(FragmentInstanceId instanceId) {
     FragmentInstanceTask task = timeoutQueue.get(new FragmentInstanceTaskID(instanceId));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceScheduler.java
index bc4c69ea00..2c597c15e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceScheduler.java
@@ -44,15 +44,12 @@ public interface IFragmentInstanceScheduler {
   void abortQuery(QueryId queryId);
 
   /**
-   * Abort the fragment instance.
+   * Abort the fragment instance. If the instance is not existed, nothing will happen.
    *
    * @param instanceId the id of the fragment instance to be aborted.
    */
   void abortFragmentInstance(FragmentInstanceId instanceId);
 
-  /** Fetch an {@link org.apache.iotdb.db.mpp.execution.Driver}. */
-  void fetchFragmentInstance(Driver instance);
-
   /**
    * Return the schedule priority of a fragment.
    *
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
index 3cc1fe7ed3..233908ca44 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
@@ -116,9 +116,24 @@ public class FragmentInstanceSchedulerTest {
     Assert.assertTrue(manager.getQueryMap().get(queryId2).contains(task4));
     Assert.assertEquals(FragmentInstanceTaskStatus.READY, task4.getStatus());
 
-    // Abort the query
+    // Abort one FragmentInstance
+    manager.abortFragmentInstance(instanceId1);
+    Mockito.verify(mockDataBlockManager, Mockito.times(1))
+        .forceDeregisterFragmentInstance(Mockito.any());
+    Assert.assertTrue(manager.getBlockedTasks().isEmpty());
+    Assert.assertEquals(2, manager.getQueryMap().size());
+    Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
+    Assert.assertEquals(3, manager.getTimeoutQueue().size());
+    Assert.assertEquals(3, manager.getReadyQueue().size());
+    Assert.assertEquals(FragmentInstanceTaskStatus.ABORTED, task1.getStatus());
+    Assert.assertEquals(FragmentInstanceTaskStatus.READY, task2.getStatus());
+    Assert.assertEquals(FragmentInstanceTaskStatus.READY, task3.getStatus());
+    Assert.assertEquals(FragmentInstanceTaskStatus.READY, task4.getStatus());
+
+    // Abort the whole query
+    Mockito.reset(mockDataBlockManager);
     manager.abortQuery(queryId);
-    Mockito.verify(mockDataBlockManager, Mockito.times(3))
+    Mockito.verify(mockDataBlockManager, Mockito.times(2))
         .forceDeregisterFragmentInstance(Mockito.any());
     Assert.assertTrue(manager.getBlockedTasks().isEmpty());
     Assert.assertEquals(1, manager.getQueryMap().size());