You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/07 05:22:26 UTC
[iotdb] branch master updated: Implement mpp scheduler interface (#5438)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 54623d8e30 Implement mpp scheduler interface (#5438)
54623d8e30 is described below
commit 54623d8e3018aea0cab395aeeb12b5cb62bab8e9
Author: BaiJian <er...@hotmail.com>
AuthorDate: Thu Apr 7 13:22:21 2022 +0800
Implement mpp scheduler interface (#5438)
---
.../db/mpp/schedule/FragmentInstanceScheduler.java | 14 ++++++++++----
.../db/mpp/schedule/IFragmentInstanceScheduler.java | 5 +----
.../mpp/schedule/FragmentInstanceSchedulerTest.java | 19 +++++++++++++++++--
3 files changed, 28 insertions(+), 10 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
index 777ba0c1e6..2ddd981249 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
@@ -166,12 +166,18 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
@Override
public void abortFragmentInstance(FragmentInstanceId instanceId) {
- // TODO(EricPai)
+ FragmentInstanceTask task = timeoutQueue.get(new FragmentInstanceTaskID(instanceId));
+ if (task == null) {
+ return;
+ }
+ task.lock();
+ try {
+ clearFragmentInstanceTask(task);
+ } finally {
+ task.unlock();
+ }
}
- @Override
- public void fetchFragmentInstance(Driver instance) {}
-
@Override
public double getSchedulePriority(FragmentInstanceId instanceId) {
FragmentInstanceTask task = timeoutQueue.get(new FragmentInstanceTaskID(instanceId));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceScheduler.java
index bc4c69ea00..2c597c15e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceScheduler.java
@@ -44,15 +44,12 @@ public interface IFragmentInstanceScheduler {
void abortQuery(QueryId queryId);
/**
- * Abort the fragment instance.
+ * Abort the fragment instance. If the instance is not existed, nothing will happen.
*
* @param instanceId the id of the fragment instance to be aborted.
*/
void abortFragmentInstance(FragmentInstanceId instanceId);
- /** Fetch an {@link org.apache.iotdb.db.mpp.execution.Driver}. */
- void fetchFragmentInstance(Driver instance);
-
/**
* Return the schedule priority of a fragment.
*
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
index 3cc1fe7ed3..233908ca44 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
@@ -116,9 +116,24 @@ public class FragmentInstanceSchedulerTest {
Assert.assertTrue(manager.getQueryMap().get(queryId2).contains(task4));
Assert.assertEquals(FragmentInstanceTaskStatus.READY, task4.getStatus());
- // Abort the query
+ // Abort one FragmentInstance
+ manager.abortFragmentInstance(instanceId1);
+ Mockito.verify(mockDataBlockManager, Mockito.times(1))
+ .forceDeregisterFragmentInstance(Mockito.any());
+ Assert.assertTrue(manager.getBlockedTasks().isEmpty());
+ Assert.assertEquals(2, manager.getQueryMap().size());
+ Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
+ Assert.assertEquals(3, manager.getTimeoutQueue().size());
+ Assert.assertEquals(3, manager.getReadyQueue().size());
+ Assert.assertEquals(FragmentInstanceTaskStatus.ABORTED, task1.getStatus());
+ Assert.assertEquals(FragmentInstanceTaskStatus.READY, task2.getStatus());
+ Assert.assertEquals(FragmentInstanceTaskStatus.READY, task3.getStatus());
+ Assert.assertEquals(FragmentInstanceTaskStatus.READY, task4.getStatus());
+
+ // Abort the whole query
+ Mockito.reset(mockDataBlockManager);
manager.abortQuery(queryId);
- Mockito.verify(mockDataBlockManager, Mockito.times(3))
+ Mockito.verify(mockDataBlockManager, Mockito.times(2))
.forceDeregisterFragmentInstance(Mockito.any());
Assert.assertTrue(manager.getBlockedTasks().isEmpty());
Assert.assertEquals(1, manager.getQueryMap().size());