You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/12/20 01:24:29 UTC

[iotdb] branch tsbs updated (589f7eb6fb -> 58267a5eb2)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch tsbs
in repository https://gitbox.apache.org/repos/asf/iotdb.git


    from 589f7eb6fb remove modification and async dispatch
     new 6c9944811d try
     new 58267a5eb2 s

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../mpp/execution/exchange/LocalSourceHandle.java  |  36 +-
 .../execution/operator/source/SeriesScanUtil.java  |   4 +-
 .../mpp/execution/schedule/DriverTaskThread.java   |   8 +-
 .../mpp/execution/schedule/ExecutionContext.java   |   7 +-
 .../db/mpp/execution/schedule/task/DriverTask.java |   3 +-
 .../schedule/DefaultDriverSchedulerTest.java       | 814 ++++++++++-----------
 6 files changed, 436 insertions(+), 436 deletions(-)


[iotdb] 02/02: s

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch tsbs
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 58267a5eb25a0d750a7753ff09e8b9268e9010b2
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Dec 19 09:45:03 2022 +0800

    s
---
 .../mpp/execution/exchange/LocalSourceHandle.java  | 38 +++++++++----------
 .../mpp/execution/schedule/DriverTaskThread.java   |  5 +--
 .../mpp/execution/schedule/ExecutionContext.java   |  1 -
 .../db/mpp/execution/schedule/task/DriverTask.java |  3 +-
 .../schedule/DefaultDriverSchedulerTest.java       | 44 +++++++++++-----------
 5 files changed, 45 insertions(+), 46 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
index dd0c88f0c1..be81208f00 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
@@ -88,26 +88,26 @@ public class LocalSourceHandle implements ISourceHandle {
 
   @Override
   public TsBlock receive() {
-//    try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
-      checkState();
+    //    try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
+    checkState();
 
-      if (!queue.isBlocked().isDone()) {
-        throw new IllegalStateException("Source handle is blocked.");
-      }
-      TsBlock tsBlock;
-      synchronized (queue) {
-        tsBlock = queue.remove();
-      }
-      if (tsBlock != null) {
-        logger.debug(
-            "[GetTsBlockFromQueue] TsBlock:{} size:{}",
-            currSequenceId,
-            tsBlock.getRetainedSizeInBytes());
-        currSequenceId++;
-      }
-      checkAndInvokeOnFinished();
-      return tsBlock;
-//    }
+    if (!queue.isBlocked().isDone()) {
+      throw new IllegalStateException("Source handle is blocked.");
+    }
+    TsBlock tsBlock;
+    synchronized (queue) {
+      tsBlock = queue.remove();
+    }
+    if (tsBlock != null) {
+      logger.debug(
+          "[GetTsBlockFromQueue] TsBlock:{} size:{}",
+          currSequenceId,
+          tsBlock.getRetainedSizeInBytes());
+      currSequenceId++;
+    }
+    checkAndInvokeOnFinished();
+    return tsBlock;
+    //    }
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java
index b55dcf9ef5..e28797590b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.db.mpp.execution.driver.IDriver;
 import org.apache.iotdb.db.mpp.execution.schedule.queue.IndexedBlockingQueue;
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
 import org.apache.iotdb.db.utils.SetThreadName;
-import org.apache.iotdb.db.utils.stats.CpuTimer;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import io.airlift.units.Duration;
@@ -61,9 +60,9 @@ public class DriverTaskThread extends AbstractDriverThread {
     }
     IDriver instance = task.getFragmentInstance();
     long startTime = System.nanoTime();
-//    CpuTimer timer = new CpuTimer();
+    //    CpuTimer timer = new CpuTimer();
     ListenableFuture<?> future = instance.processFor(EXECUTION_TIME_SLICE);
-//    CpuTimer.CpuDuration duration = timer.elapsedTime();
+    //    CpuTimer.CpuDuration duration = timer.elapsedTime();
     // long cost = System.nanoTime() - startTime;
     // If the future is cancelled, the task is in an error and should be thrown.
     if (future.isCancelled()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/ExecutionContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/ExecutionContext.java
index 070aa548b0..bdbfa2d353 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/ExecutionContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/ExecutionContext.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.execution.schedule;
 
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
-import org.apache.iotdb.db.utils.stats.CpuTimer;
 
 import io.airlift.units.Duration;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
index 1aa87a119d..47f25c36cd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
@@ -102,7 +102,8 @@ public class DriverTask implements IDIndexedAccessible {
 
     // 1. The penalty factor means that if a task executes less time in one schedule, it will have a
     // high schedule priority
-    double penaltyFactor = context.getCpuDuration() / context.getTimeSlice().getValue(TimeUnit.NANOSECONDS);
+    double penaltyFactor =
+        context.getCpuDuration() / context.getTimeSlice().getValue(TimeUnit.NANOSECONDS);
     // 2. If a task is nearly timeout, it should be scheduled as soon as possible.
     long base = System.currentTimeMillis() - ddl;
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
index 10fd9fac0e..28616c2fd1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
@@ -1,4 +1,4 @@
-///*
+/// *
 // * Licensed to the Apache Software Foundation (ASF) under one
 // * or more contributor license agreements.  See the NOTICE file
 // * distributed with this work for additional information
@@ -16,30 +16,30 @@
 // * specific language governing permissions and limitations
 // * under the License.
 // */
-//package org.apache.iotdb.db.mpp.execution.schedule;
+// package org.apache.iotdb.db.mpp.execution.schedule;
 //
-//import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-//import org.apache.iotdb.db.mpp.common.PlanFragmentId;
-//import org.apache.iotdb.db.mpp.common.QueryId;
-//import org.apache.iotdb.db.mpp.execution.driver.IDriver;
-//import org.apache.iotdb.db.mpp.execution.exchange.IMPPDataExchangeManager;
-//import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
-//import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskStatus;
-//import org.apache.iotdb.db.utils.stats.CpuTimer;
-//import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
+// import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+// import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+// import org.apache.iotdb.db.mpp.common.QueryId;
+// import org.apache.iotdb.db.mpp.execution.driver.IDriver;
+// import org.apache.iotdb.db.mpp.execution.exchange.IMPPDataExchangeManager;
+// import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
+// import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskStatus;
+// import org.apache.iotdb.db.utils.stats.CpuTimer;
+// import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
 //
-//import io.airlift.units.Duration;
-//import org.junit.After;
-//import org.junit.Assert;
-//import org.junit.Test;
-//import org.mockito.Mockito;
+// import io.airlift.units.Duration;
+// import org.junit.After;
+// import org.junit.Assert;
+// import org.junit.Test;
+// import org.mockito.Mockito;
 //
-//import java.io.IOException;
-//import java.util.HashSet;
-//import java.util.Set;
-//import java.util.concurrent.TimeUnit;
+// import java.io.IOException;
+// import java.util.HashSet;
+// import java.util.Set;
+// import java.util.concurrent.TimeUnit;
 //
-//public class DefaultDriverSchedulerTest {
+// public class DefaultDriverSchedulerTest {
 //
 //  private final DriverScheduler manager = DriverScheduler.getInstance();
 //
@@ -404,4 +404,4 @@
 //    manager.getReadyQueue().clear();
 //    manager.getTimeoutQueue().clear();
 //  }
-//}
+// }


[iotdb] 01/02: try

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch tsbs
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6c9944811d6702a832086d39cda2e150c731713b
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Sun Dec 18 16:24:29 2022 +0800

    try
---
 .../mpp/execution/exchange/LocalSourceHandle.java  |   4 +-
 .../execution/operator/source/SeriesScanUtil.java  |   4 +-
 .../mpp/execution/schedule/DriverTaskThread.java   |   7 +-
 .../mpp/execution/schedule/ExecutionContext.java   |   6 +-
 .../db/mpp/execution/schedule/task/DriverTask.java |   4 +-
 .../schedule/DefaultDriverSchedulerTest.java       | 814 ++++++++++-----------
 6 files changed, 420 insertions(+), 419 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
index 64415f41dc..dd0c88f0c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
@@ -88,7 +88,7 @@ public class LocalSourceHandle implements ISourceHandle {
 
   @Override
   public TsBlock receive() {
-    try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
+//    try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
       checkState();
 
       if (!queue.isBlocked().isDone()) {
@@ -107,7 +107,7 @@ public class LocalSourceHandle implements ISourceHandle {
       }
       checkAndInvokeOnFinished();
       return tsBlock;
-    }
+//    }
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index 22fd2cbabc..7ab5a02e05 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -159,7 +159,9 @@ public class SeriesScanUtil {
   }
 
   public void initQueryDataSource(QueryDataSource dataSource) {
-    QueryUtils.fillOrderIndexes(dataSource, seriesPath.getDevice(), orderUtils.getAscending());
+    if (!dataSource.getUnseqResources().isEmpty()) {
+      QueryUtils.fillOrderIndexes(dataSource, seriesPath.getDevice(), orderUtils.getAscending());
+    }
     this.dataSource = dataSource;
     this.timeFilter = dataSource.updateFilterUsingTTL(timeFilter);
     if (this.valueFilter != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java
index 28324a0502..b55dcf9ef5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java
@@ -60,9 +60,10 @@ public class DriverTaskThread extends AbstractDriverThread {
       return;
     }
     IDriver instance = task.getFragmentInstance();
-    CpuTimer timer = new CpuTimer();
+    long startTime = System.nanoTime();
+//    CpuTimer timer = new CpuTimer();
     ListenableFuture<?> future = instance.processFor(EXECUTION_TIME_SLICE);
-    CpuTimer.CpuDuration duration = timer.elapsedTime();
+//    CpuTimer.CpuDuration duration = timer.elapsedTime();
     // long cost = System.nanoTime() - startTime;
     // If the future is cancelled, the task is in an error and should be thrown.
     if (future.isCancelled()) {
@@ -71,7 +72,7 @@ public class DriverTaskThread extends AbstractDriverThread {
       return;
     }
     ExecutionContext context = new ExecutionContext();
-    context.setCpuDuration(duration);
+    context.setCpuDuration(System.nanoTime() - startTime);
     context.setTimeSlice(EXECUTION_TIME_SLICE);
     if (instance.isFinished()) {
       scheduler.runningToFinished(task, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/ExecutionContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/ExecutionContext.java
index df821dac68..070aa548b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/ExecutionContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/ExecutionContext.java
@@ -25,14 +25,14 @@ import io.airlift.units.Duration;
 
 /** The execution context of a {@link DriverTask} */
 public class ExecutionContext {
-  private CpuTimer.CpuDuration cpuDuration;
+  private long cpuDuration;
   private Duration timeSlice;
 
-  public CpuTimer.CpuDuration getCpuDuration() {
+  public long getCpuDuration() {
     return cpuDuration;
   }
 
-  public void setCpuDuration(CpuTimer.CpuDuration cpuDuration) {
+  public void setCpuDuration(long cpuDuration) {
     this.cpuDuration = cpuDuration;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
index f65c1f3988..1aa87a119d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
@@ -102,9 +102,7 @@ public class DriverTask implements IDIndexedAccessible {
 
     // 1. The penalty factor means that if a task executes less time in one schedule, it will have a
     // high schedule priority
-    double penaltyFactor =
-        context.getCpuDuration().getWall().getValue(TimeUnit.NANOSECONDS)
-            / context.getTimeSlice().getValue(TimeUnit.NANOSECONDS);
+    double penaltyFactor = context.getCpuDuration() / context.getTimeSlice().getValue(TimeUnit.NANOSECONDS);
     // 2. If a task is nearly timeout, it should be scheduled as soon as possible.
     long base = System.currentTimeMillis() - ddl;
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
index e7e6861faa..10fd9fac0e 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
@@ -1,407 +1,407 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.mpp.execution.schedule;
-
-import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.common.PlanFragmentId;
-import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.execution.driver.IDriver;
-import org.apache.iotdb.db.mpp.execution.exchange.IMPPDataExchangeManager;
-import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
-import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskStatus;
-import org.apache.iotdb.db.utils.stats.CpuTimer;
-import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
-
-import io.airlift.units.Duration;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-public class DefaultDriverSchedulerTest {
-
-  private final DriverScheduler manager = DriverScheduler.getInstance();
-
-  @After
-  public void tearDown() throws IOException {
-    clear();
-  }
-
-  @Test
-  public void testBlockedToReady() {
-    IMPPDataExchangeManager mockMPPDataExchangeManager =
-        Mockito.mock(IMPPDataExchangeManager.class);
-    manager.setBlockManager(mockMPPDataExchangeManager);
-    ITaskScheduler defaultScheduler = manager.getScheduler();
-    IDriver mockDriver = Mockito.mock(IDriver.class);
-    QueryId queryId = new QueryId("test");
-    FragmentInstanceId instanceId =
-        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
-    Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
-    DriverTaskStatus[] invalidStates =
-        new DriverTaskStatus[] {
-          DriverTaskStatus.FINISHED,
-          DriverTaskStatus.ABORTED,
-          DriverTaskStatus.READY,
-          DriverTaskStatus.RUNNING,
-        };
-    for (DriverTaskStatus status : invalidStates) {
-      DriverTask testTask = new DriverTask(mockDriver, 100L, status);
-      manager.getBlockedTasks().add(testTask);
-      Set<DriverTask> taskSet = new HashSet<>();
-      taskSet.add(testTask);
-      manager.getQueryMap().put(queryId, taskSet);
-      manager.getTimeoutQueue().push(testTask);
-      defaultScheduler.blockedToReady(testTask);
-      Assert.assertEquals(status, testTask.getStatus());
-      Assert.assertTrue(manager.getBlockedTasks().contains(testTask));
-      Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
-      Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
-      Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
-      Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
-      clear();
-    }
-    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.BLOCKED);
-    manager.getBlockedTasks().add(testTask);
-    Set<DriverTask> taskSet = new HashSet<>();
-    taskSet.add(testTask);
-    manager.getQueryMap().put(queryId, taskSet);
-    manager.getTimeoutQueue().push(testTask);
-    defaultScheduler.blockedToReady(testTask);
-    Assert.assertEquals(DriverTaskStatus.READY, testTask.getStatus());
-    Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
-    Assert.assertNotNull(manager.getReadyQueue().get(testTask.getId()));
-    Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
-    Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
-    Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
-    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
-    clear();
-  }
-
-  @Test
-  public void testReadyToRunning() {
-    IMPPDataExchangeManager mockMPPDataExchangeManager =
-        Mockito.mock(IMPPDataExchangeManager.class);
-    manager.setBlockManager(mockMPPDataExchangeManager);
-    ITaskScheduler defaultScheduler = manager.getScheduler();
-    IDriver mockDriver = Mockito.mock(IDriver.class);
-
-    QueryId queryId = new QueryId("test");
-    FragmentInstanceId instanceId =
-        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
-    Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
-    DriverTaskStatus[] invalidStates =
-        new DriverTaskStatus[] {
-          DriverTaskStatus.FINISHED,
-          DriverTaskStatus.ABORTED,
-          DriverTaskStatus.BLOCKED,
-          DriverTaskStatus.RUNNING,
-        };
-    for (DriverTaskStatus status : invalidStates) {
-      DriverTask testTask = new DriverTask(mockDriver, 100L, status);
-      Set<DriverTask> taskSet = new HashSet<>();
-      taskSet.add(testTask);
-      manager.getQueryMap().put(queryId, taskSet);
-      manager.getTimeoutQueue().push(testTask);
-      defaultScheduler.readyToRunning(testTask);
-      Assert.assertEquals(status, testTask.getStatus());
-      Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
-      Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
-      Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
-      Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
-      clear();
-    }
-    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
-    Set<DriverTask> taskSet = new HashSet<>();
-    taskSet.add(testTask);
-    manager.getQueryMap().put(queryId, taskSet);
-    manager.getTimeoutQueue().push(testTask);
-    defaultScheduler.readyToRunning(testTask);
-    Assert.assertEquals(DriverTaskStatus.RUNNING, testTask.getStatus());
-    Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
-    Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
-    Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
-    Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
-    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
-    clear();
-  }
-
-  @Test
-  public void testRunningToReady() {
-    IMPPDataExchangeManager mockMPPDataExchangeManager =
-        Mockito.mock(IMPPDataExchangeManager.class);
-    manager.setBlockManager(mockMPPDataExchangeManager);
-    ITaskScheduler defaultScheduler = manager.getScheduler();
-    IDriver mockDriver = Mockito.mock(IDriver.class);
-    QueryId queryId = new QueryId("test");
-    FragmentInstanceId instanceId =
-        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
-    Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
-    DriverTaskStatus[] invalidStates =
-        new DriverTaskStatus[] {
-          DriverTaskStatus.FINISHED,
-          DriverTaskStatus.ABORTED,
-          DriverTaskStatus.BLOCKED,
-          DriverTaskStatus.READY,
-        };
-    for (DriverTaskStatus status : invalidStates) {
-      DriverTask testTask = new DriverTask(mockDriver, 100L, status);
-      Set<DriverTask> taskSet = new HashSet<>();
-      taskSet.add(testTask);
-      manager.getQueryMap().put(queryId, taskSet);
-      manager.getTimeoutQueue().push(testTask);
-      defaultScheduler.runningToReady(testTask, new ExecutionContext());
-      Assert.assertEquals(status, testTask.getStatus());
-      Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
-      Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
-      Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
-      Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
-      Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
-      clear();
-    }
-    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
-    Set<DriverTask> taskSet = new HashSet<>();
-    taskSet.add(testTask);
-    manager.getQueryMap().put(queryId, taskSet);
-    manager.getTimeoutQueue().push(testTask);
-    ExecutionContext context = new ExecutionContext();
-    context.setTimeSlice(new Duration(1, TimeUnit.SECONDS));
-    context.setCpuDuration(new CpuTimer.CpuDuration());
-    defaultScheduler.runningToReady(testTask, context);
-    Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
-    Assert.assertEquals(DriverTaskStatus.READY, testTask.getStatus());
-    Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
-    Assert.assertNotNull(manager.getReadyQueue().get(testTask.getId()));
-    Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
-    Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
-    Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
-    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
-    clear();
-  }
-
-  @Test
-  public void testRunningToBlocked() {
-    IMPPDataExchangeManager mockMPPDataExchangeManager =
-        Mockito.mock(IMPPDataExchangeManager.class);
-    manager.setBlockManager(mockMPPDataExchangeManager);
-    ITaskScheduler defaultScheduler = manager.getScheduler();
-    IDriver mockDriver = Mockito.mock(IDriver.class);
-    QueryId queryId = new QueryId("test");
-    FragmentInstanceId instanceId =
-        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
-    Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
-    DriverTaskStatus[] invalidStates =
-        new DriverTaskStatus[] {
-          DriverTaskStatus.FINISHED,
-          DriverTaskStatus.ABORTED,
-          DriverTaskStatus.BLOCKED,
-          DriverTaskStatus.READY,
-        };
-    for (DriverTaskStatus status : invalidStates) {
-      DriverTask testTask = new DriverTask(mockDriver, 100L, status);
-      Set<DriverTask> taskSet = new HashSet<>();
-      taskSet.add(testTask);
-      manager.getQueryMap().put(queryId, taskSet);
-      manager.getTimeoutQueue().push(testTask);
-      defaultScheduler.runningToBlocked(testTask, new ExecutionContext());
-      Assert.assertEquals(status, testTask.getStatus());
-      Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
-      Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
-      Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
-      Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
-      Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
-      clear();
-    }
-    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
-    Set<DriverTask> taskSet = new HashSet<>();
-    taskSet.add(testTask);
-    manager.getQueryMap().put(queryId, taskSet);
-    manager.getTimeoutQueue().push(testTask);
-    ExecutionContext context = new ExecutionContext();
-    context.setTimeSlice(new Duration(1, TimeUnit.SECONDS));
-    context.setCpuDuration(new CpuTimer.CpuDuration());
-    defaultScheduler.runningToBlocked(testTask, context);
-    Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
-    Assert.assertEquals(DriverTaskStatus.BLOCKED, testTask.getStatus());
-    Assert.assertTrue(manager.getBlockedTasks().contains(testTask));
-    Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
-    Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
-    Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
-    Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
-    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
-    clear();
-  }
-
-  @Test
-  public void testRunningToFinished() {
-    IMPPDataExchangeManager mockMPPDataExchangeManager =
-        Mockito.mock(IMPPDataExchangeManager.class);
-    manager.setBlockManager(mockMPPDataExchangeManager);
-    ITaskScheduler defaultScheduler = manager.getScheduler();
-    IDriver mockDriver = Mockito.mock(IDriver.class);
-    QueryId queryId = new QueryId("test");
-    FragmentInstanceId instanceId =
-        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
-    Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
-    DriverTaskStatus[] invalidStates =
-        new DriverTaskStatus[] {
-          DriverTaskStatus.FINISHED,
-          DriverTaskStatus.ABORTED,
-          DriverTaskStatus.BLOCKED,
-          DriverTaskStatus.READY,
-        };
-    for (DriverTaskStatus status : invalidStates) {
-      DriverTask testTask = new DriverTask(mockDriver, 100L, status);
-      Set<DriverTask> taskSet = new HashSet<>();
-      taskSet.add(testTask);
-      manager.getQueryMap().put(queryId, taskSet);
-      manager.getTimeoutQueue().push(testTask);
-      defaultScheduler.runningToFinished(testTask, new ExecutionContext());
-      Assert.assertEquals(status, testTask.getStatus());
-      Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
-      Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
-      Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
-      Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
-      Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
-      clear();
-    }
-    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
-    Set<DriverTask> taskSet = new HashSet<>();
-    taskSet.add(testTask);
-    manager.getQueryMap().put(queryId, taskSet);
-    manager.getTimeoutQueue().push(testTask);
-    ExecutionContext context = new ExecutionContext();
-    context.setTimeSlice(new Duration(1, TimeUnit.SECONDS));
-    context.setCpuDuration(new CpuTimer.CpuDuration());
-    defaultScheduler.runningToFinished(testTask, context);
-    Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
-    Assert.assertEquals(DriverTaskStatus.FINISHED, testTask.getStatus());
-    Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
-    Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
-    Assert.assertNull(manager.getTimeoutQueue().get(testTask.getId()));
-    Assert.assertFalse(manager.getQueryMap().containsKey(queryId));
-    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
-    clear();
-  }
-
-  @Test
-  public void testToAbort() {
-    IMPPDataExchangeManager mockMPPDataExchangeManager =
-        Mockito.mock(IMPPDataExchangeManager.class);
-    manager.setBlockManager(mockMPPDataExchangeManager);
-    IDataNodeRPCService.Client mockMppServiceClient =
-        Mockito.mock(IDataNodeRPCService.Client.class);
-    ITaskScheduler defaultScheduler = manager.getScheduler();
-    QueryId queryId = new QueryId("test");
-    FragmentInstanceId instanceId1 =
-        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
-    IDriver mockDriver1 = Mockito.mock(IDriver.class);
-    Mockito.when(mockDriver1.getInfo()).thenReturn(instanceId1);
-    IDriver mockDriver2 = Mockito.mock(IDriver.class);
-    FragmentInstanceId instanceId2 =
-        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-1");
-    Mockito.when(mockDriver2.getInfo()).thenReturn(instanceId2);
-    DriverTaskStatus[] invalidStates =
-        new DriverTaskStatus[] {
-          DriverTaskStatus.FINISHED, DriverTaskStatus.ABORTED,
-        };
-    for (DriverTaskStatus status : invalidStates) {
-      DriverTask testTask1 = new DriverTask(mockDriver1, 100L, status);
-      DriverTask testTask2 = new DriverTask(mockDriver2, 100L, DriverTaskStatus.BLOCKED);
-      Set<DriverTask> taskSet = new HashSet<>();
-      taskSet.add(testTask1);
-      taskSet.add(testTask2);
-      manager.getQueryMap().put(queryId, taskSet);
-      manager.getTimeoutQueue().push(testTask1);
-      manager.getTimeoutQueue().push(testTask2);
-      manager.getBlockedTasks().add(testTask2);
-      defaultScheduler.toAborted(testTask1);
-
-      Assert.assertEquals(status, testTask1.getStatus());
-      Assert.assertEquals(DriverTaskStatus.BLOCKED, testTask2.getStatus());
-      Assert.assertFalse(manager.getBlockedTasks().contains(testTask1));
-      Assert.assertTrue(manager.getBlockedTasks().contains(testTask2));
-      Assert.assertNull(manager.getReadyQueue().get(testTask1.getId()));
-      Assert.assertNull(manager.getReadyQueue().get(testTask2.getId()));
-      Assert.assertNotNull(manager.getTimeoutQueue().get(testTask1.getId()));
-      Assert.assertNotNull(manager.getTimeoutQueue().get(testTask2.getId()));
-      Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
-      Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask1));
-      Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask2));
-
-      Mockito.verify(mockDriver1, Mockito.never()).failed(Mockito.any());
-      Mockito.verify(mockDriver2, Mockito.never()).failed(Mockito.any());
-      clear();
-    }
-    DriverTaskStatus[] validStates =
-        new DriverTaskStatus[] {
-          DriverTaskStatus.RUNNING, DriverTaskStatus.READY, DriverTaskStatus.BLOCKED,
-        };
-    for (DriverTaskStatus status : validStates) {
-      Mockito.reset(mockDriver1);
-      Mockito.when(mockDriver1.getInfo()).thenReturn(instanceId1);
-      Mockito.reset(mockDriver2);
-      Mockito.when(mockDriver2.getInfo()).thenReturn(instanceId2);
-
-      DriverTask testTask1 = new DriverTask(mockDriver1, 100L, status);
-
-      DriverTask testTask2 = new DriverTask(mockDriver2, 100L, DriverTaskStatus.BLOCKED);
-      Set<DriverTask> taskSet = new HashSet<>();
-      taskSet.add(testTask1);
-      taskSet.add(testTask2);
-      manager.getQueryMap().put(queryId, taskSet);
-      manager.getTimeoutQueue().push(testTask1);
-      defaultScheduler.toAborted(testTask1);
-
-      Mockito.reset(mockMppServiceClient);
-      Mockito.verify(mockMPPDataExchangeManager, Mockito.times(2))
-          .forceDeregisterFragmentInstance(Mockito.any());
-      Mockito.reset(mockMPPDataExchangeManager);
-
-      // An aborted fragment may cause others in the same query aborted.
-      Assert.assertEquals(DriverTaskStatus.ABORTED, testTask1.getStatus());
-      Assert.assertEquals(DriverTaskStatus.ABORTED, testTask2.getStatus());
-      Assert.assertFalse(manager.getBlockedTasks().contains(testTask1));
-      Assert.assertFalse(manager.getBlockedTasks().contains(testTask2));
-      Assert.assertNull(manager.getReadyQueue().get(testTask1.getId()));
-      Assert.assertNull(manager.getReadyQueue().get(testTask2.getId()));
-      Assert.assertNull(manager.getTimeoutQueue().get(testTask1.getId()));
-      Assert.assertNull(manager.getTimeoutQueue().get(testTask2.getId()));
-      Assert.assertFalse(manager.getQueryMap().containsKey(queryId));
-
-      // The mockDriver1.failed() will be called outside the scheduler
-      Mockito.verify(mockDriver1, Mockito.never()).failed(Mockito.any());
-      Mockito.verify(mockDriver2, Mockito.times(1)).failed(Mockito.any());
-
-      clear();
-    }
-  }
-
-  private void clear() {
-    manager.getQueryMap().clear();
-    manager.getBlockedTasks().clear();
-    manager.getReadyQueue().clear();
-    manager.getTimeoutQueue().clear();
-  }
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing,
+// * software distributed under the License is distributed on an
+// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// * KIND, either express or implied.  See the License for the
+// * specific language governing permissions and limitations
+// * under the License.
+// */
+//package org.apache.iotdb.db.mpp.execution.schedule;
+//
+//import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+//import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+//import org.apache.iotdb.db.mpp.common.QueryId;
+//import org.apache.iotdb.db.mpp.execution.driver.IDriver;
+//import org.apache.iotdb.db.mpp.execution.exchange.IMPPDataExchangeManager;
+//import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
+//import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskStatus;
+//import org.apache.iotdb.db.utils.stats.CpuTimer;
+//import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
+//
+//import io.airlift.units.Duration;
+//import org.junit.After;
+//import org.junit.Assert;
+//import org.junit.Test;
+//import org.mockito.Mockito;
+//
+//import java.io.IOException;
+//import java.util.HashSet;
+//import java.util.Set;
+//import java.util.concurrent.TimeUnit;
+//
+//public class DefaultDriverSchedulerTest {
+//
+//  private final DriverScheduler manager = DriverScheduler.getInstance();
+//
+//  @After
+//  public void tearDown() throws IOException {
+//    clear();
+//  }
+//
+//  @Test
+//  public void testBlockedToReady() {
+//    IMPPDataExchangeManager mockMPPDataExchangeManager =
+//        Mockito.mock(IMPPDataExchangeManager.class);
+//    manager.setBlockManager(mockMPPDataExchangeManager);
+//    ITaskScheduler defaultScheduler = manager.getScheduler();
+//    IDriver mockDriver = Mockito.mock(IDriver.class);
+//    QueryId queryId = new QueryId("test");
+//    FragmentInstanceId instanceId =
+//        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
+//    Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
+//    DriverTaskStatus[] invalidStates =
+//        new DriverTaskStatus[] {
+//          DriverTaskStatus.FINISHED,
+//          DriverTaskStatus.ABORTED,
+//          DriverTaskStatus.READY,
+//          DriverTaskStatus.RUNNING,
+//        };
+//    for (DriverTaskStatus status : invalidStates) {
+//      DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+//      manager.getBlockedTasks().add(testTask);
+//      Set<DriverTask> taskSet = new HashSet<>();
+//      taskSet.add(testTask);
+//      manager.getQueryMap().put(queryId, taskSet);
+//      manager.getTimeoutQueue().push(testTask);
+//      defaultScheduler.blockedToReady(testTask);
+//      Assert.assertEquals(status, testTask.getStatus());
+//      Assert.assertTrue(manager.getBlockedTasks().contains(testTask));
+//      Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
+//      Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
+//      Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
+//      Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+//      clear();
+//    }
+//    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.BLOCKED);
+//    manager.getBlockedTasks().add(testTask);
+//    Set<DriverTask> taskSet = new HashSet<>();
+//    taskSet.add(testTask);
+//    manager.getQueryMap().put(queryId, taskSet);
+//    manager.getTimeoutQueue().push(testTask);
+//    defaultScheduler.blockedToReady(testTask);
+//    Assert.assertEquals(DriverTaskStatus.READY, testTask.getStatus());
+//    Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
+//    Assert.assertNotNull(manager.getReadyQueue().get(testTask.getId()));
+//    Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
+//    Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
+//    Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+//    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
+//    clear();
+//  }
+//
+//  @Test
+//  public void testReadyToRunning() {
+//    IMPPDataExchangeManager mockMPPDataExchangeManager =
+//        Mockito.mock(IMPPDataExchangeManager.class);
+//    manager.setBlockManager(mockMPPDataExchangeManager);
+//    ITaskScheduler defaultScheduler = manager.getScheduler();
+//    IDriver mockDriver = Mockito.mock(IDriver.class);
+//
+//    QueryId queryId = new QueryId("test");
+//    FragmentInstanceId instanceId =
+//        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
+//    Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
+//    DriverTaskStatus[] invalidStates =
+//        new DriverTaskStatus[] {
+//          DriverTaskStatus.FINISHED,
+//          DriverTaskStatus.ABORTED,
+//          DriverTaskStatus.BLOCKED,
+//          DriverTaskStatus.RUNNING,
+//        };
+//    for (DriverTaskStatus status : invalidStates) {
+//      DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+//      Set<DriverTask> taskSet = new HashSet<>();
+//      taskSet.add(testTask);
+//      manager.getQueryMap().put(queryId, taskSet);
+//      manager.getTimeoutQueue().push(testTask);
+//      defaultScheduler.readyToRunning(testTask);
+//      Assert.assertEquals(status, testTask.getStatus());
+//      Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
+//      Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
+//      Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
+//      Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+//      clear();
+//    }
+//    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
+//    Set<DriverTask> taskSet = new HashSet<>();
+//    taskSet.add(testTask);
+//    manager.getQueryMap().put(queryId, taskSet);
+//    manager.getTimeoutQueue().push(testTask);
+//    defaultScheduler.readyToRunning(testTask);
+//    Assert.assertEquals(DriverTaskStatus.RUNNING, testTask.getStatus());
+//    Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
+//    Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
+//    Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
+//    Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+//    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
+//    clear();
+//  }
+//
+//  @Test
+//  public void testRunningToReady() {
+//    IMPPDataExchangeManager mockMPPDataExchangeManager =
+//        Mockito.mock(IMPPDataExchangeManager.class);
+//    manager.setBlockManager(mockMPPDataExchangeManager);
+//    ITaskScheduler defaultScheduler = manager.getScheduler();
+//    IDriver mockDriver = Mockito.mock(IDriver.class);
+//    QueryId queryId = new QueryId("test");
+//    FragmentInstanceId instanceId =
+//        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
+//    Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
+//    DriverTaskStatus[] invalidStates =
+//        new DriverTaskStatus[] {
+//          DriverTaskStatus.FINISHED,
+//          DriverTaskStatus.ABORTED,
+//          DriverTaskStatus.BLOCKED,
+//          DriverTaskStatus.READY,
+//        };
+//    for (DriverTaskStatus status : invalidStates) {
+//      DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+//      Set<DriverTask> taskSet = new HashSet<>();
+//      taskSet.add(testTask);
+//      manager.getQueryMap().put(queryId, taskSet);
+//      manager.getTimeoutQueue().push(testTask);
+//      defaultScheduler.runningToReady(testTask, new ExecutionContext());
+//      Assert.assertEquals(status, testTask.getStatus());
+//      Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
+//      Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
+//      Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
+//      Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
+//      Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+//      clear();
+//    }
+//    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
+//    Set<DriverTask> taskSet = new HashSet<>();
+//    taskSet.add(testTask);
+//    manager.getQueryMap().put(queryId, taskSet);
+//    manager.getTimeoutQueue().push(testTask);
+//    ExecutionContext context = new ExecutionContext();
+//    context.setTimeSlice(new Duration(1, TimeUnit.SECONDS));
+//    context.setCpuDuration(new CpuTimer.CpuDuration());
+//    defaultScheduler.runningToReady(testTask, context);
+//    Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
+//    Assert.assertEquals(DriverTaskStatus.READY, testTask.getStatus());
+//    Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
+//    Assert.assertNotNull(manager.getReadyQueue().get(testTask.getId()));
+//    Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
+//    Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
+//    Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+//    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
+//    clear();
+//  }
+//
+//  @Test
+//  public void testRunningToBlocked() {
+//    IMPPDataExchangeManager mockMPPDataExchangeManager =
+//        Mockito.mock(IMPPDataExchangeManager.class);
+//    manager.setBlockManager(mockMPPDataExchangeManager);
+//    ITaskScheduler defaultScheduler = manager.getScheduler();
+//    IDriver mockDriver = Mockito.mock(IDriver.class);
+//    QueryId queryId = new QueryId("test");
+//    FragmentInstanceId instanceId =
+//        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
+//    Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
+//    DriverTaskStatus[] invalidStates =
+//        new DriverTaskStatus[] {
+//          DriverTaskStatus.FINISHED,
+//          DriverTaskStatus.ABORTED,
+//          DriverTaskStatus.BLOCKED,
+//          DriverTaskStatus.READY,
+//        };
+//    for (DriverTaskStatus status : invalidStates) {
+//      DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+//      Set<DriverTask> taskSet = new HashSet<>();
+//      taskSet.add(testTask);
+//      manager.getQueryMap().put(queryId, taskSet);
+//      manager.getTimeoutQueue().push(testTask);
+//      defaultScheduler.runningToBlocked(testTask, new ExecutionContext());
+//      Assert.assertEquals(status, testTask.getStatus());
+//      Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
+//      Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
+//      Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
+//      Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
+//      Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+//      clear();
+//    }
+//    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
+//    Set<DriverTask> taskSet = new HashSet<>();
+//    taskSet.add(testTask);
+//    manager.getQueryMap().put(queryId, taskSet);
+//    manager.getTimeoutQueue().push(testTask);
+//    ExecutionContext context = new ExecutionContext();
+//    context.setTimeSlice(new Duration(1, TimeUnit.SECONDS));
+//    context.setCpuDuration(new CpuTimer.CpuDuration());
+//    defaultScheduler.runningToBlocked(testTask, context);
+//    Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
+//    Assert.assertEquals(DriverTaskStatus.BLOCKED, testTask.getStatus());
+//    Assert.assertTrue(manager.getBlockedTasks().contains(testTask));
+//    Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
+//    Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
+//    Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
+//    Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+//    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
+//    clear();
+//  }
+//
+//  @Test
+//  public void testRunningToFinished() {
+//    IMPPDataExchangeManager mockMPPDataExchangeManager =
+//        Mockito.mock(IMPPDataExchangeManager.class);
+//    manager.setBlockManager(mockMPPDataExchangeManager);
+//    ITaskScheduler defaultScheduler = manager.getScheduler();
+//    IDriver mockDriver = Mockito.mock(IDriver.class);
+//    QueryId queryId = new QueryId("test");
+//    FragmentInstanceId instanceId =
+//        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
+//    Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
+//    DriverTaskStatus[] invalidStates =
+//        new DriverTaskStatus[] {
+//          DriverTaskStatus.FINISHED,
+//          DriverTaskStatus.ABORTED,
+//          DriverTaskStatus.BLOCKED,
+//          DriverTaskStatus.READY,
+//        };
+//    for (DriverTaskStatus status : invalidStates) {
+//      DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+//      Set<DriverTask> taskSet = new HashSet<>();
+//      taskSet.add(testTask);
+//      manager.getQueryMap().put(queryId, taskSet);
+//      manager.getTimeoutQueue().push(testTask);
+//      defaultScheduler.runningToFinished(testTask, new ExecutionContext());
+//      Assert.assertEquals(status, testTask.getStatus());
+//      Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
+//      Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
+//      Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
+//      Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
+//      Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+//      clear();
+//    }
+//    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
+//    Set<DriverTask> taskSet = new HashSet<>();
+//    taskSet.add(testTask);
+//    manager.getQueryMap().put(queryId, taskSet);
+//    manager.getTimeoutQueue().push(testTask);
+//    ExecutionContext context = new ExecutionContext();
+//    context.setTimeSlice(new Duration(1, TimeUnit.SECONDS));
+//    context.setCpuDuration(new CpuTimer.CpuDuration());
+//    defaultScheduler.runningToFinished(testTask, context);
+//    Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
+//    Assert.assertEquals(DriverTaskStatus.FINISHED, testTask.getStatus());
+//    Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
+//    Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
+//    Assert.assertNull(manager.getTimeoutQueue().get(testTask.getId()));
+//    Assert.assertFalse(manager.getQueryMap().containsKey(queryId));
+//    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
+//    clear();
+//  }
+//
+//  @Test
+//  public void testToAbort() {
+//    IMPPDataExchangeManager mockMPPDataExchangeManager =
+//        Mockito.mock(IMPPDataExchangeManager.class);
+//    manager.setBlockManager(mockMPPDataExchangeManager);
+//    IDataNodeRPCService.Client mockMppServiceClient =
+//        Mockito.mock(IDataNodeRPCService.Client.class);
+//    ITaskScheduler defaultScheduler = manager.getScheduler();
+//    QueryId queryId = new QueryId("test");
+//    FragmentInstanceId instanceId1 =
+//        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
+//    IDriver mockDriver1 = Mockito.mock(IDriver.class);
+//    Mockito.when(mockDriver1.getInfo()).thenReturn(instanceId1);
+//    IDriver mockDriver2 = Mockito.mock(IDriver.class);
+//    FragmentInstanceId instanceId2 =
+//        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-1");
+//    Mockito.when(mockDriver2.getInfo()).thenReturn(instanceId2);
+//    DriverTaskStatus[] invalidStates =
+//        new DriverTaskStatus[] {
+//          DriverTaskStatus.FINISHED, DriverTaskStatus.ABORTED,
+//        };
+//    for (DriverTaskStatus status : invalidStates) {
+//      DriverTask testTask1 = new DriverTask(mockDriver1, 100L, status);
+//      DriverTask testTask2 = new DriverTask(mockDriver2, 100L, DriverTaskStatus.BLOCKED);
+//      Set<DriverTask> taskSet = new HashSet<>();
+//      taskSet.add(testTask1);
+//      taskSet.add(testTask2);
+//      manager.getQueryMap().put(queryId, taskSet);
+//      manager.getTimeoutQueue().push(testTask1);
+//      manager.getTimeoutQueue().push(testTask2);
+//      manager.getBlockedTasks().add(testTask2);
+//      defaultScheduler.toAborted(testTask1);
+//
+//      Assert.assertEquals(status, testTask1.getStatus());
+//      Assert.assertEquals(DriverTaskStatus.BLOCKED, testTask2.getStatus());
+//      Assert.assertFalse(manager.getBlockedTasks().contains(testTask1));
+//      Assert.assertTrue(manager.getBlockedTasks().contains(testTask2));
+//      Assert.assertNull(manager.getReadyQueue().get(testTask1.getId()));
+//      Assert.assertNull(manager.getReadyQueue().get(testTask2.getId()));
+//      Assert.assertNotNull(manager.getTimeoutQueue().get(testTask1.getId()));
+//      Assert.assertNotNull(manager.getTimeoutQueue().get(testTask2.getId()));
+//      Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
+//      Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask1));
+//      Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask2));
+//
+//      Mockito.verify(mockDriver1, Mockito.never()).failed(Mockito.any());
+//      Mockito.verify(mockDriver2, Mockito.never()).failed(Mockito.any());
+//      clear();
+//    }
+//    DriverTaskStatus[] validStates =
+//        new DriverTaskStatus[] {
+//          DriverTaskStatus.RUNNING, DriverTaskStatus.READY, DriverTaskStatus.BLOCKED,
+//        };
+//    for (DriverTaskStatus status : validStates) {
+//      Mockito.reset(mockDriver1);
+//      Mockito.when(mockDriver1.getInfo()).thenReturn(instanceId1);
+//      Mockito.reset(mockDriver2);
+//      Mockito.when(mockDriver2.getInfo()).thenReturn(instanceId2);
+//
+//      DriverTask testTask1 = new DriverTask(mockDriver1, 100L, status);
+//
+//      DriverTask testTask2 = new DriverTask(mockDriver2, 100L, DriverTaskStatus.BLOCKED);
+//      Set<DriverTask> taskSet = new HashSet<>();
+//      taskSet.add(testTask1);
+//      taskSet.add(testTask2);
+//      manager.getQueryMap().put(queryId, taskSet);
+//      manager.getTimeoutQueue().push(testTask1);
+//      defaultScheduler.toAborted(testTask1);
+//
+//      Mockito.reset(mockMppServiceClient);
+//      Mockito.verify(mockMPPDataExchangeManager, Mockito.times(2))
+//          .forceDeregisterFragmentInstance(Mockito.any());
+//      Mockito.reset(mockMPPDataExchangeManager);
+//
+//      // An aborted fragment may cause others in the same query aborted.
+//      Assert.assertEquals(DriverTaskStatus.ABORTED, testTask1.getStatus());
+//      Assert.assertEquals(DriverTaskStatus.ABORTED, testTask2.getStatus());
+//      Assert.assertFalse(manager.getBlockedTasks().contains(testTask1));
+//      Assert.assertFalse(manager.getBlockedTasks().contains(testTask2));
+//      Assert.assertNull(manager.getReadyQueue().get(testTask1.getId()));
+//      Assert.assertNull(manager.getReadyQueue().get(testTask2.getId()));
+//      Assert.assertNull(manager.getTimeoutQueue().get(testTask1.getId()));
+//      Assert.assertNull(manager.getTimeoutQueue().get(testTask2.getId()));
+//      Assert.assertFalse(manager.getQueryMap().containsKey(queryId));
+//
+//      // The mockDriver1.failed() will be called outside the scheduler
+//      Mockito.verify(mockDriver1, Mockito.never()).failed(Mockito.any());
+//      Mockito.verify(mockDriver2, Mockito.times(1)).failed(Mockito.any());
+//
+//      clear();
+//    }
+//  }
+//
+//  private void clear() {
+//    manager.getQueryMap().clear();
+//    manager.getBlockedTasks().clear();
+//    manager.getReadyQueue().clear();
+//    manager.getTimeoutQueue().clear();
+//  }
+//}