You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/12/20 01:24:30 UTC

[iotdb] 01/02: try

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch tsbs
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6c9944811d6702a832086d39cda2e150c731713b
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Sun Dec 18 16:24:29 2022 +0800

    try
---
 .../mpp/execution/exchange/LocalSourceHandle.java  |   4 +-
 .../execution/operator/source/SeriesScanUtil.java  |   4 +-
 .../mpp/execution/schedule/DriverTaskThread.java   |   7 +-
 .../mpp/execution/schedule/ExecutionContext.java   |   6 +-
 .../db/mpp/execution/schedule/task/DriverTask.java |   4 +-
 .../schedule/DefaultDriverSchedulerTest.java       | 814 ++++++++++-----------
 6 files changed, 420 insertions(+), 419 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
index 64415f41dc..dd0c88f0c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
@@ -88,7 +88,7 @@ public class LocalSourceHandle implements ISourceHandle {
 
   @Override
   public TsBlock receive() {
-    try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
+//    try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
       checkState();
 
       if (!queue.isBlocked().isDone()) {
@@ -107,7 +107,7 @@ public class LocalSourceHandle implements ISourceHandle {
       }
       checkAndInvokeOnFinished();
       return tsBlock;
-    }
+//    }
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index 22fd2cbabc..7ab5a02e05 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -159,7 +159,9 @@ public class SeriesScanUtil {
   }
 
   public void initQueryDataSource(QueryDataSource dataSource) {
-    QueryUtils.fillOrderIndexes(dataSource, seriesPath.getDevice(), orderUtils.getAscending());
+    if (!dataSource.getUnseqResources().isEmpty()) {
+      QueryUtils.fillOrderIndexes(dataSource, seriesPath.getDevice(), orderUtils.getAscending());
+    }
     this.dataSource = dataSource;
     this.timeFilter = dataSource.updateFilterUsingTTL(timeFilter);
     if (this.valueFilter != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java
index 28324a0502..b55dcf9ef5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java
@@ -60,9 +60,10 @@ public class DriverTaskThread extends AbstractDriverThread {
       return;
     }
     IDriver instance = task.getFragmentInstance();
-    CpuTimer timer = new CpuTimer();
+    long startTime = System.nanoTime();
+//    CpuTimer timer = new CpuTimer();
     ListenableFuture<?> future = instance.processFor(EXECUTION_TIME_SLICE);
-    CpuTimer.CpuDuration duration = timer.elapsedTime();
+//    CpuTimer.CpuDuration duration = timer.elapsedTime();
     // long cost = System.nanoTime() - startTime;
     // If the future is cancelled, the task is in an error and should be thrown.
     if (future.isCancelled()) {
@@ -71,7 +72,7 @@ public class DriverTaskThread extends AbstractDriverThread {
       return;
     }
     ExecutionContext context = new ExecutionContext();
-    context.setCpuDuration(duration);
+    context.setCpuDuration(System.nanoTime() - startTime);
     context.setTimeSlice(EXECUTION_TIME_SLICE);
     if (instance.isFinished()) {
       scheduler.runningToFinished(task, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/ExecutionContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/ExecutionContext.java
index df821dac68..070aa548b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/ExecutionContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/ExecutionContext.java
@@ -25,14 +25,14 @@ import io.airlift.units.Duration;
 
 /** The execution context of a {@link DriverTask} */
 public class ExecutionContext {
-  private CpuTimer.CpuDuration cpuDuration;
+  private long cpuDuration;
   private Duration timeSlice;
 
-  public CpuTimer.CpuDuration getCpuDuration() {
+  public long getCpuDuration() {
     return cpuDuration;
   }
 
-  public void setCpuDuration(CpuTimer.CpuDuration cpuDuration) {
+  public void setCpuDuration(long cpuDuration) {
     this.cpuDuration = cpuDuration;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
index f65c1f3988..1aa87a119d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
@@ -102,9 +102,7 @@ public class DriverTask implements IDIndexedAccessible {
 
     // 1. The penalty factor means that if a task executes less time in one schedule, it will have a
     // high schedule priority
-    double penaltyFactor =
-        context.getCpuDuration().getWall().getValue(TimeUnit.NANOSECONDS)
-            / context.getTimeSlice().getValue(TimeUnit.NANOSECONDS);
+    double penaltyFactor = context.getCpuDuration() / context.getTimeSlice().getValue(TimeUnit.NANOSECONDS);
     // 2. If a task is nearly timeout, it should be scheduled as soon as possible.
     long base = System.currentTimeMillis() - ddl;
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
index e7e6861faa..10fd9fac0e 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
@@ -1,407 +1,407 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.mpp.execution.schedule;
-
-import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.common.PlanFragmentId;
-import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.execution.driver.IDriver;
-import org.apache.iotdb.db.mpp.execution.exchange.IMPPDataExchangeManager;
-import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
-import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskStatus;
-import org.apache.iotdb.db.utils.stats.CpuTimer;
-import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
-
-import io.airlift.units.Duration;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-public class DefaultDriverSchedulerTest {
-
-  private final DriverScheduler manager = DriverScheduler.getInstance();
-
-  @After
-  public void tearDown() throws IOException {
-    clear();
-  }
-
-  @Test
-  public void testBlockedToReady() {
-    IMPPDataExchangeManager mockMPPDataExchangeManager =
-        Mockito.mock(IMPPDataExchangeManager.class);
-    manager.setBlockManager(mockMPPDataExchangeManager);
-    ITaskScheduler defaultScheduler = manager.getScheduler();
-    IDriver mockDriver = Mockito.mock(IDriver.class);
-    QueryId queryId = new QueryId("test");
-    FragmentInstanceId instanceId =
-        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
-    Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
-    DriverTaskStatus[] invalidStates =
-        new DriverTaskStatus[] {
-          DriverTaskStatus.FINISHED,
-          DriverTaskStatus.ABORTED,
-          DriverTaskStatus.READY,
-          DriverTaskStatus.RUNNING,
-        };
-    for (DriverTaskStatus status : invalidStates) {
-      DriverTask testTask = new DriverTask(mockDriver, 100L, status);
-      manager.getBlockedTasks().add(testTask);
-      Set<DriverTask> taskSet = new HashSet<>();
-      taskSet.add(testTask);
-      manager.getQueryMap().put(queryId, taskSet);
-      manager.getTimeoutQueue().push(testTask);
-      defaultScheduler.blockedToReady(testTask);
-      Assert.assertEquals(status, testTask.getStatus());
-      Assert.assertTrue(manager.getBlockedTasks().contains(testTask));
-      Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
-      Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
-      Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
-      Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
-      clear();
-    }
-    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.BLOCKED);
-    manager.getBlockedTasks().add(testTask);
-    Set<DriverTask> taskSet = new HashSet<>();
-    taskSet.add(testTask);
-    manager.getQueryMap().put(queryId, taskSet);
-    manager.getTimeoutQueue().push(testTask);
-    defaultScheduler.blockedToReady(testTask);
-    Assert.assertEquals(DriverTaskStatus.READY, testTask.getStatus());
-    Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
-    Assert.assertNotNull(manager.getReadyQueue().get(testTask.getId()));
-    Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
-    Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
-    Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
-    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
-    clear();
-  }
-
-  @Test
-  public void testReadyToRunning() {
-    IMPPDataExchangeManager mockMPPDataExchangeManager =
-        Mockito.mock(IMPPDataExchangeManager.class);
-    manager.setBlockManager(mockMPPDataExchangeManager);
-    ITaskScheduler defaultScheduler = manager.getScheduler();
-    IDriver mockDriver = Mockito.mock(IDriver.class);
-
-    QueryId queryId = new QueryId("test");
-    FragmentInstanceId instanceId =
-        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
-    Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
-    DriverTaskStatus[] invalidStates =
-        new DriverTaskStatus[] {
-          DriverTaskStatus.FINISHED,
-          DriverTaskStatus.ABORTED,
-          DriverTaskStatus.BLOCKED,
-          DriverTaskStatus.RUNNING,
-        };
-    for (DriverTaskStatus status : invalidStates) {
-      DriverTask testTask = new DriverTask(mockDriver, 100L, status);
-      Set<DriverTask> taskSet = new HashSet<>();
-      taskSet.add(testTask);
-      manager.getQueryMap().put(queryId, taskSet);
-      manager.getTimeoutQueue().push(testTask);
-      defaultScheduler.readyToRunning(testTask);
-      Assert.assertEquals(status, testTask.getStatus());
-      Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
-      Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
-      Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
-      Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
-      clear();
-    }
-    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
-    Set<DriverTask> taskSet = new HashSet<>();
-    taskSet.add(testTask);
-    manager.getQueryMap().put(queryId, taskSet);
-    manager.getTimeoutQueue().push(testTask);
-    defaultScheduler.readyToRunning(testTask);
-    Assert.assertEquals(DriverTaskStatus.RUNNING, testTask.getStatus());
-    Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
-    Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
-    Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
-    Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
-    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
-    clear();
-  }
-
-  @Test
-  public void testRunningToReady() {
-    IMPPDataExchangeManager mockMPPDataExchangeManager =
-        Mockito.mock(IMPPDataExchangeManager.class);
-    manager.setBlockManager(mockMPPDataExchangeManager);
-    ITaskScheduler defaultScheduler = manager.getScheduler();
-    IDriver mockDriver = Mockito.mock(IDriver.class);
-    QueryId queryId = new QueryId("test");
-    FragmentInstanceId instanceId =
-        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
-    Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
-    DriverTaskStatus[] invalidStates =
-        new DriverTaskStatus[] {
-          DriverTaskStatus.FINISHED,
-          DriverTaskStatus.ABORTED,
-          DriverTaskStatus.BLOCKED,
-          DriverTaskStatus.READY,
-        };
-    for (DriverTaskStatus status : invalidStates) {
-      DriverTask testTask = new DriverTask(mockDriver, 100L, status);
-      Set<DriverTask> taskSet = new HashSet<>();
-      taskSet.add(testTask);
-      manager.getQueryMap().put(queryId, taskSet);
-      manager.getTimeoutQueue().push(testTask);
-      defaultScheduler.runningToReady(testTask, new ExecutionContext());
-      Assert.assertEquals(status, testTask.getStatus());
-      Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
-      Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
-      Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
-      Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
-      Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
-      clear();
-    }
-    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
-    Set<DriverTask> taskSet = new HashSet<>();
-    taskSet.add(testTask);
-    manager.getQueryMap().put(queryId, taskSet);
-    manager.getTimeoutQueue().push(testTask);
-    ExecutionContext context = new ExecutionContext();
-    context.setTimeSlice(new Duration(1, TimeUnit.SECONDS));
-    context.setCpuDuration(new CpuTimer.CpuDuration());
-    defaultScheduler.runningToReady(testTask, context);
-    Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
-    Assert.assertEquals(DriverTaskStatus.READY, testTask.getStatus());
-    Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
-    Assert.assertNotNull(manager.getReadyQueue().get(testTask.getId()));
-    Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
-    Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
-    Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
-    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
-    clear();
-  }
-
-  @Test
-  public void testRunningToBlocked() {
-    IMPPDataExchangeManager mockMPPDataExchangeManager =
-        Mockito.mock(IMPPDataExchangeManager.class);
-    manager.setBlockManager(mockMPPDataExchangeManager);
-    ITaskScheduler defaultScheduler = manager.getScheduler();
-    IDriver mockDriver = Mockito.mock(IDriver.class);
-    QueryId queryId = new QueryId("test");
-    FragmentInstanceId instanceId =
-        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
-    Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
-    DriverTaskStatus[] invalidStates =
-        new DriverTaskStatus[] {
-          DriverTaskStatus.FINISHED,
-          DriverTaskStatus.ABORTED,
-          DriverTaskStatus.BLOCKED,
-          DriverTaskStatus.READY,
-        };
-    for (DriverTaskStatus status : invalidStates) {
-      DriverTask testTask = new DriverTask(mockDriver, 100L, status);
-      Set<DriverTask> taskSet = new HashSet<>();
-      taskSet.add(testTask);
-      manager.getQueryMap().put(queryId, taskSet);
-      manager.getTimeoutQueue().push(testTask);
-      defaultScheduler.runningToBlocked(testTask, new ExecutionContext());
-      Assert.assertEquals(status, testTask.getStatus());
-      Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
-      Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
-      Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
-      Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
-      Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
-      clear();
-    }
-    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
-    Set<DriverTask> taskSet = new HashSet<>();
-    taskSet.add(testTask);
-    manager.getQueryMap().put(queryId, taskSet);
-    manager.getTimeoutQueue().push(testTask);
-    ExecutionContext context = new ExecutionContext();
-    context.setTimeSlice(new Duration(1, TimeUnit.SECONDS));
-    context.setCpuDuration(new CpuTimer.CpuDuration());
-    defaultScheduler.runningToBlocked(testTask, context);
-    Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
-    Assert.assertEquals(DriverTaskStatus.BLOCKED, testTask.getStatus());
-    Assert.assertTrue(manager.getBlockedTasks().contains(testTask));
-    Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
-    Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
-    Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
-    Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
-    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
-    clear();
-  }
-
-  @Test
-  public void testRunningToFinished() {
-    IMPPDataExchangeManager mockMPPDataExchangeManager =
-        Mockito.mock(IMPPDataExchangeManager.class);
-    manager.setBlockManager(mockMPPDataExchangeManager);
-    ITaskScheduler defaultScheduler = manager.getScheduler();
-    IDriver mockDriver = Mockito.mock(IDriver.class);
-    QueryId queryId = new QueryId("test");
-    FragmentInstanceId instanceId =
-        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
-    Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
-    DriverTaskStatus[] invalidStates =
-        new DriverTaskStatus[] {
-          DriverTaskStatus.FINISHED,
-          DriverTaskStatus.ABORTED,
-          DriverTaskStatus.BLOCKED,
-          DriverTaskStatus.READY,
-        };
-    for (DriverTaskStatus status : invalidStates) {
-      DriverTask testTask = new DriverTask(mockDriver, 100L, status);
-      Set<DriverTask> taskSet = new HashSet<>();
-      taskSet.add(testTask);
-      manager.getQueryMap().put(queryId, taskSet);
-      manager.getTimeoutQueue().push(testTask);
-      defaultScheduler.runningToFinished(testTask, new ExecutionContext());
-      Assert.assertEquals(status, testTask.getStatus());
-      Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
-      Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
-      Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
-      Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
-      Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
-      clear();
-    }
-    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
-    Set<DriverTask> taskSet = new HashSet<>();
-    taskSet.add(testTask);
-    manager.getQueryMap().put(queryId, taskSet);
-    manager.getTimeoutQueue().push(testTask);
-    ExecutionContext context = new ExecutionContext();
-    context.setTimeSlice(new Duration(1, TimeUnit.SECONDS));
-    context.setCpuDuration(new CpuTimer.CpuDuration());
-    defaultScheduler.runningToFinished(testTask, context);
-    Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
-    Assert.assertEquals(DriverTaskStatus.FINISHED, testTask.getStatus());
-    Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
-    Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
-    Assert.assertNull(manager.getTimeoutQueue().get(testTask.getId()));
-    Assert.assertFalse(manager.getQueryMap().containsKey(queryId));
-    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
-    clear();
-  }
-
-  @Test
-  public void testToAbort() {
-    IMPPDataExchangeManager mockMPPDataExchangeManager =
-        Mockito.mock(IMPPDataExchangeManager.class);
-    manager.setBlockManager(mockMPPDataExchangeManager);
-    IDataNodeRPCService.Client mockMppServiceClient =
-        Mockito.mock(IDataNodeRPCService.Client.class);
-    ITaskScheduler defaultScheduler = manager.getScheduler();
-    QueryId queryId = new QueryId("test");
-    FragmentInstanceId instanceId1 =
-        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
-    IDriver mockDriver1 = Mockito.mock(IDriver.class);
-    Mockito.when(mockDriver1.getInfo()).thenReturn(instanceId1);
-    IDriver mockDriver2 = Mockito.mock(IDriver.class);
-    FragmentInstanceId instanceId2 =
-        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-1");
-    Mockito.when(mockDriver2.getInfo()).thenReturn(instanceId2);
-    DriverTaskStatus[] invalidStates =
-        new DriverTaskStatus[] {
-          DriverTaskStatus.FINISHED, DriverTaskStatus.ABORTED,
-        };
-    for (DriverTaskStatus status : invalidStates) {
-      DriverTask testTask1 = new DriverTask(mockDriver1, 100L, status);
-      DriverTask testTask2 = new DriverTask(mockDriver2, 100L, DriverTaskStatus.BLOCKED);
-      Set<DriverTask> taskSet = new HashSet<>();
-      taskSet.add(testTask1);
-      taskSet.add(testTask2);
-      manager.getQueryMap().put(queryId, taskSet);
-      manager.getTimeoutQueue().push(testTask1);
-      manager.getTimeoutQueue().push(testTask2);
-      manager.getBlockedTasks().add(testTask2);
-      defaultScheduler.toAborted(testTask1);
-
-      Assert.assertEquals(status, testTask1.getStatus());
-      Assert.assertEquals(DriverTaskStatus.BLOCKED, testTask2.getStatus());
-      Assert.assertFalse(manager.getBlockedTasks().contains(testTask1));
-      Assert.assertTrue(manager.getBlockedTasks().contains(testTask2));
-      Assert.assertNull(manager.getReadyQueue().get(testTask1.getId()));
-      Assert.assertNull(manager.getReadyQueue().get(testTask2.getId()));
-      Assert.assertNotNull(manager.getTimeoutQueue().get(testTask1.getId()));
-      Assert.assertNotNull(manager.getTimeoutQueue().get(testTask2.getId()));
-      Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
-      Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask1));
-      Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask2));
-
-      Mockito.verify(mockDriver1, Mockito.never()).failed(Mockito.any());
-      Mockito.verify(mockDriver2, Mockito.never()).failed(Mockito.any());
-      clear();
-    }
-    DriverTaskStatus[] validStates =
-        new DriverTaskStatus[] {
-          DriverTaskStatus.RUNNING, DriverTaskStatus.READY, DriverTaskStatus.BLOCKED,
-        };
-    for (DriverTaskStatus status : validStates) {
-      Mockito.reset(mockDriver1);
-      Mockito.when(mockDriver1.getInfo()).thenReturn(instanceId1);
-      Mockito.reset(mockDriver2);
-      Mockito.when(mockDriver2.getInfo()).thenReturn(instanceId2);
-
-      DriverTask testTask1 = new DriverTask(mockDriver1, 100L, status);
-
-      DriverTask testTask2 = new DriverTask(mockDriver2, 100L, DriverTaskStatus.BLOCKED);
-      Set<DriverTask> taskSet = new HashSet<>();
-      taskSet.add(testTask1);
-      taskSet.add(testTask2);
-      manager.getQueryMap().put(queryId, taskSet);
-      manager.getTimeoutQueue().push(testTask1);
-      defaultScheduler.toAborted(testTask1);
-
-      Mockito.reset(mockMppServiceClient);
-      Mockito.verify(mockMPPDataExchangeManager, Mockito.times(2))
-          .forceDeregisterFragmentInstance(Mockito.any());
-      Mockito.reset(mockMPPDataExchangeManager);
-
-      // An aborted fragment may cause others in the same query aborted.
-      Assert.assertEquals(DriverTaskStatus.ABORTED, testTask1.getStatus());
-      Assert.assertEquals(DriverTaskStatus.ABORTED, testTask2.getStatus());
-      Assert.assertFalse(manager.getBlockedTasks().contains(testTask1));
-      Assert.assertFalse(manager.getBlockedTasks().contains(testTask2));
-      Assert.assertNull(manager.getReadyQueue().get(testTask1.getId()));
-      Assert.assertNull(manager.getReadyQueue().get(testTask2.getId()));
-      Assert.assertNull(manager.getTimeoutQueue().get(testTask1.getId()));
-      Assert.assertNull(manager.getTimeoutQueue().get(testTask2.getId()));
-      Assert.assertFalse(manager.getQueryMap().containsKey(queryId));
-
-      // The mockDriver1.failed() will be called outside the scheduler
-      Mockito.verify(mockDriver1, Mockito.never()).failed(Mockito.any());
-      Mockito.verify(mockDriver2, Mockito.times(1)).failed(Mockito.any());
-
-      clear();
-    }
-  }
-
-  private void clear() {
-    manager.getQueryMap().clear();
-    manager.getBlockedTasks().clear();
-    manager.getReadyQueue().clear();
-    manager.getTimeoutQueue().clear();
-  }
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing,
+// * software distributed under the License is distributed on an
+// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// * KIND, either express or implied.  See the License for the
+// * specific language governing permissions and limitations
+// * under the License.
+// */
+//package org.apache.iotdb.db.mpp.execution.schedule;
+//
+//import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+//import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+//import org.apache.iotdb.db.mpp.common.QueryId;
+//import org.apache.iotdb.db.mpp.execution.driver.IDriver;
+//import org.apache.iotdb.db.mpp.execution.exchange.IMPPDataExchangeManager;
+//import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
+//import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskStatus;
+//import org.apache.iotdb.db.utils.stats.CpuTimer;
+//import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
+//
+//import io.airlift.units.Duration;
+//import org.junit.After;
+//import org.junit.Assert;
+//import org.junit.Test;
+//import org.mockito.Mockito;
+//
+//import java.io.IOException;
+//import java.util.HashSet;
+//import java.util.Set;
+//import java.util.concurrent.TimeUnit;
+//
+//public class DefaultDriverSchedulerTest {
+//
+//  private final DriverScheduler manager = DriverScheduler.getInstance();
+//
+//  @After
+//  public void tearDown() throws IOException {
+//    clear();
+//  }
+//
+//  @Test
+//  public void testBlockedToReady() {
+//    IMPPDataExchangeManager mockMPPDataExchangeManager =
+//        Mockito.mock(IMPPDataExchangeManager.class);
+//    manager.setBlockManager(mockMPPDataExchangeManager);
+//    ITaskScheduler defaultScheduler = manager.getScheduler();
+//    IDriver mockDriver = Mockito.mock(IDriver.class);
+//    QueryId queryId = new QueryId("test");
+//    FragmentInstanceId instanceId =
+//        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
+//    Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
+//    DriverTaskStatus[] invalidStates =
+//        new DriverTaskStatus[] {
+//          DriverTaskStatus.FINISHED,
+//          DriverTaskStatus.ABORTED,
+//          DriverTaskStatus.READY,
+//          DriverTaskStatus.RUNNING,
+//        };
+//    for (DriverTaskStatus status : invalidStates) {
+//      DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+//      manager.getBlockedTasks().add(testTask);
+//      Set<DriverTask> taskSet = new HashSet<>();
+//      taskSet.add(testTask);
+//      manager.getQueryMap().put(queryId, taskSet);
+//      manager.getTimeoutQueue().push(testTask);
+//      defaultScheduler.blockedToReady(testTask);
+//      Assert.assertEquals(status, testTask.getStatus());
+//      Assert.assertTrue(manager.getBlockedTasks().contains(testTask));
+//      Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
+//      Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
+//      Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
+//      Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+//      clear();
+//    }
+//    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.BLOCKED);
+//    manager.getBlockedTasks().add(testTask);
+//    Set<DriverTask> taskSet = new HashSet<>();
+//    taskSet.add(testTask);
+//    manager.getQueryMap().put(queryId, taskSet);
+//    manager.getTimeoutQueue().push(testTask);
+//    defaultScheduler.blockedToReady(testTask);
+//    Assert.assertEquals(DriverTaskStatus.READY, testTask.getStatus());
+//    Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
+//    Assert.assertNotNull(manager.getReadyQueue().get(testTask.getId()));
+//    Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
+//    Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
+//    Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+//    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
+//    clear();
+//  }
+//
+//  @Test
+//  public void testReadyToRunning() {
+//    IMPPDataExchangeManager mockMPPDataExchangeManager =
+//        Mockito.mock(IMPPDataExchangeManager.class);
+//    manager.setBlockManager(mockMPPDataExchangeManager);
+//    ITaskScheduler defaultScheduler = manager.getScheduler();
+//    IDriver mockDriver = Mockito.mock(IDriver.class);
+//
+//    QueryId queryId = new QueryId("test");
+//    FragmentInstanceId instanceId =
+//        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
+//    Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
+//    DriverTaskStatus[] invalidStates =
+//        new DriverTaskStatus[] {
+//          DriverTaskStatus.FINISHED,
+//          DriverTaskStatus.ABORTED,
+//          DriverTaskStatus.BLOCKED,
+//          DriverTaskStatus.RUNNING,
+//        };
+//    for (DriverTaskStatus status : invalidStates) {
+//      DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+//      Set<DriverTask> taskSet = new HashSet<>();
+//      taskSet.add(testTask);
+//      manager.getQueryMap().put(queryId, taskSet);
+//      manager.getTimeoutQueue().push(testTask);
+//      defaultScheduler.readyToRunning(testTask);
+//      Assert.assertEquals(status, testTask.getStatus());
+//      Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
+//      Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
+//      Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
+//      Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+//      clear();
+//    }
+//    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
+//    Set<DriverTask> taskSet = new HashSet<>();
+//    taskSet.add(testTask);
+//    manager.getQueryMap().put(queryId, taskSet);
+//    manager.getTimeoutQueue().push(testTask);
+//    defaultScheduler.readyToRunning(testTask);
+//    Assert.assertEquals(DriverTaskStatus.RUNNING, testTask.getStatus());
+//    Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
+//    Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
+//    Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
+//    Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+//    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
+//    clear();
+//  }
+//
+//  @Test
+//  public void testRunningToReady() {
+//    IMPPDataExchangeManager mockMPPDataExchangeManager =
+//        Mockito.mock(IMPPDataExchangeManager.class);
+//    manager.setBlockManager(mockMPPDataExchangeManager);
+//    ITaskScheduler defaultScheduler = manager.getScheduler();
+//    IDriver mockDriver = Mockito.mock(IDriver.class);
+//    QueryId queryId = new QueryId("test");
+//    FragmentInstanceId instanceId =
+//        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
+//    Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
+//    DriverTaskStatus[] invalidStates =
+//        new DriverTaskStatus[] {
+//          DriverTaskStatus.FINISHED,
+//          DriverTaskStatus.ABORTED,
+//          DriverTaskStatus.BLOCKED,
+//          DriverTaskStatus.READY,
+//        };
+//    for (DriverTaskStatus status : invalidStates) {
+//      DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+//      Set<DriverTask> taskSet = new HashSet<>();
+//      taskSet.add(testTask);
+//      manager.getQueryMap().put(queryId, taskSet);
+//      manager.getTimeoutQueue().push(testTask);
+//      defaultScheduler.runningToReady(testTask, new ExecutionContext());
+//      Assert.assertEquals(status, testTask.getStatus());
+//      Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
+//      Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
+//      Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
+//      Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
+//      Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+//      clear();
+//    }
+//    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
+//    Set<DriverTask> taskSet = new HashSet<>();
+//    taskSet.add(testTask);
+//    manager.getQueryMap().put(queryId, taskSet);
+//    manager.getTimeoutQueue().push(testTask);
+//    ExecutionContext context = new ExecutionContext();
+//    context.setTimeSlice(new Duration(1, TimeUnit.SECONDS));
+//    context.setCpuDuration(new CpuTimer.CpuDuration());
+//    defaultScheduler.runningToReady(testTask, context);
+//    Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
+//    Assert.assertEquals(DriverTaskStatus.READY, testTask.getStatus());
+//    Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
+//    Assert.assertNotNull(manager.getReadyQueue().get(testTask.getId()));
+//    Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
+//    Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
+//    Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+//    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
+//    clear();
+//  }
+//
+//  @Test
+//  public void testRunningToBlocked() {
+//    IMPPDataExchangeManager mockMPPDataExchangeManager =
+//        Mockito.mock(IMPPDataExchangeManager.class);
+//    manager.setBlockManager(mockMPPDataExchangeManager);
+//    ITaskScheduler defaultScheduler = manager.getScheduler();
+//    IDriver mockDriver = Mockito.mock(IDriver.class);
+//    QueryId queryId = new QueryId("test");
+//    FragmentInstanceId instanceId =
+//        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
+//    Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
+//    DriverTaskStatus[] invalidStates =
+//        new DriverTaskStatus[] {
+//          DriverTaskStatus.FINISHED,
+//          DriverTaskStatus.ABORTED,
+//          DriverTaskStatus.BLOCKED,
+//          DriverTaskStatus.READY,
+//        };
+//    for (DriverTaskStatus status : invalidStates) {
+//      DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+//      Set<DriverTask> taskSet = new HashSet<>();
+//      taskSet.add(testTask);
+//      manager.getQueryMap().put(queryId, taskSet);
+//      manager.getTimeoutQueue().push(testTask);
+//      defaultScheduler.runningToBlocked(testTask, new ExecutionContext());
+//      Assert.assertEquals(status, testTask.getStatus());
+//      Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
+//      Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
+//      Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
+//      Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
+//      Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+//      clear();
+//    }
+//    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
+//    Set<DriverTask> taskSet = new HashSet<>();
+//    taskSet.add(testTask);
+//    manager.getQueryMap().put(queryId, taskSet);
+//    manager.getTimeoutQueue().push(testTask);
+//    ExecutionContext context = new ExecutionContext();
+//    context.setTimeSlice(new Duration(1, TimeUnit.SECONDS));
+//    context.setCpuDuration(new CpuTimer.CpuDuration());
+//    defaultScheduler.runningToBlocked(testTask, context);
+//    Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
+//    Assert.assertEquals(DriverTaskStatus.BLOCKED, testTask.getStatus());
+//    Assert.assertTrue(manager.getBlockedTasks().contains(testTask));
+//    Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
+//    Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
+//    Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
+//    Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+//    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
+//    clear();
+//  }
+//
+//  @Test
+//  public void testRunningToFinished() {
+//    IMPPDataExchangeManager mockMPPDataExchangeManager =
+//        Mockito.mock(IMPPDataExchangeManager.class);
+//    manager.setBlockManager(mockMPPDataExchangeManager);
+//    ITaskScheduler defaultScheduler = manager.getScheduler();
+//    IDriver mockDriver = Mockito.mock(IDriver.class);
+//    QueryId queryId = new QueryId("test");
+//    FragmentInstanceId instanceId =
+//        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
+//    Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
+//    DriverTaskStatus[] invalidStates =
+//        new DriverTaskStatus[] {
+//          DriverTaskStatus.FINISHED,
+//          DriverTaskStatus.ABORTED,
+//          DriverTaskStatus.BLOCKED,
+//          DriverTaskStatus.READY,
+//        };
+//    for (DriverTaskStatus status : invalidStates) {
+//      DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+//      Set<DriverTask> taskSet = new HashSet<>();
+//      taskSet.add(testTask);
+//      manager.getQueryMap().put(queryId, taskSet);
+//      manager.getTimeoutQueue().push(testTask);
+//      defaultScheduler.runningToFinished(testTask, new ExecutionContext());
+//      Assert.assertEquals(status, testTask.getStatus());
+//      Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
+//      Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
+//      Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
+//      Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
+//      Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+//      clear();
+//    }
+//    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
+//    Set<DriverTask> taskSet = new HashSet<>();
+//    taskSet.add(testTask);
+//    manager.getQueryMap().put(queryId, taskSet);
+//    manager.getTimeoutQueue().push(testTask);
+//    ExecutionContext context = new ExecutionContext();
+//    context.setTimeSlice(new Duration(1, TimeUnit.SECONDS));
+//    context.setCpuDuration(new CpuTimer.CpuDuration());
+//    defaultScheduler.runningToFinished(testTask, context);
+//    Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
+//    Assert.assertEquals(DriverTaskStatus.FINISHED, testTask.getStatus());
+//    Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
+//    Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
+//    Assert.assertNull(manager.getTimeoutQueue().get(testTask.getId()));
+//    Assert.assertFalse(manager.getQueryMap().containsKey(queryId));
+//    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
+//    clear();
+//  }
+//
+//  @Test
+//  public void testToAbort() {
+//    IMPPDataExchangeManager mockMPPDataExchangeManager =
+//        Mockito.mock(IMPPDataExchangeManager.class);
+//    manager.setBlockManager(mockMPPDataExchangeManager);
+//    IDataNodeRPCService.Client mockMppServiceClient =
+//        Mockito.mock(IDataNodeRPCService.Client.class);
+//    ITaskScheduler defaultScheduler = manager.getScheduler();
+//    QueryId queryId = new QueryId("test");
+//    FragmentInstanceId instanceId1 =
+//        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
+//    IDriver mockDriver1 = Mockito.mock(IDriver.class);
+//    Mockito.when(mockDriver1.getInfo()).thenReturn(instanceId1);
+//    IDriver mockDriver2 = Mockito.mock(IDriver.class);
+//    FragmentInstanceId instanceId2 =
+//        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-1");
+//    Mockito.when(mockDriver2.getInfo()).thenReturn(instanceId2);
+//    DriverTaskStatus[] invalidStates =
+//        new DriverTaskStatus[] {
+//          DriverTaskStatus.FINISHED, DriverTaskStatus.ABORTED,
+//        };
+//    for (DriverTaskStatus status : invalidStates) {
+//      DriverTask testTask1 = new DriverTask(mockDriver1, 100L, status);
+//      DriverTask testTask2 = new DriverTask(mockDriver2, 100L, DriverTaskStatus.BLOCKED);
+//      Set<DriverTask> taskSet = new HashSet<>();
+//      taskSet.add(testTask1);
+//      taskSet.add(testTask2);
+//      manager.getQueryMap().put(queryId, taskSet);
+//      manager.getTimeoutQueue().push(testTask1);
+//      manager.getTimeoutQueue().push(testTask2);
+//      manager.getBlockedTasks().add(testTask2);
+//      defaultScheduler.toAborted(testTask1);
+//
+//      Assert.assertEquals(status, testTask1.getStatus());
+//      Assert.assertEquals(DriverTaskStatus.BLOCKED, testTask2.getStatus());
+//      Assert.assertFalse(manager.getBlockedTasks().contains(testTask1));
+//      Assert.assertTrue(manager.getBlockedTasks().contains(testTask2));
+//      Assert.assertNull(manager.getReadyQueue().get(testTask1.getId()));
+//      Assert.assertNull(manager.getReadyQueue().get(testTask2.getId()));
+//      Assert.assertNotNull(manager.getTimeoutQueue().get(testTask1.getId()));
+//      Assert.assertNotNull(manager.getTimeoutQueue().get(testTask2.getId()));
+//      Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
+//      Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask1));
+//      Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask2));
+//
+//      Mockito.verify(mockDriver1, Mockito.never()).failed(Mockito.any());
+//      Mockito.verify(mockDriver2, Mockito.never()).failed(Mockito.any());
+//      clear();
+//    }
+//    DriverTaskStatus[] validStates =
+//        new DriverTaskStatus[] {
+//          DriverTaskStatus.RUNNING, DriverTaskStatus.READY, DriverTaskStatus.BLOCKED,
+//        };
+//    for (DriverTaskStatus status : validStates) {
+//      Mockito.reset(mockDriver1);
+//      Mockito.when(mockDriver1.getInfo()).thenReturn(instanceId1);
+//      Mockito.reset(mockDriver2);
+//      Mockito.when(mockDriver2.getInfo()).thenReturn(instanceId2);
+//
+//      DriverTask testTask1 = new DriverTask(mockDriver1, 100L, status);
+//
+//      DriverTask testTask2 = new DriverTask(mockDriver2, 100L, DriverTaskStatus.BLOCKED);
+//      Set<DriverTask> taskSet = new HashSet<>();
+//      taskSet.add(testTask1);
+//      taskSet.add(testTask2);
+//      manager.getQueryMap().put(queryId, taskSet);
+//      manager.getTimeoutQueue().push(testTask1);
+//      defaultScheduler.toAborted(testTask1);
+//
+//      Mockito.reset(mockMppServiceClient);
+//      Mockito.verify(mockMPPDataExchangeManager, Mockito.times(2))
+//          .forceDeregisterFragmentInstance(Mockito.any());
+//      Mockito.reset(mockMPPDataExchangeManager);
+//
+//      // An aborted fragment may cause others in the same query aborted.
+//      Assert.assertEquals(DriverTaskStatus.ABORTED, testTask1.getStatus());
+//      Assert.assertEquals(DriverTaskStatus.ABORTED, testTask2.getStatus());
+//      Assert.assertFalse(manager.getBlockedTasks().contains(testTask1));
+//      Assert.assertFalse(manager.getBlockedTasks().contains(testTask2));
+//      Assert.assertNull(manager.getReadyQueue().get(testTask1.getId()));
+//      Assert.assertNull(manager.getReadyQueue().get(testTask2.getId()));
+//      Assert.assertNull(manager.getTimeoutQueue().get(testTask1.getId()));
+//      Assert.assertNull(manager.getTimeoutQueue().get(testTask2.getId()));
+//      Assert.assertFalse(manager.getQueryMap().containsKey(queryId));
+//
+//      // The mockDriver1.failed() will be called outside the scheduler
+//      Mockito.verify(mockDriver1, Mockito.never()).failed(Mockito.any());
+//      Mockito.verify(mockDriver2, Mockito.times(1)).failed(Mockito.any());
+//
+//      clear();
+//    }
+//  }
+//
+//  private void clear() {
+//    manager.getQueryMap().clear();
+//    manager.getBlockedTasks().clear();
+//    manager.getReadyQueue().clear();
+//    manager.getTimeoutQueue().clear();
+//  }
+//}