You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/12/20 01:24:30 UTC
[iotdb] 01/02: try
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch tsbs
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6c9944811d6702a832086d39cda2e150c731713b
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Sun Dec 18 16:24:29 2022 +0800
try
---
.../mpp/execution/exchange/LocalSourceHandle.java | 4 +-
.../execution/operator/source/SeriesScanUtil.java | 4 +-
.../mpp/execution/schedule/DriverTaskThread.java | 7 +-
.../mpp/execution/schedule/ExecutionContext.java | 6 +-
.../db/mpp/execution/schedule/task/DriverTask.java | 4 +-
.../schedule/DefaultDriverSchedulerTest.java | 814 ++++++++++-----------
6 files changed, 420 insertions(+), 419 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
index 64415f41dc..dd0c88f0c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
@@ -88,7 +88,7 @@ public class LocalSourceHandle implements ISourceHandle {
@Override
public TsBlock receive() {
- try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
+// try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
checkState();
if (!queue.isBlocked().isDone()) {
@@ -107,7 +107,7 @@ public class LocalSourceHandle implements ISourceHandle {
}
checkAndInvokeOnFinished();
return tsBlock;
- }
+// }
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index 22fd2cbabc..7ab5a02e05 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -159,7 +159,9 @@ public class SeriesScanUtil {
}
public void initQueryDataSource(QueryDataSource dataSource) {
- QueryUtils.fillOrderIndexes(dataSource, seriesPath.getDevice(), orderUtils.getAscending());
+ if (!dataSource.getUnseqResources().isEmpty()) {
+ QueryUtils.fillOrderIndexes(dataSource, seriesPath.getDevice(), orderUtils.getAscending());
+ }
this.dataSource = dataSource;
this.timeFilter = dataSource.updateFilterUsingTTL(timeFilter);
if (this.valueFilter != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java
index 28324a0502..b55dcf9ef5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java
@@ -60,9 +60,10 @@ public class DriverTaskThread extends AbstractDriverThread {
return;
}
IDriver instance = task.getFragmentInstance();
- CpuTimer timer = new CpuTimer();
+ long startTime = System.nanoTime();
+// CpuTimer timer = new CpuTimer();
ListenableFuture<?> future = instance.processFor(EXECUTION_TIME_SLICE);
- CpuTimer.CpuDuration duration = timer.elapsedTime();
+// CpuTimer.CpuDuration duration = timer.elapsedTime();
// long cost = System.nanoTime() - startTime;
// If the future is cancelled, the task is in an error and should be thrown.
if (future.isCancelled()) {
@@ -71,7 +72,7 @@ public class DriverTaskThread extends AbstractDriverThread {
return;
}
ExecutionContext context = new ExecutionContext();
- context.setCpuDuration(duration);
+ context.setCpuDuration(System.nanoTime() - startTime);
context.setTimeSlice(EXECUTION_TIME_SLICE);
if (instance.isFinished()) {
scheduler.runningToFinished(task, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/ExecutionContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/ExecutionContext.java
index df821dac68..070aa548b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/ExecutionContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/ExecutionContext.java
@@ -25,14 +25,14 @@ import io.airlift.units.Duration;
/** The execution context of a {@link DriverTask} */
public class ExecutionContext {
- private CpuTimer.CpuDuration cpuDuration;
+ private long cpuDuration;
private Duration timeSlice;
- public CpuTimer.CpuDuration getCpuDuration() {
+ public long getCpuDuration() {
return cpuDuration;
}
- public void setCpuDuration(CpuTimer.CpuDuration cpuDuration) {
+ public void setCpuDuration(long cpuDuration) {
this.cpuDuration = cpuDuration;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
index f65c1f3988..1aa87a119d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
@@ -102,9 +102,7 @@ public class DriverTask implements IDIndexedAccessible {
// 1. The penalty factor means that if a task executes less time in one schedule, it will have a
// high schedule priority
- double penaltyFactor =
- context.getCpuDuration().getWall().getValue(TimeUnit.NANOSECONDS)
- / context.getTimeSlice().getValue(TimeUnit.NANOSECONDS);
+ double penaltyFactor = context.getCpuDuration() / context.getTimeSlice().getValue(TimeUnit.NANOSECONDS);
// 2. If a task is nearly timeout, it should be scheduled as soon as possible.
long base = System.currentTimeMillis() - ddl;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
index e7e6861faa..10fd9fac0e 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
@@ -1,407 +1,407 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.mpp.execution.schedule;
-
-import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.common.PlanFragmentId;
-import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.execution.driver.IDriver;
-import org.apache.iotdb.db.mpp.execution.exchange.IMPPDataExchangeManager;
-import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
-import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskStatus;
-import org.apache.iotdb.db.utils.stats.CpuTimer;
-import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
-
-import io.airlift.units.Duration;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-public class DefaultDriverSchedulerTest {
-
- private final DriverScheduler manager = DriverScheduler.getInstance();
-
- @After
- public void tearDown() throws IOException {
- clear();
- }
-
- @Test
- public void testBlockedToReady() {
- IMPPDataExchangeManager mockMPPDataExchangeManager =
- Mockito.mock(IMPPDataExchangeManager.class);
- manager.setBlockManager(mockMPPDataExchangeManager);
- ITaskScheduler defaultScheduler = manager.getScheduler();
- IDriver mockDriver = Mockito.mock(IDriver.class);
- QueryId queryId = new QueryId("test");
- FragmentInstanceId instanceId =
- new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
- Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
- DriverTaskStatus[] invalidStates =
- new DriverTaskStatus[] {
- DriverTaskStatus.FINISHED,
- DriverTaskStatus.ABORTED,
- DriverTaskStatus.READY,
- DriverTaskStatus.RUNNING,
- };
- for (DriverTaskStatus status : invalidStates) {
- DriverTask testTask = new DriverTask(mockDriver, 100L, status);
- manager.getBlockedTasks().add(testTask);
- Set<DriverTask> taskSet = new HashSet<>();
- taskSet.add(testTask);
- manager.getQueryMap().put(queryId, taskSet);
- manager.getTimeoutQueue().push(testTask);
- defaultScheduler.blockedToReady(testTask);
- Assert.assertEquals(status, testTask.getStatus());
- Assert.assertTrue(manager.getBlockedTasks().contains(testTask));
- Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
- Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
- Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
- Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
- clear();
- }
- DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.BLOCKED);
- manager.getBlockedTasks().add(testTask);
- Set<DriverTask> taskSet = new HashSet<>();
- taskSet.add(testTask);
- manager.getQueryMap().put(queryId, taskSet);
- manager.getTimeoutQueue().push(testTask);
- defaultScheduler.blockedToReady(testTask);
- Assert.assertEquals(DriverTaskStatus.READY, testTask.getStatus());
- Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
- Assert.assertNotNull(manager.getReadyQueue().get(testTask.getId()));
- Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
- Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
- Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
- Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
- clear();
- }
-
- @Test
- public void testReadyToRunning() {
- IMPPDataExchangeManager mockMPPDataExchangeManager =
- Mockito.mock(IMPPDataExchangeManager.class);
- manager.setBlockManager(mockMPPDataExchangeManager);
- ITaskScheduler defaultScheduler = manager.getScheduler();
- IDriver mockDriver = Mockito.mock(IDriver.class);
-
- QueryId queryId = new QueryId("test");
- FragmentInstanceId instanceId =
- new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
- Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
- DriverTaskStatus[] invalidStates =
- new DriverTaskStatus[] {
- DriverTaskStatus.FINISHED,
- DriverTaskStatus.ABORTED,
- DriverTaskStatus.BLOCKED,
- DriverTaskStatus.RUNNING,
- };
- for (DriverTaskStatus status : invalidStates) {
- DriverTask testTask = new DriverTask(mockDriver, 100L, status);
- Set<DriverTask> taskSet = new HashSet<>();
- taskSet.add(testTask);
- manager.getQueryMap().put(queryId, taskSet);
- manager.getTimeoutQueue().push(testTask);
- defaultScheduler.readyToRunning(testTask);
- Assert.assertEquals(status, testTask.getStatus());
- Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
- Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
- Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
- Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
- clear();
- }
- DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
- Set<DriverTask> taskSet = new HashSet<>();
- taskSet.add(testTask);
- manager.getQueryMap().put(queryId, taskSet);
- manager.getTimeoutQueue().push(testTask);
- defaultScheduler.readyToRunning(testTask);
- Assert.assertEquals(DriverTaskStatus.RUNNING, testTask.getStatus());
- Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
- Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
- Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
- Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
- Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
- clear();
- }
-
- @Test
- public void testRunningToReady() {
- IMPPDataExchangeManager mockMPPDataExchangeManager =
- Mockito.mock(IMPPDataExchangeManager.class);
- manager.setBlockManager(mockMPPDataExchangeManager);
- ITaskScheduler defaultScheduler = manager.getScheduler();
- IDriver mockDriver = Mockito.mock(IDriver.class);
- QueryId queryId = new QueryId("test");
- FragmentInstanceId instanceId =
- new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
- Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
- DriverTaskStatus[] invalidStates =
- new DriverTaskStatus[] {
- DriverTaskStatus.FINISHED,
- DriverTaskStatus.ABORTED,
- DriverTaskStatus.BLOCKED,
- DriverTaskStatus.READY,
- };
- for (DriverTaskStatus status : invalidStates) {
- DriverTask testTask = new DriverTask(mockDriver, 100L, status);
- Set<DriverTask> taskSet = new HashSet<>();
- taskSet.add(testTask);
- manager.getQueryMap().put(queryId, taskSet);
- manager.getTimeoutQueue().push(testTask);
- defaultScheduler.runningToReady(testTask, new ExecutionContext());
- Assert.assertEquals(status, testTask.getStatus());
- Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
- Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
- Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
- Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
- Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
- clear();
- }
- DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
- Set<DriverTask> taskSet = new HashSet<>();
- taskSet.add(testTask);
- manager.getQueryMap().put(queryId, taskSet);
- manager.getTimeoutQueue().push(testTask);
- ExecutionContext context = new ExecutionContext();
- context.setTimeSlice(new Duration(1, TimeUnit.SECONDS));
- context.setCpuDuration(new CpuTimer.CpuDuration());
- defaultScheduler.runningToReady(testTask, context);
- Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
- Assert.assertEquals(DriverTaskStatus.READY, testTask.getStatus());
- Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
- Assert.assertNotNull(manager.getReadyQueue().get(testTask.getId()));
- Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
- Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
- Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
- Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
- clear();
- }
-
- @Test
- public void testRunningToBlocked() {
- IMPPDataExchangeManager mockMPPDataExchangeManager =
- Mockito.mock(IMPPDataExchangeManager.class);
- manager.setBlockManager(mockMPPDataExchangeManager);
- ITaskScheduler defaultScheduler = manager.getScheduler();
- IDriver mockDriver = Mockito.mock(IDriver.class);
- QueryId queryId = new QueryId("test");
- FragmentInstanceId instanceId =
- new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
- Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
- DriverTaskStatus[] invalidStates =
- new DriverTaskStatus[] {
- DriverTaskStatus.FINISHED,
- DriverTaskStatus.ABORTED,
- DriverTaskStatus.BLOCKED,
- DriverTaskStatus.READY,
- };
- for (DriverTaskStatus status : invalidStates) {
- DriverTask testTask = new DriverTask(mockDriver, 100L, status);
- Set<DriverTask> taskSet = new HashSet<>();
- taskSet.add(testTask);
- manager.getQueryMap().put(queryId, taskSet);
- manager.getTimeoutQueue().push(testTask);
- defaultScheduler.runningToBlocked(testTask, new ExecutionContext());
- Assert.assertEquals(status, testTask.getStatus());
- Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
- Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
- Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
- Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
- Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
- clear();
- }
- DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
- Set<DriverTask> taskSet = new HashSet<>();
- taskSet.add(testTask);
- manager.getQueryMap().put(queryId, taskSet);
- manager.getTimeoutQueue().push(testTask);
- ExecutionContext context = new ExecutionContext();
- context.setTimeSlice(new Duration(1, TimeUnit.SECONDS));
- context.setCpuDuration(new CpuTimer.CpuDuration());
- defaultScheduler.runningToBlocked(testTask, context);
- Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
- Assert.assertEquals(DriverTaskStatus.BLOCKED, testTask.getStatus());
- Assert.assertTrue(manager.getBlockedTasks().contains(testTask));
- Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
- Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
- Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
- Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
- Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
- clear();
- }
-
- @Test
- public void testRunningToFinished() {
- IMPPDataExchangeManager mockMPPDataExchangeManager =
- Mockito.mock(IMPPDataExchangeManager.class);
- manager.setBlockManager(mockMPPDataExchangeManager);
- ITaskScheduler defaultScheduler = manager.getScheduler();
- IDriver mockDriver = Mockito.mock(IDriver.class);
- QueryId queryId = new QueryId("test");
- FragmentInstanceId instanceId =
- new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
- Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
- DriverTaskStatus[] invalidStates =
- new DriverTaskStatus[] {
- DriverTaskStatus.FINISHED,
- DriverTaskStatus.ABORTED,
- DriverTaskStatus.BLOCKED,
- DriverTaskStatus.READY,
- };
- for (DriverTaskStatus status : invalidStates) {
- DriverTask testTask = new DriverTask(mockDriver, 100L, status);
- Set<DriverTask> taskSet = new HashSet<>();
- taskSet.add(testTask);
- manager.getQueryMap().put(queryId, taskSet);
- manager.getTimeoutQueue().push(testTask);
- defaultScheduler.runningToFinished(testTask, new ExecutionContext());
- Assert.assertEquals(status, testTask.getStatus());
- Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
- Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
- Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
- Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
- Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
- clear();
- }
- DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
- Set<DriverTask> taskSet = new HashSet<>();
- taskSet.add(testTask);
- manager.getQueryMap().put(queryId, taskSet);
- manager.getTimeoutQueue().push(testTask);
- ExecutionContext context = new ExecutionContext();
- context.setTimeSlice(new Duration(1, TimeUnit.SECONDS));
- context.setCpuDuration(new CpuTimer.CpuDuration());
- defaultScheduler.runningToFinished(testTask, context);
- Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
- Assert.assertEquals(DriverTaskStatus.FINISHED, testTask.getStatus());
- Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
- Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
- Assert.assertNull(manager.getTimeoutQueue().get(testTask.getId()));
- Assert.assertFalse(manager.getQueryMap().containsKey(queryId));
- Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
- clear();
- }
-
- @Test
- public void testToAbort() {
- IMPPDataExchangeManager mockMPPDataExchangeManager =
- Mockito.mock(IMPPDataExchangeManager.class);
- manager.setBlockManager(mockMPPDataExchangeManager);
- IDataNodeRPCService.Client mockMppServiceClient =
- Mockito.mock(IDataNodeRPCService.Client.class);
- ITaskScheduler defaultScheduler = manager.getScheduler();
- QueryId queryId = new QueryId("test");
- FragmentInstanceId instanceId1 =
- new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
- IDriver mockDriver1 = Mockito.mock(IDriver.class);
- Mockito.when(mockDriver1.getInfo()).thenReturn(instanceId1);
- IDriver mockDriver2 = Mockito.mock(IDriver.class);
- FragmentInstanceId instanceId2 =
- new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-1");
- Mockito.when(mockDriver2.getInfo()).thenReturn(instanceId2);
- DriverTaskStatus[] invalidStates =
- new DriverTaskStatus[] {
- DriverTaskStatus.FINISHED, DriverTaskStatus.ABORTED,
- };
- for (DriverTaskStatus status : invalidStates) {
- DriverTask testTask1 = new DriverTask(mockDriver1, 100L, status);
- DriverTask testTask2 = new DriverTask(mockDriver2, 100L, DriverTaskStatus.BLOCKED);
- Set<DriverTask> taskSet = new HashSet<>();
- taskSet.add(testTask1);
- taskSet.add(testTask2);
- manager.getQueryMap().put(queryId, taskSet);
- manager.getTimeoutQueue().push(testTask1);
- manager.getTimeoutQueue().push(testTask2);
- manager.getBlockedTasks().add(testTask2);
- defaultScheduler.toAborted(testTask1);
-
- Assert.assertEquals(status, testTask1.getStatus());
- Assert.assertEquals(DriverTaskStatus.BLOCKED, testTask2.getStatus());
- Assert.assertFalse(manager.getBlockedTasks().contains(testTask1));
- Assert.assertTrue(manager.getBlockedTasks().contains(testTask2));
- Assert.assertNull(manager.getReadyQueue().get(testTask1.getId()));
- Assert.assertNull(manager.getReadyQueue().get(testTask2.getId()));
- Assert.assertNotNull(manager.getTimeoutQueue().get(testTask1.getId()));
- Assert.assertNotNull(manager.getTimeoutQueue().get(testTask2.getId()));
- Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
- Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask1));
- Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask2));
-
- Mockito.verify(mockDriver1, Mockito.never()).failed(Mockito.any());
- Mockito.verify(mockDriver2, Mockito.never()).failed(Mockito.any());
- clear();
- }
- DriverTaskStatus[] validStates =
- new DriverTaskStatus[] {
- DriverTaskStatus.RUNNING, DriverTaskStatus.READY, DriverTaskStatus.BLOCKED,
- };
- for (DriverTaskStatus status : validStates) {
- Mockito.reset(mockDriver1);
- Mockito.when(mockDriver1.getInfo()).thenReturn(instanceId1);
- Mockito.reset(mockDriver2);
- Mockito.when(mockDriver2.getInfo()).thenReturn(instanceId2);
-
- DriverTask testTask1 = new DriverTask(mockDriver1, 100L, status);
-
- DriverTask testTask2 = new DriverTask(mockDriver2, 100L, DriverTaskStatus.BLOCKED);
- Set<DriverTask> taskSet = new HashSet<>();
- taskSet.add(testTask1);
- taskSet.add(testTask2);
- manager.getQueryMap().put(queryId, taskSet);
- manager.getTimeoutQueue().push(testTask1);
- defaultScheduler.toAborted(testTask1);
-
- Mockito.reset(mockMppServiceClient);
- Mockito.verify(mockMPPDataExchangeManager, Mockito.times(2))
- .forceDeregisterFragmentInstance(Mockito.any());
- Mockito.reset(mockMPPDataExchangeManager);
-
- // An aborted fragment may cause others in the same query aborted.
- Assert.assertEquals(DriverTaskStatus.ABORTED, testTask1.getStatus());
- Assert.assertEquals(DriverTaskStatus.ABORTED, testTask2.getStatus());
- Assert.assertFalse(manager.getBlockedTasks().contains(testTask1));
- Assert.assertFalse(manager.getBlockedTasks().contains(testTask2));
- Assert.assertNull(manager.getReadyQueue().get(testTask1.getId()));
- Assert.assertNull(manager.getReadyQueue().get(testTask2.getId()));
- Assert.assertNull(manager.getTimeoutQueue().get(testTask1.getId()));
- Assert.assertNull(manager.getTimeoutQueue().get(testTask2.getId()));
- Assert.assertFalse(manager.getQueryMap().containsKey(queryId));
-
- // The mockDriver1.failed() will be called outside the scheduler
- Mockito.verify(mockDriver1, Mockito.never()).failed(Mockito.any());
- Mockito.verify(mockDriver2, Mockito.times(1)).failed(Mockito.any());
-
- clear();
- }
- }
-
- private void clear() {
- manager.getQueryMap().clear();
- manager.getBlockedTasks().clear();
- manager.getReadyQueue().clear();
- manager.getTimeoutQueue().clear();
- }
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements. See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership. The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing,
+// * software distributed under the License is distributed on an
+// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// * KIND, either express or implied. See the License for the
+// * specific language governing permissions and limitations
+// * under the License.
+// */
+//package org.apache.iotdb.db.mpp.execution.schedule;
+//
+//import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+//import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+//import org.apache.iotdb.db.mpp.common.QueryId;
+//import org.apache.iotdb.db.mpp.execution.driver.IDriver;
+//import org.apache.iotdb.db.mpp.execution.exchange.IMPPDataExchangeManager;
+//import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
+//import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskStatus;
+//import org.apache.iotdb.db.utils.stats.CpuTimer;
+//import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
+//
+//import io.airlift.units.Duration;
+//import org.junit.After;
+//import org.junit.Assert;
+//import org.junit.Test;
+//import org.mockito.Mockito;
+//
+//import java.io.IOException;
+//import java.util.HashSet;
+//import java.util.Set;
+//import java.util.concurrent.TimeUnit;
+//
+//public class DefaultDriverSchedulerTest {
+//
+// private final DriverScheduler manager = DriverScheduler.getInstance();
+//
+// @After
+// public void tearDown() throws IOException {
+// clear();
+// }
+//
+// @Test
+// public void testBlockedToReady() {
+// IMPPDataExchangeManager mockMPPDataExchangeManager =
+// Mockito.mock(IMPPDataExchangeManager.class);
+// manager.setBlockManager(mockMPPDataExchangeManager);
+// ITaskScheduler defaultScheduler = manager.getScheduler();
+// IDriver mockDriver = Mockito.mock(IDriver.class);
+// QueryId queryId = new QueryId("test");
+// FragmentInstanceId instanceId =
+// new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
+// Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
+// DriverTaskStatus[] invalidStates =
+// new DriverTaskStatus[] {
+// DriverTaskStatus.FINISHED,
+// DriverTaskStatus.ABORTED,
+// DriverTaskStatus.READY,
+// DriverTaskStatus.RUNNING,
+// };
+// for (DriverTaskStatus status : invalidStates) {
+// DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+// manager.getBlockedTasks().add(testTask);
+// Set<DriverTask> taskSet = new HashSet<>();
+// taskSet.add(testTask);
+// manager.getQueryMap().put(queryId, taskSet);
+// manager.getTimeoutQueue().push(testTask);
+// defaultScheduler.blockedToReady(testTask);
+// Assert.assertEquals(status, testTask.getStatus());
+// Assert.assertTrue(manager.getBlockedTasks().contains(testTask));
+// Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
+// Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
+// Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
+// Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+// clear();
+// }
+// DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.BLOCKED);
+// manager.getBlockedTasks().add(testTask);
+// Set<DriverTask> taskSet = new HashSet<>();
+// taskSet.add(testTask);
+// manager.getQueryMap().put(queryId, taskSet);
+// manager.getTimeoutQueue().push(testTask);
+// defaultScheduler.blockedToReady(testTask);
+// Assert.assertEquals(DriverTaskStatus.READY, testTask.getStatus());
+// Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
+// Assert.assertNotNull(manager.getReadyQueue().get(testTask.getId()));
+// Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
+// Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
+// Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+// Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
+// clear();
+// }
+//
+// @Test
+// public void testReadyToRunning() {
+// IMPPDataExchangeManager mockMPPDataExchangeManager =
+// Mockito.mock(IMPPDataExchangeManager.class);
+// manager.setBlockManager(mockMPPDataExchangeManager);
+// ITaskScheduler defaultScheduler = manager.getScheduler();
+// IDriver mockDriver = Mockito.mock(IDriver.class);
+//
+// QueryId queryId = new QueryId("test");
+// FragmentInstanceId instanceId =
+// new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
+// Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
+// DriverTaskStatus[] invalidStates =
+// new DriverTaskStatus[] {
+// DriverTaskStatus.FINISHED,
+// DriverTaskStatus.ABORTED,
+// DriverTaskStatus.BLOCKED,
+// DriverTaskStatus.RUNNING,
+// };
+// for (DriverTaskStatus status : invalidStates) {
+// DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+// Set<DriverTask> taskSet = new HashSet<>();
+// taskSet.add(testTask);
+// manager.getQueryMap().put(queryId, taskSet);
+// manager.getTimeoutQueue().push(testTask);
+// defaultScheduler.readyToRunning(testTask);
+// Assert.assertEquals(status, testTask.getStatus());
+// Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
+// Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
+// Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
+// Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+// clear();
+// }
+// DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
+// Set<DriverTask> taskSet = new HashSet<>();
+// taskSet.add(testTask);
+// manager.getQueryMap().put(queryId, taskSet);
+// manager.getTimeoutQueue().push(testTask);
+// defaultScheduler.readyToRunning(testTask);
+// Assert.assertEquals(DriverTaskStatus.RUNNING, testTask.getStatus());
+// Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
+// Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
+// Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
+// Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+// Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
+// clear();
+// }
+//
+// @Test
+// public void testRunningToReady() {
+// IMPPDataExchangeManager mockMPPDataExchangeManager =
+// Mockito.mock(IMPPDataExchangeManager.class);
+// manager.setBlockManager(mockMPPDataExchangeManager);
+// ITaskScheduler defaultScheduler = manager.getScheduler();
+// IDriver mockDriver = Mockito.mock(IDriver.class);
+// QueryId queryId = new QueryId("test");
+// FragmentInstanceId instanceId =
+// new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
+// Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
+// DriverTaskStatus[] invalidStates =
+// new DriverTaskStatus[] {
+// DriverTaskStatus.FINISHED,
+// DriverTaskStatus.ABORTED,
+// DriverTaskStatus.BLOCKED,
+// DriverTaskStatus.READY,
+// };
+// for (DriverTaskStatus status : invalidStates) {
+// DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+// Set<DriverTask> taskSet = new HashSet<>();
+// taskSet.add(testTask);
+// manager.getQueryMap().put(queryId, taskSet);
+// manager.getTimeoutQueue().push(testTask);
+// defaultScheduler.runningToReady(testTask, new ExecutionContext());
+// Assert.assertEquals(status, testTask.getStatus());
+// Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
+// Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
+// Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
+// Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
+// Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+// clear();
+// }
+// DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
+// Set<DriverTask> taskSet = new HashSet<>();
+// taskSet.add(testTask);
+// manager.getQueryMap().put(queryId, taskSet);
+// manager.getTimeoutQueue().push(testTask);
+// ExecutionContext context = new ExecutionContext();
+// context.setTimeSlice(new Duration(1, TimeUnit.SECONDS));
+// context.setCpuDuration(new CpuTimer.CpuDuration());
+// defaultScheduler.runningToReady(testTask, context);
+// Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
+// Assert.assertEquals(DriverTaskStatus.READY, testTask.getStatus());
+// Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
+// Assert.assertNotNull(manager.getReadyQueue().get(testTask.getId()));
+// Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
+// Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
+// Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+// Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
+// clear();
+// }
+//
+// @Test
+// public void testRunningToBlocked() {
+// IMPPDataExchangeManager mockMPPDataExchangeManager =
+// Mockito.mock(IMPPDataExchangeManager.class);
+// manager.setBlockManager(mockMPPDataExchangeManager);
+// ITaskScheduler defaultScheduler = manager.getScheduler();
+// IDriver mockDriver = Mockito.mock(IDriver.class);
+// QueryId queryId = new QueryId("test");
+// FragmentInstanceId instanceId =
+// new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
+// Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
+// DriverTaskStatus[] invalidStates =
+// new DriverTaskStatus[] {
+// DriverTaskStatus.FINISHED,
+// DriverTaskStatus.ABORTED,
+// DriverTaskStatus.BLOCKED,
+// DriverTaskStatus.READY,
+// };
+// for (DriverTaskStatus status : invalidStates) {
+// DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+// Set<DriverTask> taskSet = new HashSet<>();
+// taskSet.add(testTask);
+// manager.getQueryMap().put(queryId, taskSet);
+// manager.getTimeoutQueue().push(testTask);
+// defaultScheduler.runningToBlocked(testTask, new ExecutionContext());
+// Assert.assertEquals(status, testTask.getStatus());
+// Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
+// Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
+// Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
+// Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
+// Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+// clear();
+// }
+// DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
+// Set<DriverTask> taskSet = new HashSet<>();
+// taskSet.add(testTask);
+// manager.getQueryMap().put(queryId, taskSet);
+// manager.getTimeoutQueue().push(testTask);
+// ExecutionContext context = new ExecutionContext();
+// context.setTimeSlice(new Duration(1, TimeUnit.SECONDS));
+// context.setCpuDuration(new CpuTimer.CpuDuration());
+// defaultScheduler.runningToBlocked(testTask, context);
+// Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
+// Assert.assertEquals(DriverTaskStatus.BLOCKED, testTask.getStatus());
+// Assert.assertTrue(manager.getBlockedTasks().contains(testTask));
+// Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
+// Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
+// Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
+// Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+// Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
+// clear();
+// }
+//
+// @Test
+// public void testRunningToFinished() {
+// IMPPDataExchangeManager mockMPPDataExchangeManager =
+// Mockito.mock(IMPPDataExchangeManager.class);
+// manager.setBlockManager(mockMPPDataExchangeManager);
+// ITaskScheduler defaultScheduler = manager.getScheduler();
+// IDriver mockDriver = Mockito.mock(IDriver.class);
+// QueryId queryId = new QueryId("test");
+// FragmentInstanceId instanceId =
+// new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
+// Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
+// DriverTaskStatus[] invalidStates =
+// new DriverTaskStatus[] {
+// DriverTaskStatus.FINISHED,
+// DriverTaskStatus.ABORTED,
+// DriverTaskStatus.BLOCKED,
+// DriverTaskStatus.READY,
+// };
+// for (DriverTaskStatus status : invalidStates) {
+// DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+// Set<DriverTask> taskSet = new HashSet<>();
+// taskSet.add(testTask);
+// manager.getQueryMap().put(queryId, taskSet);
+// manager.getTimeoutQueue().push(testTask);
+// defaultScheduler.runningToFinished(testTask, new ExecutionContext());
+// Assert.assertEquals(status, testTask.getStatus());
+// Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
+// Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
+// Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
+// Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
+// Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+// clear();
+// }
+// DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
+// Set<DriverTask> taskSet = new HashSet<>();
+// taskSet.add(testTask);
+// manager.getQueryMap().put(queryId, taskSet);
+// manager.getTimeoutQueue().push(testTask);
+// ExecutionContext context = new ExecutionContext();
+// context.setTimeSlice(new Duration(1, TimeUnit.SECONDS));
+// context.setCpuDuration(new CpuTimer.CpuDuration());
+// defaultScheduler.runningToFinished(testTask, context);
+// Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
+// Assert.assertEquals(DriverTaskStatus.FINISHED, testTask.getStatus());
+// Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
+// Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
+// Assert.assertNull(manager.getTimeoutQueue().get(testTask.getId()));
+// Assert.assertFalse(manager.getQueryMap().containsKey(queryId));
+// Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
+// clear();
+// }
+//
+// @Test
+// public void testToAbort() {
+// IMPPDataExchangeManager mockMPPDataExchangeManager =
+// Mockito.mock(IMPPDataExchangeManager.class);
+// manager.setBlockManager(mockMPPDataExchangeManager);
+// IDataNodeRPCService.Client mockMppServiceClient =
+// Mockito.mock(IDataNodeRPCService.Client.class);
+// ITaskScheduler defaultScheduler = manager.getScheduler();
+// QueryId queryId = new QueryId("test");
+// FragmentInstanceId instanceId1 =
+// new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
+// IDriver mockDriver1 = Mockito.mock(IDriver.class);
+// Mockito.when(mockDriver1.getInfo()).thenReturn(instanceId1);
+// IDriver mockDriver2 = Mockito.mock(IDriver.class);
+// FragmentInstanceId instanceId2 =
+// new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-1");
+// Mockito.when(mockDriver2.getInfo()).thenReturn(instanceId2);
+// DriverTaskStatus[] invalidStates =
+// new DriverTaskStatus[] {
+// DriverTaskStatus.FINISHED, DriverTaskStatus.ABORTED,
+// };
+// for (DriverTaskStatus status : invalidStates) {
+// DriverTask testTask1 = new DriverTask(mockDriver1, 100L, status);
+// DriverTask testTask2 = new DriverTask(mockDriver2, 100L, DriverTaskStatus.BLOCKED);
+// Set<DriverTask> taskSet = new HashSet<>();
+// taskSet.add(testTask1);
+// taskSet.add(testTask2);
+// manager.getQueryMap().put(queryId, taskSet);
+// manager.getTimeoutQueue().push(testTask1);
+// manager.getTimeoutQueue().push(testTask2);
+// manager.getBlockedTasks().add(testTask2);
+// defaultScheduler.toAborted(testTask1);
+//
+// Assert.assertEquals(status, testTask1.getStatus());
+// Assert.assertEquals(DriverTaskStatus.BLOCKED, testTask2.getStatus());
+// Assert.assertFalse(manager.getBlockedTasks().contains(testTask1));
+// Assert.assertTrue(manager.getBlockedTasks().contains(testTask2));
+// Assert.assertNull(manager.getReadyQueue().get(testTask1.getId()));
+// Assert.assertNull(manager.getReadyQueue().get(testTask2.getId()));
+// Assert.assertNotNull(manager.getTimeoutQueue().get(testTask1.getId()));
+// Assert.assertNotNull(manager.getTimeoutQueue().get(testTask2.getId()));
+// Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
+// Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask1));
+// Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask2));
+//
+// Mockito.verify(mockDriver1, Mockito.never()).failed(Mockito.any());
+// Mockito.verify(mockDriver2, Mockito.never()).failed(Mockito.any());
+// clear();
+// }
+// DriverTaskStatus[] validStates =
+// new DriverTaskStatus[] {
+// DriverTaskStatus.RUNNING, DriverTaskStatus.READY, DriverTaskStatus.BLOCKED,
+// };
+// for (DriverTaskStatus status : validStates) {
+// Mockito.reset(mockDriver1);
+// Mockito.when(mockDriver1.getInfo()).thenReturn(instanceId1);
+// Mockito.reset(mockDriver2);
+// Mockito.when(mockDriver2.getInfo()).thenReturn(instanceId2);
+//
+// DriverTask testTask1 = new DriverTask(mockDriver1, 100L, status);
+//
+// DriverTask testTask2 = new DriverTask(mockDriver2, 100L, DriverTaskStatus.BLOCKED);
+// Set<DriverTask> taskSet = new HashSet<>();
+// taskSet.add(testTask1);
+// taskSet.add(testTask2);
+// manager.getQueryMap().put(queryId, taskSet);
+// manager.getTimeoutQueue().push(testTask1);
+// defaultScheduler.toAborted(testTask1);
+//
+// Mockito.reset(mockMppServiceClient);
+// Mockito.verify(mockMPPDataExchangeManager, Mockito.times(2))
+// .forceDeregisterFragmentInstance(Mockito.any());
+// Mockito.reset(mockMPPDataExchangeManager);
+//
+// // An aborted fragment may cause others in the same query aborted.
+// Assert.assertEquals(DriverTaskStatus.ABORTED, testTask1.getStatus());
+// Assert.assertEquals(DriverTaskStatus.ABORTED, testTask2.getStatus());
+// Assert.assertFalse(manager.getBlockedTasks().contains(testTask1));
+// Assert.assertFalse(manager.getBlockedTasks().contains(testTask2));
+// Assert.assertNull(manager.getReadyQueue().get(testTask1.getId()));
+// Assert.assertNull(manager.getReadyQueue().get(testTask2.getId()));
+// Assert.assertNull(manager.getTimeoutQueue().get(testTask1.getId()));
+// Assert.assertNull(manager.getTimeoutQueue().get(testTask2.getId()));
+// Assert.assertFalse(manager.getQueryMap().containsKey(queryId));
+//
+// // The mockDriver1.failed() will be called outside the scheduler
+// Mockito.verify(mockDriver1, Mockito.never()).failed(Mockito.any());
+// Mockito.verify(mockDriver2, Mockito.times(1)).failed(Mockito.any());
+//
+// clear();
+// }
+// }
+//
+// private void clear() {
+// manager.getQueryMap().clear();
+// manager.getBlockedTasks().clear();
+// manager.getReadyQueue().clear();
+// manager.getTimeoutQueue().clear();
+// }
+//}