You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/28 06:09:42 UTC

[iotdb] branch master updated: Add javadoc and rename some class in FragmentInstanceManager and Driver related (#5701)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 29944af738 Add javadoc and rename some class in FragmentInstanceManager and Driver related (#5701)
29944af738 is described below

commit 29944af7386d568cc5f4056a9fa09720b612d1d6
Author: Jialin Qiao <qj...@mails.tsinghua.edu.cn>
AuthorDate: Thu Apr 28 14:09:36 2022 +0800

    Add javadoc and rename some class in FragmentInstanceManager and Driver related (#5701)
---
 .../apache/iotdb/db/mpp/execution/DataDriver.java  |   4 +-
 .../org/apache/iotdb/db/mpp/execution/Driver.java  |  30 ----
 .../mpp/execution/FragmentInstanceExecution.java   |  16 +-
 .../db/mpp/execution/FragmentInstanceManager.java  |   6 +-
 .../org/apache/iotdb/db/mpp/execution/IDriver.java |  31 ++++
 ...ractExecutor.java => AbstractDriverThread.java} |  19 ++-
 ...InstanceScheduler.java => DriverScheduler.java} | 126 ++++++++-------
 ...anceTaskExecutor.java => DriverTaskThread.java} |  12 +-
 ...l.java => DriverTaskTimeoutSentinelThread.java} |  12 +-
 .../iotdb/db/mpp/schedule/ExecutionContext.java    |   4 +-
 ...nstanceScheduler.java => IDriverScheduler.java} |   7 +-
 .../iotdb/db/mpp/schedule/ITaskScheduler.java      |  35 ++---
 .../{FragmentInstanceTask.java => DriverTask.java} |  40 +++--
 ...agmentInstanceTaskID.java => DriverTaskID.java} |   8 +-
 ...stanceTaskStatus.java => DriverTaskStatus.java} |   4 +-
 .../java/org/apache/iotdb/db/service/DataNode.java |   4 +-
 .../java/org/apache/iotdb/db/service/IoTDB.java    |   4 +-
 .../iotdb/db/mpp/execution/DataDriverTest.java     |   2 +-
 .../db/mpp/schedule/DefaultTaskSchedulerTest.java  | 169 ++++++++++-----------
 ...SchedulerTest.java => DriverSchedulerTest.java} |  52 +++----
 ...va => DriverTaskTimeoutSentinelThreadTest.java} | 115 ++++++--------
 21 files changed, 327 insertions(+), 373 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
index e6e8e8083a..899d9d2e0e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
@@ -99,7 +99,7 @@ public class DataDriver extends Driver {
     List<DataSourceOperator> sourceOperators =
         ((DataDriverContext) driverContext).getSourceOperators();
     if (sourceOperators != null && !sourceOperators.isEmpty()) {
-      QueryDataSource dataSource = initQueryDataSourceCache();
+      QueryDataSource dataSource = initQueryDataSource();
       sourceOperators.forEach(
           sourceOperator -> {
             // construct QueryDataSource for source operator
@@ -119,7 +119,7 @@ public class DataDriver extends Driver {
    * The method is called in mergeLock() when executing query. This method will get all the
    * QueryDataSource needed for this query
    */
-  public QueryDataSource initQueryDataSourceCache() throws QueryProcessException {
+  private QueryDataSource initQueryDataSource() throws QueryProcessException {
     DataDriverContext context = (DataDriverContext) driverContext;
     DataRegion dataRegion = context.getDataRegion();
     dataRegion.readLock();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
index 40da49ff15..ae220e4911 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
@@ -47,10 +47,6 @@ import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 import static java.lang.Boolean.TRUE;
 import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
 
-/**
- * Driver encapsulates some methods which are necessary for execution scheduler to run a fragment
- * instance
- */
 public abstract class Driver implements IDriver {
 
   protected static final Logger LOGGER = LoggerFactory.getLogger(Driver.class);
@@ -83,11 +79,6 @@ public abstract class Driver implements IDriver {
     driverBlockedFuture.set(future);
   }
 
-  /**
-   * Used to judge whether this fragment instance should be scheduled for execution anymore
-   *
-   * @return true if the FragmentInstance is done or terminated due to failure, otherwise false.
-   */
   @Override
   public boolean isFinished() {
     checkLockNotHeld("Cannot check finished status while holding the driver lock");
@@ -107,16 +98,6 @@ public abstract class Driver implements IDriver {
   /** release resource this driver used */
   protected abstract void releaseResource();
 
-  /**
-   * run the fragment instance for {@param duration} time slice, the time of this run is likely not
-   * to be equal to {@param duration}, the actual run time should be calculated by the caller
-   *
-   * @param duration how long should this fragment instance run
-   * @return the returned ListenableFuture<Void> is used to represent status of this processing if
-   *     isDone() return true, meaning that this fragment instance is not blocked and is ready for
-   *     next processing otherwise, meaning that this fragment instance is blocked and not ready for
-   *     next processing.
-   */
   @Override
   public ListenableFuture<Void> processFor(Duration duration) {
 
@@ -154,17 +135,11 @@ public abstract class Driver implements IDriver {
     return result.orElse(NOT_BLOCKED);
   }
 
-  /**
-   * the id information about this Fragment Instance.
-   *
-   * @return a {@link FragmentInstanceId} instance.
-   */
   @Override
   public FragmentInstanceId getInfo() {
     return driverContext.getId();
   }
 
-  /** clear resource used by this fragment instance */
   @Override
   public void close() {
     // mark the service for destruction
@@ -178,11 +153,6 @@ public abstract class Driver implements IDriver {
     tryWithLockUnInterruptibly(() -> TRUE);
   }
 
-  /**
-   * fail current driver
-   *
-   * @param t reason cause this failure
-   */
   @Override
   public void failed(Throwable t) {
     driverContext.failed(t);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
index 15c85c4d99..be2fcc6d68 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.mpp.execution;
 
 import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.schedule.IFragmentInstanceScheduler;
+import org.apache.iotdb.db.mpp.schedule.IDriverScheduler;
 
 import com.google.common.collect.ImmutableList;
 import io.airlift.stats.CounterStat;
@@ -30,8 +30,6 @@ import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FAILED;
 
 public class FragmentInstanceExecution {
 
-  private final IFragmentInstanceScheduler scheduler;
-
   private final FragmentInstanceId instanceId;
   private final FragmentInstanceContext context;
 
@@ -44,31 +42,29 @@ public class FragmentInstanceExecution {
   private long lastHeartbeat;
 
   public static FragmentInstanceExecution createFragmentInstanceExecution(
-      IFragmentInstanceScheduler scheduler,
+      IDriverScheduler scheduler,
       FragmentInstanceId instanceId,
       FragmentInstanceContext context,
       IDriver driver,
       FragmentInstanceStateMachine stateMachine,
       CounterStat failedInstances) {
     FragmentInstanceExecution execution =
-        new FragmentInstanceExecution(scheduler, instanceId, context, driver, stateMachine);
-    execution.initialize(failedInstances);
+        new FragmentInstanceExecution(instanceId, context, driver, stateMachine);
+    execution.initialize(failedInstances, scheduler);
+    scheduler.submitDrivers(instanceId.getQueryId(), ImmutableList.of(driver));
     return execution;
   }
 
   private FragmentInstanceExecution(
-      IFragmentInstanceScheduler scheduler,
       FragmentInstanceId instanceId,
       FragmentInstanceContext context,
       IDriver driver,
       FragmentInstanceStateMachine stateMachine) {
-    this.scheduler = scheduler;
     this.instanceId = instanceId;
     this.context = context;
     this.driver = driver;
     this.sinkHandle = driver.getSinkHandle();
     this.stateMachine = stateMachine;
-    scheduler.submitFragmentInstances(instanceId.getQueryId(), ImmutableList.of(driver));
   }
 
   public void recordHeartbeat() {
@@ -101,7 +97,7 @@ public class FragmentInstanceExecution {
   }
 
   // this is a separate method to ensure that the `this` reference is not leaked during construction
-  private void initialize(CounterStat failedInstances) {
+  private void initialize(CounterStat failedInstances, IDriverScheduler scheduler) {
     requireNonNull(failedInstances, "failedInstances is null");
     stateMachine.addStateChangeListener(
         newState -> {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
index 438e1bea0e..a3abfe65e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
@@ -22,8 +22,8 @@ import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.schedule.FragmentInstanceScheduler;
-import org.apache.iotdb.db.mpp.schedule.IFragmentInstanceScheduler;
+import org.apache.iotdb.db.mpp.schedule.DriverScheduler;
+import org.apache.iotdb.db.mpp.schedule.IDriverScheduler;
 import org.apache.iotdb.db.mpp.sql.planner.LocalExecutionPlanner;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 
@@ -49,7 +49,7 @@ public class FragmentInstanceManager {
   private final Map<FragmentInstanceId, FragmentInstanceContext> instanceContext;
   private final Map<FragmentInstanceId, FragmentInstanceExecution> instanceExecution;
   private final LocalExecutionPlanner planner = LocalExecutionPlanner.getInstance();
-  private final IFragmentInstanceScheduler scheduler = FragmentInstanceScheduler.getInstance();
+  private final IDriverScheduler scheduler = DriverScheduler.getInstance();
 
   private final ScheduledExecutorService instanceManagementExecutor;
   private final ExecutorService instanceNotificationExecutor;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/IDriver.java
index 54c31901a2..61e96dbe42 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/IDriver.java
@@ -24,17 +24,48 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import com.google.common.util.concurrent.ListenableFuture;
 import io.airlift.units.Duration;
 
+/**
+ * IDriver encapsulates some methods which are necessary for FragmentInstanceTaskExecutor to run a
+ * fragment instance
+ */
 public interface IDriver {
 
+  /**
+   * Used to judge whether this IDriver should be scheduled for execution anymore
+   *
+   * @return true if the IDriver is done or terminated due to failure, otherwise false.
+   */
   boolean isFinished();
 
+  /**
+   * run the IDriver for {@param duration} time slice, the time of this run is likely not to be
+   * equal to {@param duration}, the actual run time should be calculated by the caller
+   *
+   * @param duration how long should this IDriver run
+   * @return the returned ListenableFuture<Void> is used to represent status of this processing if
+   *     isDone() return true, meaning that this IDriver is not blocked and is ready for next
+   *     processing. Otherwise, meaning that this IDriver is blocked and not ready for next
+   *     processing.
+   */
   ListenableFuture<Void> processFor(Duration duration);
 
+  /**
+   * the id information about this IDriver.
+   *
+   * @return a {@link FragmentInstanceId} instance.
+   */
   FragmentInstanceId getInfo();
 
+  /** clear resource used by this fragment instance */
   void close();
 
+  /**
+   * fail current driver
+   *
+   * @param t reason cause this failure
+   */
   void failed(Throwable t);
 
+  /** @return get SinkHandle of current IDriver */
   ISinkHandle getSinkHandle();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/AbstractExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/AbstractDriverThread.java
similarity index 77%
rename from server/src/main/java/org/apache/iotdb/db/mpp/schedule/AbstractExecutor.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/AbstractDriverThread.java
index d3df9bbb1e..c79304b0aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/AbstractExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/AbstractDriverThread.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.db.mpp.schedule;
 
 import org.apache.iotdb.db.mpp.schedule.queue.IndexedBlockingQueue;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
+import org.apache.iotdb.db.mpp.schedule.task.DriverTask;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -28,18 +28,18 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.ExecutionException;
 
-/** an abstract executor for {@link FragmentInstanceTask} */
-public abstract class AbstractExecutor extends Thread implements Closeable {
+/** an abstract executor for {@link DriverTask} */
+public abstract class AbstractDriverThread extends Thread implements Closeable {
 
-  private static final Logger logger = LoggerFactory.getLogger(AbstractExecutor.class);
-  private final IndexedBlockingQueue<FragmentInstanceTask> queue;
+  private static final Logger logger = LoggerFactory.getLogger(AbstractDriverThread.class);
+  private final IndexedBlockingQueue<DriverTask> queue;
   protected final ITaskScheduler scheduler;
   private volatile boolean closed;
 
-  public AbstractExecutor(
+  public AbstractDriverThread(
       String workerId,
       ThreadGroup tg,
-      IndexedBlockingQueue<FragmentInstanceTask> queue,
+      IndexedBlockingQueue<DriverTask> queue,
       ITaskScheduler scheduler) {
     super(tg, workerId);
     this.queue = queue;
@@ -51,7 +51,7 @@ public abstract class AbstractExecutor extends Thread implements Closeable {
   public void run() {
     while (!closed && !Thread.currentThread().isInterrupted()) {
       try {
-        FragmentInstanceTask next = queue.poll();
+        DriverTask next = queue.poll();
         execute(next);
       } catch (InterruptedException e) {
         break;
@@ -62,8 +62,7 @@ public abstract class AbstractExecutor extends Thread implements Closeable {
   }
 
   /** Processing a task. */
-  protected abstract void execute(FragmentInstanceTask task)
-      throws InterruptedException, ExecutionException;
+  protected abstract void execute(DriverTask task) throws InterruptedException, ExecutionException;
 
   @Override
   public void close() throws IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/DriverScheduler.java
similarity index 65%
rename from server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/DriverScheduler.java
index 880041924a..fadf51400d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/DriverScheduler.java
@@ -30,9 +30,9 @@ import org.apache.iotdb.db.mpp.execution.IDriver;
 import org.apache.iotdb.db.mpp.schedule.queue.IndexedBlockingQueue;
 import org.apache.iotdb.db.mpp.schedule.queue.L1PriorityQueue;
 import org.apache.iotdb.db.mpp.schedule.queue.L2PriorityQueue;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskID;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskStatus;
+import org.apache.iotdb.db.mpp.schedule.task.DriverTask;
+import org.apache.iotdb.db.mpp.schedule.task.DriverTaskID;
+import org.apache.iotdb.db.mpp.schedule.task.DriverTaskStatus;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 
 import org.slf4j.Logger;
@@ -49,18 +49,18 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 /** the manager of fragment instances scheduling */
-public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IService {
+public class DriverScheduler implements IDriverScheduler, IService {
 
-  private static final Logger logger = LoggerFactory.getLogger(FragmentInstanceScheduler.class);
+  private static final Logger logger = LoggerFactory.getLogger(DriverScheduler.class);
 
-  public static FragmentInstanceScheduler getInstance() {
+  public static DriverScheduler getInstance() {
     return InstanceHolder.instance;
   }
 
-  private final IndexedBlockingQueue<FragmentInstanceTask> readyQueue;
-  private final IndexedBlockingQueue<FragmentInstanceTask> timeoutQueue;
-  private final Set<FragmentInstanceTask> blockedTasks;
-  private final Map<QueryId, Set<FragmentInstanceTask>> queryMap;
+  private final IndexedBlockingQueue<DriverTask> readyQueue;
+  private final IndexedBlockingQueue<DriverTask> timeoutQueue;
+  private final Set<DriverTask> blockedTasks;
+  private final Map<QueryId, Set<DriverTask>> queryMap;
   private final ITaskScheduler scheduler;
   private IDataBlockManager blockManager; // TODO: init with real IDataBlockManager
 
@@ -68,17 +68,14 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
   private static final int WORKER_THREAD_NUM = 4; // TODO: load from config files
   private static final int QUERY_TIMEOUT_MS = 10_000; // TODO: load from config files or requests
   private final ThreadGroup workerGroups;
-  private final List<AbstractExecutor> threads;
+  private final List<AbstractDriverThread> threads;
 
-  private FragmentInstanceScheduler() {
+  private DriverScheduler() {
     this.readyQueue =
         new L2PriorityQueue<>(
-            MAX_CAPACITY,
-            new FragmentInstanceTask.SchedulePriorityComparator(),
-            new FragmentInstanceTask());
+            MAX_CAPACITY, new DriverTask.SchedulePriorityComparator(), new DriverTask());
     this.timeoutQueue =
-        new L1PriorityQueue<>(
-            MAX_CAPACITY, new FragmentInstanceTask.TimeoutComparator(), new FragmentInstanceTask());
+        new L1PriorityQueue<>(MAX_CAPACITY, new DriverTask.TimeoutComparator(), new DriverTask());
     this.queryMap = new ConcurrentHashMap<>();
     this.blockedTasks = Collections.synchronizedSet(new HashSet<>());
     this.scheduler = new Scheduler();
@@ -90,14 +87,13 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
   @Override
   public void start() throws StartupException {
     for (int i = 0; i < WORKER_THREAD_NUM; i++) {
-      AbstractExecutor t =
-          new FragmentInstanceTaskExecutor(
-              "Worker-Thread-" + i, workerGroups, readyQueue, scheduler);
+      AbstractDriverThread t =
+          new DriverTaskThread("Worker-Thread-" + i, workerGroups, readyQueue, scheduler);
       threads.add(t);
       t.start();
     }
-    AbstractExecutor t =
-        new FragmentInstanceTimeoutSentinel(
+    AbstractDriverThread t =
+        new DriverTaskTimeoutSentinelThread(
             "Sentinel-Thread", workerGroups, timeoutQueue, scheduler);
     threads.add(t);
     t.start();
@@ -121,20 +117,18 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
   }
 
   @Override
-  public void submitFragmentInstances(QueryId queryId, List<IDriver> instances) {
-    List<FragmentInstanceTask> tasks =
+  public void submitDrivers(QueryId queryId, List<IDriver> instances) {
+    List<DriverTask> tasks =
         instances.stream()
-            .map(
-                v ->
-                    new FragmentInstanceTask(v, QUERY_TIMEOUT_MS, FragmentInstanceTaskStatus.READY))
+            .map(v -> new DriverTask(v, QUERY_TIMEOUT_MS, DriverTaskStatus.READY))
             .collect(Collectors.toList());
     queryMap
         .computeIfAbsent(queryId, v -> Collections.synchronizedSet(new HashSet<>()))
         .addAll(tasks);
-    for (FragmentInstanceTask task : tasks) {
+    for (DriverTask task : tasks) {
       task.lock();
       try {
-        if (task.getStatus() != FragmentInstanceTaskStatus.READY) {
+        if (task.getStatus() != DriverTaskStatus.READY) {
           continue;
         }
         timeoutQueue.push(task);
@@ -147,13 +141,13 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
 
   @Override
   public void abortQuery(QueryId queryId) {
-    Set<FragmentInstanceTask> queryRelatedTasks = queryMap.remove(queryId);
+    Set<DriverTask> queryRelatedTasks = queryMap.remove(queryId);
     if (queryRelatedTasks != null) {
-      for (FragmentInstanceTask task : queryRelatedTasks) {
+      for (DriverTask task : queryRelatedTasks) {
         task.lock();
         try {
           task.setAbortCause(FragmentInstanceAbortedException.BY_QUERY_CASCADING_ABORTED);
-          clearFragmentInstanceTask(task);
+          clearDriverTask(task);
         } finally {
           task.unlock();
         }
@@ -163,14 +157,14 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
 
   @Override
   public void abortFragmentInstance(FragmentInstanceId instanceId) {
-    FragmentInstanceTask task = timeoutQueue.get(new FragmentInstanceTaskID(instanceId));
+    DriverTask task = timeoutQueue.get(new DriverTaskID(instanceId));
     if (task == null) {
       return;
     }
     task.lock();
     try {
       task.setAbortCause(FragmentInstanceAbortedException.BY_FRAGMENT_ABORT_CALLED);
-      clearFragmentInstanceTask(task);
+      clearDriverTask(task);
     } finally {
       task.unlock();
     }
@@ -178,7 +172,7 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
 
   @Override
   public double getSchedulePriority(FragmentInstanceId instanceId) {
-    FragmentInstanceTask task = timeoutQueue.get(new FragmentInstanceTaskID(instanceId));
+    DriverTask task = timeoutQueue.get(new DriverTaskID(instanceId));
     if (task == null) {
       throw new IllegalStateException(
           "the fragmentInstance " + instanceId.getFullId() + " has been cleared");
@@ -186,9 +180,9 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
     return task.getSchedulePriority();
   }
 
-  private void clearFragmentInstanceTask(FragmentInstanceTask task) {
-    if (task.getStatus() != FragmentInstanceTaskStatus.FINISHED) {
-      task.setStatus(FragmentInstanceTaskStatus.ABORTED);
+  private void clearDriverTask(DriverTask task) {
+    if (task.getStatus() != DriverTaskStatus.FINISHED) {
+      task.setStatus(DriverTaskStatus.ABORTED);
     }
     if (task.getAbortCause() != null) {
       task.getFragmentInstance()
@@ -196,7 +190,7 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
               new FragmentInstanceAbortedException(
                   task.getFragmentInstance().getInfo(), task.getAbortCause()));
     }
-    if (task.getStatus() == FragmentInstanceTaskStatus.ABORTED) {
+    if (task.getStatus() == DriverTaskStatus.ABORTED) {
       blockManager.forceDeregisterFragmentInstance(
           new TFragmentInstanceId(
               task.getId().getQueryId().getId(),
@@ -206,7 +200,7 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
     readyQueue.remove(task.getId());
     timeoutQueue.remove(task.getId());
     blockedTasks.remove(task);
-    Set<FragmentInstanceTask> tasks = queryMap.get(task.getId().getQueryId());
+    Set<DriverTask> tasks = queryMap.get(task.getId().getQueryId());
     if (tasks != null) {
       tasks.remove(task);
       if (tasks.isEmpty()) {
@@ -220,22 +214,22 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
   }
 
   @TestOnly
-  IndexedBlockingQueue<FragmentInstanceTask> getReadyQueue() {
+  IndexedBlockingQueue<DriverTask> getReadyQueue() {
     return readyQueue;
   }
 
   @TestOnly
-  IndexedBlockingQueue<FragmentInstanceTask> getTimeoutQueue() {
+  IndexedBlockingQueue<DriverTask> getTimeoutQueue() {
     return timeoutQueue;
   }
 
   @TestOnly
-  Set<FragmentInstanceTask> getBlockedTasks() {
+  Set<DriverTask> getBlockedTasks() {
     return blockedTasks;
   }
 
   @TestOnly
-  Map<QueryId, Set<FragmentInstanceTask>> getQueryMap() {
+  Map<QueryId, Set<DriverTask>> getQueryMap() {
     return queryMap;
   }
 
@@ -248,18 +242,18 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
 
     private InstanceHolder() {}
 
-    private static final FragmentInstanceScheduler instance = new FragmentInstanceScheduler();
+    private static final DriverScheduler instance = new DriverScheduler();
   }
   /** the default scheduler implementation */
   private class Scheduler implements ITaskScheduler {
     @Override
-    public void blockedToReady(FragmentInstanceTask task) {
+    public void blockedToReady(DriverTask task) {
       task.lock();
       try {
-        if (task.getStatus() != FragmentInstanceTaskStatus.BLOCKED) {
+        if (task.getStatus() != DriverTaskStatus.BLOCKED) {
           return;
         }
-        task.setStatus(FragmentInstanceTaskStatus.READY);
+        task.setStatus(DriverTaskStatus.READY);
         readyQueue.push(task);
         blockedTasks.remove(task);
       } finally {
@@ -268,13 +262,13 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
     }
 
     @Override
-    public boolean readyToRunning(FragmentInstanceTask task) {
+    public boolean readyToRunning(DriverTask task) {
       task.lock();
       try {
-        if (task.getStatus() != FragmentInstanceTaskStatus.READY) {
+        if (task.getStatus() != DriverTaskStatus.READY) {
           return false;
         }
-        task.setStatus(FragmentInstanceTaskStatus.RUNNING);
+        task.setStatus(DriverTaskStatus.RUNNING);
       } finally {
         task.unlock();
       }
@@ -282,14 +276,14 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
     }
 
     @Override
-    public void runningToReady(FragmentInstanceTask task, ExecutionContext context) {
+    public void runningToReady(DriverTask task, ExecutionContext context) {
       task.lock();
       try {
-        if (task.getStatus() != FragmentInstanceTaskStatus.RUNNING) {
+        if (task.getStatus() != DriverTaskStatus.RUNNING) {
           return;
         }
         task.updateSchedulePriority(context);
-        task.setStatus(FragmentInstanceTaskStatus.READY);
+        task.setStatus(DriverTaskStatus.READY);
         readyQueue.push(task);
       } finally {
         task.unlock();
@@ -297,14 +291,14 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
     }
 
     @Override
-    public void runningToBlocked(FragmentInstanceTask task, ExecutionContext context) {
+    public void runningToBlocked(DriverTask task, ExecutionContext context) {
       task.lock();
       try {
-        if (task.getStatus() != FragmentInstanceTaskStatus.RUNNING) {
+        if (task.getStatus() != DriverTaskStatus.RUNNING) {
           return;
         }
         task.updateSchedulePriority(context);
-        task.setStatus(FragmentInstanceTaskStatus.BLOCKED);
+        task.setStatus(DriverTaskStatus.BLOCKED);
         blockedTasks.add(task);
       } finally {
         task.unlock();
@@ -312,22 +306,22 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
     }
 
     @Override
-    public void runningToFinished(FragmentInstanceTask task, ExecutionContext context) {
+    public void runningToFinished(DriverTask task, ExecutionContext context) {
       task.lock();
       try {
-        if (task.getStatus() != FragmentInstanceTaskStatus.RUNNING) {
+        if (task.getStatus() != DriverTaskStatus.RUNNING) {
           return;
         }
         task.updateSchedulePriority(context);
-        task.setStatus(FragmentInstanceTaskStatus.FINISHED);
-        clearFragmentInstanceTask(task);
+        task.setStatus(DriverTaskStatus.FINISHED);
+        clearDriverTask(task);
       } finally {
         task.unlock();
       }
     }
 
     @Override
-    public void toAborted(FragmentInstanceTask task) {
+    public void toAborted(DriverTask task) {
       task.lock();
       try {
         // If a task is already in an end state, it indicates that the task is finalized in other
@@ -338,21 +332,21 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
         logger.warn(
             "The task {} is aborted. All other tasks in the same query will be cancelled",
             task.getId().toString());
-        clearFragmentInstanceTask(task);
+        clearDriverTask(task);
       } finally {
         task.unlock();
       }
       QueryId queryId = task.getId().getQueryId();
-      Set<FragmentInstanceTask> queryRelatedTasks = queryMap.remove(queryId);
+      Set<DriverTask> queryRelatedTasks = queryMap.remove(queryId);
       if (queryRelatedTasks != null) {
-        for (FragmentInstanceTask otherTask : queryRelatedTasks) {
+        for (DriverTask otherTask : queryRelatedTasks) {
           if (task.equals(otherTask)) {
             continue;
           }
           otherTask.lock();
           try {
             otherTask.setAbortCause(FragmentInstanceAbortedException.BY_QUERY_CASCADING_ABORTED);
-            clearFragmentInstanceTask(otherTask);
+            clearDriverTask(otherTask);
           } finally {
             otherTask.unlock();
           }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/DriverTaskThread.java
similarity index 88%
rename from server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/DriverTaskThread.java
index 25b4e29a0a..9c227cf100 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/DriverTaskThread.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.mpp.schedule;
 
 import org.apache.iotdb.db.mpp.execution.IDriver;
 import org.apache.iotdb.db.mpp.schedule.queue.IndexedBlockingQueue;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
+import org.apache.iotdb.db.mpp.schedule.task.DriverTask;
 import org.apache.iotdb.db.utils.stats.CpuTimer;
 
 import com.google.common.util.concurrent.ListenableFuture;
@@ -30,24 +30,24 @@ import io.airlift.units.Duration;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
-/** the worker thread of {@link FragmentInstanceTask} */
-public class FragmentInstanceTaskExecutor extends AbstractExecutor {
+/** the worker thread of {@link DriverTask} */
+public class DriverTaskThread extends AbstractDriverThread {
 
   public static final Duration EXECUTION_TIME_SLICE = new Duration(100, TimeUnit.MILLISECONDS);
 
   // As the callback is lightweight enough, there's no need to use another one thread to execute.
   private static final Executor listeningExecutor = MoreExecutors.directExecutor();
 
-  public FragmentInstanceTaskExecutor(
+  public DriverTaskThread(
       String workerId,
       ThreadGroup tg,
-      IndexedBlockingQueue<FragmentInstanceTask> queue,
+      IndexedBlockingQueue<DriverTask> queue,
       ITaskScheduler scheduler) {
     super(workerId, tg, queue, scheduler);
   }
 
   @Override
-  public void execute(FragmentInstanceTask task) throws InterruptedException {
+  public void execute(DriverTask task) throws InterruptedException {
     // try to switch it to RUNNING
     if (!scheduler.readyToRunning(task)) {
       return;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinel.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/DriverTaskTimeoutSentinelThread.java
similarity index 81%
rename from server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinel.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/DriverTaskTimeoutSentinelThread.java
index e7aaaf4e47..d0b691f09d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinel.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/DriverTaskTimeoutSentinelThread.java
@@ -19,21 +19,21 @@
 package org.apache.iotdb.db.mpp.schedule;
 
 import org.apache.iotdb.db.mpp.schedule.queue.IndexedBlockingQueue;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
+import org.apache.iotdb.db.mpp.schedule.task.DriverTask;
 
-/** the thread for watching the timeout of {@link FragmentInstanceTask} */
-public class FragmentInstanceTimeoutSentinel extends AbstractExecutor {
+/** the thread for watching the timeout of {@link DriverTask} */
+public class DriverTaskTimeoutSentinelThread extends AbstractDriverThread {
 
-  public FragmentInstanceTimeoutSentinel(
+  public DriverTaskTimeoutSentinelThread(
       String workerId,
       ThreadGroup tg,
-      IndexedBlockingQueue<FragmentInstanceTask> queue,
+      IndexedBlockingQueue<DriverTask> queue,
       ITaskScheduler scheduler) {
     super(workerId, tg, queue, scheduler);
   }
 
   @Override
-  public void execute(FragmentInstanceTask task) throws InterruptedException {
+  public void execute(DriverTask task) throws InterruptedException {
     task.lock();
     try {
       // if this task is already in an end state, it means that the resource releasing will be
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/ExecutionContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/ExecutionContext.java
index e8cd091d2d..10d9f3d597 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/ExecutionContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/ExecutionContext.java
@@ -18,12 +18,12 @@
  */
 package org.apache.iotdb.db.mpp.schedule;
 
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
+import org.apache.iotdb.db.mpp.schedule.task.DriverTask;
 import org.apache.iotdb.db.utils.stats.CpuTimer;
 
 import io.airlift.units.Duration;
 
-/** The execution context of a {@link FragmentInstanceTask} */
+/** The execution context of a {@link DriverTask} */
 public class ExecutionContext {
   private CpuTimer.CpuDuration cpuDuration;
   private Duration timeSlice;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IDriverScheduler.java
similarity index 90%
rename from server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceScheduler.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/IDriverScheduler.java
index b8d5970ee9..30dc6aa38c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IDriverScheduler.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.db.mpp.execution.IDriver;
 import java.util.List;
 
 /** the interface of fragment instance scheduling */
-public interface IFragmentInstanceScheduler {
+public interface IDriverScheduler {
 
   /**
    * Submit one or more {@link org.apache.iotdb.db.mpp.execution.IDriver} in one query for later
@@ -34,7 +34,7 @@ public interface IFragmentInstanceScheduler {
    * @param queryId the queryId these instances belong to.
    * @param instances the submitted instances.
    */
-  void submitFragmentInstances(QueryId queryId, List<IDriver> instances);
+  void submitDrivers(QueryId queryId, List<IDriver> instances);
 
   /**
    * Abort all the instances in this query.
@@ -44,7 +44,8 @@ public interface IFragmentInstanceScheduler {
   void abortQuery(QueryId queryId);
 
   /**
-   * Abort the fragment instance. If the instance is not existed, nothing will happen.
+   * Abort all Drivers of the fragment instance. If the instance is not existed, nothing will
+   * happen.
    *
    * @param instanceId the id of the fragment instance to be aborted.
    */
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/ITaskScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/ITaskScheduler.java
index f4a008b4a2..641aba8c7f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/ITaskScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/ITaskScheduler.java
@@ -18,60 +18,55 @@
  */
 package org.apache.iotdb.db.mpp.schedule;
 
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskStatus;
+import org.apache.iotdb.db.mpp.schedule.task.DriverTask;
+import org.apache.iotdb.db.mpp.schedule.task.DriverTaskStatus;
 
-/** the scheduler interface of {@link FragmentInstanceTask} */
+/** the scheduler interface of {@link DriverTask} */
 public interface ITaskScheduler {
 
   /**
-   * Switch a task from {@link FragmentInstanceTaskStatus#BLOCKED} to {@link
-   * FragmentInstanceTaskStatus#READY}.
+   * Switch a task from {@link DriverTaskStatus#BLOCKED} to {@link DriverTaskStatus#READY}.
    *
    * @param task the task to be switched.
    */
-  void blockedToReady(FragmentInstanceTask task);
+  void blockedToReady(DriverTask task);
 
   /**
-   * Switch a task from {@link FragmentInstanceTaskStatus#READY} to {@link
-   * FragmentInstanceTaskStatus#RUNNING}.
+   * Switch a task from {@link DriverTaskStatus#READY} to {@link DriverTaskStatus#RUNNING}.
    *
    * @param task the task to be switched.
    * @return true if it's switched to the target status successfully, otherwise false.
    */
-  boolean readyToRunning(FragmentInstanceTask task);
+  boolean readyToRunning(DriverTask task);
 
   /**
-   * Switch a task from {@link FragmentInstanceTaskStatus#RUNNING} to {@link
-   * FragmentInstanceTaskStatus#READY}.
+   * Switch a task from {@link DriverTaskStatus#RUNNING} to {@link DriverTaskStatus#READY}.
    *
    * @param task the task to be switched.
    * @param context the execution context of last running.
    */
-  void runningToReady(FragmentInstanceTask task, ExecutionContext context);
+  void runningToReady(DriverTask task, ExecutionContext context);
 
   /**
-   * Switch a task from {@link FragmentInstanceTaskStatus#RUNNING} to {@link
-   * FragmentInstanceTaskStatus#BLOCKED}.
+   * Switch a task from {@link DriverTaskStatus#RUNNING} to {@link DriverTaskStatus#BLOCKED}.
    *
    * @param task the task to be switched.
    * @param context the execution context of last running.
    */
-  void runningToBlocked(FragmentInstanceTask task, ExecutionContext context);
+  void runningToBlocked(DriverTask task, ExecutionContext context);
 
   /**
-   * Switch a task from {@link FragmentInstanceTaskStatus#RUNNING} to {@link
-   * FragmentInstanceTaskStatus#FINISHED}.
+   * Switch a task from {@link DriverTaskStatus#RUNNING} to {@link DriverTaskStatus#FINISHED}.
    *
    * @param task the task to be switched.
    * @param context the execution context of last running.
    */
-  void runningToFinished(FragmentInstanceTask task, ExecutionContext context);
+  void runningToFinished(DriverTask task, ExecutionContext context);
 
   /**
-   * Switch a task to {@link FragmentInstanceTaskStatus#ABORTED}.
+   * Switch a task to {@link DriverTaskStatus#ABORTED}.
    *
    * @param task the task to be switched.
    */
-  void toAborted(FragmentInstanceTask task);
+  void toAborted(DriverTask task);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/DriverTask.java
similarity index 81%
rename from server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/DriverTask.java
index fc322fee28..e008f8df1c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/DriverTask.java
@@ -23,8 +23,8 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.IDriver;
+import org.apache.iotdb.db.mpp.schedule.DriverTaskThread;
 import org.apache.iotdb.db.mpp.schedule.ExecutionContext;
-import org.apache.iotdb.db.mpp.schedule.FragmentInstanceTaskExecutor;
 import org.apache.iotdb.db.mpp.schedule.queue.ID;
 import org.apache.iotdb.db.mpp.schedule.queue.IDIndexedAccessible;
 
@@ -36,14 +36,11 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-/**
- * the scheduling element of {@link FragmentInstanceTaskExecutor}. It wraps a single
- * FragmentInstance.
- */
-public class FragmentInstanceTask implements IDIndexedAccessible {
+/** the scheduling element of {@link DriverTaskThread}. It wraps a single Driver. */
+public class DriverTask implements IDIndexedAccessible {
 
-  private FragmentInstanceTaskID id;
-  private FragmentInstanceTaskStatus status;
+  private DriverTaskID id;
+  private DriverTaskStatus status;
   private final IDriver fragmentInstance;
 
   // the higher this field is, the higher probability it will be scheduled.
@@ -57,42 +54,41 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
   private String abortCause;
 
   /** Initialize a dummy instance for queryHolder */
-  public FragmentInstanceTask() {
+  public DriverTask() {
     this(new StubFragmentInstance(), 0L, null);
   }
 
-  public FragmentInstanceTask(IDriver instance, long timeoutMs, FragmentInstanceTaskStatus status) {
+  public DriverTask(IDriver instance, long timeoutMs, DriverTaskStatus status) {
     this.fragmentInstance = instance;
-    this.id = new FragmentInstanceTaskID(instance.getInfo());
+    this.id = new DriverTaskID(instance.getInfo());
     this.setStatus(status);
     this.schedulePriority = 0.0D;
     this.ddl = System.currentTimeMillis() + timeoutMs;
     this.lock = new ReentrantLock();
   }
 
-  public FragmentInstanceTaskID getId() {
+  public DriverTaskID getId() {
     return id;
   }
 
   @Override
   public void setId(ID id) {
-    this.id = (FragmentInstanceTaskID) id;
+    this.id = (DriverTaskID) id;
   }
 
-  public FragmentInstanceTaskStatus getStatus() {
+  public DriverTaskStatus getStatus() {
     return status;
   }
 
   public boolean isEndState() {
-    return status == FragmentInstanceTaskStatus.ABORTED
-        || status == FragmentInstanceTaskStatus.FINISHED;
+    return status == DriverTaskStatus.ABORTED || status == DriverTaskStatus.FINISHED;
   }
 
   public IDriver getFragmentInstance() {
     return fragmentInstance;
   }
 
-  public void setStatus(FragmentInstanceTaskStatus status) {
+  public void setStatus(DriverTaskStatus status) {
     this.status = status;
   }
 
@@ -139,7 +135,7 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
 
   @Override
   public boolean equals(Object o) {
-    return o instanceof FragmentInstanceTask && ((FragmentInstanceTask) o).getId().equals(id);
+    return o instanceof DriverTask && ((DriverTask) o).getId().equals(id);
   }
 
   public String getAbortCause() {
@@ -151,10 +147,10 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
   }
 
   /** a comparator of ddl, the less the ddl is, the low order it has. */
-  public static class TimeoutComparator implements Comparator<FragmentInstanceTask> {
+  public static class TimeoutComparator implements Comparator<DriverTask> {
 
     @Override
-    public int compare(FragmentInstanceTask o1, FragmentInstanceTask o2) {
+    public int compare(DriverTask o1, DriverTask o2) {
       if (o1.getId().equals(o2.getId())) {
         return 0;
       }
@@ -169,10 +165,10 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
   }
 
   /** a comparator of ddl, the higher the schedulePriority is, the low order it has. */
-  public static class SchedulePriorityComparator implements Comparator<FragmentInstanceTask> {
+  public static class SchedulePriorityComparator implements Comparator<DriverTask> {
 
     @Override
-    public int compare(FragmentInstanceTask o1, FragmentInstanceTask o2) {
+    public int compare(DriverTask o1, DriverTask o2) {
       if (o1.getId().equals(o2.getId())) {
         return 0;
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTaskID.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/DriverTaskID.java
similarity index 85%
rename from server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTaskID.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/DriverTaskID.java
index a0081b177c..fe7421667b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTaskID.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/DriverTaskID.java
@@ -26,17 +26,17 @@ import org.apache.iotdb.db.mpp.schedule.queue.ID;
 import org.jetbrains.annotations.NotNull;
 
 /** the class of id of the fragment instance task */
-public class FragmentInstanceTaskID implements ID, Comparable<FragmentInstanceTaskID> {
+public class DriverTaskID implements ID, Comparable<DriverTaskID> {
 
   private final FragmentInstanceId id;
 
-  public FragmentInstanceTaskID(FragmentInstanceId id) {
+  public DriverTaskID(FragmentInstanceId id) {
     this.id = id;
   }
 
   @Override
   public boolean equals(Object o) {
-    return o instanceof FragmentInstanceTaskID && ((FragmentInstanceTaskID) o).id.equals(id);
+    return o instanceof DriverTaskID && ((DriverTaskID) o).id.equals(id);
   }
 
   @Override
@@ -62,7 +62,7 @@ public class FragmentInstanceTaskID implements ID, Comparable<FragmentInstanceTa
 
   // This is the default comparator of FragmentInstanceID
   @Override
-  public int compareTo(@NotNull FragmentInstanceTaskID o) {
+  public int compareTo(@NotNull DriverTaskID o) {
     return String.CASE_INSENSITIVE_ORDER.compare(this.toString(), o.toString());
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTaskStatus.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/DriverTaskStatus.java
similarity index 92%
rename from server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTaskStatus.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/DriverTaskStatus.java
index f50dc4dfdc..09dbeac9c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTaskStatus.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/DriverTaskStatus.java
@@ -18,8 +18,8 @@
  */
 package org.apache.iotdb.db.mpp.schedule.task;
 
-/** the status enum of {@link FragmentInstanceTask} */
-public enum FragmentInstanceTaskStatus {
+/** the status enum of {@link DriverTask} */
+public enum DriverTaskStatus {
   /* Ready to be executed */
   READY,
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index c2c835d0e8..a6b63d46e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -44,7 +44,7 @@ import org.apache.iotdb.db.engine.flush.FlushManager;
 import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.mpp.buffer.DataBlockService;
-import org.apache.iotdb.db.mpp.schedule.FragmentInstanceScheduler;
+import org.apache.iotdb.db.mpp.schedule.DriverScheduler;
 import org.apache.iotdb.db.protocol.influxdb.meta.InfluxDBMetaManager;
 import org.apache.iotdb.db.protocol.rest.RestService;
 import org.apache.iotdb.db.query.udf.service.TemporaryQueryDataFileService;
@@ -251,7 +251,7 @@ public class DataNode implements DataNodeMBean {
     registerManager.register(StorageEngineV2.getInstance());
     registerManager.register(DataBlockService.getInstance());
     registerManager.register(InternalService.getInstance());
-    registerManager.register(FragmentInstanceScheduler.getInstance());
+    registerManager.register(DriverScheduler.getInstance());
     IoTDBDescriptor.getInstance()
         .getConfig()
         .setRpcImplClassName(DataNodeTSIServiceImpl.class.getName());
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index ae8b8d0b1e..d60c167a00 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -40,7 +40,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.localconfignode.LocalConfigNode;
 import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
 import org.apache.iotdb.db.mpp.buffer.DataBlockService;
-import org.apache.iotdb.db.mpp.schedule.FragmentInstanceScheduler;
+import org.apache.iotdb.db.mpp.schedule.DriverScheduler;
 import org.apache.iotdb.db.protocol.influxdb.meta.InfluxDBMetaManager;
 import org.apache.iotdb.db.protocol.rest.RestService;
 import org.apache.iotdb.db.query.udf.service.TemporaryQueryDataFileService;
@@ -147,7 +147,7 @@ public class IoTDB implements IoTDBMBean {
       registerManager.register(StorageEngineV2.getInstance());
       registerManager.register(DataBlockService.getInstance());
       registerManager.register(InternalService.getInstance());
-      registerManager.register(FragmentInstanceScheduler.getInstance());
+      registerManager.register(DriverScheduler.getInstance());
       IoTDBDescriptor.getInstance()
           .getConfig()
           .setRpcImplClassName(DataNodeTSIServiceImpl.class.getName());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
index 65cca8e51c..5f03cd366a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
@@ -61,7 +61,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
 import static org.apache.iotdb.db.mpp.execution.FragmentInstanceContext.createFragmentInstanceContext;
-import static org.apache.iotdb.db.mpp.schedule.FragmentInstanceTaskExecutor.EXECUTION_TIME_SLICE;
+import static org.apache.iotdb.db.mpp.schedule.DriverTaskThread.EXECUTION_TIME_SLICE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
index a93ab988ea..213f742e69 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
@@ -23,8 +23,8 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.IDriver;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskStatus;
+import org.apache.iotdb.db.mpp.schedule.task.DriverTask;
+import org.apache.iotdb.db.mpp.schedule.task.DriverTaskStatus;
 import org.apache.iotdb.db.utils.stats.CpuTimer;
 import org.apache.iotdb.mpp.rpc.thrift.InternalService;
 
@@ -40,7 +40,7 @@ import java.util.concurrent.TimeUnit;
 
 public class DefaultTaskSchedulerTest {
 
-  private final FragmentInstanceScheduler manager = FragmentInstanceScheduler.getInstance();
+  private final DriverScheduler manager = DriverScheduler.getInstance();
 
   @After
   public void tearDown() {
@@ -57,17 +57,17 @@ public class DefaultTaskSchedulerTest {
     FragmentInstanceId instanceId =
         new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
     Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
-    FragmentInstanceTaskStatus[] invalidStates =
-        new FragmentInstanceTaskStatus[] {
-          FragmentInstanceTaskStatus.FINISHED,
-          FragmentInstanceTaskStatus.ABORTED,
-          FragmentInstanceTaskStatus.READY,
-          FragmentInstanceTaskStatus.RUNNING,
+    DriverTaskStatus[] invalidStates =
+        new DriverTaskStatus[] {
+          DriverTaskStatus.FINISHED,
+          DriverTaskStatus.ABORTED,
+          DriverTaskStatus.READY,
+          DriverTaskStatus.RUNNING,
         };
-    for (FragmentInstanceTaskStatus status : invalidStates) {
-      FragmentInstanceTask testTask = new FragmentInstanceTask(mockDriver, 100L, status);
+    for (DriverTaskStatus status : invalidStates) {
+      DriverTask testTask = new DriverTask(mockDriver, 100L, status);
       manager.getBlockedTasks().add(testTask);
-      Set<FragmentInstanceTask> taskSet = new HashSet<>();
+      Set<DriverTask> taskSet = new HashSet<>();
       taskSet.add(testTask);
       manager.getQueryMap().put(queryId, taskSet);
       manager.getTimeoutQueue().push(testTask);
@@ -80,15 +80,14 @@ public class DefaultTaskSchedulerTest {
       Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
       clear();
     }
-    FragmentInstanceTask testTask =
-        new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.BLOCKED);
+    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.BLOCKED);
     manager.getBlockedTasks().add(testTask);
-    Set<FragmentInstanceTask> taskSet = new HashSet<>();
+    Set<DriverTask> taskSet = new HashSet<>();
     taskSet.add(testTask);
     manager.getQueryMap().put(queryId, taskSet);
     manager.getTimeoutQueue().push(testTask);
     defaultScheduler.blockedToReady(testTask);
-    Assert.assertEquals(FragmentInstanceTaskStatus.READY, testTask.getStatus());
+    Assert.assertEquals(DriverTaskStatus.READY, testTask.getStatus());
     Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
     Assert.assertNotNull(manager.getReadyQueue().get(testTask.getId()));
     Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
@@ -109,16 +108,16 @@ public class DefaultTaskSchedulerTest {
     FragmentInstanceId instanceId =
         new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
     Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
-    FragmentInstanceTaskStatus[] invalidStates =
-        new FragmentInstanceTaskStatus[] {
-          FragmentInstanceTaskStatus.FINISHED,
-          FragmentInstanceTaskStatus.ABORTED,
-          FragmentInstanceTaskStatus.BLOCKED,
-          FragmentInstanceTaskStatus.RUNNING,
+    DriverTaskStatus[] invalidStates =
+        new DriverTaskStatus[] {
+          DriverTaskStatus.FINISHED,
+          DriverTaskStatus.ABORTED,
+          DriverTaskStatus.BLOCKED,
+          DriverTaskStatus.RUNNING,
         };
-    for (FragmentInstanceTaskStatus status : invalidStates) {
-      FragmentInstanceTask testTask = new FragmentInstanceTask(mockDriver, 100L, status);
-      Set<FragmentInstanceTask> taskSet = new HashSet<>();
+    for (DriverTaskStatus status : invalidStates) {
+      DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+      Set<DriverTask> taskSet = new HashSet<>();
       taskSet.add(testTask);
       manager.getQueryMap().put(queryId, taskSet);
       manager.getTimeoutQueue().push(testTask);
@@ -130,14 +129,13 @@ public class DefaultTaskSchedulerTest {
       Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
       clear();
     }
-    FragmentInstanceTask testTask =
-        new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.READY);
-    Set<FragmentInstanceTask> taskSet = new HashSet<>();
+    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
+    Set<DriverTask> taskSet = new HashSet<>();
     taskSet.add(testTask);
     manager.getQueryMap().put(queryId, taskSet);
     manager.getTimeoutQueue().push(testTask);
     defaultScheduler.readyToRunning(testTask);
-    Assert.assertEquals(FragmentInstanceTaskStatus.RUNNING, testTask.getStatus());
+    Assert.assertEquals(DriverTaskStatus.RUNNING, testTask.getStatus());
     Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
     Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
     Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
@@ -156,16 +154,16 @@ public class DefaultTaskSchedulerTest {
     FragmentInstanceId instanceId =
         new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
     Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
-    FragmentInstanceTaskStatus[] invalidStates =
-        new FragmentInstanceTaskStatus[] {
-          FragmentInstanceTaskStatus.FINISHED,
-          FragmentInstanceTaskStatus.ABORTED,
-          FragmentInstanceTaskStatus.BLOCKED,
-          FragmentInstanceTaskStatus.READY,
+    DriverTaskStatus[] invalidStates =
+        new DriverTaskStatus[] {
+          DriverTaskStatus.FINISHED,
+          DriverTaskStatus.ABORTED,
+          DriverTaskStatus.BLOCKED,
+          DriverTaskStatus.READY,
         };
-    for (FragmentInstanceTaskStatus status : invalidStates) {
-      FragmentInstanceTask testTask = new FragmentInstanceTask(mockDriver, 100L, status);
-      Set<FragmentInstanceTask> taskSet = new HashSet<>();
+    for (DriverTaskStatus status : invalidStates) {
+      DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+      Set<DriverTask> taskSet = new HashSet<>();
       taskSet.add(testTask);
       manager.getQueryMap().put(queryId, taskSet);
       manager.getTimeoutQueue().push(testTask);
@@ -178,9 +176,8 @@ public class DefaultTaskSchedulerTest {
       Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
       clear();
     }
-    FragmentInstanceTask testTask =
-        new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.RUNNING);
-    Set<FragmentInstanceTask> taskSet = new HashSet<>();
+    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
+    Set<DriverTask> taskSet = new HashSet<>();
     taskSet.add(testTask);
     manager.getQueryMap().put(queryId, taskSet);
     manager.getTimeoutQueue().push(testTask);
@@ -189,7 +186,7 @@ public class DefaultTaskSchedulerTest {
     context.setCpuDuration(new CpuTimer.CpuDuration());
     defaultScheduler.runningToReady(testTask, context);
     Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
-    Assert.assertEquals(FragmentInstanceTaskStatus.READY, testTask.getStatus());
+    Assert.assertEquals(DriverTaskStatus.READY, testTask.getStatus());
     Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
     Assert.assertNotNull(manager.getReadyQueue().get(testTask.getId()));
     Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
@@ -209,16 +206,16 @@ public class DefaultTaskSchedulerTest {
     FragmentInstanceId instanceId =
         new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
     Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
-    FragmentInstanceTaskStatus[] invalidStates =
-        new FragmentInstanceTaskStatus[] {
-          FragmentInstanceTaskStatus.FINISHED,
-          FragmentInstanceTaskStatus.ABORTED,
-          FragmentInstanceTaskStatus.BLOCKED,
-          FragmentInstanceTaskStatus.READY,
+    DriverTaskStatus[] invalidStates =
+        new DriverTaskStatus[] {
+          DriverTaskStatus.FINISHED,
+          DriverTaskStatus.ABORTED,
+          DriverTaskStatus.BLOCKED,
+          DriverTaskStatus.READY,
         };
-    for (FragmentInstanceTaskStatus status : invalidStates) {
-      FragmentInstanceTask testTask = new FragmentInstanceTask(mockDriver, 100L, status);
-      Set<FragmentInstanceTask> taskSet = new HashSet<>();
+    for (DriverTaskStatus status : invalidStates) {
+      DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+      Set<DriverTask> taskSet = new HashSet<>();
       taskSet.add(testTask);
       manager.getQueryMap().put(queryId, taskSet);
       manager.getTimeoutQueue().push(testTask);
@@ -231,9 +228,8 @@ public class DefaultTaskSchedulerTest {
       Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
       clear();
     }
-    FragmentInstanceTask testTask =
-        new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.RUNNING);
-    Set<FragmentInstanceTask> taskSet = new HashSet<>();
+    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
+    Set<DriverTask> taskSet = new HashSet<>();
     taskSet.add(testTask);
     manager.getQueryMap().put(queryId, taskSet);
     manager.getTimeoutQueue().push(testTask);
@@ -242,7 +238,7 @@ public class DefaultTaskSchedulerTest {
     context.setCpuDuration(new CpuTimer.CpuDuration());
     defaultScheduler.runningToBlocked(testTask, context);
     Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
-    Assert.assertEquals(FragmentInstanceTaskStatus.BLOCKED, testTask.getStatus());
+    Assert.assertEquals(DriverTaskStatus.BLOCKED, testTask.getStatus());
     Assert.assertTrue(manager.getBlockedTasks().contains(testTask));
     Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
     Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
@@ -262,16 +258,16 @@ public class DefaultTaskSchedulerTest {
     FragmentInstanceId instanceId =
         new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
     Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
-    FragmentInstanceTaskStatus[] invalidStates =
-        new FragmentInstanceTaskStatus[] {
-          FragmentInstanceTaskStatus.FINISHED,
-          FragmentInstanceTaskStatus.ABORTED,
-          FragmentInstanceTaskStatus.BLOCKED,
-          FragmentInstanceTaskStatus.READY,
+    DriverTaskStatus[] invalidStates =
+        new DriverTaskStatus[] {
+          DriverTaskStatus.FINISHED,
+          DriverTaskStatus.ABORTED,
+          DriverTaskStatus.BLOCKED,
+          DriverTaskStatus.READY,
         };
-    for (FragmentInstanceTaskStatus status : invalidStates) {
-      FragmentInstanceTask testTask = new FragmentInstanceTask(mockDriver, 100L, status);
-      Set<FragmentInstanceTask> taskSet = new HashSet<>();
+    for (DriverTaskStatus status : invalidStates) {
+      DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+      Set<DriverTask> taskSet = new HashSet<>();
       taskSet.add(testTask);
       manager.getQueryMap().put(queryId, taskSet);
       manager.getTimeoutQueue().push(testTask);
@@ -284,9 +280,8 @@ public class DefaultTaskSchedulerTest {
       Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
       clear();
     }
-    FragmentInstanceTask testTask =
-        new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.RUNNING);
-    Set<FragmentInstanceTask> taskSet = new HashSet<>();
+    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
+    Set<DriverTask> taskSet = new HashSet<>();
     taskSet.add(testTask);
     manager.getQueryMap().put(queryId, taskSet);
     manager.getTimeoutQueue().push(testTask);
@@ -295,7 +290,7 @@ public class DefaultTaskSchedulerTest {
     context.setCpuDuration(new CpuTimer.CpuDuration());
     defaultScheduler.runningToFinished(testTask, context);
     Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
-    Assert.assertEquals(FragmentInstanceTaskStatus.FINISHED, testTask.getStatus());
+    Assert.assertEquals(DriverTaskStatus.FINISHED, testTask.getStatus());
     Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
     Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
     Assert.assertNull(manager.getTimeoutQueue().get(testTask.getId()));
@@ -319,15 +314,14 @@ public class DefaultTaskSchedulerTest {
     FragmentInstanceId instanceId2 =
         new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-1");
     Mockito.when(mockDriver2.getInfo()).thenReturn(instanceId2);
-    FragmentInstanceTaskStatus[] invalidStates =
-        new FragmentInstanceTaskStatus[] {
-          FragmentInstanceTaskStatus.FINISHED, FragmentInstanceTaskStatus.ABORTED,
+    DriverTaskStatus[] invalidStates =
+        new DriverTaskStatus[] {
+          DriverTaskStatus.FINISHED, DriverTaskStatus.ABORTED,
         };
-    for (FragmentInstanceTaskStatus status : invalidStates) {
-      FragmentInstanceTask testTask1 = new FragmentInstanceTask(mockDriver1, 100L, status);
-      FragmentInstanceTask testTask2 =
-          new FragmentInstanceTask(mockDriver2, 100L, FragmentInstanceTaskStatus.BLOCKED);
-      Set<FragmentInstanceTask> taskSet = new HashSet<>();
+    for (DriverTaskStatus status : invalidStates) {
+      DriverTask testTask1 = new DriverTask(mockDriver1, 100L, status);
+      DriverTask testTask2 = new DriverTask(mockDriver2, 100L, DriverTaskStatus.BLOCKED);
+      Set<DriverTask> taskSet = new HashSet<>();
       taskSet.add(testTask1);
       taskSet.add(testTask2);
       manager.getQueryMap().put(queryId, taskSet);
@@ -337,7 +331,7 @@ public class DefaultTaskSchedulerTest {
       defaultScheduler.toAborted(testTask1);
 
       Assert.assertEquals(status, testTask1.getStatus());
-      Assert.assertEquals(FragmentInstanceTaskStatus.BLOCKED, testTask2.getStatus());
+      Assert.assertEquals(DriverTaskStatus.BLOCKED, testTask2.getStatus());
       Assert.assertFalse(manager.getBlockedTasks().contains(testTask1));
       Assert.assertTrue(manager.getBlockedTasks().contains(testTask2));
       Assert.assertNull(manager.getReadyQueue().get(testTask1.getId()));
@@ -352,23 +346,20 @@ public class DefaultTaskSchedulerTest {
       Mockito.verify(mockDriver2, Mockito.never()).failed(Mockito.any());
       clear();
     }
-    FragmentInstanceTaskStatus[] validStates =
-        new FragmentInstanceTaskStatus[] {
-          FragmentInstanceTaskStatus.RUNNING,
-          FragmentInstanceTaskStatus.READY,
-          FragmentInstanceTaskStatus.BLOCKED,
+    DriverTaskStatus[] validStates =
+        new DriverTaskStatus[] {
+          DriverTaskStatus.RUNNING, DriverTaskStatus.READY, DriverTaskStatus.BLOCKED,
         };
-    for (FragmentInstanceTaskStatus status : validStates) {
+    for (DriverTaskStatus status : validStates) {
       Mockito.reset(mockDriver1);
       Mockito.when(mockDriver1.getInfo()).thenReturn(instanceId1);
       Mockito.reset(mockDriver2);
       Mockito.when(mockDriver2.getInfo()).thenReturn(instanceId2);
 
-      FragmentInstanceTask testTask1 = new FragmentInstanceTask(mockDriver1, 100L, status);
+      DriverTask testTask1 = new DriverTask(mockDriver1, 100L, status);
 
-      FragmentInstanceTask testTask2 =
-          new FragmentInstanceTask(mockDriver2, 100L, FragmentInstanceTaskStatus.BLOCKED);
-      Set<FragmentInstanceTask> taskSet = new HashSet<>();
+      DriverTask testTask2 = new DriverTask(mockDriver2, 100L, DriverTaskStatus.BLOCKED);
+      Set<DriverTask> taskSet = new HashSet<>();
       taskSet.add(testTask1);
       taskSet.add(testTask2);
       manager.getQueryMap().put(queryId, taskSet);
@@ -381,8 +372,8 @@ public class DefaultTaskSchedulerTest {
       Mockito.reset(mockDataBlockManager);
 
       // An aborted fragment may cause others in the same query aborted.
-      Assert.assertEquals(FragmentInstanceTaskStatus.ABORTED, testTask1.getStatus());
-      Assert.assertEquals(FragmentInstanceTaskStatus.ABORTED, testTask2.getStatus());
+      Assert.assertEquals(DriverTaskStatus.ABORTED, testTask1.getStatus());
+      Assert.assertEquals(DriverTaskStatus.ABORTED, testTask2.getStatus());
       Assert.assertFalse(manager.getBlockedTasks().contains(testTask1));
       Assert.assertFalse(manager.getBlockedTasks().contains(testTask2));
       Assert.assertNull(manager.getReadyQueue().get(testTask1.getId()));
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DriverSchedulerTest.java
similarity index 77%
rename from server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
rename to server/src/test/java/org/apache/iotdb/db/mpp/schedule/DriverSchedulerTest.java
index f83d1213c6..2dc325fb30 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DriverSchedulerTest.java
@@ -23,9 +23,9 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.IDriver;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskID;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskStatus;
+import org.apache.iotdb.db.mpp.schedule.task.DriverTask;
+import org.apache.iotdb.db.mpp.schedule.task.DriverTaskID;
+import org.apache.iotdb.db.mpp.schedule.task.DriverTaskStatus;
 
 import org.junit.After;
 import org.junit.Assert;
@@ -36,9 +36,9 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
-public class FragmentInstanceSchedulerTest {
+public class DriverSchedulerTest {
 
-  private final FragmentInstanceScheduler manager = FragmentInstanceScheduler.getInstance();
+  private final DriverScheduler manager = DriverScheduler.getInstance();
 
   @After
   public void tearDown() {
@@ -62,40 +62,37 @@ public class FragmentInstanceSchedulerTest {
     IDriver mockDriver2 = Mockito.mock(IDriver.class);
     Mockito.when(mockDriver2.getInfo()).thenReturn(instanceId2);
     List<IDriver> instances = Arrays.asList(mockDriver1, mockDriver2);
-    manager.submitFragmentInstances(queryId, instances);
+    manager.submitDrivers(queryId, instances);
     Assert.assertTrue(manager.getBlockedTasks().isEmpty());
     Assert.assertEquals(1, manager.getQueryMap().size());
     Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
     Assert.assertEquals(2, manager.getQueryMap().get(queryId).size());
     Assert.assertEquals(2, manager.getTimeoutQueue().size());
     Assert.assertEquals(2, manager.getReadyQueue().size());
-    FragmentInstanceTask task1 =
-        manager.getTimeoutQueue().get(new FragmentInstanceTaskID(instanceId1));
+    DriverTask task1 = manager.getTimeoutQueue().get(new DriverTaskID(instanceId1));
     Assert.assertNotNull(task1);
-    FragmentInstanceTask task2 =
-        manager.getTimeoutQueue().get(new FragmentInstanceTaskID(instanceId2));
+    DriverTask task2 = manager.getTimeoutQueue().get(new DriverTaskID(instanceId2));
     Assert.assertNotNull(task2);
     Assert.assertTrue(manager.getQueryMap().get(queryId).contains(task1));
     Assert.assertTrue(manager.getQueryMap().get(queryId).contains(task2));
-    Assert.assertEquals(FragmentInstanceTaskStatus.READY, task1.getStatus());
-    Assert.assertEquals(FragmentInstanceTaskStatus.READY, task2.getStatus());
+    Assert.assertEquals(DriverTaskStatus.READY, task1.getStatus());
+    Assert.assertEquals(DriverTaskStatus.READY, task2.getStatus());
 
     // Submit another task of the same query
     IDriver mockDriver3 = Mockito.mock(IDriver.class);
     FragmentInstanceId instanceId3 = new FragmentInstanceId(fragmentId, "inst-2");
     Mockito.when(mockDriver3.getInfo()).thenReturn(instanceId3);
-    manager.submitFragmentInstances(queryId, Collections.singletonList(mockDriver3));
+    manager.submitDrivers(queryId, Collections.singletonList(mockDriver3));
     Assert.assertTrue(manager.getBlockedTasks().isEmpty());
     Assert.assertEquals(1, manager.getQueryMap().size());
     Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
     Assert.assertEquals(3, manager.getQueryMap().get(queryId).size());
     Assert.assertEquals(3, manager.getTimeoutQueue().size());
     Assert.assertEquals(3, manager.getReadyQueue().size());
-    FragmentInstanceTask task3 =
-        manager.getTimeoutQueue().get(new FragmentInstanceTaskID(instanceId3));
+    DriverTask task3 = manager.getTimeoutQueue().get(new DriverTaskID(instanceId3));
     Assert.assertNotNull(task3);
     Assert.assertTrue(manager.getQueryMap().get(queryId).contains(task3));
-    Assert.assertEquals(FragmentInstanceTaskStatus.READY, task3.getStatus());
+    Assert.assertEquals(DriverTaskStatus.READY, task3.getStatus());
 
     // Submit another task of the different query
     QueryId queryId2 = new QueryId("test2");
@@ -103,18 +100,17 @@ public class FragmentInstanceSchedulerTest {
     FragmentInstanceId instanceId4 = new FragmentInstanceId(fragmentId2, "inst-0");
     IDriver mockDriver4 = Mockito.mock(IDriver.class);
     Mockito.when(mockDriver4.getInfo()).thenReturn(instanceId4);
-    manager.submitFragmentInstances(queryId2, Collections.singletonList(mockDriver4));
+    manager.submitDrivers(queryId2, Collections.singletonList(mockDriver4));
     Assert.assertTrue(manager.getBlockedTasks().isEmpty());
     Assert.assertEquals(2, manager.getQueryMap().size());
     Assert.assertTrue(manager.getQueryMap().containsKey(queryId2));
     Assert.assertEquals(1, manager.getQueryMap().get(queryId2).size());
     Assert.assertEquals(4, manager.getTimeoutQueue().size());
     Assert.assertEquals(4, manager.getReadyQueue().size());
-    FragmentInstanceTask task4 =
-        manager.getTimeoutQueue().get(new FragmentInstanceTaskID(instanceId4));
+    DriverTask task4 = manager.getTimeoutQueue().get(new DriverTaskID(instanceId4));
     Assert.assertNotNull(task4);
     Assert.assertTrue(manager.getQueryMap().get(queryId2).contains(task4));
-    Assert.assertEquals(FragmentInstanceTaskStatus.READY, task4.getStatus());
+    Assert.assertEquals(DriverTaskStatus.READY, task4.getStatus());
 
     // Abort one FragmentInstance
     Mockito.reset(mockDriver1);
@@ -127,10 +123,10 @@ public class FragmentInstanceSchedulerTest {
     Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
     Assert.assertEquals(3, manager.getTimeoutQueue().size());
     Assert.assertEquals(3, manager.getReadyQueue().size());
-    Assert.assertEquals(FragmentInstanceTaskStatus.ABORTED, task1.getStatus());
-    Assert.assertEquals(FragmentInstanceTaskStatus.READY, task2.getStatus());
-    Assert.assertEquals(FragmentInstanceTaskStatus.READY, task3.getStatus());
-    Assert.assertEquals(FragmentInstanceTaskStatus.READY, task4.getStatus());
+    Assert.assertEquals(DriverTaskStatus.ABORTED, task1.getStatus());
+    Assert.assertEquals(DriverTaskStatus.READY, task2.getStatus());
+    Assert.assertEquals(DriverTaskStatus.READY, task3.getStatus());
+    Assert.assertEquals(DriverTaskStatus.READY, task4.getStatus());
     Mockito.verify(mockDriver1, Mockito.times(1)).failed(Mockito.any());
     Assert.assertEquals(
         FragmentInstanceAbortedException.BY_FRAGMENT_ABORT_CALLED, task1.getAbortCause());
@@ -151,10 +147,10 @@ public class FragmentInstanceSchedulerTest {
     Assert.assertFalse(manager.getQueryMap().containsKey(queryId));
     Assert.assertEquals(1, manager.getTimeoutQueue().size());
     Assert.assertEquals(1, manager.getReadyQueue().size());
-    Assert.assertEquals(FragmentInstanceTaskStatus.ABORTED, task1.getStatus());
-    Assert.assertEquals(FragmentInstanceTaskStatus.ABORTED, task2.getStatus());
-    Assert.assertEquals(FragmentInstanceTaskStatus.ABORTED, task3.getStatus());
-    Assert.assertEquals(FragmentInstanceTaskStatus.READY, task4.getStatus());
+    Assert.assertEquals(DriverTaskStatus.ABORTED, task1.getStatus());
+    Assert.assertEquals(DriverTaskStatus.ABORTED, task2.getStatus());
+    Assert.assertEquals(DriverTaskStatus.ABORTED, task3.getStatus());
+    Assert.assertEquals(DriverTaskStatus.READY, task4.getStatus());
     Mockito.verify(mockDriver1, Mockito.never()).failed(Mockito.any());
     Mockito.verify(mockDriver2, Mockito.times(1)).failed(Mockito.any());
     Mockito.verify(mockDriver3, Mockito.times(1)).failed(Mockito.any());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinelTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DriverTaskTimeoutSentinelThreadTest.java
similarity index 71%
rename from server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinelTest.java
rename to server/src/test/java/org/apache/iotdb/db/mpp/schedule/DriverTaskTimeoutSentinelThreadTest.java
index 28365962b8..9e6e7b3293 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinelTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DriverTaskTimeoutSentinelThreadTest.java
@@ -24,8 +24,8 @@ import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.IDriver;
 import org.apache.iotdb.db.mpp.schedule.queue.IndexedBlockingQueue;
 import org.apache.iotdb.db.mpp.schedule.queue.L1PriorityQueue;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskStatus;
+import org.apache.iotdb.db.mpp.schedule.task.DriverTask;
+import org.apache.iotdb.db.mpp.schedule.task.DriverTaskStatus;
 
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -36,7 +36,7 @@ import org.mockito.Mockito;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 
-public class FragmentInstanceTimeoutSentinelTest {
+public class DriverTaskTimeoutSentinelThreadTest {
 
   @Test
   public void testHandleInvalidStateTask() throws ExecutionException, InterruptedException {
@@ -44,52 +44,49 @@ public class FragmentInstanceTimeoutSentinelTest {
     Mockito.when(mockScheduler.readyToRunning(Mockito.any()))
         .thenAnswer(
             ans -> {
-              FragmentInstanceTask task = ans.getArgument(0);
-              if (task.getStatus() != FragmentInstanceTaskStatus.READY) {
+              DriverTask task = ans.getArgument(0);
+              if (task.getStatus() != DriverTaskStatus.READY) {
                 return false;
               }
-              task.setStatus(FragmentInstanceTaskStatus.RUNNING);
+              task.setStatus(DriverTaskStatus.RUNNING);
               return true;
             });
     QueryId queryId = new QueryId("test");
     PlanFragmentId fragmentId = new PlanFragmentId(queryId, 0);
     FragmentInstanceId instanceId = new FragmentInstanceId(fragmentId, "inst-0");
-    IndexedBlockingQueue<FragmentInstanceTask> taskQueue =
-        new L1PriorityQueue<>(
-            100, new FragmentInstanceTask.TimeoutComparator(), new FragmentInstanceTask());
+    IndexedBlockingQueue<DriverTask> taskQueue =
+        new L1PriorityQueue<>(100, new DriverTask.TimeoutComparator(), new DriverTask());
     IDriver mockDriver = Mockito.mock(IDriver.class);
     Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
 
-    AbstractExecutor executor =
-        new FragmentInstanceTaskExecutor(
-            "0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler);
+    AbstractDriverThread executor =
+        new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler);
 
     // FINISHED status test
-    FragmentInstanceTask testTask =
-        new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.FINISHED);
+    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.FINISHED);
     executor.execute(testTask);
-    Assert.assertEquals(FragmentInstanceTaskStatus.FINISHED, testTask.getStatus());
+    Assert.assertEquals(DriverTaskStatus.FINISHED, testTask.getStatus());
     Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any());
     Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
 
     // ABORTED status test
-    testTask = new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.ABORTED);
+    testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.ABORTED);
     executor.execute(testTask);
-    Assert.assertEquals(FragmentInstanceTaskStatus.ABORTED, testTask.getStatus());
+    Assert.assertEquals(DriverTaskStatus.ABORTED, testTask.getStatus());
     Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any());
     Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
 
     // RUNNING status test
-    testTask = new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.RUNNING);
+    testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
     executor.execute(testTask);
-    Assert.assertEquals(FragmentInstanceTaskStatus.RUNNING, testTask.getStatus());
+    Assert.assertEquals(DriverTaskStatus.RUNNING, testTask.getStatus());
     Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any());
     Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
 
     // BLOCKED status test
-    testTask = new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.BLOCKED);
+    testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.BLOCKED);
     executor.execute(testTask);
-    Assert.assertEquals(FragmentInstanceTaskStatus.BLOCKED, testTask.getStatus());
+    Assert.assertEquals(DriverTaskStatus.BLOCKED, testTask.getStatus());
     Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any());
     Assert.assertNull(testTask.getAbortCause());
     Mockito.verify(mockScheduler, Mockito.never()).toAborted(Mockito.any());
@@ -104,16 +101,15 @@ public class FragmentInstanceTimeoutSentinelTest {
     Mockito.when(mockScheduler.readyToRunning(Mockito.any()))
         .thenAnswer(
             ans -> {
-              FragmentInstanceTask task = ans.getArgument(0);
-              if (task.getStatus() != FragmentInstanceTaskStatus.READY) {
+              DriverTask task = ans.getArgument(0);
+              if (task.getStatus() != DriverTaskStatus.READY) {
                 return false;
               }
-              task.setStatus(FragmentInstanceTaskStatus.RUNNING);
+              task.setStatus(DriverTaskStatus.RUNNING);
               return true;
             });
-    IndexedBlockingQueue<FragmentInstanceTask> taskQueue =
-        new L1PriorityQueue<>(
-            100, new FragmentInstanceTask.TimeoutComparator(), new FragmentInstanceTask());
+    IndexedBlockingQueue<DriverTask> taskQueue =
+        new L1PriorityQueue<>(100, new DriverTask.TimeoutComparator(), new DriverTask());
 
     // Mock the instance with a cancelled future
     IDriver mockDriver = Mockito.mock(IDriver.class);
@@ -124,11 +120,9 @@ public class FragmentInstanceTimeoutSentinelTest {
     Mockito.when(mockDriver.processFor(Mockito.any()))
         .thenReturn(Futures.immediateCancelledFuture());
 
-    AbstractExecutor executor =
-        new FragmentInstanceTaskExecutor(
-            "0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler);
-    FragmentInstanceTask testTask =
-        new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.READY);
+    AbstractDriverThread executor =
+        new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler);
+    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
     executor.execute(testTask);
     Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
     Assert.assertEquals(
@@ -146,16 +140,15 @@ public class FragmentInstanceTimeoutSentinelTest {
     Mockito.when(mockScheduler.readyToRunning(Mockito.any()))
         .thenAnswer(
             ans -> {
-              FragmentInstanceTask task = ans.getArgument(0);
-              if (task.getStatus() != FragmentInstanceTaskStatus.READY) {
+              DriverTask task = ans.getArgument(0);
+              if (task.getStatus() != DriverTaskStatus.READY) {
                 return false;
               }
-              task.setStatus(FragmentInstanceTaskStatus.RUNNING);
+              task.setStatus(DriverTaskStatus.RUNNING);
               return true;
             });
-    IndexedBlockingQueue<FragmentInstanceTask> taskQueue =
-        new L1PriorityQueue<>(
-            100, new FragmentInstanceTask.TimeoutComparator(), new FragmentInstanceTask());
+    IndexedBlockingQueue<DriverTask> taskQueue =
+        new L1PriorityQueue<>(100, new DriverTask.TimeoutComparator(), new DriverTask());
 
     // Mock the instance with a cancelled future
     IDriver mockDriver = Mockito.mock(IDriver.class);
@@ -165,11 +158,9 @@ public class FragmentInstanceTimeoutSentinelTest {
     Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
     Mockito.when(mockDriver.processFor(Mockito.any())).thenReturn(Futures.immediateVoidFuture());
     Mockito.when(mockDriver.isFinished()).thenReturn(true);
-    AbstractExecutor executor =
-        new FragmentInstanceTaskExecutor(
-            "0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler);
-    FragmentInstanceTask testTask =
-        new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.READY);
+    AbstractDriverThread executor =
+        new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler);
+    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
     executor.execute(testTask);
     Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
     Assert.assertNull(testTask.getAbortCause());
@@ -186,16 +177,15 @@ public class FragmentInstanceTimeoutSentinelTest {
     Mockito.when(mockScheduler.readyToRunning(Mockito.any()))
         .thenAnswer(
             ans -> {
-              FragmentInstanceTask task = ans.getArgument(0);
-              if (task.getStatus() != FragmentInstanceTaskStatus.READY) {
+              DriverTask task = ans.getArgument(0);
+              if (task.getStatus() != DriverTaskStatus.READY) {
                 return false;
               }
-              task.setStatus(FragmentInstanceTaskStatus.RUNNING);
+              task.setStatus(DriverTaskStatus.RUNNING);
               return true;
             });
-    IndexedBlockingQueue<FragmentInstanceTask> taskQueue =
-        new L1PriorityQueue<>(
-            100, new FragmentInstanceTask.TimeoutComparator(), new FragmentInstanceTask());
+    IndexedBlockingQueue<DriverTask> taskQueue =
+        new L1PriorityQueue<>(100, new DriverTask.TimeoutComparator(), new DriverTask());
 
     // Mock the instance with a blocked future
     ListenableFuture<Void> mockFuture = Mockito.mock(ListenableFuture.class);
@@ -216,11 +206,9 @@ public class FragmentInstanceTimeoutSentinelTest {
     Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
     Mockito.when(mockDriver.processFor(Mockito.any())).thenReturn(mockFuture);
     Mockito.when(mockDriver.isFinished()).thenReturn(false);
-    AbstractExecutor executor =
-        new FragmentInstanceTaskExecutor(
-            "0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler);
-    FragmentInstanceTask testTask =
-        new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.READY);
+    AbstractDriverThread executor =
+        new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler);
+    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
     executor.execute(testTask);
     Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
     Assert.assertNull(testTask.getAbortCause());
@@ -237,16 +225,15 @@ public class FragmentInstanceTimeoutSentinelTest {
     Mockito.when(mockScheduler.readyToRunning(Mockito.any()))
         .thenAnswer(
             ans -> {
-              FragmentInstanceTask task = ans.getArgument(0);
-              if (task.getStatus() != FragmentInstanceTaskStatus.READY) {
+              DriverTask task = ans.getArgument(0);
+              if (task.getStatus() != DriverTaskStatus.READY) {
                 return false;
               }
-              task.setStatus(FragmentInstanceTaskStatus.RUNNING);
+              task.setStatus(DriverTaskStatus.RUNNING);
               return true;
             });
-    IndexedBlockingQueue<FragmentInstanceTask> taskQueue =
-        new L1PriorityQueue<>(
-            100, new FragmentInstanceTask.TimeoutComparator(), new FragmentInstanceTask());
+    IndexedBlockingQueue<DriverTask> taskQueue =
+        new L1PriorityQueue<>(100, new DriverTask.TimeoutComparator(), new DriverTask());
 
     // Mock the instance with a ready future
     ListenableFuture<Void> mockFuture = Mockito.mock(ListenableFuture.class);
@@ -267,11 +254,9 @@ public class FragmentInstanceTimeoutSentinelTest {
     Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
     Mockito.when(mockDriver.processFor(Mockito.any())).thenReturn(mockFuture);
     Mockito.when(mockDriver.isFinished()).thenReturn(false);
-    AbstractExecutor executor =
-        new FragmentInstanceTaskExecutor(
-            "0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler);
-    FragmentInstanceTask testTask =
-        new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.READY);
+    AbstractDriverThread executor =
+        new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler);
+    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
     executor.execute(testTask);
     Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
     Assert.assertNull(testTask.getAbortCause());