You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/20 09:55:13 UTC

[iotdb] 01/02: Add FragmentInstanceStateMachine

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch stable-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fb52130d2bc1d0ea95b36055049f1c98c728dbb2
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Wed Apr 20 17:24:28 2022 +0800

    Add FragmentInstanceStateMachine
---
 pom.xml                                            |   5 +
 server/pom.xml                                     |   4 +
 .../apache/iotdb/db/mpp/buffer/ISinkHandle.java    |  21 +-
 .../org/apache/iotdb/db/mpp/buffer/SinkHandle.java |  16 +-
 .../apache/iotdb/db/mpp/buffer/StubSinkHandle.java |  13 +-
 .../apache/iotdb/db/mpp/execution/DataDriver.java  | 186 ++-------
 .../org/apache/iotdb/db/mpp/execution/Driver.java  | 415 ++++++++++++++++++++-
 .../iotdb/db/mpp/execution/DriverContext.java      |  15 +-
 .../db/mpp/execution/FragmentInstanceContext.java  |  87 +++--
 .../mpp/execution/FragmentInstanceExecution.java   |  85 ++++-
 ...t.java => FragmentInstanceFailureListener.java} |  29 +-
 .../db/mpp/execution/FragmentInstanceManager.java  |  56 ++-
 .../db/mpp/execution/FragmentInstanceState.java    |   2 +-
 .../execution/FragmentInstanceStateMachine.java    | 171 +++++++++
 .../iotdb/db/mpp/execution/SchemaDriver.java       | 142 +------
 .../org/apache/iotdb/db/mpp/operator/Operator.java |   2 +-
 .../db/mpp/operator/process/TransformOperator.java |   2 +-
 .../mpp/operator/schema/SchemaFetchOperator.java   |   2 +-
 .../db/mpp/operator/source/ExchangeOperator.java   |   2 +-
 .../db/mpp/schedule/task/FragmentInstanceTask.java |  25 +-
 .../iotdb/db/mpp/execution/DataDriverTest.java     |  25 +-
 .../iotdb/db/mpp/operator/LimitOperatorTest.java   |  13 +-
 .../db/mpp/operator/SeriesScanOperatorTest.java    |  13 +-
 .../db/mpp/operator/TimeJoinOperatorTest.java      |  13 +-
 .../operator/schema/SchemaScanOperatorTest.java    |  23 +-
 25 files changed, 916 insertions(+), 451 deletions(-)

diff --git a/pom.xml b/pom.xml
index 90a8dbccb4..a54d70cdb4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -495,6 +495,11 @@
                 <artifactId>slice</artifactId>
                 <version>0.41</version>
             </dependency>
+            <dependency>
+                <groupId>io.airlift</groupId>
+                <artifactId>stats</artifactId>
+                <version>214</version>
+            </dependency>
             <dependency>
                 <groupId>org.openjdk.jol</groupId>
                 <artifactId>jol-core</artifactId>
diff --git a/server/pom.xml b/server/pom.xml
index e6c9c29adf..fdc67576bc 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -99,6 +99,10 @@
             <groupId>io.airlift</groupId>
             <artifactId>units</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.airlift</groupId>
+            <artifactId>stats</artifactId>
+        </dependency>
         <dependency>
             <groupId>io.airlift</groupId>
             <artifactId>airline</artifactId>
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
index 6300c5beef..1d037678ed 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
@@ -24,8 +24,9 @@ import com.google.common.util.concurrent.ListenableFuture;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Optional;
 
-public interface ISinkHandle extends AutoCloseable {
+public interface ISinkHandle {
 
   /** Get the total amount of memory used by buffered tsblocks. */
   long getBufferRetainedSizeInBytes();
@@ -41,7 +42,7 @@ public interface ISinkHandle extends AutoCloseable {
    * the send tsblock call is ignored. This can happen with limit queries. A {@link
    * RuntimeException} will be thrown if any exception happened * during the data transmission.
    */
-  void send(List<TsBlock> tsBlocks) throws IOException;
+  void send(List<TsBlock> tsBlocks);
 
   /**
    * Send a {@link TsBlock} to a specific partition. If no-more-tsblocks has been set, the send
@@ -57,22 +58,32 @@ public interface ISinkHandle extends AutoCloseable {
   void setNoMoreTsBlocks();
 
   /** If the handle is closed. */
-  public boolean isClosed();
+  boolean isClosed();
 
   /**
    * If no more tsblocks will be sent and all the tsblocks have been fetched by downstream fragment
    * instances.
    */
-  public boolean isFinished();
+  boolean isFinished();
 
   /**
    * Close the handle. The output buffer will not be cleared until all tsblocks are fetched by
    * downstream instances. A {@link RuntimeException} will be thrown if any exception happened
    * during the data transmission.
    */
-  @Override
   void close() throws IOException;
 
   /** Abort the sink handle, discarding all tsblocks which may still be in memory buffer. */
   void abort();
+
+  /**
+   * @return whether this SinkHandle is failed
+   */
+  boolean isFailed();
+
+
+  /**
+   * Returns non empty failure cause if the sink handle is failed
+   */
+  Optional<Throwable> getFailureCause();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index af4ecf089d..b2fd70459d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -41,6 +41,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.StringJoiner;
 import java.util.concurrent.ExecutorService;
 
@@ -113,11 +114,8 @@ public class SinkHandle implements ISinkHandle {
   }
 
   @Override
-  public void send(List<TsBlock> tsBlocks) throws IOException {
+  public void send(List<TsBlock> tsBlocks) {
     Validate.notNull(tsBlocks, "tsBlocks is null");
-    if (throwable != null) {
-      throw new IOException(throwable);
-    }
     if (closed) {
       throw new IllegalStateException("Sink handle is closed.");
     }
@@ -229,6 +227,16 @@ public class SinkHandle implements ISinkHandle {
     logger.info("Sink handle {} is aborted", this);
   }
 
+  @Override
+  public boolean isFailed() {
+    return throwable != null;
+  }
+
+  @Override
+  public Optional<Throwable> getFailureCause() {
+    return Optional.ofNullable(throwable);
+  }
+
   @Override
   public synchronized void setNoMoreTsBlocks() {
     noMoreTsBlocks = true;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
index 9f3b9240c2..c4bc0e377a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
@@ -26,6 +26,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
 
@@ -59,7 +60,7 @@ public class StubSinkHandle implements ISinkHandle {
   }
 
   @Override
-  public void send(List<TsBlock> tsBlocks) throws IOException {
+  public void send(List<TsBlock> tsBlocks) {
     this.tsBlocks.addAll(tsBlocks);
   }
 
@@ -95,6 +96,16 @@ public class StubSinkHandle implements ISinkHandle {
     tsBlocks.clear();
   }
 
+  @Override
+  public boolean isFailed() {
+    return false;
+  }
+
+  @Override
+  public Optional<Throwable> getFailureCause() {
+    return Optional.empty();
+  }
+
   public List<TsBlock> getTsBlocks() {
     return tsBlocks;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
index 21aebe0214..2921c9d889 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
-import org.apache.iotdb.commons.exception.IoTDBException;
+import com.google.common.util.concurrent.SettableFuture;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -26,164 +26,74 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.idtable.IDTable;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
-import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.operator.Operator;
 import org.apache.iotdb.db.mpp.operator.source.DataSourceOperator;
 import org.apache.iotdb.db.query.control.FileReaderManager;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import io.airlift.units.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.NotThreadSafe;
-
-import java.io.IOException;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
-import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
-import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
 
 @NotThreadSafe
-public class DataDriver implements Driver {
-
-  private static final Logger logger = LoggerFactory.getLogger(DataDriver.class);
-
-  private final Operator root;
-  private final ISinkHandle sinkHandle;
-  private final DataDriverContext driverContext;
+public class DataDriver extends Driver {
 
   private boolean init;
-  private boolean closed;
 
   /** closed tsfile used in this fragment instance */
   private Set<TsFileResource> closedFilePaths;
   /** unClosed tsfile used in this fragment instance */
   private Set<TsFileResource> unClosedFilePaths;
 
-  private final AtomicReference<SettableFuture<Void>> driverBlockedFuture = new AtomicReference<>();
 
   public DataDriver(Operator root, ISinkHandle sinkHandle, DataDriverContext driverContext) {
-    this.root = root;
-    this.sinkHandle = sinkHandle;
-    this.driverContext = driverContext;
+    super(root, sinkHandle, driverContext);
     this.closedFilePaths = new HashSet<>();
     this.unClosedFilePaths = new HashSet<>();
-    // initially the driverBlockedFuture is not blocked (it is completed)
-    SettableFuture<Void> future = SettableFuture.create();
-    future.set(null);
-    driverBlockedFuture.set(future);
-  }
-
-  @Override
-  public boolean isFinished() {
-    try {
-      boolean isFinished =
-          closed || (driverBlockedFuture.get().isDone() && root != null && root.isFinished());
-      if (isFinished) {
-        close();
-      }
-      return isFinished;
-    } catch (Throwable t) {
-      logger.error(
-          "Failed to query whether the data driver {} is finished", driverContext.getId(), t);
-      driverContext.failed(t);
-      close();
-      return true;
-    }
   }
 
   @Override
-  public ListenableFuture<Void> processFor(Duration duration) {
-
-    SettableFuture<Void> blockedFuture = driverBlockedFuture.get();
-    // initialization may be time-consuming, so we keep it in the processFor method
-    // in normal case, it won't cause deadlock and should finish soon, otherwise it will be a
-    // critical bug
+  protected boolean init(SettableFuture<Void> blockedFuture) {
     if (!init) {
       try {
         initialize();
       } catch (Throwable t) {
-        logger.error(
+        LOGGER.error(
             "Failed to do the initialization for fragment instance {} ", driverContext.getId(), t);
         driverContext.failed(t);
-        close();
         blockedFuture.setException(t);
-        return blockedFuture;
+        return false;
       }
     }
-
-    // if the driver is blocked we don't need to continue
-    if (!blockedFuture.isDone()) {
-      return blockedFuture;
-    }
-
-    long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
-
-    long start = System.nanoTime();
-    try {
-      do {
-        ListenableFuture<Void> future = processInternal();
-        if (!future.isDone()) {
-          return updateDriverBlockedFuture(future);
-        }
-      } while (System.nanoTime() - start < maxRuntime && !root.isFinished());
-    } catch (Throwable t) {
-      logger.error("Failed to execute fragment instance {}", driverContext.getId(), t);
-      driverContext.failed(t);
-      close();
-      blockedFuture.setException(t);
-      return blockedFuture;
-    }
-    return NOT_BLOCKED;
-  }
-
-  @Override
-  public FragmentInstanceId getInfo() {
-    return driverContext.getId();
+    return true;
   }
 
+  /**
+   * All file paths used by this fragment instance must be cleared and thus the usage reference must
+   * be decreased.
+   */
   @Override
-  public void close() {
-    if (closed) {
-      return;
+  protected void releaseResource() {
+    for (TsFileResource tsFile : closedFilePaths) {
+      FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true);
     }
-    closed = true;
-    try {
-      if (root != null) {
-        root.close();
-      }
-      if (sinkHandle != null) {
-        sinkHandle.close();
-      }
-    } catch (Throwable t) {
-      logger.error("Failed to closed driver {}", driverContext.getId(), t);
-      driverContext.failed(t);
-    } finally {
-      removeUsedFilesForQuery();
+    closedFilePaths = null;
+    for (TsFileResource tsFile : unClosedFilePaths) {
+      FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true);
     }
+    unClosedFilePaths = null;
   }
 
-  @Override
-  public void failed(Throwable t) {
-    driverContext.failed(t);
-  }
 
   /**
    * init seq file list and unseq file list in QueryDataSource and set it into each SourceNode TODO
    * we should change all the blocked lock operation into tryLock
    */
   private void initialize() throws QueryProcessException {
-    List<DataSourceOperator> sourceOperators = driverContext.getSourceOperators();
+    List<DataSourceOperator> sourceOperators = ((DataDriverContext)driverContext).getSourceOperators();
     if (sourceOperators != null && !sourceOperators.isEmpty()) {
       QueryDataSource dataSource = initQueryDataSourceCache();
       sourceOperators.forEach(
@@ -206,11 +116,12 @@ public class DataDriver implements Driver {
    * QueryDataSource needed for this query
    */
   public QueryDataSource initQueryDataSourceCache() throws QueryProcessException {
-    DataRegion dataRegion = driverContext.getDataRegion();
+    DataDriverContext context = (DataDriverContext) driverContext;
+    DataRegion dataRegion = context.getDataRegion();
     dataRegion.readLock();
     try {
       List<PartialPath> pathList =
-          driverContext.getPaths().stream()
+          context.getPaths().stream()
               .map(IDTable::translateQueryPath)
               .collect(Collectors.toList());
       // when all the selected series are under the same device, the QueryDataSource will be
@@ -223,7 +134,7 @@ public class DataDriver implements Driver {
               pathList,
               selectedDeviceIdSet.size() == 1 ? selectedDeviceIdSet.iterator().next() : null,
               driverContext.getFragmentInstanceContext(),
-              driverContext.getTimeFilter());
+              context.getTimeFilter());
 
       // used files should be added before mergeLock is unlocked, or they may be deleted by
       // running merge
@@ -264,28 +175,13 @@ public class DataDriver implements Driver {
     }
   }
 
-  /**
-   * All file paths used by this fragment instance must be cleared and thus the usage reference must
-   * be decreased.
-   */
-  private void removeUsedFilesForQuery() {
-    for (TsFileResource tsFile : closedFilePaths) {
-      FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true);
-    }
-    closedFilePaths = null;
-    for (TsFileResource tsFile : unClosedFilePaths) {
-      FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true);
-    }
-    unClosedFilePaths = null;
-  }
-
   /**
    * Increase the usage reference of filePath of job id. Before the invoking of this method, <code>
    * this.setqueryIdForCurrentRequestThread</code> has been invoked, so <code>
    * sealedFilePathsMap.get(queryId)</code> or <code>unsealedFilePathsMap.get(queryId)</code> must
    * not return null.
    */
-  void addFilePathToMap(TsFileResource tsFile, boolean isClosed) {
+  private void addFilePathToMap(TsFileResource tsFile, boolean isClosed) {
     Set<TsFileResource> pathSet = isClosed ? closedFilePaths : unClosedFilePaths;
     if (!pathSet.contains(tsFile)) {
       pathSet.add(tsFile);
@@ -293,38 +189,4 @@ public class DataDriver implements Driver {
     }
   }
 
-  private ListenableFuture<Void> processInternal() throws IOException, IoTDBException {
-    ListenableFuture<Void> blocked = root.isBlocked();
-    if (!blocked.isDone()) {
-      return blocked;
-    }
-    blocked = sinkHandle.isFull();
-    if (!blocked.isDone()) {
-      return blocked;
-    }
-    if (root.hasNext()) {
-      TsBlock tsBlock = root.next();
-      if (tsBlock != null && !tsBlock.isEmpty()) {
-        sinkHandle.send(Collections.singletonList(tsBlock));
-      }
-    }
-    return NOT_BLOCKED;
-  }
-
-  private ListenableFuture<Void> updateDriverBlockedFuture(
-      ListenableFuture<Void> sourceBlockedFuture) {
-    // driverBlockedFuture will be completed as soon as the sourceBlockedFuture is completed
-    // or any of the operators gets a memory revocation request
-    SettableFuture<Void> newDriverBlockedFuture = SettableFuture.create();
-    driverBlockedFuture.set(newDriverBlockedFuture);
-    sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), directExecutor());
-
-    // TODO Although we don't have memory management for operator now, we should consider it for
-    // future
-    // it's possible that memory revoking is requested for some operator
-    // before we update driverBlockedFuture above and we don't want to miss that
-    // notification, so we check to see whether that's the case before returning.
-
-    return newDriverBlockedFuture;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
index f211ce593c..fd2fef8b3a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
@@ -18,25 +18,88 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
-import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-
+import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import io.airlift.units.Duration;
+import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.operator.Operator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
+import javax.annotation.concurrent.GuardedBy;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Throwables.throwIfUnchecked;
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
+import static java.lang.Boolean.TRUE;
+import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
 
 /**
  * Driver encapsulates some methods which are necessary for execution scheduler to run a fragment
  * instance
  */
-public interface Driver extends Closeable {
+public abstract class Driver {
+
+  protected static final Logger LOGGER = LoggerFactory.getLogger(Driver.class);
+
+
+  protected final Operator root;
+  protected final ISinkHandle sinkHandle;
+  protected final DriverContext driverContext;
+  protected final AtomicReference<SettableFuture<Void>> driverBlockedFuture = new AtomicReference<>();
+  protected final AtomicReference<State> state = new AtomicReference<>(State.ALIVE);
+
+  protected final DriverLock exclusiveLock = new DriverLock();
+
+  protected enum State {
+    ALIVE, NEED_DESTRUCTION, DESTROYED
+  }
+
+  public Driver(Operator root, ISinkHandle sinkHandle, DriverContext driverContext) {
+    this.root = root;
+    this.sinkHandle = sinkHandle;
+    this.driverContext = driverContext;
+
+    // initially the driverBlockedFuture is not blocked (it is completed)
+    SettableFuture<Void> future = SettableFuture.create();
+    future.set(null);
+    driverBlockedFuture.set(future);
+  }
 
   /**
    * Used to judge whether this fragment instance should be scheduled for execution anymore
    *
    * @return true if the FragmentInstance is done or terminated due to failure, otherwise false.
    */
-  boolean isFinished();
+  public boolean isFinished() {
+    checkLockNotHeld("Cannot check finished status while holding the driver lock");
+
+    // if we can get the lock, attempt a clean shutdown; otherwise someone else will shutdown
+    Optional<Boolean> result = tryWithLockUnInterruptibly(this::isFinishedInternal);
+    return result.orElseGet(() -> state.get() != State.ALIVE || driverContext.isDone());
+  }
+
+  /**
+   * do initialization
+   *
+   * @return true if init succeed, false otherwise
+   */
+  protected abstract boolean init(SettableFuture<Void> blockedFuture);
+
+  /**
+   * release resource this driver used
+   */
+  protected abstract void releaseResource();
 
   /**
    * run the fragment instance for {@param duration} time slice, the time of this run is likely not
@@ -44,27 +107,351 @@ public interface Driver extends Closeable {
    *
    * @param duration how long should this fragment instance run
    * @return the returned ListenableFuture<Void> is used to represent status of this processing if
-   *     isDone() return true, meaning that this fragment instance is not blocked and is ready for
-   *     next processing otherwise, meaning that this fragment instance is blocked and not ready for
-   *     next processing.
+   * isDone() return true, meaning that this fragment instance is not blocked and is ready for
+   * next processing otherwise, meaning that this fragment instance is blocked and not ready for
+   * next processing.
    */
-  ListenableFuture<Void> processFor(Duration duration);
+  public ListenableFuture<Void> processFor(Duration duration) {
+
+    SettableFuture<Void> blockedFuture = driverBlockedFuture.get();
+    // initialization may be time-consuming, so we keep it in the processFor method
+    // in normal case, it won't cause deadlock and should finish soon, otherwise it will be a
+    // critical bug
+    if (!init(blockedFuture)) {
+      return blockedFuture;
+    }
+
+    // if the driver is blocked we don't need to continue
+    if (!blockedFuture.isDone()) {
+      return blockedFuture;
+    }
+
+    long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
+
+    Optional<ListenableFuture<Void>> result = tryWithLock(100, TimeUnit.MILLISECONDS, true, () -> {
+      long start = System.nanoTime();
+      do {
+        ListenableFuture<Void> future = processInternal();
+        if (!future.isDone()) {
+          return updateDriverBlockedFuture(future);
+        }
+      }
+      while (System.nanoTime() - start < maxRuntime && !isFinishedInternal());
+      return NOT_BLOCKED;
+    });
+
+    return result.orElse(NOT_BLOCKED);
+  }
 
   /**
    * the id information about this Fragment Instance.
    *
    * @return a {@link FragmentInstanceId} instance.
    */
-  FragmentInstanceId getInfo();
+  public FragmentInstanceId getInfo() {
+    return driverContext.getId();
+  }
+
+  /**
+   * clear resource used by this fragment instance
+   */
+  public void close() {
+    // mark the service for destruction
+    if (!state.compareAndSet(State.ALIVE, State.NEED_DESTRUCTION)) {
+      return;
+    }
 
-  /** clear resource used by this fragment instance */
-  @Override
-  void close();
+    exclusiveLock.interruptCurrentOwner();
+
+    // if we can get the lock, attempt a clean shutdown; otherwise someone else will shutdown
+    tryWithLockUnInterruptibly(() -> TRUE);
+  }
 
   /**
    * fail current driver
    *
    * @param t reason cause this failure
    */
-  void failed(Throwable t);
+  public void failed(Throwable t) {
+    driverContext.failed(t);
+  }
+
+  public ISinkHandle getSinkHandle() {
+    return sinkHandle;
+  }
+
+  @GuardedBy("exclusiveLock")
+  private boolean isFinishedInternal() {
+    checkLockHeld("Lock must be held to call isFinishedInternal");
+
+    boolean finished = state.get() != State.ALIVE || driverContext.isDone() || root == null || root.isFinished();
+    if (finished) {
+      state.compareAndSet(State.ALIVE, State.NEED_DESTRUCTION);
+    }
+    return finished;
+  }
+
+
+  private ListenableFuture<Void> processInternal() {
+    try {
+      ListenableFuture<Void> blocked = root.isBlocked();
+      if (!blocked.isDone()) {
+        return blocked;
+      }
+      blocked = sinkHandle.isFull();
+      if (!blocked.isDone()) {
+        return blocked;
+      }
+      if (root.hasNext()) {
+        TsBlock tsBlock = root.next();
+        if (tsBlock != null && !tsBlock.isEmpty()) {
+          sinkHandle.send(Collections.singletonList(tsBlock));
+        }
+      }
+      return NOT_BLOCKED;
+    } catch (Throwable t) {
+      LOGGER.error("Failed to execute fragment instance {}", driverContext.getId(), t);
+      List<StackTraceElement> interrupterStack = exclusiveLock.getInterrupterStack();
+      if (interrupterStack == null) {
+        driverContext.failed(t);
+        throw t;
+      }
+
+      // Driver thread was interrupted which should only happen if the task is already finished.
+      // If this becomes the actual cause of a failed query there is a bug in the task state machine.
+      Exception exception = new Exception("Interrupted By");
+      exception.setStackTrace(interrupterStack.toArray(new StackTraceElement[0]));
+      RuntimeException newException = new RuntimeException("Driver was interrupted", exception);
+      newException.addSuppressed(t);
+      driverContext.failed(newException);
+      throw newException;
+    }
+  }
+
+  private ListenableFuture<Void> updateDriverBlockedFuture(
+      ListenableFuture<Void> sourceBlockedFuture) {
+    // driverBlockedFuture will be completed as soon as the sourceBlockedFuture is completed
+    // or any of the operators gets a memory revocation request
+    SettableFuture<Void> newDriverBlockedFuture = SettableFuture.create();
+    driverBlockedFuture.set(newDriverBlockedFuture);
+    sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), directExecutor());
+
+    // TODO Although we don't have memory management for operator now, we should consider it for
+    // future
+    // it's possible that memory revoking is requested for some operator
+    // before we update driverBlockedFuture above and we don't want to miss that
+    // notification, so we check to see whether that's the case before returning.
+
+    return newDriverBlockedFuture;
+  }
+
+
+  private synchronized void checkLockNotHeld(String message) {
+    checkState(!exclusiveLock.isHeldByCurrentThread(), message);
+  }
+
+  @GuardedBy("exclusiveLock")
+  private synchronized void checkLockHeld(String message) {
+    checkState(exclusiveLock.isHeldByCurrentThread(), message);
+  }
+
+  /**
+   * Try to acquire the {@code exclusiveLock} immediately and run a {@code task}
+   * The task will not be interrupted if the {@code Driver} is closed.
+   * <p>
+   * Note: task cannot return null
+   */
+  private <T> Optional<T> tryWithLockUnInterruptibly(Supplier<T> task) {
+    return tryWithLock(0, TimeUnit.MILLISECONDS, false, task);
+  }
+
+  /**
+   * Try to acquire the {@code exclusiveLock} with {@code timeout} and run a {@code task}.
+   * If the {@code interruptOnClose} flag is set to {@code true} the {@code task} will be
+   * interrupted if the {@code Driver} is closed.
+   * <p>
+   * Note: task cannot return null
+   */
+  private <T> Optional<T> tryWithLock(long timeout, TimeUnit unit, boolean interruptOnClose, Supplier<T> task) {
+    checkLockNotHeld("Lock cannot be reacquired");
+
+    boolean acquired = false;
+    try {
+      acquired = exclusiveLock.tryLock(timeout, unit, interruptOnClose);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+
+    if (!acquired) {
+      return Optional.empty();
+    }
+
+    Optional<T> result;
+    try {
+      result = Optional.of(task.get());
+    } finally {
+      try {
+        destroyIfNecessary();
+      } finally {
+        exclusiveLock.unlock();
+      }
+    }
+
+    return result;
+  }
+
+  @GuardedBy("exclusiveLock")
+  private void destroyIfNecessary() {
+    checkLockHeld("Lock must be held to call destroyIfNecessary");
+
+    if (!state.compareAndSet(State.NEED_DESTRUCTION, State.DESTROYED)) {
+      return;
+    }
+
+    // if we get an error while closing a driver, record it and we will throw it at the end
+    Throwable inFlightException = null;
+    try {
+      inFlightException = closeAndDestroyOperators();
+      driverContext.finished();
+    } catch (Throwable t) {
+      // this shouldn't happen but be safe
+      inFlightException = addSuppressedException(
+          inFlightException,
+          t,
+          "Error destroying driver for task %s",
+          driverContext.getId());
+    } finally {
+      releaseResource();
+    }
+
+    if (inFlightException != null) {
+      // this will always be an Error or Runtime
+      throwIfUnchecked(inFlightException);
+      throw new RuntimeException(inFlightException);
+    }
+  }
+
+  private Throwable closeAndDestroyOperators() {
+    // record the current interrupted status (and clear the flag); we'll reset it later
+    boolean wasInterrupted = Thread.interrupted();
+
+    Throwable inFlightException = null;
+
+    try {
+      if (root != null) {
+        root.close();
+      }
+      if (sinkHandle != null) {
+        sinkHandle.close();
+      }
+    } catch (InterruptedException t) {
+      // don't record the stack
+      wasInterrupted = true;
+    } catch (Throwable t) {
+      // TODO currently, we won't know exact operator which is failed in closing
+      inFlightException = addSuppressedException(
+          inFlightException,
+          t,
+          "Error closing operator {} for fragment instance {}",
+          root.getOperatorContext().getOperatorId(),
+          driverContext.getId());
+    } finally {
+      // reset the interrupted flag
+      if (wasInterrupted) {
+        Thread.currentThread().interrupt();
+      }
+    }
+    return inFlightException;
+  }
+
+  private static Throwable addSuppressedException(Throwable inFlightException, Throwable newException, String message, Object... args) {
+    if (newException instanceof Error) {
+      if (inFlightException == null) {
+        inFlightException = newException;
+      } else {
+        // Self-suppression not permitted
+        if (inFlightException != newException) {
+          inFlightException.addSuppressed(newException);
+        }
+      }
+    } else {
+      // log normal exceptions instead of rethrowing them
+      LOGGER.error(message, args, newException);
+    }
+    return inFlightException;
+  }
+
+  private static class DriverLock {
+    private final ReentrantLock lock = new ReentrantLock();
+
+    @GuardedBy("this")
+    private Thread currentOwner;
+    @GuardedBy("this")
+    private boolean currentOwnerInterruptionAllowed;
+
+    @GuardedBy("this")
+    private List<StackTraceElement> interrupterStack;
+
+    public boolean isHeldByCurrentThread() {
+      return lock.isHeldByCurrentThread();
+    }
+
+    public boolean tryLock(boolean currentThreadInterruptionAllowed) {
+      checkState(!lock.isHeldByCurrentThread(), "Lock is not reentrant");
+      boolean acquired = lock.tryLock();
+      if (acquired) {
+        setOwner(currentThreadInterruptionAllowed);
+      }
+      return acquired;
+    }
+
+    public boolean tryLock(long timeout, TimeUnit unit, boolean currentThreadInterruptionAllowed)
+        throws InterruptedException {
+      checkState(!lock.isHeldByCurrentThread(), "Lock is not reentrant");
+      boolean acquired = lock.tryLock(timeout, unit);
+      if (acquired) {
+        setOwner(currentThreadInterruptionAllowed);
+      }
+      return acquired;
+    }
+
+    private synchronized void setOwner(boolean interruptionAllowed) {
+      checkState(lock.isHeldByCurrentThread(), "Current thread does not hold lock");
+      currentOwner = Thread.currentThread();
+      currentOwnerInterruptionAllowed = interruptionAllowed;
+      // NOTE: We do not use interrupted stack information to know that another
+      // thread has attempted to interrupt the driver, and interrupt this new lock
+      // owner.  The interrupted stack information is for debugging purposes only.
+      // In the case of interruption, the caller should (and does) have a separate
+      // state to prevent further processing in the Driver.
+    }
+
+    public synchronized void unlock() {
+      checkState(lock.isHeldByCurrentThread(), "Current thread does not hold lock");
+      currentOwner = null;
+      currentOwnerInterruptionAllowed = false;
+      lock.unlock();
+    }
+
+    public synchronized List<StackTraceElement> getInterrupterStack() {
+      return interrupterStack;
+    }
+
+    public synchronized void interruptCurrentOwner() {
+      if (!currentOwnerInterruptionAllowed) {
+        return;
+      }
+      // there is a benign race condition here were the lock holder
+      // can be change between attempting to get lock and grabbing
+      // the synchronized lock here, but in either case we want to
+      // interrupt the lock holder thread
+      if (interrupterStack == null) {
+        interrupterStack = ImmutableList.copyOf(Thread.currentThread().getStackTrace());
+      }
+
+      if (currentOwner != null) {
+        currentOwner.interrupt();
+      }
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
index 8c20a2c334..d0bc3bb232 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
@@ -20,10 +20,15 @@ package org.apache.iotdb.db.mpp.execution;
 
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 public class DriverContext {
 
   private final FragmentInstanceContext fragmentInstanceContext;
 
+  private final AtomicBoolean finished = new AtomicBoolean();
+
+
   public DriverContext(FragmentInstanceContext fragmentInstanceContext) {
     this.fragmentInstanceContext = fragmentInstanceContext;
   }
@@ -38,13 +43,15 @@ public class DriverContext {
 
   public void failed(Throwable cause) {
     fragmentInstanceContext.failed(cause);
+    finished.set(true);
   }
 
-  public void finish() {
-    fragmentInstanceContext.finish();
+  public void finished() {
+    finished.compareAndSet(false, true);
   }
 
-  public void flushing() {
-    fragmentInstanceContext.flushing();
+
+  public boolean isDone() {
+    return finished.get();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
index b2e51be2bc..c3d33dbd74 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
@@ -28,9 +28,11 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 public class FragmentInstanceContext extends QueryContext {
 
@@ -41,14 +43,19 @@ public class FragmentInstanceContext extends QueryContext {
   // TODO if we split one fragment instance into multiple pipelines to run, we need to replace it
   // with CopyOnWriteArrayList or some other thread safe data structure
   private final List<OperatorContext> operatorContexts = new ArrayList<>();
-  private final long createNanos = System.nanoTime();
 
   private DriverContext driverContext;
 
-  // TODO we may use StateMachine<FragmentInstanceState> to replace it
-  private final AtomicReference<FragmentInstanceState> state;
+  private final FragmentInstanceStateMachine stateMachine;
+
+  private final long createNanos = System.nanoTime();
+
+  private final AtomicLong startNanos = new AtomicLong();
+  private final AtomicLong endNanos = new AtomicLong();
 
-  private long endTime = -1;
+  private final AtomicReference<Long> executionStartTime = new AtomicReference<>();
+  private final AtomicReference<Long> lastExecutionStartTime = new AtomicReference<>();
+  private final AtomicReference<Long> executionEndTime = new AtomicReference<>();
 
   //    private final GcMonitor gcMonitor;
   //    private final AtomicLong startNanos = new AtomicLong();
@@ -59,9 +66,43 @@ public class FragmentInstanceContext extends QueryContext {
   //    private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1);
 
   public FragmentInstanceContext(
-      FragmentInstanceId id, AtomicReference<FragmentInstanceState> state) {
+      FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) {
     this.id = id;
-    this.state = state;
+    this.stateMachine = stateMachine;
+  }
+
+  public void start() {
+    long now = System.currentTimeMillis();
+    executionStartTime.compareAndSet(null, now);
+    startNanos.compareAndSet(0, System.nanoTime());
+
+    // always update last execution start time
+    lastExecutionStartTime.set(now);
+  }
+
+  // the state change listener is added here in a separate initialize() method
+  // instead of the constructor to prevent leaking the "this" reference to
+  // another thread, which will cause unsafe publication of this instance.
+  private void initialize() {
+    stateMachine.addStateChangeListener(this::updateStatsIfDone);
+  }
+
+  private void updateStatsIfDone(FragmentInstanceState newState) {
+    if (newState.isDone()) {
+      long now = System.currentTimeMillis();
+
+      // before setting the end times, make sure a start has been recorded
+      executionStartTime.compareAndSet(null, now);
+      startNanos.compareAndSet(0, System.nanoTime());
+
+      // Only update last start time, if the nothing was started
+      lastExecutionStartTime.compareAndSet(null, now);
+
+      // use compare and set from initial value to avoid overwriting if there
+      // were a duplicate notification, which shouldn't happen
+      executionEndTime.compareAndSet(null, now);
+      endNanos.compareAndSet(0, System.nanoTime());
+    }
   }
 
   public OperatorContext addOperatorContext(
@@ -98,40 +139,10 @@ public class FragmentInstanceContext extends QueryContext {
   }
 
   public void failed(Throwable cause) {
-    LOGGER.warn("Fragment Instance {} failed.", id, cause);
-    state.set(FragmentInstanceState.FAILED);
-  }
-
-  public void cancel() {
-    state.set(FragmentInstanceState.CANCELED);
-    this.endTime = System.currentTimeMillis();
-  }
-
-  public void abort() {
-    state.set(FragmentInstanceState.ABORTED);
-    this.endTime = System.currentTimeMillis();
-  }
-
-  public void finish() {
-    if (state.get().isDone()) {
-      return;
-    }
-    state.set(FragmentInstanceState.FINISHED);
-    this.endTime = System.currentTimeMillis();
-  }
-
-  public void flushing() {
-    if (state.get().isDone()) {
-      return;
-    }
-    state.set(FragmentInstanceState.FLUSHING);
+    stateMachine.failed(cause);
   }
 
   public long getEndTime() {
-    return endTime;
-  }
-
-  public void setEndTime(long endTime) {
-    this.endTime = endTime;
+    return executionEndTime.get();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
index 21e0cc3d50..b00ddcadb3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
@@ -18,14 +18,17 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
+import io.airlift.stats.CounterStat;
+import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.schedule.IFragmentInstanceScheduler;
 
 import com.google.common.collect.ImmutableList;
 
-import java.util.concurrent.atomic.AtomicReference;
-
 import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.ABORTED;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.CANCELLED;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FAILED;
 
 public class FragmentInstanceExecution {
 
@@ -36,23 +39,35 @@ public class FragmentInstanceExecution {
 
   private final Driver driver;
 
-  // TODO we may use StateMachine<FragmentInstanceState> to replace it
-  private final AtomicReference<FragmentInstanceState> state;
+  private final ISinkHandle sinkHandle;
+
+  private final FragmentInstanceStateMachine stateMachine;
 
   private long lastHeartbeat;
 
-  public FragmentInstanceExecution(
+  public static FragmentInstanceExecution createFragmentInstanceExecution(IFragmentInstanceScheduler scheduler,
+                                                                          FragmentInstanceId instanceId,
+                                                                          FragmentInstanceContext context,
+                                                                          Driver driver,
+                                                                          FragmentInstanceStateMachine stateMachine,
+                                                                          CounterStat failedInstances) {
+    FragmentInstanceExecution execution = new FragmentInstanceExecution(scheduler, instanceId, context, driver, stateMachine);
+    execution.initialize(failedInstances, scheduler, instanceId, driver);
+    return execution;
+  }
+
+  private FragmentInstanceExecution(
       IFragmentInstanceScheduler scheduler,
       FragmentInstanceId instanceId,
       FragmentInstanceContext context,
       Driver driver,
-      AtomicReference<FragmentInstanceState> state) {
+      FragmentInstanceStateMachine stateMachine) {
     this.scheduler = scheduler;
     this.instanceId = instanceId;
     this.context = context;
     this.driver = driver;
-    this.state = state;
-    state.set(FragmentInstanceState.RUNNING);
+    this.sinkHandle = driver.getSinkHandle();
+    this.stateMachine = stateMachine;
     scheduler.submitFragmentInstances(instanceId.getQueryId(), ImmutableList.of(driver));
   }
 
@@ -65,24 +80,66 @@ public class FragmentInstanceExecution {
   }
 
   public FragmentInstanceState getInstanceState() {
-    return state.get();
+    return stateMachine.getState();
   }
 
   public FragmentInstanceInfo getInstanceInfo() {
-    return new FragmentInstanceInfo(state.get(), context.getEndTime());
+    return new FragmentInstanceInfo(stateMachine.getState(), context.getEndTime());
   }
 
   public void failed(Throwable cause) {
     requireNonNull(cause, "cause is null");
-    context.failed(cause);
+    stateMachine.failed(cause);
   }
 
   public void cancel() {
-    context.cancel();
+    stateMachine.cancel();
   }
 
   public void abort() {
-    scheduler.abortFragmentInstance(instanceId);
-    context.abort();
+    stateMachine.abort();
+  }
+
+  // this is a separate method to ensure that the `this` reference is not leaked during construction
+  private void initialize(CounterStat failedInstances, IFragmentInstanceScheduler scheduler, FragmentInstanceId instanceId, Driver driver) {
+    requireNonNull(failedInstances, "failedInstances is null");
+    stateMachine.addStateChangeListener(newState -> {
+
+      if (!newState.isDone()) {
+        return;
+      }
+
+      // Update failed tasks counter
+      if (newState == FAILED) {
+        failedInstances.update(1);
+      }
+
+      driver.close();
+      sinkHandle.abort();
+      scheduler.abortFragmentInstance(instanceId);
+    });
+  }
+
+  private synchronized void instanceCompletion() {
+    if (stateMachine.getState().isDone()) {
+      return;
+    }
+
+    if (!sinkHandle.isFinished()) {
+      stateMachine.transitionToFlushing();
+      return;
+    }
+
+    if (sinkHandle.isFinished()) {
+      // Cool! All done!
+      stateMachine.finished();
+      return;
+    }
+
+    if (sinkHandle.isFailed()) {
+      Throwable failureCause = sinkHandle.getFailureCause()
+          .orElseGet(() -> new RuntimeException("Fragment Instance " + instanceId + " 's SinkHandle is failed but the failure cause is missing"));
+      stateMachine.failed(failureCause);
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceFailureListener.java
similarity index 58%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceFailureListener.java
index 8c20a2c334..3f99848c29 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceFailureListener.java
@@ -20,31 +20,6 @@ package org.apache.iotdb.db.mpp.execution;
 
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 
-public class DriverContext {
-
-  private final FragmentInstanceContext fragmentInstanceContext;
-
-  public DriverContext(FragmentInstanceContext fragmentInstanceContext) {
-    this.fragmentInstanceContext = fragmentInstanceContext;
-  }
-
-  public FragmentInstanceId getId() {
-    return fragmentInstanceContext.getId();
-  }
-
-  public FragmentInstanceContext getFragmentInstanceContext() {
-    return fragmentInstanceContext;
-  }
-
-  public void failed(Throwable cause) {
-    fragmentInstanceContext.failed(cause);
-  }
-
-  public void finish() {
-    fragmentInstanceContext.finish();
-  }
-
-  public void flushing() {
-    fragmentInstanceContext.flushing();
-  }
+public interface FragmentInstanceFailureListener {
+  void onTaskFailed(FragmentInstanceId taskId, Throwable failure);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
index 537f330f04..5f49fa47ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
+import io.airlift.stats.CounterStat;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
@@ -33,11 +34,12 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceExecution.createFragmentInstanceExecution;
 
 public class FragmentInstanceManager {
 
@@ -49,9 +51,14 @@ public class FragmentInstanceManager {
   private final IFragmentInstanceScheduler scheduler = FragmentInstanceScheduler.getInstance();
 
   private final ScheduledExecutorService instanceManagementExecutor;
+  private final ExecutorService instanceNotificationExecutor;
+
 
   private final Duration infoCacheTime;
 
+  // record failed instances count
+  private final CounterStat failedInstances = new CounterStat();
+
   public static FragmentInstanceManager getInstance() {
     return FragmentInstanceManager.InstanceHolder.INSTANCE;
   }
@@ -61,13 +68,14 @@ public class FragmentInstanceManager {
     this.instanceExecution = new ConcurrentHashMap<>();
     this.instanceManagementExecutor =
         IoTDBThreadPoolFactory.newScheduledThreadPool(1, "instance-management");
+    this.instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(4, "instance-notification");
 
     this.infoCacheTime = new Duration(15, TimeUnit.MINUTES);
 
     instanceManagementExecutor.scheduleWithFixedDelay(
         () -> {
           try {
-            removeOldTasks();
+            removeOldInstances();
           } catch (Throwable e) {
             logger.warn("Error removing old tasks", e);
           }
@@ -85,13 +93,13 @@ public class FragmentInstanceManager {
         instanceExecution.computeIfAbsent(
             instanceId,
             id -> {
-              AtomicReference<FragmentInstanceState> state = new AtomicReference<>();
-              state.set(FragmentInstanceState.PLANNED);
+
+              FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
 
               FragmentInstanceContext context =
                   instanceContext.computeIfAbsent(
                       instanceId,
-                      fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, state));
+                      fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, stateMachine));
 
               try {
                 DataDriver driver =
@@ -100,9 +108,9 @@ public class FragmentInstanceManager {
                         context,
                         instance.getTimeFilter(),
                         dataRegion);
-                return new FragmentInstanceExecution(scheduler, instanceId, context, driver, state);
+                return createFragmentInstanceExecution(scheduler, instanceId, context, driver, stateMachine, failedInstances);
               } catch (Throwable t) {
-                context.failed(t);
+                stateMachine.failed(t);
                 return null;
               }
             });
@@ -118,26 +126,29 @@ public class FragmentInstanceManager {
         instanceExecution.computeIfAbsent(
             instanceId,
             id -> {
-              AtomicReference<FragmentInstanceState> state = new AtomicReference<>();
-              state.set(FragmentInstanceState.PLANNED);
+              FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+
 
               FragmentInstanceContext context =
                   instanceContext.computeIfAbsent(
                       instanceId,
-                      fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, state));
+                      fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, stateMachine));
 
               try {
                 SchemaDriver driver =
                     planner.plan(instance.getFragment().getRoot(), context, schemaRegion);
-                return new FragmentInstanceExecution(scheduler, instanceId, context, driver, state);
+                return createFragmentInstanceExecution(scheduler, instanceId, context, driver, stateMachine, failedInstances);
               } catch (Throwable t) {
-                context.failed(t);
+                stateMachine.failed(t);
                 return null;
               }
             });
     return execution != null ? execution.getInstanceInfo() : createFailedInstanceInfo(instanceId);
   }
 
+  /**
+   * Aborts a FragmentInstance.
+   */
   public FragmentInstanceInfo abortFragmentInstance(FragmentInstanceId fragmentInstanceId) {
     FragmentInstanceExecution execution = instanceExecution.remove(fragmentInstanceId);
     if (execution != null) {
@@ -148,6 +159,21 @@ public class FragmentInstanceManager {
     return null;
   }
 
+  /**
+   * Cancels a FragmentInstance.
+   */
+  public FragmentInstanceInfo cancelTask(FragmentInstanceId instanceId) {
+    requireNonNull(instanceId, "taskId is null");
+
+    FragmentInstanceExecution execution = instanceExecution.remove(instanceId);
+    if (execution != null) {
+      instanceContext.remove(instanceId);
+      execution.cancel();
+      return execution.getInstanceInfo();
+    }
+    return null;
+  }
+
   /**
    * Gets the info for the specified fragment instance.
    *
@@ -163,12 +189,16 @@ public class FragmentInstanceManager {
     return execution.getInstanceInfo();
   }
 
+  public CounterStat getFailedInstances() {
+    return failedInstances;
+  }
+
   private FragmentInstanceInfo createFailedInstanceInfo(FragmentInstanceId instanceId) {
     return new FragmentInstanceInfo(
         FragmentInstanceState.FAILED, instanceContext.get(instanceId).getEndTime());
   }
 
-  private void removeOldTasks() {
+  private void removeOldInstances() {
     long oldestAllowedInstance = System.currentTimeMillis() - infoCacheTime.toMillis();
     instanceContext
         .entrySet()
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java
index d55af83e98..d1a182bcbd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java
@@ -40,7 +40,7 @@ public enum FragmentInstanceState {
   /** Instance has finished executing and all output has been consumed. */
   FINISHED(true, false),
   /** Instance was canceled by a user. */
-  CANCELED(true, true),
+  CANCELLED(true, true),
   /** Instance was aborted due to a failure in the query. The failure was not in this instance. */
   ABORTED(true, true),
   /** Instance execution failed. */
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceStateMachine.java
new file mode 100644
index 0000000000..7f66d1a321
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceStateMachine.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.iotdb.db.mpp.execution.StateMachine.StateChangeListener;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.ABORTED;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.CANCELLED;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FAILED;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FINISHED;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FLUSHING;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.RUNNING;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.TERMINAL_INSTANCE_STATES;
+
+
+@ThreadSafe
+public class FragmentInstanceStateMachine {
+  private static final Logger LOGGER = LoggerFactory.getLogger(FragmentInstanceStateMachine.class);
+
+  private final long createdTime = System.currentTimeMillis();
+
+  private final FragmentInstanceId instanceId;
+  private final Executor executor;
+  private final StateMachine<FragmentInstanceState> instanceState;
+  private final LinkedBlockingQueue<Throwable> failureCauses = new LinkedBlockingQueue<>();
+
+  @GuardedBy("this")
+  private final Map<FragmentInstanceId, Throwable> sourceInstanceFailures = new HashMap<>();
+  @GuardedBy("this")
+  private final List<FragmentInstanceFailureListener> sourceInstanceFailureListeners = new ArrayList<>();
+
+  public FragmentInstanceStateMachine(FragmentInstanceId fragmentInstanceId, Executor executor) {
+    this.instanceId = requireNonNull(fragmentInstanceId, "fragmentInstanceId is null");
+    this.executor = requireNonNull(executor, "executor is null");
+    instanceState = new StateMachine<>("FragmentInstance " + fragmentInstanceId, executor, RUNNING, TERMINAL_INSTANCE_STATES);
+    instanceState.addStateChangeListener(newState -> LOGGER.debug("Fragment Instance {} is {}", fragmentInstanceId, newState));
+  }
+
+  public long getCreatedTime() {
+    return createdTime;
+  }
+
+  public FragmentInstanceId getFragmentInstanceId() {
+    return instanceId;
+  }
+
+  public FragmentInstanceState getState() {
+    return instanceState.get();
+  }
+
+  public ListenableFuture<FragmentInstanceState> getStateChange(FragmentInstanceState currentState) {
+    requireNonNull(currentState, "currentState is null");
+    checkArgument(!currentState.isDone(), "Current state is already done");
+
+    ListenableFuture<FragmentInstanceState> future = instanceState.getStateChange(currentState);
+    FragmentInstanceState state = instanceState.get();
+    if (state.isDone()) {
+      return immediateFuture(state);
+    }
+    return future;
+  }
+
+  public LinkedBlockingQueue<Throwable> getFailureCauses() {
+    return failureCauses;
+  }
+
+  public void transitionToFlushing() {
+    instanceState.setIf(FLUSHING, currentState -> currentState == RUNNING);
+  }
+
+  public void finished() {
+    transitionToDoneState(FINISHED);
+  }
+
+  public void cancel() {
+    transitionToDoneState(CANCELLED);
+  }
+
+  public void abort() {
+    transitionToDoneState(ABORTED);
+  }
+
+  public void failed(Throwable cause) {
+    failureCauses.add(cause);
+    transitionToDoneState(FAILED);
+  }
+
+  private void transitionToDoneState(FragmentInstanceState doneState) {
+    requireNonNull(doneState, "doneState is null");
+    checkArgument(doneState.isDone(), "doneState %s is not a done state", doneState);
+
+    instanceState.setIf(doneState, currentState -> !currentState.isDone());
+  }
+
+  /**
+   * Listener is always notified asynchronously using a dedicated notification thread pool so, care should
+   * be taken to avoid leaking {@code this} when adding a listener in a constructor. Additionally, it is
+   * possible notifications are observed out of order due to the asynchronous execution.
+   */
+  public void addStateChangeListener(StateChangeListener<FragmentInstanceState> stateChangeListener) {
+    instanceState.addStateChangeListener(stateChangeListener);
+  }
+
+  public void addSourceTaskFailureListener(FragmentInstanceFailureListener listener) {
+    Map<FragmentInstanceId, Throwable> failures;
+    synchronized (this) {
+      sourceInstanceFailureListeners.add(listener);
+      failures = ImmutableMap.copyOf(sourceInstanceFailures);
+    }
+    executor.execute(() -> {
+      failures.forEach(listener::onTaskFailed);
+    });
+  }
+
+  public void sourceTaskFailed(FragmentInstanceId instanceId, Throwable failure) {
+    List<FragmentInstanceFailureListener> listeners;
+    synchronized (this) {
+      sourceInstanceFailures.putIfAbsent(instanceId, failure);
+      listeners = ImmutableList.copyOf(sourceInstanceFailureListeners);
+    }
+    executor.execute(() -> {
+      for (FragmentInstanceFailureListener listener : listeners) {
+        listener.onTaskFailed(instanceId, failure);
+      }
+    });
+  }
+
+  @Override
+  public String toString() {
+    return toStringHelper(this)
+        .add("FragmentInstanceId", instanceId)
+        .add("FragmentInstanceState", instanceState)
+        .add("failureCauses", failureCauses)
+        .toString();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
index 4020db4593..6c45eb529a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
@@ -18,155 +18,27 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
+import com.google.common.util.concurrent.SettableFuture;
 import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
-import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.operator.Operator;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import io.airlift.units.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.NotThreadSafe;
 
-import java.io.IOException;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
-import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
-
 @NotThreadSafe
-public class SchemaDriver implements Driver {
-
-  private static final Logger logger = LoggerFactory.getLogger(SchemaDriver.class);
-
-  private final Operator root;
-  private final ISinkHandle sinkHandle;
-  private final SchemaDriverContext driverContext;
-
-  private final AtomicReference<SettableFuture<Void>> driverBlockedFuture = new AtomicReference<>();
-
-  private boolean closed = false;
+public class SchemaDriver extends Driver {
 
   public SchemaDriver(Operator root, ISinkHandle sinkHandle, SchemaDriverContext driverContext) {
-    this.root = root;
-    this.sinkHandle = sinkHandle;
-    this.driverContext = driverContext;
-    // initially the driverBlockedFuture is not blocked (it is completed)
-    SettableFuture<Void> future = SettableFuture.create();
-    future.set(null);
-    driverBlockedFuture.set(future);
-  }
-
-  @Override
-  public boolean isFinished() {
-    try {
-      boolean isFinished = driverBlockedFuture.get().isDone() && root != null && root.isFinished();
-      if (isFinished) {
-        close();
-      }
-      return isFinished;
-    } catch (Throwable t) {
-      logger.error(
-          "Failed to query whether the schema driver {} is finished", driverContext.getId(), t);
-      driverContext.failed(t);
-      return true;
-    }
+    super(root, sinkHandle, driverContext);
   }
 
-  @Override
-  public ListenableFuture<Void> processFor(Duration duration) {
-    // if the driver is blocked we don't need to continue
-    SettableFuture<Void> blockedFuture = driverBlockedFuture.get();
-    if (!blockedFuture.isDone()) {
-      return blockedFuture;
-    }
-
-    long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
-
-    long start = System.nanoTime();
-    try {
-      do {
-        ListenableFuture<Void> future = processInternal();
-        if (!future.isDone()) {
-          return updateDriverBlockedFuture(future);
-        }
-      } while (System.nanoTime() - start < maxRuntime && !root.isFinished());
-    } catch (Throwable t) {
-      logger.error("Failed to execute fragment instance {}", driverContext.getId(), t);
-      driverContext.failed(t);
-      close();
-      blockedFuture.setException(t);
-      return blockedFuture;
-    }
-    return NOT_BLOCKED;
-  }
-
-  private ListenableFuture<Void> processInternal() throws IOException {
-    ListenableFuture<Void> blocked = root.isBlocked();
-    if (!blocked.isDone()) {
-      return blocked;
-    }
-    blocked = sinkHandle.isFull();
-    if (!blocked.isDone()) {
-      return blocked;
-    }
-    if (root.hasNext()) {
-      TsBlock tsBlock = root.next();
-      if (tsBlock != null && !tsBlock.isEmpty()) {
-        sinkHandle.send(Collections.singletonList(tsBlock));
-      }
-    }
-    return NOT_BLOCKED;
-  }
-
-  private ListenableFuture<Void> updateDriverBlockedFuture(
-      ListenableFuture<Void> sourceBlockedFuture) {
-    // driverBlockedFuture will be completed as soon as the sourceBlockedFuture is completed
-    // or any of the operators gets a memory revocation request
-    SettableFuture<Void> newDriverBlockedFuture = SettableFuture.create();
-    driverBlockedFuture.set(newDriverBlockedFuture);
-    sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), directExecutor());
-
-    // TODO Although we don't have memory management for operator now, we should consider it for
-    // future
-    // it's possible that memory revoking is requested for some operator
-    // before we update driverBlockedFuture above and we don't want to miss that
-    // notification, so we check to see whether that's the case before returning.
-
-    return newDriverBlockedFuture;
-  }
-
-  @Override
-  public FragmentInstanceId getInfo() {
-    return driverContext.getId();
-  }
 
   @Override
-  public void close() {
-    if (closed) {
-      return;
-    }
-    closed = true;
-    try {
-      if (root != null) {
-        root.close();
-      }
-      if (sinkHandle != null) {
-        sinkHandle.close();
-      }
-    } catch (Throwable t) {
-      logger.error("Failed to closed driver {}", driverContext.getId(), t);
-      driverContext.failed(t);
-    }
+  protected boolean init(SettableFuture<Void> blockedFuture) {
+    return true;
   }
 
   @Override
-  public void failed(Throwable t) {
-    driverContext.failed(t);
+  protected void releaseResource() {
+    // do nothing
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
index c8cccc0520..7305feee83 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
@@ -53,5 +53,5 @@ public interface Operator extends AutoCloseable {
   /**
    * Is this operator completely finished processing and no more output TsBlock will be produced.
    */
-  boolean isFinished() throws IOException;
+  boolean isFinished();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java
index 48d0c71185..cd3355d395 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java
@@ -217,7 +217,7 @@ public class TransformOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean isFinished() throws IOException {
+  public boolean isFinished() {
     return !hasNext();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaFetchOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaFetchOperator.java
index 003da76f2d..97da0a4935 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaFetchOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaFetchOperator.java
@@ -92,7 +92,7 @@ public class SchemaFetchOperator implements SourceOperator {
   }
 
   @Override
-  public boolean isFinished() throws IOException {
+  public boolean isFinished() {
     return isFinished;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java
index 722f63543d..397b7dc461 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java
@@ -65,7 +65,7 @@ public class ExchangeOperator implements SourceOperator {
   }
 
   @Override
-  public boolean isFinished() throws IOException {
+  public boolean isFinished() {
     return sourceHandle.isFinished();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
index e30b1f15bc..888f4ce5e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
@@ -18,10 +18,14 @@
  */
 package org.apache.iotdb.db.mpp.schedule.task;
 
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.Driver;
+import org.apache.iotdb.db.mpp.execution.DriverContext;
+import org.apache.iotdb.db.mpp.operator.Operator;
 import org.apache.iotdb.db.mpp.schedule.ExecutionContext;
 import org.apache.iotdb.db.mpp.schedule.FragmentInstanceTaskExecutor;
 import org.apache.iotdb.db.mpp.schedule.queue.ID;
@@ -175,17 +179,31 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
     }
   }
 
-  private static class StubFragmentInstance implements Driver {
+  private static class StubFragmentInstance extends Driver {
 
     private static final QueryId stubQueryId = new QueryId("stub_query");
     private static final FragmentInstanceId stubInstance =
         new FragmentInstanceId(new PlanFragmentId(stubQueryId, 0), "stub-instance");
 
+    public StubFragmentInstance() {
+      super(null, null, null);
+    }
+
     @Override
     public boolean isFinished() {
       return false;
     }
 
+    @Override
+    protected boolean init(SettableFuture<Void> blockedFuture) {
+      return true;
+    }
+
+    @Override
+    protected void releaseResource() {
+
+    }
+
     @Override
     public ListenableFuture<Void> processFor(Duration duration) {
       return null;
@@ -201,5 +219,10 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
 
     @Override
     public void failed(Throwable t) {}
+
+    @Override
+    public ISinkHandle getSinkHandle() {
+      return null;
+    }
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
index 2703d0aa37..db060ac405 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -55,6 +56,7 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.iotdb.db.mpp.schedule.FragmentInstanceTaskExecutor.EXECUTION_TIME_SLICE;
@@ -85,6 +87,7 @@ public class DataDriverTest {
 
   @Test
   public void batchTest() {
+    ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
       MeasurementPath measurementPath1 =
           new MeasurementPath(DATA_DRIVER_TEST_SG + ".device0.sensor0", TSDataType.INT32);
@@ -92,11 +95,10 @@ public class DataDriverTest {
       allSensors.add("sensor0");
       allSensors.add("sensor1");
       QueryId queryId = new QueryId("stub_query");
-      AtomicReference<FragmentInstanceState> state =
-          new AtomicReference<>(FragmentInstanceState.RUNNING);
+      FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
       FragmentInstanceContext fragmentInstanceContext =
-          new FragmentInstanceContext(
-              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+          new FragmentInstanceContext(instanceId, stateMachine);
       PlanNodeId planNodeId1 = new PlanNodeId("1");
       fragmentInstanceContext.addOperatorContext(
           1, planNodeId1, SeriesScanOperator.class.getSimpleName());
@@ -159,19 +161,20 @@ public class DataDriverTest {
               ImmutableList.of(seriesScanOperator1, seriesScanOperator2));
 
       StubSinkHandle sinkHandle = new StubSinkHandle(fragmentInstanceContext);
-
-      try (Driver dataDriver = new DataDriver(limitOperator, sinkHandle, driverContext)) {
+      Driver dataDriver = null;
+      try {
+        dataDriver = new DataDriver(limitOperator, sinkHandle, driverContext);
         assertEquals(fragmentInstanceContext.getId(), dataDriver.getInfo());
 
         assertFalse(dataDriver.isFinished());
 
         while (!dataDriver.isFinished()) {
-          assertEquals(FragmentInstanceState.RUNNING, state.get());
+          assertEquals(FragmentInstanceState.RUNNING, stateMachine.getState());
           ListenableFuture<Void> blocked = dataDriver.processFor(EXECUTION_TIME_SLICE);
           assertTrue(blocked.isDone());
         }
 
-        assertEquals(FragmentInstanceState.FLUSHING, state.get());
+        assertEquals(FragmentInstanceState.FLUSHING, stateMachine.getState());
 
         List<TsBlock> result = sinkHandle.getTsBlocks();
         assertEquals(13, result.size());
@@ -204,10 +207,16 @@ public class DataDriverTest {
             }
           }
         }
+      } finally {
+        if (dataDriver != null) {
+          dataDriver.close();
+        }
       }
     } catch (IllegalPathException | QueryProcessException e) {
       e.printStackTrace();
       fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
     }
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
index 6ad74bd110..ab0849ddf2 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.operator;
 
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -28,6 +29,7 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.operator.process.LimitOperator;
 import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator;
 import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
@@ -50,6 +52,7 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
@@ -78,6 +81,7 @@ public class LimitOperatorTest {
 
   @Test
   public void batchTest() {
+    ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
       MeasurementPath measurementPath1 =
           new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
@@ -85,11 +89,10 @@ public class LimitOperatorTest {
       allSensors.add("sensor0");
       allSensors.add("sensor1");
       QueryId queryId = new QueryId("stub_query");
-      AtomicReference<FragmentInstanceState> state =
-          new AtomicReference<>(FragmentInstanceState.RUNNING);
+      FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
       FragmentInstanceContext fragmentInstanceContext =
-          new FragmentInstanceContext(
-              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+          new FragmentInstanceContext(instanceId, stateMachine);
       PlanNodeId planNodeId1 = new PlanNodeId("1");
       fragmentInstanceContext.addOperatorContext(
           1, planNodeId1, SeriesScanOperator.class.getSimpleName());
@@ -169,6 +172,8 @@ public class LimitOperatorTest {
     } catch (IllegalPathException e) {
       e.printStackTrace();
       fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
     }
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
index 3793504970..dd28a90263 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.operator;
 
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -28,6 +29,7 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
@@ -46,6 +48,7 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
@@ -73,17 +76,17 @@ public class SeriesScanOperatorTest {
 
   @Test
   public void batchTest() {
+    ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
       MeasurementPath measurementPath =
           new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
       Set<String> allSensors = new HashSet<>();
       allSensors.add("sensor0");
       QueryId queryId = new QueryId("stub_query");
-      AtomicReference<FragmentInstanceState> state =
-          new AtomicReference<>(FragmentInstanceState.RUNNING);
+      FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
       FragmentInstanceContext fragmentInstanceContext =
-          new FragmentInstanceContext(
-              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+          new FragmentInstanceContext(instanceId, stateMachine);
       PlanNodeId planNodeId = new PlanNodeId("1");
       fragmentInstanceContext.addOperatorContext(
           1, planNodeId, SeriesScanOperator.class.getSimpleName());
@@ -123,6 +126,8 @@ public class SeriesScanOperatorTest {
     } catch (IllegalPathException e) {
       e.printStackTrace();
       fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
     }
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
index 5534418b84..70fe4def02 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.operator;
 
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -28,6 +29,7 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator;
 import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -49,6 +51,7 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.*;
@@ -74,6 +77,7 @@ public class TimeJoinOperatorTest {
 
   @Test
   public void batchTest() {
+    ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
       MeasurementPath measurementPath1 =
           new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
@@ -81,11 +85,10 @@ public class TimeJoinOperatorTest {
       allSensors.add("sensor0");
       allSensors.add("sensor1");
       QueryId queryId = new QueryId("stub_query");
-      AtomicReference<FragmentInstanceState> state =
-          new AtomicReference<>(FragmentInstanceState.RUNNING);
+      FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
       FragmentInstanceContext fragmentInstanceContext =
-          new FragmentInstanceContext(
-              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+          new FragmentInstanceContext(instanceId, stateMachine);
       PlanNodeId planNodeId1 = new PlanNodeId("1");
       fragmentInstanceContext.addOperatorContext(
           1, planNodeId1, SeriesScanOperator.class.getSimpleName());
@@ -155,6 +158,8 @@ public class TimeJoinOperatorTest {
     } catch (IllegalPathException e) {
       e.printStackTrace();
       fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
     }
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
index 609dc9befa..320be3be21 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.operator.schema;
 
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.LocalConfigNode;
@@ -29,6 +30,7 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.execution.SchemaDriverContext;
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -52,6 +54,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_ATTRIBUTES;
@@ -90,13 +93,13 @@ public class SchemaScanOperatorTest {
 
   @Test
   public void testDeviceMetaScanOperator() {
+    ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
       QueryId queryId = new QueryId("stub_query");
-      AtomicReference<FragmentInstanceState> state =
-          new AtomicReference<>(FragmentInstanceState.RUNNING);
+      FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
       FragmentInstanceContext fragmentInstanceContext =
-          new FragmentInstanceContext(
-              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+          new FragmentInstanceContext(instanceId, stateMachine);
       PlanNodeId planNodeId = queryId.genPlanNodeId();
       OperatorContext operatorContext =
           fragmentInstanceContext.addOperatorContext(
@@ -149,18 +152,20 @@ public class SchemaScanOperatorTest {
     } catch (MetadataException e) {
       e.printStackTrace();
       fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
     }
   }
 
   @Test
   public void testTimeSeriesMetaScanOperator() {
+    ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
       QueryId queryId = new QueryId("stub_query");
-      AtomicReference<FragmentInstanceState> state =
-          new AtomicReference<>(FragmentInstanceState.RUNNING);
+      FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
       FragmentInstanceContext fragmentInstanceContext =
-          new FragmentInstanceContext(
-              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+          new FragmentInstanceContext(instanceId, stateMachine);
       PlanNodeId planNodeId = queryId.genPlanNodeId();
       OperatorContext operatorContext =
           fragmentInstanceContext.addOperatorContext(
@@ -237,6 +242,8 @@ public class SchemaScanOperatorTest {
     } catch (MetadataException e) {
       e.printStackTrace();
       fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
     }
   }
 }