You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/22 01:23:42 UTC

[iotdb] branch master updated: Add FragmentInstanceStateMachine for FragmentInstance State change (#5615)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f9226c380d Add FragmentInstanceStateMachine for FragmentInstance State change (#5615)
f9226c380d is described below

commit f9226c380dc262f9966717a53ae80a72885824d3
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Fri Apr 22 09:23:37 2022 +0800

    Add FragmentInstanceStateMachine for FragmentInstance State change (#5615)
---
 pom.xml                                            |   5 +
 server/pom.xml                                     |   4 +
 .../iotdb/db/mpp/buffer/DataBlockManager.java      |  12 +-
 .../apache/iotdb/db/mpp/buffer/ISinkHandle.java    |   7 +-
 .../org/apache/iotdb/db/mpp/buffer/SinkHandle.java |  29 +-
 .../apache/iotdb/db/mpp/buffer/StubSinkHandle.java |   4 +-
 .../apache/iotdb/db/mpp/execution/DataDriver.java  | 189 ++-------
 .../org/apache/iotdb/db/mpp/execution/Driver.java  | 426 ++++++++++++++++++++-
 .../iotdb/db/mpp/execution/DriverContext.java      |  13 +-
 .../db/mpp/execution/FragmentInstanceContext.java  |  95 +++--
 .../mpp/execution/FragmentInstanceExecution.java   |  66 +++-
 ...t.java => FragmentInstanceFailureListener.java} |  29 +-
 .../db/mpp/execution/FragmentInstanceManager.java  |  57 ++-
 .../db/mpp/execution/FragmentInstanceState.java    |   2 +-
 .../execution/FragmentInstanceStateMachine.java    | 182 +++++++++
 .../execution/{DriverContext.java => IDriver.java} |  30 +-
 .../iotdb/db/mpp/execution/SchemaDriver.java       | 140 +------
 .../org/apache/iotdb/db/mpp/operator/Operator.java |   4 +-
 .../db/mpp/operator/process/LimitOperator.java     |   4 +-
 .../db/mpp/operator/process/TransformOperator.java |   2 +-
 .../mpp/operator/schema/SchemaFetchOperator.java   |   3 +-
 .../db/mpp/operator/source/ExchangeOperator.java   |   2 +-
 .../schedule/FragmentInstanceAbortedException.java |   4 +-
 .../db/mpp/schedule/FragmentInstanceScheduler.java |   4 +-
 .../mpp/schedule/FragmentInstanceTaskExecutor.java |   4 +-
 .../mpp/schedule/IFragmentInstanceScheduler.java   |   6 +-
 .../db/mpp/schedule/task/FragmentInstanceTask.java |  16 +-
 .../apache/iotdb/db/mpp/buffer/SinkHandleTest.java |  68 +---
 .../iotdb/db/mpp/execution/DataDriverTest.java     |  30 +-
 .../iotdb/db/mpp/operator/LimitOperatorTest.java   |  19 +-
 .../operator/SeriesAggregateScanOperatorTest.java  |  15 +-
 .../db/mpp/operator/SeriesScanOperatorTest.java    |  19 +-
 .../db/mpp/operator/TimeJoinOperatorTest.java      |  19 +-
 .../operator/schema/SchemaScanOperatorTest.java    |  32 +-
 .../db/mpp/schedule/DefaultTaskSchedulerTest.java  |  16 +-
 .../schedule/FragmentInstanceSchedulerTest.java    |  12 +-
 .../FragmentInstanceTimeoutSentinelTest.java       |  12 +-
 37 files changed, 995 insertions(+), 586 deletions(-)

diff --git a/pom.xml b/pom.xml
index 912e982c23..5abd10d608 100644
--- a/pom.xml
+++ b/pom.xml
@@ -498,6 +498,11 @@
                 <artifactId>slice</artifactId>
                 <version>0.41</version>
             </dependency>
+            <dependency>
+                <groupId>io.airlift</groupId>
+                <artifactId>stats</artifactId>
+                <version>200</version>
+            </dependency>
             <dependency>
                 <groupId>org.openjdk.jol</groupId>
                 <artifactId>jol-core</artifactId>
diff --git a/server/pom.xml b/server/pom.xml
index cf18fa5054..791dd03a5a 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -89,6 +89,10 @@
             <groupId>io.airlift</groupId>
             <artifactId>units</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.airlift</groupId>
+            <artifactId>stats</artifactId>
+        </dependency>
         <dependency>
             <groupId>io.airlift</groupId>
             <artifactId>airline</artifactId>
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
index e09acf28cb..7a38ef6d09 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
@@ -61,6 +61,8 @@ public class DataBlockManager implements IDataBlockManager {
     void onClosed(SinkHandle sinkHandle);
 
     void onAborted(SinkHandle sinkHandle);
+
+    void onFailure(Throwable t);
   }
 
   /** Handle thrift communications. */
@@ -166,6 +168,7 @@ public class DataBlockManager implements IDataBlockManager {
 
   /** Listen to the state changes of a source handle. */
   class SourceHandleListenerImpl implements SourceHandleListener {
+
     @Override
     public void onFinished(SourceHandle sourceHandle) {
       logger.info("Release resources of finished source handle {}", sourceHandle);
@@ -208,12 +211,12 @@ public class DataBlockManager implements IDataBlockManager {
         logger.info("Resources of finished sink handle {} has already been released", sinkHandle);
       }
       sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId());
-      context.finish();
+      context.finished();
     }
 
     @Override
     public void onClosed(SinkHandle sinkHandle) {
-      context.flushing();
+      context.transitionToFlushing();
     }
 
     @Override
@@ -224,6 +227,11 @@ public class DataBlockManager implements IDataBlockManager {
       }
       sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId());
     }
+
+    @Override
+    public void onFailure(Throwable t) {
+      context.failed(t);
+    }
   }
 
   private final LocalMemoryManager localMemoryManager;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
index b27e86186b..99f92e3f45 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
@@ -25,7 +25,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import java.io.IOException;
 import java.util.List;
 
-public interface ISinkHandle extends AutoCloseable {
+public interface ISinkHandle {
 
   /** Get the total amount of memory used by buffered tsblocks. */
   long getBufferRetainedSizeInBytes();
@@ -41,7 +41,7 @@ public interface ISinkHandle extends AutoCloseable {
    * the invocation will be ignored. This can happen with limit queries. A {@link RuntimeException}
    * will be thrown if any exception happened during the data transmission.
    */
-  void send(List<TsBlock> tsBlocks) throws IOException;
+  void send(List<TsBlock> tsBlocks);
 
   /**
    * Send a {@link TsBlock} to a specific partition. If no-more-tsblocks has been set, the send
@@ -70,8 +70,7 @@ public interface ISinkHandle extends AutoCloseable {
    * downstream instances. A {@link RuntimeException} will be thrown if any exception happened
    * during the data transmission.
    */
-  @Override
-  void close() throws IOException;
+  void close();
 
   /**
    * Abort the sink handle. Discard all tsblocks which may still be in the memory buffer and cancel
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index f66d8b49f3..c540103126 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -74,7 +74,6 @@ public class SinkHandle implements ISinkHandle {
   private long bufferRetainedSizeInBytes;
   private boolean closed;
   private boolean noMoreTsBlocks;
-  private Throwable throwable;
 
   public SinkHandle(
       TEndPoint remoteEndpoint,
@@ -114,11 +113,8 @@ public class SinkHandle implements ISinkHandle {
   }
 
   @Override
-  public void send(List<TsBlock> tsBlocks) throws IOException {
+  public void send(List<TsBlock> tsBlocks) {
     Validate.notNull(tsBlocks, "tsBlocks is null");
-    if (throwable != null) {
-      throw new IOException(throwable);
-    }
     if (closed) {
       throw new IllegalStateException("Sink handle is closed.");
     }
@@ -194,24 +190,21 @@ public class SinkHandle implements ISinkHandle {
   }
 
   @Override
-  public void close() throws IOException {
+  public void close() {
     logger.info("Sink handle {} is being closed.", this);
-    if (throwable != null) {
-      throw new IOException(throwable);
-    }
     if (closed) {
       return;
     }
+    try {
+      sendEndOfDataBlockEvent();
+    } catch (TException e) {
+      throw new RuntimeException("Send EndOfDataBlockEvent failed", e);
+    }
     synchronized (this) {
       closed = true;
       noMoreTsBlocks = true;
     }
     sinkHandleListener.onClosed(this);
-    try {
-      sendEndOfDataBlockEvent();
-    } catch (TException e) {
-      throw new IOException(e);
-    }
     logger.info("Sink handle {} is closed.", this);
   }
 
@@ -247,7 +240,7 @@ public class SinkHandle implements ISinkHandle {
 
   @Override
   public boolean isFinished() {
-    return throwable == null && noMoreTsBlocks && sequenceIdToTsBlock.isEmpty();
+    return noMoreTsBlocks && sequenceIdToTsBlock.isEmpty();
   }
 
   @Override
@@ -362,7 +355,7 @@ public class SinkHandle implements ISinkHandle {
         try {
           client.onNewDataBlockEvent(newDataBlockEvent);
           break;
-        } catch (TException e) {
+        } catch (Throwable e) {
           logger.error(
               "Failed to send new data block event to plan node {} of {} due to {}, attempt times: {}",
               remotePlanNodeId,
@@ -371,9 +364,7 @@ public class SinkHandle implements ISinkHandle {
               attempt,
               e);
           if (attempt == MAX_ATTEMPT_TIMES) {
-            synchronized (this) {
-              throwable = e;
-            }
+            sinkHandleListener.onFailure(e);
           }
         }
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
index 9f3b9240c2..49a13eddd7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
@@ -59,7 +59,7 @@ public class StubSinkHandle implements ISinkHandle {
   }
 
   @Override
-  public void send(List<TsBlock> tsBlocks) throws IOException {
+  public void send(List<TsBlock> tsBlocks) {
     this.tsBlocks.addAll(tsBlocks);
   }
 
@@ -87,7 +87,7 @@ public class StubSinkHandle implements ISinkHandle {
       return;
     }
     closed = true;
-    instanceContext.flushing();
+    instanceContext.transitionToFlushing();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
index 5ae66b14d7..e6e8e8083a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
-import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -26,160 +25,70 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.idtable.IDTable;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
-import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.operator.Operator;
 import org.apache.iotdb.db.mpp.operator.source.DataSourceOperator;
 import org.apache.iotdb.db.query.control.FileReaderManager;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
-import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
-import io.airlift.units.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.NotThreadSafe;
 
-import java.io.IOException;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
-import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
-import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
-
 /**
  * One dataDriver is responsible for one FragmentInstance which is for data query, which may
  * contains several series.
  */
 @NotThreadSafe
-public class DataDriver implements Driver {
-
-  private static final Logger logger = LoggerFactory.getLogger(DataDriver.class);
-
-  private final Operator root;
-  private final ISinkHandle sinkHandle;
-  private final DataDriverContext driverContext;
+public class DataDriver extends Driver {
 
   private boolean init;
-  private boolean closed;
 
   /** closed tsfile used in this fragment instance */
   private Set<TsFileResource> closedFilePaths;
   /** unClosed tsfile used in this fragment instance */
   private Set<TsFileResource> unClosedFilePaths;
 
-  private final AtomicReference<SettableFuture<Void>> driverBlockedFuture = new AtomicReference<>();
-
   public DataDriver(Operator root, ISinkHandle sinkHandle, DataDriverContext driverContext) {
-    this.root = root;
-    this.sinkHandle = sinkHandle;
-    this.driverContext = driverContext;
+    super(root, sinkHandle, driverContext);
     this.closedFilePaths = new HashSet<>();
     this.unClosedFilePaths = new HashSet<>();
-    // initially the driverBlockedFuture is not blocked (it is completed)
-    SettableFuture<Void> future = SettableFuture.create();
-    future.set(null);
-    driverBlockedFuture.set(future);
-  }
-
-  @Override
-  public boolean isFinished() {
-    try {
-      boolean isFinished =
-          closed || (driverBlockedFuture.get().isDone() && root != null && root.isFinished());
-      if (isFinished) {
-        close();
-      }
-      return isFinished;
-    } catch (Throwable t) {
-      logger.error(
-          "Failed to query whether the data driver {} is finished", driverContext.getId(), t);
-      driverContext.failed(t);
-      close();
-      return true;
-    }
   }
 
   @Override
-  public ListenableFuture<Void> processFor(Duration duration) {
-
-    SettableFuture<Void> blockedFuture = driverBlockedFuture.get();
-    // initialization may be time-consuming, so we keep it in the processFor method
-    // in normal case, it won't cause deadlock and should finish soon, otherwise it will be a
-    // critical bug
+  protected boolean init(SettableFuture<Void> blockedFuture) {
     if (!init) {
       try {
         initialize();
       } catch (Throwable t) {
-        logger.error(
+        LOGGER.error(
             "Failed to do the initialization for fragment instance {} ", driverContext.getId(), t);
         driverContext.failed(t);
-        close();
         blockedFuture.setException(t);
-        return blockedFuture;
+        return false;
       }
     }
-
-    // if the driver is blocked we don't need to continue
-    if (!blockedFuture.isDone()) {
-      return blockedFuture;
-    }
-
-    long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
-
-    long start = System.nanoTime();
-    try {
-      do {
-        ListenableFuture<Void> future = processInternal();
-        if (!future.isDone()) {
-          return updateDriverBlockedFuture(future);
-        }
-      } while (System.nanoTime() - start < maxRuntime && !root.isFinished());
-    } catch (Throwable t) {
-      logger.error("Failed to execute fragment instance {}", driverContext.getId(), t);
-      driverContext.failed(t);
-      close();
-      blockedFuture.setException(t);
-      return blockedFuture;
-    }
-    return NOT_BLOCKED;
-  }
-
-  @Override
-  public FragmentInstanceId getInfo() {
-    return driverContext.getId();
+    return true;
   }
 
+  /**
+   * All file paths used by this fragment instance must be cleared and thus the usage reference must
+   * be decreased.
+   */
   @Override
-  public void close() {
-    if (closed) {
-      return;
+  protected void releaseResource() {
+    for (TsFileResource tsFile : closedFilePaths) {
+      FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true);
     }
-    closed = true;
-    try {
-      if (root != null) {
-        root.close();
-      }
-      if (sinkHandle != null) {
-        sinkHandle.close();
-      }
-    } catch (Throwable t) {
-      logger.error("Failed to closed driver {}", driverContext.getId(), t);
-      driverContext.failed(t);
-    } finally {
-      removeUsedFilesForQuery();
+    closedFilePaths = null;
+    for (TsFileResource tsFile : unClosedFilePaths) {
+      FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true);
     }
-  }
-
-  @Override
-  public void failed(Throwable t) {
-    driverContext.failed(t);
+    unClosedFilePaths = null;
   }
 
   /**
@@ -187,7 +96,8 @@ public class DataDriver implements Driver {
    * we should change all the blocked lock operation into tryLock
    */
   private void initialize() throws QueryProcessException {
-    List<DataSourceOperator> sourceOperators = driverContext.getSourceOperators();
+    List<DataSourceOperator> sourceOperators =
+        ((DataDriverContext) driverContext).getSourceOperators();
     if (sourceOperators != null && !sourceOperators.isEmpty()) {
       QueryDataSource dataSource = initQueryDataSourceCache();
       sourceOperators.forEach(
@@ -210,13 +120,12 @@ public class DataDriver implements Driver {
    * QueryDataSource needed for this query
    */
   public QueryDataSource initQueryDataSourceCache() throws QueryProcessException {
-    DataRegion dataRegion = driverContext.getDataRegion();
+    DataDriverContext context = (DataDriverContext) driverContext;
+    DataRegion dataRegion = context.getDataRegion();
     dataRegion.readLock();
     try {
       List<PartialPath> pathList =
-          driverContext.getPaths().stream()
-              .map(IDTable::translateQueryPath)
-              .collect(Collectors.toList());
+          context.getPaths().stream().map(IDTable::translateQueryPath).collect(Collectors.toList());
       // when all the selected series are under the same device, the QueryDataSource will be
       // filtered according to timeIndex
       Set<String> selectedDeviceIdSet =
@@ -227,7 +136,7 @@ public class DataDriver implements Driver {
               pathList,
               selectedDeviceIdSet.size() == 1 ? selectedDeviceIdSet.iterator().next() : null,
               driverContext.getFragmentInstanceContext(),
-              driverContext.getTimeFilter());
+              context.getTimeFilter());
 
       // used files should be added before mergeLock is unlocked, or they may be deleted by
       // running merge
@@ -268,67 +177,17 @@ public class DataDriver implements Driver {
     }
   }
 
-  /**
-   * All file paths used by this fragment instance must be cleared and thus the usage reference must
-   * be decreased.
-   */
-  private void removeUsedFilesForQuery() {
-    for (TsFileResource tsFile : closedFilePaths) {
-      FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true);
-    }
-    closedFilePaths = null;
-    for (TsFileResource tsFile : unClosedFilePaths) {
-      FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true);
-    }
-    unClosedFilePaths = null;
-  }
-
   /**
    * Increase the usage reference of filePath of job id. Before the invoking of this method, <code>
    * this.setqueryIdForCurrentRequestThread</code> has been invoked, so <code>
    * sealedFilePathsMap.get(queryId)</code> or <code>unsealedFilePathsMap.get(queryId)</code> must
    * not return null.
    */
-  void addFilePathToMap(TsFileResource tsFile, boolean isClosed) {
+  private void addFilePathToMap(TsFileResource tsFile, boolean isClosed) {
     Set<TsFileResource> pathSet = isClosed ? closedFilePaths : unClosedFilePaths;
     if (!pathSet.contains(tsFile)) {
       pathSet.add(tsFile);
       FileReaderManager.getInstance().increaseFileReaderReference(tsFile, isClosed);
     }
   }
-
-  private ListenableFuture<Void> processInternal() throws IOException, IoTDBException {
-    ListenableFuture<Void> blocked = root.isBlocked();
-    if (!blocked.isDone()) {
-      return blocked;
-    }
-    blocked = sinkHandle.isFull();
-    if (!blocked.isDone()) {
-      return blocked;
-    }
-    if (root.hasNext()) {
-      TsBlock tsBlock = root.next();
-      if (tsBlock != null && !tsBlock.isEmpty()) {
-        sinkHandle.send(Collections.singletonList(tsBlock));
-      }
-    }
-    return NOT_BLOCKED;
-  }
-
-  private ListenableFuture<Void> updateDriverBlockedFuture(
-      ListenableFuture<Void> sourceBlockedFuture) {
-    // driverBlockedFuture will be completed as soon as the sourceBlockedFuture is completed
-    // or any of the operators gets a memory revocation request
-    SettableFuture<Void> newDriverBlockedFuture = SettableFuture.create();
-    driverBlockedFuture.set(newDriverBlockedFuture);
-    sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), directExecutor());
-
-    // TODO Although we don't have memory management for operator now, we should consider it for
-    // future
-    // it's possible that memory revoking is requested for some operator
-    // before we update driverBlockedFuture above and we don't want to miss that
-    // notification, so we check to see whether that's the case before returning.
-
-    return newDriverBlockedFuture;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
index f211ce593c..40da49ff15 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
@@ -18,25 +18,94 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
+import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.operator.Operator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import io.airlift.units.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Throwables.throwIfUnchecked;
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
+import static java.lang.Boolean.TRUE;
+import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
 
 /**
  * Driver encapsulates some methods which are necessary for execution scheduler to run a fragment
  * instance
  */
-public interface Driver extends Closeable {
+public abstract class Driver implements IDriver {
+
+  protected static final Logger LOGGER = LoggerFactory.getLogger(Driver.class);
+
+  protected final Operator root;
+  protected final ISinkHandle sinkHandle;
+  protected final DriverContext driverContext;
+  protected final AtomicReference<SettableFuture<Void>> driverBlockedFuture =
+      new AtomicReference<>();
+  protected final AtomicReference<State> state = new AtomicReference<>(State.ALIVE);
+
+  protected final DriverLock exclusiveLock = new DriverLock();
+
+  protected enum State {
+    ALIVE,
+    NEED_DESTRUCTION,
+    DESTROYED
+  }
+
+  public Driver(Operator root, ISinkHandle sinkHandle, DriverContext driverContext) {
+    checkNotNull(root, "root Operator should not be null");
+    checkNotNull(sinkHandle, "SinkHandle should not be null");
+    this.root = root;
+    this.sinkHandle = sinkHandle;
+    this.driverContext = driverContext;
+
+    // initially the driverBlockedFuture is not blocked (it is completed)
+    SettableFuture<Void> future = SettableFuture.create();
+    future.set(null);
+    driverBlockedFuture.set(future);
+  }
 
   /**
    * Used to judge whether this fragment instance should be scheduled for execution anymore
    *
    * @return true if the FragmentInstance is done or terminated due to failure, otherwise false.
    */
-  boolean isFinished();
+  @Override
+  public boolean isFinished() {
+    checkLockNotHeld("Cannot check finished status while holding the driver lock");
+
+    // if we can get the lock, attempt a clean shutdown; otherwise someone else will shutdown
+    Optional<Boolean> result = tryWithLockUnInterruptibly(this::isFinishedInternal);
+    return result.orElseGet(() -> state.get() != State.ALIVE || driverContext.isDone());
+  }
+
+  /**
+   * do initialization
+   *
+   * @return true if init succeed, false otherwise
+   */
+  protected abstract boolean init(SettableFuture<Void> blockedFuture);
+
+  /** release resource this driver used */
+  protected abstract void releaseResource();
 
   /**
    * run the fragment instance for {@param duration} time slice, the time of this run is likely not
@@ -48,23 +117,366 @@ public interface Driver extends Closeable {
    *     next processing otherwise, meaning that this fragment instance is blocked and not ready for
    *     next processing.
    */
-  ListenableFuture<Void> processFor(Duration duration);
+  @Override
+  public ListenableFuture<Void> processFor(Duration duration) {
+
+    SettableFuture<Void> blockedFuture = driverBlockedFuture.get();
+    // initialization may be time-consuming, so we keep it in the processFor method
+    // in normal case, it won't cause deadlock and should finish soon, otherwise it will be a
+    // critical bug
+    if (!init(blockedFuture)) {
+      return blockedFuture;
+    }
+
+    // if the driver is blocked we don't need to continue
+    if (!blockedFuture.isDone()) {
+      return blockedFuture;
+    }
+
+    long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
+
+    Optional<ListenableFuture<Void>> result =
+        tryWithLock(
+            100,
+            TimeUnit.MILLISECONDS,
+            true,
+            () -> {
+              long start = System.nanoTime();
+              do {
+                ListenableFuture<Void> future = processInternal();
+                if (!future.isDone()) {
+                  return updateDriverBlockedFuture(future);
+                }
+              } while (System.nanoTime() - start < maxRuntime && !isFinishedInternal());
+              return NOT_BLOCKED;
+            });
+
+    return result.orElse(NOT_BLOCKED);
+  }
 
   /**
    * the id information about this Fragment Instance.
    *
    * @return a {@link FragmentInstanceId} instance.
    */
-  FragmentInstanceId getInfo();
+  @Override
+  public FragmentInstanceId getInfo() {
+    return driverContext.getId();
+  }
 
   /** clear resource used by this fragment instance */
   @Override
-  void close();
+  public void close() {
+    // mark the service for destruction
+    if (!state.compareAndSet(State.ALIVE, State.NEED_DESTRUCTION)) {
+      return;
+    }
+
+    exclusiveLock.interruptCurrentOwner();
+
+    // if we can get the lock, attempt a clean shutdown; otherwise someone else will shut down
+    tryWithLockUnInterruptibly(() -> TRUE);
+  }
 
   /**
    * fail current driver
    *
    * @param t reason cause this failure
    */
-  void failed(Throwable t);
+  @Override
+  public void failed(Throwable t) {
+    driverContext.failed(t);
+  }
+
+  @Override
+  public ISinkHandle getSinkHandle() {
+    return sinkHandle;
+  }
+
+  @GuardedBy("exclusiveLock")
+  private boolean isFinishedInternal() {
+    checkLockHeld("Lock must be held to call isFinishedInternal");
+
+    boolean finished = state.get() != State.ALIVE || driverContext.isDone() || root.isFinished();
+    if (finished) {
+      state.compareAndSet(State.ALIVE, State.NEED_DESTRUCTION);
+    }
+    return finished;
+  }
+
+  private ListenableFuture<Void> processInternal() {
+    try {
+      ListenableFuture<Void> blocked = root.isBlocked();
+      if (!blocked.isDone()) {
+        return blocked;
+      }
+      blocked = sinkHandle.isFull();
+      if (!blocked.isDone()) {
+        return blocked;
+      }
+      if (root.hasNext()) {
+        TsBlock tsBlock = root.next();
+        if (tsBlock != null && !tsBlock.isEmpty()) {
+          sinkHandle.send(Collections.singletonList(tsBlock));
+        }
+      }
+      return NOT_BLOCKED;
+    } catch (Throwable t) {
+      LOGGER.error("Failed to execute fragment instance {}", driverContext.getId(), t);
+      List<StackTraceElement> interrupterStack = exclusiveLock.getInterrupterStack();
+      if (interrupterStack == null) {
+        driverContext.failed(t);
+        throw t;
+      }
+
+      // Driver thread was interrupted which should only happen if the task is already finished.
+      // If this becomes the actual cause of a failed query there is a bug in the task state
+      // machine.
+      Exception exception = new Exception("Interrupted By");
+      exception.setStackTrace(interrupterStack.toArray(new StackTraceElement[0]));
+      RuntimeException newException = new RuntimeException("Driver was interrupted", exception);
+      newException.addSuppressed(t);
+      driverContext.failed(newException);
+      throw newException;
+    }
+  }
+
+  private ListenableFuture<Void> updateDriverBlockedFuture(
+      ListenableFuture<Void> sourceBlockedFuture) {
+    // driverBlockedFuture will be completed as soon as the sourceBlockedFuture is completed
+    // or any of the operators gets a memory revocation request
+    SettableFuture<Void> newDriverBlockedFuture = SettableFuture.create();
+    driverBlockedFuture.set(newDriverBlockedFuture);
+    sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), directExecutor());
+
+    // TODO Although we don't have memory management for operator now, we should consider it for
+    // future
+    // it's possible that memory revoking is requested for some operator
+    // before we update driverBlockedFuture above and we don't want to miss that
+    // notification, so we check to see whether that's the case before returning.
+
+    return newDriverBlockedFuture;
+  }
+
+  private synchronized void checkLockNotHeld(String message) {
+    checkState(!exclusiveLock.isHeldByCurrentThread(), message);
+  }
+
+  @GuardedBy("exclusiveLock")
+  private synchronized void checkLockHeld(String message) {
+    checkState(exclusiveLock.isHeldByCurrentThread(), message);
+  }
+
+  /**
+   * Try to acquire the {@code exclusiveLock} immediately and run a {@code task} The task will not
+   * be interrupted if the {@code Driver} is closed.
+   *
+   * <p>Note: task cannot return null
+   */
+  private <T> Optional<T> tryWithLockUnInterruptibly(Supplier<T> task) {
+    return tryWithLock(0, TimeUnit.MILLISECONDS, false, task);
+  }
+
+  /**
+   * Try to acquire the {@code exclusiveLock} with {@code timeout} and run a {@code task}. If the
+   * {@code interruptOnClose} flag is set to {@code true} the {@code task} will be interrupted if
+   * the {@code Driver} is closed.
+   *
+   * <p>Note: task cannot return null
+   */
+  private <T> Optional<T> tryWithLock(
+      long timeout, TimeUnit unit, boolean interruptOnClose, Supplier<T> task) {
+    checkLockNotHeld("Lock cannot be reacquired");
+
+    boolean acquired = false;
+    try {
+      acquired = exclusiveLock.tryLock(timeout, unit, interruptOnClose);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+
+    if (!acquired) {
+      return Optional.empty();
+    }
+
+    Optional<T> result;
+    try {
+      result = Optional.of(task.get());
+    } finally {
+      try {
+        destroyIfNecessary();
+      } finally {
+        exclusiveLock.unlock();
+      }
+    }
+
+    // We need to recheck whether the state is NEED_DESTRUCTION, if so, destroy the driver.
+    // We assume that there is another concurrent Thread-A calling close method, it successfully CAS
+    // state from ALIVE to NEED_DESTRUCTION just after current Thread-B do destroyIfNecessary() and
+    // before exclusiveLock.unlock().
+    // Then Thread-A call this method, trying to acquire lock and do destroy things, but it won't
+    // succeed because the lock is still held by Thread-B. So Thread-A exit.
+    // If we don't do this recheck here, Thread-B will exit too. Nobody will do destroy things.
+    if (state.get() == State.NEED_DESTRUCTION && exclusiveLock.tryLock(interruptOnClose)) {
+      try {
+        destroyIfNecessary();
+      } finally {
+        exclusiveLock.unlock();
+      }
+    }
+
+    return result;
+  }
+
+  @GuardedBy("exclusiveLock")
+  private void destroyIfNecessary() {
+    checkLockHeld("Lock must be held to call destroyIfNecessary");
+
+    if (!state.compareAndSet(State.NEED_DESTRUCTION, State.DESTROYED)) {
+      return;
+    }
+
+    // if we get an error while closing a driver, record it and we will throw it at the end
+    Throwable inFlightException = null;
+    try {
+      inFlightException = closeAndDestroyOperators();
+      driverContext.finished();
+    } catch (Throwable t) {
+      // this shouldn't happen but be safe
+      inFlightException =
+          addSuppressedException(
+              inFlightException, t, "Error destroying driver for task %s", driverContext.getId());
+    } finally {
+      releaseResource();
+    }
+
+    if (inFlightException != null) {
+      // this will always be an Error or Runtime
+      throwIfUnchecked(inFlightException);
+      throw new RuntimeException(inFlightException);
+    }
+  }
+
+  private Throwable closeAndDestroyOperators() {
+    // record the current interrupted status (and clear the flag); we'll reset it later
+    boolean wasInterrupted = Thread.interrupted();
+
+    Throwable inFlightException = null;
+
+    try {
+      root.close();
+      sinkHandle.close();
+    } catch (InterruptedException t) {
+      // don't record the stack
+      wasInterrupted = true;
+    } catch (Throwable t) {
+      // TODO currently, we won't know exact operator which is failed in closing
+      inFlightException =
+          addSuppressedException(
+              inFlightException,
+              t,
+              "Error closing operator {} for fragment instance {}",
+              root.getOperatorContext().getOperatorId(),
+              driverContext.getId());
+    } finally {
+      // reset the interrupted flag
+      if (wasInterrupted) {
+        Thread.currentThread().interrupt();
+      }
+    }
+    return inFlightException;
+  }
+
+  private static Throwable addSuppressedException(
+      Throwable inFlightException, Throwable newException, String message, Object... args) {
+    if (newException instanceof Error) {
+      if (inFlightException == null) {
+        inFlightException = newException;
+      } else {
+        // Self-suppression not permitted
+        if (inFlightException != newException) {
+          inFlightException.addSuppressed(newException);
+        }
+      }
+    } else {
+      // log normal exceptions instead of rethrowing them
+      LOGGER.error(message, args, newException);
+    }
+    return inFlightException;
+  }
+
+  private static class DriverLock {
+    private final ReentrantLock lock = new ReentrantLock();
+
+    @GuardedBy("this")
+    private Thread currentOwner;
+
+    @GuardedBy("this")
+    private boolean currentOwnerInterruptionAllowed;
+
+    @GuardedBy("this")
+    private List<StackTraceElement> interrupterStack;
+
+    public boolean isHeldByCurrentThread() {
+      return lock.isHeldByCurrentThread();
+    }
+
+    public boolean tryLock(boolean currentThreadInterruptionAllowed) {
+      checkState(!lock.isHeldByCurrentThread(), "Lock is not reentrant");
+      boolean acquired = lock.tryLock();
+      if (acquired) {
+        setOwner(currentThreadInterruptionAllowed);
+      }
+      return acquired;
+    }
+
+    public boolean tryLock(long timeout, TimeUnit unit, boolean currentThreadInterruptionAllowed)
+        throws InterruptedException {
+      checkState(!lock.isHeldByCurrentThread(), "Lock is not reentrant");
+      boolean acquired = lock.tryLock(timeout, unit);
+      if (acquired) {
+        setOwner(currentThreadInterruptionAllowed);
+      }
+      return acquired;
+    }
+
+    private synchronized void setOwner(boolean interruptionAllowed) {
+      checkState(lock.isHeldByCurrentThread(), "Current thread does not hold lock");
+      currentOwner = Thread.currentThread();
+      currentOwnerInterruptionAllowed = interruptionAllowed;
+      // NOTE: We do not use interrupted stack information to know that another
+      // thread has attempted to interrupt the driver, and interrupt this new lock
+      // owner.  The interrupted stack information is for debugging purposes only.
+      // In the case of interruption, the caller should (and does) have a separate
+      // state to prevent further processing in the Driver.
+    }
+
+    public synchronized void unlock() {
+      checkState(lock.isHeldByCurrentThread(), "Current thread does not hold lock");
+      currentOwner = null;
+      currentOwnerInterruptionAllowed = false;
+      lock.unlock();
+    }
+
+    public synchronized List<StackTraceElement> getInterrupterStack() {
+      return interrupterStack;
+    }
+
+    public synchronized void interruptCurrentOwner() {
+      if (!currentOwnerInterruptionAllowed) {
+        return;
+      }
+      // there is a benign race condition here were the lock holder
+      // can be change between attempting to get lock and grabbing
+      // the synchronized lock here, but in either case we want to
+      // interrupt the lock holder thread
+      if (interrupterStack == null) {
+        interrupterStack = ImmutableList.copyOf(Thread.currentThread().getStackTrace());
+      }
+
+      if (currentOwner != null) {
+        currentOwner.interrupt();
+      }
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
index 8c20a2c334..8985508b0c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
@@ -20,10 +20,14 @@ package org.apache.iotdb.db.mpp.execution;
 
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 public class DriverContext {
 
   private final FragmentInstanceContext fragmentInstanceContext;
 
+  private final AtomicBoolean finished = new AtomicBoolean();
+
   public DriverContext(FragmentInstanceContext fragmentInstanceContext) {
     this.fragmentInstanceContext = fragmentInstanceContext;
   }
@@ -38,13 +42,14 @@ public class DriverContext {
 
   public void failed(Throwable cause) {
     fragmentInstanceContext.failed(cause);
+    finished.set(true);
   }
 
-  public void finish() {
-    fragmentInstanceContext.finish();
+  public void finished() {
+    finished.compareAndSet(false, true);
   }
 
-  public void flushing() {
-    fragmentInstanceContext.flushing();
+  public boolean isDone() {
+    return finished.get();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
index b2e51be2bc..c5a3232c18 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -41,14 +42,19 @@ public class FragmentInstanceContext extends QueryContext {
   // TODO if we split one fragment instance into multiple pipelines to run, we need to replace it
   // with CopyOnWriteArrayList or some other thread safe data structure
   private final List<OperatorContext> operatorContexts = new ArrayList<>();
-  private final long createNanos = System.nanoTime();
 
   private DriverContext driverContext;
 
-  // TODO we may use StateMachine<FragmentInstanceState> to replace it
-  private final AtomicReference<FragmentInstanceState> state;
+  private final FragmentInstanceStateMachine stateMachine;
+
+  private final long createNanos = System.nanoTime();
+
+  private final AtomicLong startNanos = new AtomicLong();
+  private final AtomicLong endNanos = new AtomicLong();
 
-  private long endTime = -1;
+  private final AtomicReference<Long> executionStartTime = new AtomicReference<>();
+  private final AtomicReference<Long> lastExecutionStartTime = new AtomicReference<>();
+  private final AtomicReference<Long> executionEndTime = new AtomicReference<>();
 
   //    private final GcMonitor gcMonitor;
   //    private final AtomicLong startNanos = new AtomicLong();
@@ -58,10 +64,51 @@ public class FragmentInstanceContext extends QueryContext {
   //    private final AtomicLong endFullGcCount = new AtomicLong(-1);
   //    private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1);
 
-  public FragmentInstanceContext(
-      FragmentInstanceId id, AtomicReference<FragmentInstanceState> state) {
+  public static FragmentInstanceContext createFragmentInstanceContext(
+      FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) {
+    FragmentInstanceContext instanceContext = new FragmentInstanceContext(id, stateMachine);
+    instanceContext.initialize();
+    return instanceContext;
+  }
+
+  private FragmentInstanceContext(
+      FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) {
     this.id = id;
-    this.state = state;
+    this.stateMachine = stateMachine;
+  }
+
+  public void start() {
+    long now = System.currentTimeMillis();
+    executionStartTime.compareAndSet(null, now);
+    startNanos.compareAndSet(0, System.nanoTime());
+
+    // always update last execution start time
+    lastExecutionStartTime.set(now);
+  }
+
+  // the state change listener is added here in a separate initialize() method
+  // instead of the constructor to prevent leaking the "this" reference to
+  // another thread, which will cause unsafe publication of this instance.
+  private void initialize() {
+    stateMachine.addStateChangeListener(this::updateStatsIfDone);
+  }
+
+  private void updateStatsIfDone(FragmentInstanceState newState) {
+    if (newState.isDone()) {
+      long now = System.currentTimeMillis();
+
+      // before setting the end times, make sure a start has been recorded
+      executionStartTime.compareAndSet(null, now);
+      startNanos.compareAndSet(0, System.nanoTime());
+
+      // Only update last start time, if the nothing was started
+      lastExecutionStartTime.compareAndSet(null, now);
+
+      // use compare and set from initial value to avoid overwriting if there
+      // were a duplicate notification, which shouldn't happen
+      executionEndTime.compareAndSet(null, now);
+      endNanos.compareAndSet(0, System.nanoTime());
+    }
   }
 
   public OperatorContext addOperatorContext(
@@ -98,40 +145,18 @@ public class FragmentInstanceContext extends QueryContext {
   }
 
   public void failed(Throwable cause) {
-    LOGGER.warn("Fragment Instance {} failed.", id, cause);
-    state.set(FragmentInstanceState.FAILED);
-  }
-
-  public void cancel() {
-    state.set(FragmentInstanceState.CANCELED);
-    this.endTime = System.currentTimeMillis();
+    stateMachine.failed(cause);
   }
 
-  public void abort() {
-    state.set(FragmentInstanceState.ABORTED);
-    this.endTime = System.currentTimeMillis();
+  public void finished() {
+    stateMachine.finished();
   }
 
-  public void finish() {
-    if (state.get().isDone()) {
-      return;
-    }
-    state.set(FragmentInstanceState.FINISHED);
-    this.endTime = System.currentTimeMillis();
-  }
-
-  public void flushing() {
-    if (state.get().isDone()) {
-      return;
-    }
-    state.set(FragmentInstanceState.FLUSHING);
+  public void transitionToFlushing() {
+    stateMachine.transitionToFlushing();
   }
 
   public long getEndTime() {
-    return endTime;
-  }
-
-  public void setEndTime(long endTime) {
-    this.endTime = endTime;
+    return executionEndTime.get();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
index 21e0cc3d50..15c85c4d99 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
@@ -18,14 +18,15 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
+import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.schedule.IFragmentInstanceScheduler;
 
 import com.google.common.collect.ImmutableList;
-
-import java.util.concurrent.atomic.AtomicReference;
+import io.airlift.stats.CounterStat;
 
 import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FAILED;
 
 public class FragmentInstanceExecution {
 
@@ -34,25 +35,39 @@ public class FragmentInstanceExecution {
   private final FragmentInstanceId instanceId;
   private final FragmentInstanceContext context;
 
-  private final Driver driver;
+  private final IDriver driver;
+
+  private final ISinkHandle sinkHandle;
 
-  // TODO we may use StateMachine<FragmentInstanceState> to replace it
-  private final AtomicReference<FragmentInstanceState> state;
+  private final FragmentInstanceStateMachine stateMachine;
 
   private long lastHeartbeat;
 
-  public FragmentInstanceExecution(
+  public static FragmentInstanceExecution createFragmentInstanceExecution(
       IFragmentInstanceScheduler scheduler,
       FragmentInstanceId instanceId,
       FragmentInstanceContext context,
-      Driver driver,
-      AtomicReference<FragmentInstanceState> state) {
+      IDriver driver,
+      FragmentInstanceStateMachine stateMachine,
+      CounterStat failedInstances) {
+    FragmentInstanceExecution execution =
+        new FragmentInstanceExecution(scheduler, instanceId, context, driver, stateMachine);
+    execution.initialize(failedInstances);
+    return execution;
+  }
+
+  private FragmentInstanceExecution(
+      IFragmentInstanceScheduler scheduler,
+      FragmentInstanceId instanceId,
+      FragmentInstanceContext context,
+      IDriver driver,
+      FragmentInstanceStateMachine stateMachine) {
     this.scheduler = scheduler;
     this.instanceId = instanceId;
     this.context = context;
     this.driver = driver;
-    this.state = state;
-    state.set(FragmentInstanceState.RUNNING);
+    this.sinkHandle = driver.getSinkHandle();
+    this.stateMachine = stateMachine;
     scheduler.submitFragmentInstances(instanceId.getQueryId(), ImmutableList.of(driver));
   }
 
@@ -65,24 +80,43 @@ public class FragmentInstanceExecution {
   }
 
   public FragmentInstanceState getInstanceState() {
-    return state.get();
+    return stateMachine.getState();
   }
 
   public FragmentInstanceInfo getInstanceInfo() {
-    return new FragmentInstanceInfo(state.get(), context.getEndTime());
+    return new FragmentInstanceInfo(stateMachine.getState(), context.getEndTime());
   }
 
   public void failed(Throwable cause) {
     requireNonNull(cause, "cause is null");
-    context.failed(cause);
+    stateMachine.failed(cause);
   }
 
   public void cancel() {
-    context.cancel();
+    stateMachine.cancel();
   }
 
   public void abort() {
-    scheduler.abortFragmentInstance(instanceId);
-    context.abort();
+    stateMachine.abort();
+  }
+
+  // this is a separate method to ensure that the `this` reference is not leaked during construction
+  private void initialize(CounterStat failedInstances) {
+    requireNonNull(failedInstances, "failedInstances is null");
+    stateMachine.addStateChangeListener(
+        newState -> {
+          if (!newState.isDone()) {
+            return;
+          }
+
+          // Update failed tasks counter
+          if (newState == FAILED) {
+            failedInstances.update(1);
+          }
+
+          driver.close();
+          sinkHandle.abort();
+          scheduler.abortFragmentInstance(instanceId);
+        });
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceFailureListener.java
similarity index 58%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceFailureListener.java
index 8c20a2c334..3f99848c29 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceFailureListener.java
@@ -20,31 +20,6 @@ package org.apache.iotdb.db.mpp.execution;
 
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 
-public class DriverContext {
-
-  private final FragmentInstanceContext fragmentInstanceContext;
-
-  public DriverContext(FragmentInstanceContext fragmentInstanceContext) {
-    this.fragmentInstanceContext = fragmentInstanceContext;
-  }
-
-  public FragmentInstanceId getId() {
-    return fragmentInstanceContext.getId();
-  }
-
-  public FragmentInstanceContext getFragmentInstanceContext() {
-    return fragmentInstanceContext;
-  }
-
-  public void failed(Throwable cause) {
-    fragmentInstanceContext.failed(cause);
-  }
-
-  public void finish() {
-    fragmentInstanceContext.finish();
-  }
-
-  public void flushing() {
-    fragmentInstanceContext.flushing();
-  }
+public interface FragmentInstanceFailureListener {
+  void onTaskFailed(FragmentInstanceId taskId, Throwable failure);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
index 537f330f04..438e1bea0e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
@@ -27,17 +27,20 @@ import org.apache.iotdb.db.mpp.schedule.IFragmentInstanceScheduler;
 import org.apache.iotdb.db.mpp.sql.planner.LocalExecutionPlanner;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 
+import io.airlift.stats.CounterStat;
 import io.airlift.units.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceExecution.createFragmentInstanceExecution;
 
 public class FragmentInstanceManager {
 
@@ -49,9 +52,13 @@ public class FragmentInstanceManager {
   private final IFragmentInstanceScheduler scheduler = FragmentInstanceScheduler.getInstance();
 
   private final ScheduledExecutorService instanceManagementExecutor;
+  private final ExecutorService instanceNotificationExecutor;
 
   private final Duration infoCacheTime;
 
+  // record failed instances count
+  private final CounterStat failedInstances = new CounterStat();
+
   public static FragmentInstanceManager getInstance() {
     return FragmentInstanceManager.InstanceHolder.INSTANCE;
   }
@@ -61,13 +68,15 @@ public class FragmentInstanceManager {
     this.instanceExecution = new ConcurrentHashMap<>();
     this.instanceManagementExecutor =
         IoTDBThreadPoolFactory.newScheduledThreadPool(1, "instance-management");
+    this.instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(4, "instance-notification");
 
     this.infoCacheTime = new Duration(15, TimeUnit.MINUTES);
 
     instanceManagementExecutor.scheduleWithFixedDelay(
         () -> {
           try {
-            removeOldTasks();
+            removeOldInstances();
           } catch (Throwable e) {
             logger.warn("Error removing old tasks", e);
           }
@@ -85,13 +94,14 @@ public class FragmentInstanceManager {
         instanceExecution.computeIfAbsent(
             instanceId,
             id -> {
-              AtomicReference<FragmentInstanceState> state = new AtomicReference<>();
-              state.set(FragmentInstanceState.PLANNED);
+              FragmentInstanceStateMachine stateMachine =
+                  new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
 
               FragmentInstanceContext context =
                   instanceContext.computeIfAbsent(
                       instanceId,
-                      fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, state));
+                      fragmentInstanceId ->
+                          createFragmentInstanceContext(fragmentInstanceId, stateMachine));
 
               try {
                 DataDriver driver =
@@ -100,9 +110,10 @@ public class FragmentInstanceManager {
                         context,
                         instance.getTimeFilter(),
                         dataRegion);
-                return new FragmentInstanceExecution(scheduler, instanceId, context, driver, state);
+                return createFragmentInstanceExecution(
+                    scheduler, instanceId, context, driver, stateMachine, failedInstances);
               } catch (Throwable t) {
-                context.failed(t);
+                stateMachine.failed(t);
                 return null;
               }
             });
@@ -118,26 +129,29 @@ public class FragmentInstanceManager {
         instanceExecution.computeIfAbsent(
             instanceId,
             id -> {
-              AtomicReference<FragmentInstanceState> state = new AtomicReference<>();
-              state.set(FragmentInstanceState.PLANNED);
+              FragmentInstanceStateMachine stateMachine =
+                  new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
 
               FragmentInstanceContext context =
                   instanceContext.computeIfAbsent(
                       instanceId,
-                      fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, state));
+                      fragmentInstanceId ->
+                          createFragmentInstanceContext(fragmentInstanceId, stateMachine));
 
               try {
                 SchemaDriver driver =
                     planner.plan(instance.getFragment().getRoot(), context, schemaRegion);
-                return new FragmentInstanceExecution(scheduler, instanceId, context, driver, state);
+                return createFragmentInstanceExecution(
+                    scheduler, instanceId, context, driver, stateMachine, failedInstances);
               } catch (Throwable t) {
-                context.failed(t);
+                stateMachine.failed(t);
                 return null;
               }
             });
     return execution != null ? execution.getInstanceInfo() : createFailedInstanceInfo(instanceId);
   }
 
+  /** Aborts a FragmentInstance. */
   public FragmentInstanceInfo abortFragmentInstance(FragmentInstanceId fragmentInstanceId) {
     FragmentInstanceExecution execution = instanceExecution.remove(fragmentInstanceId);
     if (execution != null) {
@@ -148,6 +162,19 @@ public class FragmentInstanceManager {
     return null;
   }
 
+  /** Cancels a FragmentInstance. */
+  public FragmentInstanceInfo cancelTask(FragmentInstanceId instanceId) {
+    requireNonNull(instanceId, "taskId is null");
+
+    FragmentInstanceExecution execution = instanceExecution.remove(instanceId);
+    if (execution != null) {
+      instanceContext.remove(instanceId);
+      execution.cancel();
+      return execution.getInstanceInfo();
+    }
+    return null;
+  }
+
   /**
    * Gets the info for the specified fragment instance.
    *
@@ -163,12 +190,16 @@ public class FragmentInstanceManager {
     return execution.getInstanceInfo();
   }
 
+  public CounterStat getFailedInstances() {
+    return failedInstances;
+  }
+
   private FragmentInstanceInfo createFailedInstanceInfo(FragmentInstanceId instanceId) {
     return new FragmentInstanceInfo(
         FragmentInstanceState.FAILED, instanceContext.get(instanceId).getEndTime());
   }
 
-  private void removeOldTasks() {
+  private void removeOldInstances() {
     long oldestAllowedInstance = System.currentTimeMillis() - infoCacheTime.toMillis();
     instanceContext
         .entrySet()
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java
index d55af83e98..d1a182bcbd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java
@@ -40,7 +40,7 @@ public enum FragmentInstanceState {
   /** Instance has finished executing and all output has been consumed. */
   FINISHED(true, false),
   /** Instance was canceled by a user. */
-  CANCELED(true, true),
+  CANCELLED(true, true),
   /** Instance was aborted due to a failure in the query. The failure was not in this instance. */
   ABORTED(true, true),
   /** Instance execution failed. */
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceStateMachine.java
new file mode 100644
index 0000000000..febc9e9e09
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceStateMachine.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution;
+
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.execution.StateMachine.StateChangeListener;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.ABORTED;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.CANCELLED;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FAILED;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FINISHED;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FLUSHING;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.RUNNING;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.TERMINAL_INSTANCE_STATES;
+
+@ThreadSafe
+public class FragmentInstanceStateMachine {
+  private static final Logger LOGGER = LoggerFactory.getLogger(FragmentInstanceStateMachine.class);
+
+  private final long createdTime = System.currentTimeMillis();
+
+  private final FragmentInstanceId instanceId;
+  private final Executor executor;
+  private final StateMachine<FragmentInstanceState> instanceState;
+  private final LinkedBlockingQueue<Throwable> failureCauses = new LinkedBlockingQueue<>();
+
+  @GuardedBy("this")
+  private final Map<FragmentInstanceId, Throwable> sourceInstanceFailures = new HashMap<>();
+
+  @GuardedBy("this")
+  private final List<FragmentInstanceFailureListener> sourceInstanceFailureListeners =
+      new ArrayList<>();
+
+  public FragmentInstanceStateMachine(FragmentInstanceId fragmentInstanceId, Executor executor) {
+    this.instanceId = requireNonNull(fragmentInstanceId, "fragmentInstanceId is null");
+    this.executor = requireNonNull(executor, "executor is null");
+    instanceState =
+        new StateMachine<>(
+            "FragmentInstance " + fragmentInstanceId, executor, RUNNING, TERMINAL_INSTANCE_STATES);
+    instanceState.addStateChangeListener(
+        newState -> LOGGER.debug("Fragment Instance {} is {}", fragmentInstanceId, newState));
+  }
+
+  public long getCreatedTime() {
+    return createdTime;
+  }
+
+  public FragmentInstanceId getFragmentInstanceId() {
+    return instanceId;
+  }
+
+  public FragmentInstanceState getState() {
+    return instanceState.get();
+  }
+
+  public ListenableFuture<FragmentInstanceState> getStateChange(
+      FragmentInstanceState currentState) {
+    requireNonNull(currentState, "currentState is null");
+    checkArgument(!currentState.isDone(), "Current state is already done");
+
+    ListenableFuture<FragmentInstanceState> future = instanceState.getStateChange(currentState);
+    FragmentInstanceState state = instanceState.get();
+    if (state.isDone()) {
+      return immediateFuture(state);
+    }
+    return future;
+  }
+
+  public LinkedBlockingQueue<Throwable> getFailureCauses() {
+    return failureCauses;
+  }
+
+  public void transitionToFlushing() {
+    instanceState.setIf(FLUSHING, currentState -> currentState == RUNNING);
+  }
+
+  public void finished() {
+    transitionToDoneState(FINISHED);
+  }
+
+  public void cancel() {
+    transitionToDoneState(CANCELLED);
+  }
+
+  public void abort() {
+    transitionToDoneState(ABORTED);
+  }
+
+  public void failed(Throwable cause) {
+    failureCauses.add(cause);
+    transitionToDoneState(FAILED);
+  }
+
+  private void transitionToDoneState(FragmentInstanceState doneState) {
+    requireNonNull(doneState, "doneState is null");
+    checkArgument(doneState.isDone(), "doneState %s is not a done state", doneState);
+
+    instanceState.setIf(doneState, currentState -> !currentState.isDone());
+  }
+
+  /**
+   * Listener is always notified asynchronously using a dedicated notification thread pool so, care
+   * should be taken to avoid leaking {@code this} when adding a listener in a constructor.
+   * Additionally, it is possible notifications are observed out of order due to the asynchronous
+   * execution.
+   */
+  public void addStateChangeListener(
+      StateChangeListener<FragmentInstanceState> stateChangeListener) {
+    instanceState.addStateChangeListener(stateChangeListener);
+  }
+
+  public void addSourceTaskFailureListener(FragmentInstanceFailureListener listener) {
+    Map<FragmentInstanceId, Throwable> failures;
+    synchronized (this) {
+      sourceInstanceFailureListeners.add(listener);
+      failures = ImmutableMap.copyOf(sourceInstanceFailures);
+    }
+    executor.execute(
+        () -> {
+          failures.forEach(listener::onTaskFailed);
+        });
+  }
+
+  public void sourceTaskFailed(FragmentInstanceId instanceId, Throwable failure) {
+    List<FragmentInstanceFailureListener> listeners;
+    synchronized (this) {
+      sourceInstanceFailures.putIfAbsent(instanceId, failure);
+      listeners = ImmutableList.copyOf(sourceInstanceFailureListeners);
+    }
+    executor.execute(
+        () -> {
+          for (FragmentInstanceFailureListener listener : listeners) {
+            listener.onTaskFailed(instanceId, failure);
+          }
+        });
+  }
+
+  @Override
+  public String toString() {
+    return toStringHelper(this)
+        .add("FragmentInstanceId", instanceId)
+        .add("FragmentInstanceState", instanceState)
+        .add("failureCauses", failureCauses)
+        .toString();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/IDriver.java
similarity index 58%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/IDriver.java
index 8c20a2c334..54c31901a2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/IDriver.java
@@ -18,33 +18,23 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
+import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 
-public class DriverContext {
+import com.google.common.util.concurrent.ListenableFuture;
+import io.airlift.units.Duration;
 
-  private final FragmentInstanceContext fragmentInstanceContext;
+public interface IDriver {
 
-  public DriverContext(FragmentInstanceContext fragmentInstanceContext) {
-    this.fragmentInstanceContext = fragmentInstanceContext;
-  }
+  boolean isFinished();
 
-  public FragmentInstanceId getId() {
-    return fragmentInstanceContext.getId();
-  }
+  ListenableFuture<Void> processFor(Duration duration);
 
-  public FragmentInstanceContext getFragmentInstanceContext() {
-    return fragmentInstanceContext;
-  }
+  FragmentInstanceId getInfo();
 
-  public void failed(Throwable cause) {
-    fragmentInstanceContext.failed(cause);
-  }
+  void close();
 
-  public void finish() {
-    fragmentInstanceContext.finish();
-  }
+  void failed(Throwable t);
 
-  public void flushing() {
-    fragmentInstanceContext.flushing();
-  }
+  ISinkHandle getSinkHandle();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
index 844fe417ba..258065cd80 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
@@ -19,155 +19,27 @@
 package org.apache.iotdb.db.mpp.execution;
 
 import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
-import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.operator.Operator;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
-import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
-import io.airlift.units.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.NotThreadSafe;
 
-import java.io.IOException;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
-import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
-
 /** One SchemaDriver is used to execute one FragmentInstance which is for metadata query. */
 @NotThreadSafe
-public class SchemaDriver implements Driver {
-
-  private static final Logger logger = LoggerFactory.getLogger(SchemaDriver.class);
-
-  private final Operator root;
-  private final ISinkHandle sinkHandle;
-  private final SchemaDriverContext driverContext;
-
-  private final AtomicReference<SettableFuture<Void>> driverBlockedFuture = new AtomicReference<>();
-
-  private boolean closed = false;
+public class SchemaDriver extends Driver {
 
   public SchemaDriver(Operator root, ISinkHandle sinkHandle, SchemaDriverContext driverContext) {
-    this.root = root;
-    this.sinkHandle = sinkHandle;
-    this.driverContext = driverContext;
-    // initially the driverBlockedFuture is not blocked (it is completed)
-    SettableFuture<Void> future = SettableFuture.create();
-    future.set(null);
-    driverBlockedFuture.set(future);
-  }
-
-  @Override
-  public boolean isFinished() {
-    try {
-      boolean isFinished = driverBlockedFuture.get().isDone() && root != null && root.isFinished();
-      if (isFinished) {
-        close();
-      }
-      return isFinished;
-    } catch (Throwable t) {
-      logger.error(
-          "Failed to query whether the schema driver {} is finished", driverContext.getId(), t);
-      driverContext.failed(t);
-      return true;
-    }
-  }
-
-  @Override
-  public ListenableFuture<Void> processFor(Duration duration) {
-    // if the driver is blocked we don't need to continue
-    SettableFuture<Void> blockedFuture = driverBlockedFuture.get();
-    if (!blockedFuture.isDone()) {
-      return blockedFuture;
-    }
-
-    long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
-
-    long start = System.nanoTime();
-    try {
-      do {
-        ListenableFuture<Void> future = processInternal();
-        if (!future.isDone()) {
-          return updateDriverBlockedFuture(future);
-        }
-      } while (System.nanoTime() - start < maxRuntime && !root.isFinished());
-    } catch (Throwable t) {
-      logger.error("Failed to execute fragment instance {}", driverContext.getId(), t);
-      driverContext.failed(t);
-      close();
-      blockedFuture.setException(t);
-      return blockedFuture;
-    }
-    return NOT_BLOCKED;
-  }
-
-  private ListenableFuture<Void> processInternal() throws IOException {
-    ListenableFuture<Void> blocked = root.isBlocked();
-    if (!blocked.isDone()) {
-      return blocked;
-    }
-    blocked = sinkHandle.isFull();
-    if (!blocked.isDone()) {
-      return blocked;
-    }
-    if (root.hasNext()) {
-      TsBlock tsBlock = root.next();
-      if (tsBlock != null && !tsBlock.isEmpty()) {
-        sinkHandle.send(Collections.singletonList(tsBlock));
-      }
-    }
-    return NOT_BLOCKED;
-  }
-
-  private ListenableFuture<Void> updateDriverBlockedFuture(
-      ListenableFuture<Void> sourceBlockedFuture) {
-    // driverBlockedFuture will be completed as soon as the sourceBlockedFuture is completed
-    // or any of the operators gets a memory revocation request
-    SettableFuture<Void> newDriverBlockedFuture = SettableFuture.create();
-    driverBlockedFuture.set(newDriverBlockedFuture);
-    sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), directExecutor());
-
-    // TODO Although we don't have memory management for operator now, we should consider it for
-    // future
-    // it's possible that memory revoking is requested for some operator
-    // before we update driverBlockedFuture above and we don't want to miss that
-    // notification, so we check to see whether that's the case before returning.
-
-    return newDriverBlockedFuture;
-  }
-
-  @Override
-  public FragmentInstanceId getInfo() {
-    return driverContext.getId();
+    super(root, sinkHandle, driverContext);
   }
 
   @Override
-  public void close() {
-    if (closed) {
-      return;
-    }
-    closed = true;
-    try {
-      if (root != null) {
-        root.close();
-      }
-      if (sinkHandle != null) {
-        sinkHandle.close();
-      }
-    } catch (Throwable t) {
-      logger.error("Failed to closed driver {}", driverContext.getId(), t);
-      driverContext.failed(t);
-    }
+  protected boolean init(SettableFuture<Void> blockedFuture) {
+    return true;
   }
 
   @Override
-  public void failed(Throwable t) {
-    driverContext.failed(t);
+  protected void releaseResource() {
+    // do nothing
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
index c8cccc0520..398b2f03c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
@@ -22,8 +22,6 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
-import java.io.IOException;
-
 import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
 
 public interface Operator extends AutoCloseable {
@@ -53,5 +51,5 @@ public interface Operator extends AutoCloseable {
   /**
    * Is this operator completely finished processing and no more output TsBlock will be produced.
    */
-  boolean isFinished() throws IOException;
+  boolean isFinished();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
index a226637146..24711217c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
@@ -24,8 +24,6 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
-import java.io.IOException;
-
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 
@@ -76,7 +74,7 @@ public class LimitOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean isFinished() throws IOException {
+  public boolean isFinished() {
     return remainingLimit == 0 || child.isFinished();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java
index 48d0c71185..cd3355d395 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java
@@ -217,7 +217,7 @@ public class TransformOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean isFinished() throws IOException {
+  public boolean isFinished() {
     return !hasNext();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaFetchOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaFetchOperator.java
index 003da76f2d..34fb44373e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaFetchOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaFetchOperator.java
@@ -35,7 +35,6 @@ import org.apache.iotdb.tsfile.utils.Binary;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -92,7 +91,7 @@ public class SchemaFetchOperator implements SourceOperator {
   }
 
   @Override
-  public boolean isFinished() throws IOException {
+  public boolean isFinished() {
     return isFinished;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java
index 722f63543d..397b7dc461 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java
@@ -65,7 +65,7 @@ public class ExchangeOperator implements SourceOperator {
   }
 
   @Override
-  public boolean isFinished() throws IOException {
+  public boolean isFinished() {
     return sourceHandle.isFinished();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceAbortedException.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceAbortedException.java
index 20017340f6..7a25e4209a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceAbortedException.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceAbortedException.java
@@ -19,9 +19,9 @@
 package org.apache.iotdb.db.mpp.schedule;
 
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.execution.Driver;
+import org.apache.iotdb.db.mpp.execution.IDriver;
 
-/** A common exception to pass to {@link Driver#failed(Throwable)} */
+/** A common exception to pass to {@link IDriver#failed(Throwable)} */
 public class FragmentInstanceAbortedException extends Exception {
 
   public static final String BY_TIMEOUT = "timeout";
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
index 4422e81ba8..880041924a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.db.mpp.buffer.DataBlockService;
 import org.apache.iotdb.db.mpp.buffer.IDataBlockManager;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.execution.Driver;
+import org.apache.iotdb.db.mpp.execution.IDriver;
 import org.apache.iotdb.db.mpp.schedule.queue.IndexedBlockingQueue;
 import org.apache.iotdb.db.mpp.schedule.queue.L1PriorityQueue;
 import org.apache.iotdb.db.mpp.schedule.queue.L2PriorityQueue;
@@ -121,7 +121,7 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
   }
 
   @Override
-  public void submitFragmentInstances(QueryId queryId, List<Driver> instances) {
+  public void submitFragmentInstances(QueryId queryId, List<IDriver> instances) {
     List<FragmentInstanceTask> tasks =
         instances.stream()
             .map(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
index fd19b67ee0..25b4e29a0a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.schedule;
 
-import org.apache.iotdb.db.mpp.execution.Driver;
+import org.apache.iotdb.db.mpp.execution.IDriver;
 import org.apache.iotdb.db.mpp.schedule.queue.IndexedBlockingQueue;
 import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
 import org.apache.iotdb.db.utils.stats.CpuTimer;
@@ -52,7 +52,7 @@ public class FragmentInstanceTaskExecutor extends AbstractExecutor {
     if (!scheduler.readyToRunning(task)) {
       return;
     }
-    Driver instance = task.getFragmentInstance();
+    IDriver instance = task.getFragmentInstance();
     CpuTimer timer = new CpuTimer();
     ListenableFuture<Void> future = instance.processFor(EXECUTION_TIME_SLICE);
     CpuTimer.CpuDuration duration = timer.elapsedTime();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceScheduler.java
index 2c597c15e7..b8d5970ee9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceScheduler.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.mpp.schedule;
 
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.execution.Driver;
+import org.apache.iotdb.db.mpp.execution.IDriver;
 
 import java.util.List;
 
@@ -28,13 +28,13 @@ import java.util.List;
 public interface IFragmentInstanceScheduler {
 
   /**
-   * Submit one or more {@link org.apache.iotdb.db.mpp.execution.Driver} in one query for later
+   * Submit one or more {@link org.apache.iotdb.db.mpp.execution.IDriver} in one query for later
    * scheduling.
    *
    * @param queryId the queryId these instances belong to.
    * @param instances the submitted instances.
    */
-  void submitFragmentInstances(QueryId queryId, List<Driver> instances);
+  void submitFragmentInstances(QueryId queryId, List<IDriver> instances);
 
   /**
    * Abort all the instances in this query.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
index abebdaf30d..fc322fee28 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
@@ -18,10 +18,11 @@
  */
 package org.apache.iotdb.db.mpp.schedule.task;
 
+import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.execution.Driver;
+import org.apache.iotdb.db.mpp.execution.IDriver;
 import org.apache.iotdb.db.mpp.schedule.ExecutionContext;
 import org.apache.iotdb.db.mpp.schedule.FragmentInstanceTaskExecutor;
 import org.apache.iotdb.db.mpp.schedule.queue.ID;
@@ -43,7 +44,7 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
 
   private FragmentInstanceTaskID id;
   private FragmentInstanceTaskStatus status;
-  private final Driver fragmentInstance;
+  private final IDriver fragmentInstance;
 
   // the higher this field is, the higher probability it will be scheduled.
   private volatile double schedulePriority;
@@ -60,7 +61,7 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
     this(new StubFragmentInstance(), 0L, null);
   }
 
-  public FragmentInstanceTask(Driver instance, long timeoutMs, FragmentInstanceTaskStatus status) {
+  public FragmentInstanceTask(IDriver instance, long timeoutMs, FragmentInstanceTaskStatus status) {
     this.fragmentInstance = instance;
     this.id = new FragmentInstanceTaskID(instance.getInfo());
     this.setStatus(status);
@@ -87,7 +88,7 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
         || status == FragmentInstanceTaskStatus.FINISHED;
   }
 
-  public Driver getFragmentInstance() {
+  public IDriver getFragmentInstance() {
     return fragmentInstance;
   }
 
@@ -185,7 +186,7 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
     }
   }
 
-  private static class StubFragmentInstance implements Driver {
+  private static class StubFragmentInstance implements IDriver {
 
     private static final QueryId stubQueryId = new QueryId("stub_query");
     private static final FragmentInstanceId stubInstance =
@@ -211,5 +212,10 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
 
     @Override
     public void failed(Throwable t) {}
+
+    @Override
+    public ISinkHandle getSinkHandle() {
+      return null;
+    }
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
index e8a7767100..126faaeef5 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
@@ -36,7 +36,6 @@ import org.junit.Test;
 import org.mockito.Mockito;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -96,12 +95,7 @@ public class SinkHandleTest {
     Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
 
     // Send tsblocks.
-    try {
-      sinkHandle.send(mockTsBlocks);
-    } catch (IOException e) {
-      e.printStackTrace();
-      Assert.fail();
-    }
+    sinkHandle.send(mockTsBlocks);
     sinkHandle.setNoMoreTsBlocks();
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
@@ -150,12 +144,7 @@ public class SinkHandleTest {
     Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onFinish(sinkHandle);
 
     // Close the SinkHandle.
-    try {
-      sinkHandle.close();
-    } catch (IOException e) {
-      e.printStackTrace();
-      Assert.fail();
-    }
+    sinkHandle.close();
     Assert.assertTrue(sinkHandle.isClosed());
     Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onClosed(sinkHandle);
     try {
@@ -229,12 +218,7 @@ public class SinkHandleTest {
     Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
 
     // Send tsblocks.
-    try {
-      sinkHandle.send(mockTsBlocks);
-    } catch (IOException e) {
-      e.printStackTrace();
-      Assert.fail();
-    }
+    sinkHandle.send(mockTsBlocks);
     Assert.assertFalse(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isClosed());
@@ -281,12 +265,7 @@ public class SinkHandleTest {
         .free(queryId, numOfMockTsBlock * mockTsBlockSize);
 
     // Send tsblocks.
-    try {
-      sinkHandle.send(mockTsBlocks);
-    } catch (IOException e) {
-      e.printStackTrace();
-      Assert.fail();
-    }
+    sinkHandle.send(mockTsBlocks);
     Assert.assertFalse(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isClosed());
@@ -314,12 +293,7 @@ public class SinkHandleTest {
     // Close the SinkHandle.
     sinkHandle.setNoMoreTsBlocks();
     Assert.assertFalse(sinkHandle.isFinished());
-    try {
-      sinkHandle.close();
-    } catch (IOException e) {
-      e.printStackTrace();
-      Assert.fail();
-    }
+    sinkHandle.close();
     Assert.assertTrue(sinkHandle.isClosed());
     Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onClosed(sinkHandle);
     try {
@@ -381,11 +355,12 @@ public class SinkHandleTest {
     List<TsBlock> mockTsBlocks = Utils.createMockTsBlocks(numOfMockTsBlock, mockTsBlockSize);
     // Construct a mock client.
     Client mockClient = Mockito.mock(Client.class);
+    TException exception = new TException("Mock exception");
     try {
-      Mockito.doThrow(new TException("Mock exception"))
+      Mockito.doThrow(exception)
           .when(mockClient)
           .onEndOfDataBlockEvent(Mockito.any(TEndOfDataBlockEvent.class));
-      Mockito.doThrow(new TException("Mock exception"))
+      Mockito.doThrow(exception)
           .when(mockClient)
           .onNewDataBlockEvent(Mockito.any(TNewDataBlockEvent.class));
     } catch (TException e) {
@@ -412,12 +387,7 @@ public class SinkHandleTest {
     Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
 
     // Send tsblocks.
-    try {
-      sinkHandle.send(mockTsBlocks);
-    } catch (IOException e) {
-      e.printStackTrace();
-      Assert.fail();
-    }
+    sinkHandle.send(mockTsBlocks);
     sinkHandle.setNoMoreTsBlocks();
     Assert.assertFalse(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
@@ -443,19 +413,14 @@ public class SinkHandleTest {
       Assert.fail();
     }
 
-    try {
-      sinkHandle.send(Collections.singletonList(Mockito.mock(TsBlock.class)));
-      Assert.fail("Expect an IOException.");
-    } catch (IOException e) {
-      Assert.assertEquals("org.apache.thrift.TException: Mock exception", e.getMessage());
-    }
+    Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onFailure(exception);
 
     // Close the SinkHandle.
     try {
       sinkHandle.close();
-      Assert.fail("Expect an IOException.");
-    } catch (IOException e) {
-      Assert.assertEquals("org.apache.thrift.TException: Mock exception", e.getMessage());
+      Assert.fail("Expect an RuntimeException.");
+    } catch (RuntimeException e) {
+      Assert.assertEquals("Send EndOfDataBlockEvent failed", e.getMessage());
     }
     Assert.assertFalse(sinkHandle.isClosed());
     Mockito.verify(mockSinkHandleListener, Mockito.times(0)).onClosed(sinkHandle);
@@ -522,12 +487,7 @@ public class SinkHandleTest {
     Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
 
     // Send tsblocks.
-    try {
-      sinkHandle.send(mockTsBlocks);
-    } catch (IOException e) {
-      e.printStackTrace();
-      Assert.fail();
-    }
+    sinkHandle.send(mockTsBlocks);
     Future<?> blocked = sinkHandle.isFull();
     Assert.assertFalse(blocked.isDone());
     Assert.assertFalse(blocked.isCancelled());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
index 2703d0aa37..88b5e57a54 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -55,8 +56,9 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.ExecutorService;
 
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceContext.createFragmentInstanceContext;
 import static org.apache.iotdb.db.mpp.schedule.FragmentInstanceTaskExecutor.EXECUTION_TIME_SLICE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -85,6 +87,8 @@ public class DataDriverTest {
 
   @Test
   public void batchTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
       MeasurementPath measurementPath1 =
           new MeasurementPath(DATA_DRIVER_TEST_SG + ".device0.sensor0", TSDataType.INT32);
@@ -92,11 +96,12 @@ public class DataDriverTest {
       allSensors.add("sensor0");
       allSensors.add("sensor1");
       QueryId queryId = new QueryId("stub_query");
-      AtomicReference<FragmentInstanceState> state =
-          new AtomicReference<>(FragmentInstanceState.RUNNING);
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
       FragmentInstanceContext fragmentInstanceContext =
-          new FragmentInstanceContext(
-              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+          createFragmentInstanceContext(instanceId, stateMachine);
       PlanNodeId planNodeId1 = new PlanNodeId("1");
       fragmentInstanceContext.addOperatorContext(
           1, planNodeId1, SeriesScanOperator.class.getSimpleName());
@@ -159,19 +164,20 @@ public class DataDriverTest {
               ImmutableList.of(seriesScanOperator1, seriesScanOperator2));
 
       StubSinkHandle sinkHandle = new StubSinkHandle(fragmentInstanceContext);
-
-      try (Driver dataDriver = new DataDriver(limitOperator, sinkHandle, driverContext)) {
+      IDriver dataDriver = null;
+      try {
+        dataDriver = new DataDriver(limitOperator, sinkHandle, driverContext);
         assertEquals(fragmentInstanceContext.getId(), dataDriver.getInfo());
 
         assertFalse(dataDriver.isFinished());
 
         while (!dataDriver.isFinished()) {
-          assertEquals(FragmentInstanceState.RUNNING, state.get());
+          assertEquals(FragmentInstanceState.RUNNING, stateMachine.getState());
           ListenableFuture<Void> blocked = dataDriver.processFor(EXECUTION_TIME_SLICE);
           assertTrue(blocked.isDone());
         }
 
-        assertEquals(FragmentInstanceState.FLUSHING, state.get());
+        assertEquals(FragmentInstanceState.FLUSHING, stateMachine.getState());
 
         List<TsBlock> result = sinkHandle.getTsBlocks();
         assertEquals(13, result.size());
@@ -204,10 +210,16 @@ public class DataDriverTest {
             }
           }
         }
+      } finally {
+        if (dataDriver != null) {
+          dataDriver.close();
+        }
       }
     } catch (IllegalPathException | QueryProcessException e) {
       e.printStackTrace();
       fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
     }
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
index 6ad74bd110..a2516c0ab0 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.operator;
 
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -27,7 +28,7 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
-import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.operator.process.LimitOperator;
 import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator;
 import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
@@ -50,8 +51,9 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.ExecutorService;
 
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceContext.createFragmentInstanceContext;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -78,6 +80,8 @@ public class LimitOperatorTest {
 
   @Test
   public void batchTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
       MeasurementPath measurementPath1 =
           new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
@@ -85,11 +89,12 @@ public class LimitOperatorTest {
       allSensors.add("sensor0");
       allSensors.add("sensor1");
       QueryId queryId = new QueryId("stub_query");
-      AtomicReference<FragmentInstanceState> state =
-          new AtomicReference<>(FragmentInstanceState.RUNNING);
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
       FragmentInstanceContext fragmentInstanceContext =
-          new FragmentInstanceContext(
-              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+          createFragmentInstanceContext(instanceId, stateMachine);
       PlanNodeId planNodeId1 = new PlanNodeId("1");
       fragmentInstanceContext.addOperatorContext(
           1, planNodeId1, SeriesScanOperator.class.getSimpleName());
@@ -169,6 +174,8 @@ public class LimitOperatorTest {
     } catch (IllegalPathException e) {
       e.printStackTrace();
       fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
     }
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
index d1c71f2eb4..d2045e0d16 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.operator;
 
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -28,6 +29,7 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.operator.source.SeriesAggregateScanOperator;
 import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -52,8 +54,10 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceContext.createFragmentInstanceContext;
 import static org.junit.Assert.assertEquals;
 
 public class SeriesAggregateScanOperatorTest {
@@ -64,16 +68,20 @@ public class SeriesAggregateScanOperatorTest {
 
   private final List<TsFileResource> seqResources = new ArrayList<>();
   private final List<TsFileResource> unSeqResources = new ArrayList<>();
+  private ExecutorService instanceNotificationExecutor;
 
   @Before
   public void setUp() throws MetadataException, IOException, WriteProcessException {
     SeriesReaderTestUtil.setUp(
         measurementSchemas, deviceIds, seqResources, unSeqResources, SERIES_SCAN_OPERATOR_TEST_SG);
+    this.instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
   }
 
   @After
   public void tearDown() throws IOException {
     SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+    instanceNotificationExecutor.shutdown();
   }
 
   @Test
@@ -349,9 +357,12 @@ public class SeriesAggregateScanOperatorTest {
     QueryId queryId = new QueryId("stub_query");
     AtomicReference<FragmentInstanceState> state =
         new AtomicReference<>(FragmentInstanceState.RUNNING);
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+    FragmentInstanceStateMachine stateMachine =
+        new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
     FragmentInstanceContext fragmentInstanceContext =
-        new FragmentInstanceContext(
-            new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+        createFragmentInstanceContext(instanceId, stateMachine);
     PlanNodeId planNodeId = new PlanNodeId("1");
     fragmentInstanceContext.addOperatorContext(
         1, planNodeId, SeriesScanOperator.class.getSimpleName());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
index 78e6821bcd..a9b4b452c5 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.operator;
 
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -27,7 +28,7 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
-import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
@@ -46,8 +47,9 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.ExecutorService;
 
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceContext.createFragmentInstanceContext;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -73,16 +75,19 @@ public class SeriesScanOperatorTest {
 
   @Test
   public void batchTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
       MeasurementPath measurementPath =
           new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
       Set<String> allSensors = Sets.newHashSet("sensor0");
       QueryId queryId = new QueryId("stub_query");
-      AtomicReference<FragmentInstanceState> state =
-          new AtomicReference<>(FragmentInstanceState.RUNNING);
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
       FragmentInstanceContext fragmentInstanceContext =
-          new FragmentInstanceContext(
-              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+          createFragmentInstanceContext(instanceId, stateMachine);
       PlanNodeId planNodeId = new PlanNodeId("1");
       fragmentInstanceContext.addOperatorContext(
           1, planNodeId, SeriesScanOperator.class.getSimpleName());
@@ -123,6 +128,8 @@ public class SeriesScanOperatorTest {
     } catch (IllegalPathException e) {
       e.printStackTrace();
       fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
     }
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
index 5534418b84..4ee1ed95d3 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.operator;
 
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -27,7 +28,7 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
-import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator;
 import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -49,8 +50,9 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.ExecutorService;
 
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceContext.createFragmentInstanceContext;
 import static org.junit.Assert.*;
 
 public class TimeJoinOperatorTest {
@@ -74,6 +76,8 @@ public class TimeJoinOperatorTest {
 
   @Test
   public void batchTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
       MeasurementPath measurementPath1 =
           new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
@@ -81,11 +85,12 @@ public class TimeJoinOperatorTest {
       allSensors.add("sensor0");
       allSensors.add("sensor1");
       QueryId queryId = new QueryId("stub_query");
-      AtomicReference<FragmentInstanceState> state =
-          new AtomicReference<>(FragmentInstanceState.RUNNING);
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
       FragmentInstanceContext fragmentInstanceContext =
-          new FragmentInstanceContext(
-              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+          createFragmentInstanceContext(instanceId, stateMachine);
       PlanNodeId planNodeId1 = new PlanNodeId("1");
       fragmentInstanceContext.addOperatorContext(
           1, planNodeId1, SeriesScanOperator.class.getSimpleName());
@@ -155,6 +160,8 @@ public class TimeJoinOperatorTest {
     } catch (IllegalPathException e) {
       e.printStackTrace();
       fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
     }
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
index 609dc9befa..d4532ff167 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.operator.schema;
 
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.LocalConfigNode;
@@ -28,7 +29,7 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
-import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.execution.SchemaDriverContext;
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -52,7 +53,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.ExecutorService;
 
 import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_ATTRIBUTES;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_DEVICES;
@@ -64,6 +65,7 @@ import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_ALIA
 import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_COMPRESSION;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_DATATYPE;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_ENCODING;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceContext.createFragmentInstanceContext;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -90,13 +92,16 @@ public class SchemaScanOperatorTest {
 
   @Test
   public void testDeviceMetaScanOperator() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
       QueryId queryId = new QueryId("stub_query");
-      AtomicReference<FragmentInstanceState> state =
-          new AtomicReference<>(FragmentInstanceState.RUNNING);
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
       FragmentInstanceContext fragmentInstanceContext =
-          new FragmentInstanceContext(
-              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+          createFragmentInstanceContext(instanceId, stateMachine);
       PlanNodeId planNodeId = queryId.genPlanNodeId();
       OperatorContext operatorContext =
           fragmentInstanceContext.addOperatorContext(
@@ -149,18 +154,23 @@ public class SchemaScanOperatorTest {
     } catch (MetadataException e) {
       e.printStackTrace();
       fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
     }
   }
 
   @Test
   public void testTimeSeriesMetaScanOperator() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
       QueryId queryId = new QueryId("stub_query");
-      AtomicReference<FragmentInstanceState> state =
-          new AtomicReference<>(FragmentInstanceState.RUNNING);
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
       FragmentInstanceContext fragmentInstanceContext =
-          new FragmentInstanceContext(
-              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+          createFragmentInstanceContext(instanceId, stateMachine);
       PlanNodeId planNodeId = queryId.genPlanNodeId();
       OperatorContext operatorContext =
           fragmentInstanceContext.addOperatorContext(
@@ -237,6 +247,8 @@ public class SchemaScanOperatorTest {
     } catch (MetadataException e) {
       e.printStackTrace();
       fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
     }
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
index 82ca5dfd2c..a93ab988ea 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.db.mpp.buffer.IDataBlockManager;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.execution.Driver;
+import org.apache.iotdb.db.mpp.execution.IDriver;
 import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
 import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskStatus;
 import org.apache.iotdb.db.utils.stats.CpuTimer;
@@ -52,7 +52,7 @@ public class DefaultTaskSchedulerTest {
     IDataBlockManager mockDataBlockManager = Mockito.mock(IDataBlockManager.class);
     manager.setBlockManager(mockDataBlockManager);
     ITaskScheduler defaultScheduler = manager.getScheduler();
-    Driver mockDriver = Mockito.mock(Driver.class);
+    IDriver mockDriver = Mockito.mock(IDriver.class);
     QueryId queryId = new QueryId("test");
     FragmentInstanceId instanceId =
         new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
@@ -103,7 +103,7 @@ public class DefaultTaskSchedulerTest {
     IDataBlockManager mockDataBlockManager = Mockito.mock(IDataBlockManager.class);
     manager.setBlockManager(mockDataBlockManager);
     ITaskScheduler defaultScheduler = manager.getScheduler();
-    Driver mockDriver = Mockito.mock(Driver.class);
+    IDriver mockDriver = Mockito.mock(IDriver.class);
 
     QueryId queryId = new QueryId("test");
     FragmentInstanceId instanceId =
@@ -151,7 +151,7 @@ public class DefaultTaskSchedulerTest {
     IDataBlockManager mockDataBlockManager = Mockito.mock(IDataBlockManager.class);
     manager.setBlockManager(mockDataBlockManager);
     ITaskScheduler defaultScheduler = manager.getScheduler();
-    Driver mockDriver = Mockito.mock(Driver.class);
+    IDriver mockDriver = Mockito.mock(IDriver.class);
     QueryId queryId = new QueryId("test");
     FragmentInstanceId instanceId =
         new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
@@ -204,7 +204,7 @@ public class DefaultTaskSchedulerTest {
     IDataBlockManager mockDataBlockManager = Mockito.mock(IDataBlockManager.class);
     manager.setBlockManager(mockDataBlockManager);
     ITaskScheduler defaultScheduler = manager.getScheduler();
-    Driver mockDriver = Mockito.mock(Driver.class);
+    IDriver mockDriver = Mockito.mock(IDriver.class);
     QueryId queryId = new QueryId("test");
     FragmentInstanceId instanceId =
         new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
@@ -257,7 +257,7 @@ public class DefaultTaskSchedulerTest {
     IDataBlockManager mockDataBlockManager = Mockito.mock(IDataBlockManager.class);
     manager.setBlockManager(mockDataBlockManager);
     ITaskScheduler defaultScheduler = manager.getScheduler();
-    Driver mockDriver = Mockito.mock(Driver.class);
+    IDriver mockDriver = Mockito.mock(IDriver.class);
     QueryId queryId = new QueryId("test");
     FragmentInstanceId instanceId =
         new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
@@ -313,9 +313,9 @@ public class DefaultTaskSchedulerTest {
     QueryId queryId = new QueryId("test");
     FragmentInstanceId instanceId1 =
         new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
-    Driver mockDriver1 = Mockito.mock(Driver.class);
+    IDriver mockDriver1 = Mockito.mock(IDriver.class);
     Mockito.when(mockDriver1.getInfo()).thenReturn(instanceId1);
-    Driver mockDriver2 = Mockito.mock(Driver.class);
+    IDriver mockDriver2 = Mockito.mock(IDriver.class);
     FragmentInstanceId instanceId2 =
         new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-1");
     Mockito.when(mockDriver2.getInfo()).thenReturn(instanceId2);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
index 56435496ab..f83d1213c6 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.db.mpp.buffer.IDataBlockManager;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.execution.Driver;
+import org.apache.iotdb.db.mpp.execution.IDriver;
 import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
 import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskID;
 import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskStatus;
@@ -56,12 +56,12 @@ public class FragmentInstanceSchedulerTest {
     QueryId queryId = new QueryId("test");
     PlanFragmentId fragmentId = new PlanFragmentId(queryId, 0);
     FragmentInstanceId instanceId1 = new FragmentInstanceId(fragmentId, "inst-0");
-    Driver mockDriver1 = Mockito.mock(Driver.class);
+    IDriver mockDriver1 = Mockito.mock(IDriver.class);
     Mockito.when(mockDriver1.getInfo()).thenReturn(instanceId1);
     FragmentInstanceId instanceId2 = new FragmentInstanceId(fragmentId, "inst-1");
-    Driver mockDriver2 = Mockito.mock(Driver.class);
+    IDriver mockDriver2 = Mockito.mock(IDriver.class);
     Mockito.when(mockDriver2.getInfo()).thenReturn(instanceId2);
-    List<Driver> instances = Arrays.asList(mockDriver1, mockDriver2);
+    List<IDriver> instances = Arrays.asList(mockDriver1, mockDriver2);
     manager.submitFragmentInstances(queryId, instances);
     Assert.assertTrue(manager.getBlockedTasks().isEmpty());
     Assert.assertEquals(1, manager.getQueryMap().size());
@@ -81,7 +81,7 @@ public class FragmentInstanceSchedulerTest {
     Assert.assertEquals(FragmentInstanceTaskStatus.READY, task2.getStatus());
 
     // Submit another task of the same query
-    Driver mockDriver3 = Mockito.mock(Driver.class);
+    IDriver mockDriver3 = Mockito.mock(IDriver.class);
     FragmentInstanceId instanceId3 = new FragmentInstanceId(fragmentId, "inst-2");
     Mockito.when(mockDriver3.getInfo()).thenReturn(instanceId3);
     manager.submitFragmentInstances(queryId, Collections.singletonList(mockDriver3));
@@ -101,7 +101,7 @@ public class FragmentInstanceSchedulerTest {
     QueryId queryId2 = new QueryId("test2");
     PlanFragmentId fragmentId2 = new PlanFragmentId(queryId2, 0);
     FragmentInstanceId instanceId4 = new FragmentInstanceId(fragmentId2, "inst-0");
-    Driver mockDriver4 = Mockito.mock(Driver.class);
+    IDriver mockDriver4 = Mockito.mock(IDriver.class);
     Mockito.when(mockDriver4.getInfo()).thenReturn(instanceId4);
     manager.submitFragmentInstances(queryId2, Collections.singletonList(mockDriver4));
     Assert.assertTrue(manager.getBlockedTasks().isEmpty());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinelTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinelTest.java
index 862f4ca207..28365962b8 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinelTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinelTest.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.mpp.schedule;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.execution.Driver;
+import org.apache.iotdb.db.mpp.execution.IDriver;
 import org.apache.iotdb.db.mpp.schedule.queue.IndexedBlockingQueue;
 import org.apache.iotdb.db.mpp.schedule.queue.L1PriorityQueue;
 import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
@@ -57,7 +57,7 @@ public class FragmentInstanceTimeoutSentinelTest {
     IndexedBlockingQueue<FragmentInstanceTask> taskQueue =
         new L1PriorityQueue<>(
             100, new FragmentInstanceTask.TimeoutComparator(), new FragmentInstanceTask());
-    Driver mockDriver = Mockito.mock(Driver.class);
+    IDriver mockDriver = Mockito.mock(IDriver.class);
     Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
 
     AbstractExecutor executor =
@@ -116,7 +116,7 @@ public class FragmentInstanceTimeoutSentinelTest {
             100, new FragmentInstanceTask.TimeoutComparator(), new FragmentInstanceTask());
 
     // Mock the instance with a cancelled future
-    Driver mockDriver = Mockito.mock(Driver.class);
+    IDriver mockDriver = Mockito.mock(IDriver.class);
     QueryId queryId = new QueryId("test");
     PlanFragmentId fragmentId = new PlanFragmentId(queryId, 0);
     FragmentInstanceId instanceId = new FragmentInstanceId(fragmentId, "inst-0");
@@ -158,7 +158,7 @@ public class FragmentInstanceTimeoutSentinelTest {
             100, new FragmentInstanceTask.TimeoutComparator(), new FragmentInstanceTask());
 
     // Mock the instance with a cancelled future
-    Driver mockDriver = Mockito.mock(Driver.class);
+    IDriver mockDriver = Mockito.mock(IDriver.class);
     QueryId queryId = new QueryId("test");
     PlanFragmentId fragmentId = new PlanFragmentId(queryId, 0);
     FragmentInstanceId instanceId = new FragmentInstanceId(fragmentId, "inst-0");
@@ -209,7 +209,7 @@ public class FragmentInstanceTimeoutSentinelTest {
             })
         .when(mockFuture)
         .addListener(Mockito.any(), Mockito.any());
-    Driver mockDriver = Mockito.mock(Driver.class);
+    IDriver mockDriver = Mockito.mock(IDriver.class);
     QueryId queryId = new QueryId("test");
     PlanFragmentId fragmentId = new PlanFragmentId(queryId, 0);
     FragmentInstanceId instanceId = new FragmentInstanceId(fragmentId, "inst-0");
@@ -260,7 +260,7 @@ public class FragmentInstanceTimeoutSentinelTest {
             })
         .when(mockFuture)
         .addListener(Mockito.any(), Mockito.any());
-    Driver mockDriver = Mockito.mock(Driver.class);
+    IDriver mockDriver = Mockito.mock(IDriver.class);
     QueryId queryId = new QueryId("test");
     PlanFragmentId fragmentId = new PlanFragmentId(queryId, 0);
     FragmentInstanceId instanceId = new FragmentInstanceId(fragmentId, "inst-0");