You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/20 09:55:12 UTC

[iotdb] branch stable-mpp created (now 86690f209e)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch stable-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 86690f209e Add FragmentInstanceStateMachine

This branch includes the following new commits:

     new fb52130d2b Add FragmentInstanceStateMachine
     new 86690f209e Add FragmentInstanceStateMachine

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 02/02: Add FragmentInstanceStateMachine

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch stable-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 86690f209e67a5f7270c7e3018aa32042b4a113a
Merge: fb52130d2b 67dafed0e6
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Wed Apr 20 17:54:57 2022 +0800

    Add FragmentInstanceStateMachine

 .../iotdb/cluster/coordinator/Coordinator.java     |    6 +-
 .../log/manage/PartitionedSnapshotLogManager.java  |    2 +-
 .../iotdb/cluster/metadata/CSchemaProcessor.java   |   37 -
 .../apache/iotdb/cluster/metadata/MetaPuller.java  |    3 +-
 .../iotdb/cluster/query/LocalQueryExecutor.java    |    8 +-
 .../cluster/server/member/DataGroupMember.java     |   11 +-
 .../cluster/server/member/MetaGroupMember.java     |    6 +-
 .../cluster/server/service/BaseAsyncService.java   |    4 +-
 .../apache/iotdb/cluster/utils/StatusUtils.java    |    6 +-
 .../FilePartitionedSnapshotLogManagerTest.java     |    2 +
 .../cluster/server/member/DataGroupMemberTest.java |    2 +
 .../cluster/server/member/MetaGroupMemberTest.java |    5 +-
 .../consensus/response/DataNodesInfoDataSet.java   |    4 +-
 .../statemachine/PartitionRegionStateMachine.java  |   17 +
 .../confignode/consensus/RatisConsensusDemo.java   |    6 +-
 .../manager/ConfigManagerManualTest.java           |    6 +-
 .../server/ConfigNodeRPCServerProcessorTest.java   |   26 +-
 .../iotdb/consensus/common/SnapshotMeta.java       |   40 +-
 .../iotdb/consensus/ratis/RatisConsensus.java      |    4 +-
 .../consensus/standalone/StandAloneServerImpl.java |   18 +
 .../consensus/statemachine/EmptyStateMachine.java  |   18 +
 .../consensus/statemachine/IStateMachine.java      |   49 +
 .../iotdb/consensus/ratis/RatisConsensusTest.java  |   15 +
 .../standalone/StandAloneConsensusTest.java        |   15 +
 distribution/src/assembly/all.xml                  |    4 +
 distribution/src/assembly/server.xml               |    4 +
 docs/UserGuide/Maintenance-Tools/Metric-Tool.md    |   12 +-
 .../Maintenance-Tools/SchemaFileSketch-Tool.md     |   38 +
 .../Ecosystem Integration/Grafana Plugin.md        |  143 +-
 docs/zh/UserGuide/Maintenance-Tools/Metric-Tool.md |   12 +-
 .../Maintenance-Tools/SchemaFileSketch-Tool.md     |   35 +
 .../Apache IoTDB Dashboard v0.13.1.json            | 1527 ++++++++++++++++++++
 .../Apache IoTDB Dashboard v0.14.0.json            | 1527 ++++++++++++++++++++
 .../iotdb/influxdb/protocol/dto/SessionPoint.java  |    8 +-
 .../iotdb/influxdb/session/InfluxDBSession.java    |    6 +-
 .../influxdb/integration/IoTDBInfluxDBIT.java      |    4 +-
 .../iotdb/commons/partition/DataPartition.java     |   17 +-
 .../{PartitionInfo.java => Partition.java}         |   27 +-
 .../iotdb/commons/partition/RegionReplicaSet.java  |    6 +-
 .../iotdb/commons/partition/SchemaPartition.java   |   17 +-
 .../SchemaFileSketcher.bat}                        |    4 +-
 .../mLogParser.sh => schema/SchemaFileSketcher.sh} |    4 +-
 .../tools/{mlog => schema}/mLogParser.bat          |    2 +-
 .../resources/tools/{mlog => schema}/mLogParser.sh |    0
 .../org/apache/iotdb/db/conf/IoTDBConfigCheck.java |    8 +
 .../statemachine/DataRegionStateMachine.java       |   17 +
 .../statemachine/SchemaRegionStateMachine.java     |   22 +-
 .../engine/compaction/CompactionTaskManager.java   |    6 +-
 .../db/engine/storagegroup/TsFileProcessor.java    |  214 ++-
 .../apache/iotdb/db/metadata/LocalConfigNode.java  |    2 +-
 .../db/metadata/LocalSchemaPartitionTable.java     |    6 +
 .../iotdb/db/metadata/LocalSchemaProcessor.java    |   39 -
 .../iotdb/db/metadata/mtree/IMTreeBelowSG.java     |  307 ++++
 ...reeBelowSG.java => MTreeBelowSGCachedImpl.java} |  134 +-
 ...reeBelowSG.java => MTreeBelowSGMemoryImpl.java} |  909 +++++-------
 .../mtree/store/disk/MTreeFlushTaskManager.java    |    2 +-
 .../mtree/store/disk/MTreeReleaseTaskManager.java  |    2 +-
 .../mtree/store/disk/schemafile/ISegment.java      |    2 +
 .../mtree/store/disk/schemafile/RecordUtils.java   |   24 +-
 .../mtree/store/disk/schemafile/SchemaFile.java    |   37 +-
 .../mtree/store/disk/schemafile/SchemaPage.java    |    9 +-
 .../mtree/store/disk/schemafile/Segment.java       |   51 +
 .../db/metadata/schemaregion/ISchemaRegion.java    |  247 +++-
 .../db/metadata/schemaregion/SchemaEngine.java     |    5 +-
 ...hemaRegion.java => SchemaRegionMemoryImpl.java} |  600 +++-----
 ...Region.java => SchemaRegionSchemaFileImpl.java} |   93 +-
 .../schemaregion/rocksdb/RSchemaRegion.java        |   81 +-
 .../SchemaExecutionVisitor.java}                   |   59 +-
 .../iotdb/db/mpp/buffer/DataBlockManager.java      |   26 +-
 .../apache/iotdb/db/mpp/buffer/ISinkHandle.java    |   14 +-
 .../org/apache/iotdb/db/mpp/buffer/SinkHandle.java |   37 +-
 .../apache/iotdb/db/mpp/buffer/SourceHandle.java   |   27 +-
 .../apache/iotdb/db/mpp/buffer/StubSinkHandle.java |   13 +-
 .../apache/iotdb/db/mpp/execution/Coordinator.java |   16 +-
 .../apache/iotdb/db/mpp/execution/DataDriver.java  |   19 +-
 .../iotdb/db/mpp/execution/DataDriverContext.java  |    1 +
 .../org/apache/iotdb/db/mpp/execution/Driver.java  |  112 +-
 .../iotdb/db/mpp/execution/DriverContext.java      |    2 -
 .../db/mpp/execution/FragmentInstanceContext.java  |   12 +-
 .../mpp/execution/FragmentInstanceExecution.java   |   79 +-
 .../db/mpp/execution/FragmentInstanceManager.java  |   34 +-
 .../execution/FragmentInstanceStateMachine.java    |   49 +-
 .../iotdb/db/mpp/execution/IQueryExecution.java    |    2 +
 .../iotdb/db/mpp/execution/QueryExecution.java     |   43 +-
 .../iotdb/db/mpp/execution/QueryStateMachine.java  |   14 +-
 .../iotdb/db/mpp/execution/SchemaDriver.java       |    5 +-
 .../db/mpp/execution/SchemaDriverContext.java      |    1 +
 .../db/mpp/execution/config/ConfigExecution.java   |   59 +-
 .../config/ConfigTaskVisitor.java}                 |   24 +-
 .../iotdb/db/mpp/execution/config/IConfigTask.java |    2 +-
 .../mpp/execution/scheduler/ClusterScheduler.java  |   32 +-
 .../mpp/execution/scheduler/IQueryTerminator.java  |    4 +-
 .../scheduler/SimpleFragInstanceDispatcher.java    |   51 +-
 .../execution/scheduler/SimpleQueryTerminator.java |   50 +-
 .../org/apache/iotdb/db/mpp/operator/Operator.java |    2 -
 .../mpp/operator/schema/SchemaFetchOperator.java   |    1 -
 .../source/SeriesAggregateScanOperator.java        |  420 +++++-
 .../db/mpp/operator/source/SeriesScanOperator.java |    2 +-
 .../db/mpp/operator/source/SeriesScanUtil.java     |   10 +-
 .../FragmentInstanceAbortedException.java}         |   28 +-
 .../db/mpp/schedule/FragmentInstanceScheduler.java |   15 +-
 .../mpp/schedule/FragmentInstanceTaskExecutor.java |    1 +
 .../schedule/FragmentInstanceTimeoutSentinel.java  |    1 +
 .../db/mpp/schedule/queue/L1PriorityQueue.java     |   37 +-
 .../db/mpp/schedule/queue/L2PriorityQueue.java     |   66 +-
 .../db/mpp/schedule/task/FragmentInstanceTask.java |   18 +-
 .../mpp/schedule/task/FragmentInstanceTaskID.java  |   10 +-
 .../db/mpp/sql/analyze/ClusterSchemaFetcher.java   |    4 +-
 .../db/mpp/sql/planner/DistributionPlanner.java    |   20 +
 .../db/mpp/sql/planner/LocalExecutionPlanner.java  |   28 +-
 .../iotdb/db/mpp/sql/planner/QueryPlanBuilder.java |    5 +
 .../db/mpp/sql/planner/plan/FragmentInstance.java  |    1 +
 .../db/mpp/sql/planner/plan/node/PlanVisitor.java  |   13 +-
 .../node/metedata/write/AlterTimeSeriesNode.java   |   41 +-
 .../node/metedata/write/CreateTimeSeriesNode.java  |    1 +
 .../plan/node/source/SeriesAggregateScanNode.java  |   45 +-
 .../statement/ConfigStatement.java}                |   10 +-
 .../db/mpp/sql/statement/StatementVisitor.java     |    5 +
 .../statement/component/GroupByTimeComponent.java  |   73 +
 .../metadata/SetStorageGroupStatement.java         |   10 +-
 .../db/protocol/influxdb/handler/QueryHandler.java |    4 +-
 .../influxdb/util/JacksonUtils.java}               |   36 +-
 .../db/protocol/influxdb/util/StringUtils.java     |    3 +-
 .../iotdb/db/query/control/SessionManager.java     |    7 +-
 .../db/query/dataset/AlignByDeviceDataSet.java     |    4 +-
 .../query/dataset/groupby/GroupByFillDataSet.java  |    8 +-
 .../query/dataset/groupby/GroupByTimeDataSet.java  |   30 +-
 .../dataset/groupby/GroupByTimeEngineDataSet.java  |   24 +-
 .../groupby/GroupByWithValueFilterDataSet.java     |    6 +-
 .../groupby/GroupByWithoutValueFilterDataSet.java  |    9 +-
 .../iotdb/db/query/reader/series/SeriesReader.java |    3 +
 .../java/org/apache/iotdb/db/service/DataNode.java |    4 +-
 .../java/org/apache/iotdb/db/service/IoTDB.java    |    2 +-
 .../thrift/impl/DataNodeTSIServiceImpl.java        |   11 +-
 .../service/thrift/impl/InternalServiceImpl.java   |    6 +-
 .../db/service/thrift/impl/TSServiceImpl.java      |   47 +-
 .../db/sync/sender/manager/SchemaSyncManager.java  |    6 +-
 .../db/tools/{mlog => schema}/MLogParser.java      |    2 +-
 .../db/tools/schema/SchemaFileSketchTool.java      |  165 +++
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   |    2 +-
 .../timerangeiterator/AggrWindowIterator.java      |   38 +-
 .../timerangeiterator/ITimeRangeIterator.java      |    8 +-
 .../timerangeiterator/PreAggrWindowIterator.java   |   38 +-
 .../PreAggrWindowWithNaturalMonthIterator.java     |   50 +-
 .../SingleTimeWindowIterator.java                  |   65 +
 .../org/apache/iotdb/db/wal/buffer/WALBuffer.java  |  143 +-
 .../java/org/apache/iotdb/db/wal/node/WALNode.java |    6 +-
 .../db/engine/storagegroup/DataRegionTest.java     |  910 ++++++++++++
 .../engine/storagegroup/TsFileProcessorTest.java   |   17 +
 ...ocessorTest.java => TsFileProcessorV2Test.java} |  104 +-
 .../iotdb/db/metadata/mtree/MTreeBelowSGTest.java  |   18 +-
 .../iotdb/db/mpp/buffer/SourceHandleTest.java      |    6 +-
 .../db/mpp/execution/ConfigExecutionTest.java      |  130 ++
 .../iotdb/db/mpp/execution/DataDriverTest.java     |   10 +-
 .../iotdb/db/mpp/operator/LimitOperatorTest.java   |   11 +-
 .../operator/SeriesAggregateScanOperatorTest.java  |  373 +++++
 .../db/mpp/operator/SeriesScanOperatorTest.java    |   17 +-
 .../db/mpp/operator/TimeJoinOperatorTest.java      |   11 +-
 .../operator/schema/SchemaScanOperatorTest.java    |   20 +-
 .../db/mpp/schedule/DefaultTaskSchedulerTest.java  |   18 +
 .../schedule/FragmentInstanceSchedulerTest.java    |   20 +
 .../FragmentInstanceTimeoutSentinelTest.java       |   55 +-
 .../db/mpp/schedule/queue/L1PriorityQueueTest.java |   22 +
 .../db/mpp/schedule/queue/L2PriorityQueueTest.java |   27 +
 .../db/mpp/sql/plan/DistributionPlannerTest.java   |   39 +
 .../db/mpp/sql/plan/QueryLogicalPlanUtil.java      |    8 +
 .../source/SeriesAggregateScanNodeSerdeTest.java   |    7 +-
 .../dataset/groupby/GroupByTimeDataSetTest.java    |   74 +-
 .../query/reader/series/SeriesReaderTestUtil.java  |    8 +
 .../iotdb/db/service/InternalServiceImplTest.java  |  117 +-
 .../org/apache/iotdb/db/tools/MLogParserTest.java  |    2 +-
 .../iotdb/db/tools/SchemaFileSketchTest.java       |  158 ++
 .../iotdb/db/utils/TimeRangeIteratorTest.java      |  230 ++-
 .../db/wal/recover/WALRecoverManagerTest.java      |    6 +-
 .../org/apache/iotdb/rpc/RedirectException.java    |   14 +-
 .../main/java/org/apache/iotdb/rpc/RpcUtils.java   |    4 +-
 .../java/org/apache/iotdb/session/Session.java     |   52 +-
 .../apache/iotdb/session/SessionConnection.java    |   16 +-
 .../apache/iotdb/session/util/SessionUtils.java    |   12 +-
 .../apache/iotdb/session/SessionCacheLeaderUT.java |   28 +-
 spark-iotdb-connector/pom.xml                      |    2 +-
 .../src/main/thrift/confignode.thrift              |    4 +-
 thrift/src/main/thrift/common.thrift               |    6 +-
 .../iotdb/tsfile/read/common/block/TsBlock.java    |   22 +-
 .../tsfile/read/common/block/TsBlockBuilder.java   |    5 +
 .../common/block/column/BinaryColumnBuilder.java   |   11 +
 .../common/block/column/BooleanColumnBuilder.java  |   11 +
 .../read/common/block/column/ColumnBuilder.java    |    5 +
 .../common/block/column/DoubleColumnBuilder.java   |   11 +
 .../common/block/column/FloatColumnBuilder.java    |   11 +
 .../read/common/block/column/IntColumnBuilder.java |   11 +
 .../common/block/column/LongColumnBuilder.java     |   11 +
 .../read/common/block/column/TimeColumn.java       |    4 +
 .../common/block/column/TimeColumnBuilder.java     |   11 +
 zeppelin-interpreter/pom.xml                       |    1 -
 195 files changed, 9032 insertions(+), 2543 deletions(-)

diff --cc server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
index ab7daa5289,573771fe81..7c3f24509d
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
@@@ -61,6 -61,6 +61,8 @@@ public class DataBlockManager implement
      void onClosed(SinkHandle sinkHandle);
  
      void onAborted(SinkHandle sinkHandle);
++
++    void onFailed(Throwable t);
    }
  
    /** Handle thrift communications. */
@@@ -166,6 -166,6 +168,7 @@@
  
    /** Listen to the state changes of a source handle. */
    class SourceHandleListenerImpl implements SourceHandleListener {
++
      @Override
      public void onFinished(SourceHandle sourceHandle) {
        logger.info("Release resources of finished source handle {}", sourceHandle);
@@@ -206,12 -208,12 +211,12 @@@
          logger.info("Resources of finished sink handle {} has already been released", sinkHandle);
        }
        sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId());
--      context.finish();
++      context.finished();
      }
  
      @Override
      public void onClosed(SinkHandle sinkHandle) {
--      context.flushing();
++      context.transitionToFlushing();
      }
  
      @Override
@@@ -222,6 -224,6 +227,11 @@@
        }
        sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId());
      }
++
++    @Override
++    public void onFailed(Throwable t) {
++      context.failed(t);
++    }
    }
  
    private final LocalMemoryManager localMemoryManager;
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
index 1d037678ed,6300c5beef..05d12e4dfa
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
@@@ -24,9 -24,8 +24,8 @@@ import com.google.common.util.concurren
  
  import java.io.IOException;
  import java.util.List;
- import java.util.Optional;
  
 -public interface ISinkHandle extends AutoCloseable {
 +public interface ISinkHandle {
  
    /** Get the total amount of memory used by buffered tsblocks. */
    long getBufferRetainedSizeInBytes();
@@@ -71,7 -70,8 +70,7 @@@
     * downstream instances. A {@link RuntimeException} will be thrown if any exception happened
     * during the data transmission.
     */
 -  @Override
--  void close() throws IOException;
++  void close();
  
    /** Abort the sink handle, discarding all tsblocks which may still be in memory buffer. */
    void abort();
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index b2fd70459d,a72dcb9cb6..c6e3e64739
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@@ -74,7 -73,7 +73,6 @@@ public class SinkHandle implements ISin
    private long bufferRetainedSizeInBytes;
    private boolean closed;
    private boolean noMoreTsBlocks;
--  private Throwable throwable;
  
    public SinkHandle(
        String remoteHostname,
@@@ -191,11 -193,11 +189,8 @@@
    }
  
    @Override
--  public void close() throws IOException {
++  public void close() {
      logger.info("Sink handle {} is being closed.", this);
--    if (throwable != null) {
--      throw new IOException(throwable);
--    }
      if (closed) {
        return;
      }
@@@ -207,7 -209,7 +202,7 @@@
      try {
        sendEndOfDataBlockEvent();
      } catch (TException e) {
--      throw new IOException(e);
++      throw new RuntimeException("Send EndOfDataBlockEvent failed", e);
      }
      logger.info("Sink handle {} is closed.", this);
    }
@@@ -249,7 -243,7 +236,7 @@@
  
    @Override
    public boolean isFinished() {
--    return throwable == null && noMoreTsBlocks && sequenceIdToTsBlock.isEmpty();
++    return noMoreTsBlocks && sequenceIdToTsBlock.isEmpty();
    }
  
    @Override
@@@ -364,7 -358,7 +351,7 @@@
          try {
            client.onNewDataBlockEvent(newDataBlockEvent);
            break;
--        } catch (TException e) {
++        } catch (Throwable e) {
            logger.error(
                "Failed to send new data block event to plan node {} of {} due to {}, attempt times: {}",
                remotePlanNodeId,
@@@ -373,9 -367,9 +360,7 @@@
                attempt,
                e);
            if (attempt == MAX_ATTEMPT_TIMES) {
--            synchronized (this) {
--              throwable = e;
--            }
++            sinkHandleListener.onFailed(e);
            }
          }
        }
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
index c4bc0e377a,9f3b9240c2..49a13eddd7
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
@@@ -88,7 -87,7 +87,7 @@@ public class StubSinkHandle implements 
        return;
      }
      closed = true;
--    instanceContext.flushing();
++    instanceContext.transitionToFlushing();
    }
  
    @Override
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
index 2921c9d889,5ae66b14d7..e6e8e8083a
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
@@@ -18,7 -18,7 +18,6 @@@
   */
  package org.apache.iotdb.db.mpp.execution;
  
- import com.google.common.util.concurrent.SettableFuture;
 -import org.apache.iotdb.commons.exception.IoTDBException;
  import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
  import org.apache.iotdb.db.engine.storagegroup.DataRegion;
  import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@@ -29,30 -29,81 +28,35 @@@ import org.apache.iotdb.db.mpp.buffer.I
  import org.apache.iotdb.db.mpp.operator.Operator;
  import org.apache.iotdb.db.mpp.operator.source.DataSourceOperator;
  import org.apache.iotdb.db.query.control.FileReaderManager;
 -import org.apache.iotdb.tsfile.read.common.block.TsBlock;
  
 -import com.google.common.util.concurrent.ListenableFuture;
+ import com.google.common.util.concurrent.SettableFuture;
 -import io.airlift.units.Duration;
 -import org.slf4j.Logger;
 -import org.slf4j.LoggerFactory;
+ 
  import javax.annotation.concurrent.NotThreadSafe;
+ 
 -import java.io.IOException;
 -import java.util.Collections;
  import java.util.HashSet;
  import java.util.Iterator;
  import java.util.List;
  import java.util.Set;
 -import java.util.concurrent.TimeUnit;
 -import java.util.concurrent.atomic.AtomicReference;
  import java.util.stream.Collectors;
  
 -import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 -import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
--
+ /**
+  * One dataDriver is responsible for one FragmentInstance which is for data query, which may
+  * contains several series.
+  */
  @NotThreadSafe
 -public class DataDriver implements Driver {
 -
 -  private static final Logger logger = LoggerFactory.getLogger(DataDriver.class);
 -
 -  private final Operator root;
 -  private final ISinkHandle sinkHandle;
 -  private final DataDriverContext driverContext;
 +public class DataDriver extends Driver {
  
    private boolean init;
 -  private boolean closed;
  
    /** closed tsfile used in this fragment instance */
    private Set<TsFileResource> closedFilePaths;
    /** unClosed tsfile used in this fragment instance */
    private Set<TsFileResource> unClosedFilePaths;
  
 -  private final AtomicReference<SettableFuture<Void>> driverBlockedFuture = new AtomicReference<>();
--
    public DataDriver(Operator root, ISinkHandle sinkHandle, DataDriverContext driverContext) {
 -    this.root = root;
 -    this.sinkHandle = sinkHandle;
 -    this.driverContext = driverContext;
 +    super(root, sinkHandle, driverContext);
      this.closedFilePaths = new HashSet<>();
      this.unClosedFilePaths = new HashSet<>();
 -    // initially the driverBlockedFuture is not blocked (it is completed)
 -    SettableFuture<Void> future = SettableFuture.create();
 -    future.set(null);
 -    driverBlockedFuture.set(future);
 -  }
 -
 -  @Override
 -  public boolean isFinished() {
 -    try {
 -      boolean isFinished =
 -          closed || (driverBlockedFuture.get().isDone() && root != null && root.isFinished());
 -      if (isFinished) {
 -        close();
 -      }
 -      return isFinished;
 -    } catch (Throwable t) {
 -      logger.error(
 -          "Failed to query whether the data driver {} is finished", driverContext.getId(), t);
 -      driverContext.failed(t);
 -      close();
 -      return true;
 -    }
    }
  
    @Override
@@@ -61,39 -117,77 +65,39 @@@
        try {
          initialize();
        } catch (Throwable t) {
 -        logger.error(
 +        LOGGER.error(
              "Failed to do the initialization for fragment instance {} ", driverContext.getId(), t);
          driverContext.failed(t);
 -        close();
          blockedFuture.setException(t);
 -        return blockedFuture;
 +        return false;
        }
      }
 -
 -    // if the driver is blocked we don't need to continue
 -    if (!blockedFuture.isDone()) {
 -      return blockedFuture;
 -    }
 -
 -    long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
 -
 -    long start = System.nanoTime();
 -    try {
 -      do {
 -        ListenableFuture<Void> future = processInternal();
 -        if (!future.isDone()) {
 -          return updateDriverBlockedFuture(future);
 -        }
 -      } while (System.nanoTime() - start < maxRuntime && !root.isFinished());
 -    } catch (Throwable t) {
 -      logger.error("Failed to execute fragment instance {}", driverContext.getId(), t);
 -      driverContext.failed(t);
 -      close();
 -      blockedFuture.setException(t);
 -      return blockedFuture;
 -    }
 -    return NOT_BLOCKED;
 -  }
 -
 -  @Override
 -  public FragmentInstanceId getInfo() {
 -    return driverContext.getId();
 +    return true;
    }
  
 +  /**
 +   * All file paths used by this fragment instance must be cleared and thus the usage reference must
 +   * be decreased.
 +   */
    @Override
 -  public void close() {
 -    if (closed) {
 -      return;
 +  protected void releaseResource() {
 +    for (TsFileResource tsFile : closedFilePaths) {
 +      FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true);
      }
 -    closed = true;
 -    try {
 -      if (root != null) {
 -        root.close();
 -      }
 -      if (sinkHandle != null) {
 -        sinkHandle.close();
 -      }
 -    } catch (Throwable t) {
 -      logger.error("Failed to closed driver {}", driverContext.getId(), t);
 -      driverContext.failed(t);
 -    } finally {
 -      removeUsedFilesForQuery();
 +    closedFilePaths = null;
 +    for (TsFileResource tsFile : unClosedFilePaths) {
 +      FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true);
      }
 -  }
 -
 -  @Override
 -  public void failed(Throwable t) {
 -    driverContext.failed(t);
 +    unClosedFilePaths = null;
    }
  
- 
    /**
     * init seq file list and unseq file list in QueryDataSource and set it into each SourceNode TODO
     * we should change all the blocked lock operation into tryLock
     */
    private void initialize() throws QueryProcessException {
-     List<DataSourceOperator> sourceOperators = ((DataDriverContext)driverContext).getSourceOperators();
 -    List<DataSourceOperator> sourceOperators = driverContext.getSourceOperators();
++    List<DataSourceOperator> sourceOperators =
++        ((DataDriverContext) driverContext).getSourceOperators();
      if (sourceOperators != null && !sourceOperators.isEmpty()) {
        QueryDataSource dataSource = initQueryDataSourceCache();
        sourceOperators.forEach(
@@@ -121,9 -214,9 +125,7 @@@
      dataRegion.readLock();
      try {
        List<PartialPath> pathList =
-           context.getPaths().stream()
 -          driverContext.getPaths().stream()
--              .map(IDTable::translateQueryPath)
--              .collect(Collectors.toList());
++          context.getPaths().stream().map(IDTable::translateQueryPath).collect(Collectors.toList());
        // when all the selected series are under the same device, the QueryDataSource will be
        // filtered according to timeIndex
        Set<String> selectedDeviceIdSet =
@@@ -188,5 -296,39 +190,4 @@@
        FileReaderManager.getInstance().increaseFileReaderReference(tsFile, isClosed);
      }
    }
 -
 -  private ListenableFuture<Void> processInternal() throws IOException, IoTDBException {
 -    ListenableFuture<Void> blocked = root.isBlocked();
 -    if (!blocked.isDone()) {
 -      return blocked;
 -    }
 -    blocked = sinkHandle.isFull();
 -    if (!blocked.isDone()) {
 -      return blocked;
 -    }
 -    if (root.hasNext()) {
 -      TsBlock tsBlock = root.next();
 -      if (tsBlock != null && !tsBlock.isEmpty()) {
 -        sinkHandle.send(Collections.singletonList(tsBlock));
 -      }
 -    }
 -    return NOT_BLOCKED;
 -  }
 -
 -  private ListenableFuture<Void> updateDriverBlockedFuture(
 -      ListenableFuture<Void> sourceBlockedFuture) {
 -    // driverBlockedFuture will be completed as soon as the sourceBlockedFuture is completed
 -    // or any of the operators gets a memory revocation request
 -    SettableFuture<Void> newDriverBlockedFuture = SettableFuture.create();
 -    driverBlockedFuture.set(newDriverBlockedFuture);
 -    sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), directExecutor());
 -
 -    // TODO Although we don't have memory management for operator now, we should consider it for
 -    // future
 -    // it's possible that memory revoking is requested for some operator
 -    // before we update driverBlockedFuture above and we don't want to miss that
 -    // notification, so we check to see whether that's the case before returning.
--
 -    return newDriverBlockedFuture;
 -  }
  }
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
index fd2fef8b3a,f211ce593c..8687627055
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
@@@ -18,88 -18,25 +18,90 @@@
   */
  package org.apache.iotdb.db.mpp.execution;
  
- import com.google.common.collect.ImmutableList;
- import com.google.common.util.concurrent.ListenableFuture;
- import com.google.common.util.concurrent.SettableFuture;
- import io.airlift.units.Duration;
 +import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
  import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 +import org.apache.iotdb.db.mpp.operator.Operator;
 +import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+ 
++import com.google.common.collect.ImmutableList;
+ import com.google.common.util.concurrent.ListenableFuture;
++import com.google.common.util.concurrent.SettableFuture;
+ import io.airlift.units.Duration;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
  
 -import java.io.Closeable;
 +import javax.annotation.concurrent.GuardedBy;
++
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.Optional;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicReference;
 +import java.util.concurrent.locks.ReentrantLock;
 +import java.util.function.Supplier;
 +
 +import static com.google.common.base.Preconditions.checkState;
 +import static com.google.common.base.Throwables.throwIfUnchecked;
 +import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 +import static java.lang.Boolean.TRUE;
 +import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
  
  /**
   * Driver encapsulates some methods which are necessary for execution scheduler to run a fragment
   * instance
   */
 -public interface Driver extends Closeable {
 +public abstract class Driver {
 +
 +  protected static final Logger LOGGER = LoggerFactory.getLogger(Driver.class);
 +
- 
 +  protected final Operator root;
 +  protected final ISinkHandle sinkHandle;
 +  protected final DriverContext driverContext;
-   protected final AtomicReference<SettableFuture<Void>> driverBlockedFuture = new AtomicReference<>();
++  protected final AtomicReference<SettableFuture<Void>> driverBlockedFuture =
++      new AtomicReference<>();
 +  protected final AtomicReference<State> state = new AtomicReference<>(State.ALIVE);
 +
 +  protected final DriverLock exclusiveLock = new DriverLock();
 +
 +  protected enum State {
-     ALIVE, NEED_DESTRUCTION, DESTROYED
++    ALIVE,
++    NEED_DESTRUCTION,
++    DESTROYED
 +  }
 +
 +  public Driver(Operator root, ISinkHandle sinkHandle, DriverContext driverContext) {
 +    this.root = root;
 +    this.sinkHandle = sinkHandle;
 +    this.driverContext = driverContext;
 +
 +    // initially the driverBlockedFuture is not blocked (it is completed)
 +    SettableFuture<Void> future = SettableFuture.create();
 +    future.set(null);
 +    driverBlockedFuture.set(future);
 +  }
  
    /**
     * Used to judge whether this fragment instance should be scheduled for execution anymore
     *
     * @return true if the FragmentInstance is done or terminated due to failure, otherwise false.
     */
 -  boolean isFinished();
 +  public boolean isFinished() {
 +    checkLockNotHeld("Cannot check finished status while holding the driver lock");
 +
 +    // if we can get the lock, attempt a clean shutdown; otherwise someone else will shutdown
 +    Optional<Boolean> result = tryWithLockUnInterruptibly(this::isFinishedInternal);
 +    return result.orElseGet(() -> state.get() != State.ALIVE || driverContext.isDone());
 +  }
 +
 +  /**
 +   * do initialization
 +   *
 +   * @return true if init succeed, false otherwise
 +   */
 +  protected abstract boolean init(SettableFuture<Void> blockedFuture);
 +
-   /**
-    * release resource this driver used
-    */
++  /** release resource this driver used */
 +  protected abstract void releaseResource();
  
    /**
     * run the fragment instance for {@param duration} time slice, the time of this run is likely not
@@@ -107,351 -44,27 +109,355 @@@
     *
     * @param duration how long should this fragment instance run
     * @return the returned ListenableFuture<Void> is used to represent status of this processing if
-    * isDone() return true, meaning that this fragment instance is not blocked and is ready for
-    * next processing otherwise, meaning that this fragment instance is blocked and not ready for
-    * next processing.
+    *     isDone() return true, meaning that this fragment instance is not blocked and is ready for
+    *     next processing otherwise, meaning that this fragment instance is blocked and not ready for
+    *     next processing.
     */
 -  ListenableFuture<Void> processFor(Duration duration);
 +  public ListenableFuture<Void> processFor(Duration duration) {
 +
 +    SettableFuture<Void> blockedFuture = driverBlockedFuture.get();
 +    // initialization may be time-consuming, so we keep it in the processFor method
 +    // in normal case, it won't cause deadlock and should finish soon, otherwise it will be a
 +    // critical bug
 +    if (!init(blockedFuture)) {
 +      return blockedFuture;
 +    }
 +
 +    // if the driver is blocked we don't need to continue
 +    if (!blockedFuture.isDone()) {
 +      return blockedFuture;
 +    }
 +
 +    long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
 +
-     Optional<ListenableFuture<Void>> result = tryWithLock(100, TimeUnit.MILLISECONDS, true, () -> {
-       long start = System.nanoTime();
-       do {
-         ListenableFuture<Void> future = processInternal();
-         if (!future.isDone()) {
-           return updateDriverBlockedFuture(future);
-         }
-       }
-       while (System.nanoTime() - start < maxRuntime && !isFinishedInternal());
-       return NOT_BLOCKED;
-     });
++    Optional<ListenableFuture<Void>> result =
++        tryWithLock(
++            100,
++            TimeUnit.MILLISECONDS,
++            true,
++            () -> {
++              long start = System.nanoTime();
++              do {
++                ListenableFuture<Void> future = processInternal();
++                if (!future.isDone()) {
++                  return updateDriverBlockedFuture(future);
++                }
++              } while (System.nanoTime() - start < maxRuntime && !isFinishedInternal());
++              return NOT_BLOCKED;
++            });
 +
 +    return result.orElse(NOT_BLOCKED);
 +  }
  
    /**
     * the id information about this Fragment Instance.
     *
     * @return a {@link FragmentInstanceId} instance.
     */
 -  FragmentInstanceId getInfo();
 +  public FragmentInstanceId getInfo() {
 +    return driverContext.getId();
 +  }
  
-   /**
-    * clear resource used by this fragment instance
-    */
+   /** clear resource used by this fragment instance */
 -  @Override
 -  void close();
 +  public void close() {
 +    // mark the service for destruction
 +    if (!state.compareAndSet(State.ALIVE, State.NEED_DESTRUCTION)) {
 +      return;
 +    }
 +
 +    exclusiveLock.interruptCurrentOwner();
 +
 +    // if we can get the lock, attempt a clean shutdown; otherwise someone else will shutdown
 +    tryWithLockUnInterruptibly(() -> TRUE);
 +  }
  
    /**
     * fail current driver
     *
     * @param t reason cause this failure
     */
 -  void failed(Throwable t);
 +  public void failed(Throwable t) {
 +    driverContext.failed(t);
 +  }
 +
 +  public ISinkHandle getSinkHandle() {
 +    return sinkHandle;
 +  }
 +
 +  @GuardedBy("exclusiveLock")
 +  private boolean isFinishedInternal() {
 +    checkLockHeld("Lock must be held to call isFinishedInternal");
 +
-     boolean finished = state.get() != State.ALIVE || driverContext.isDone() || root == null || root.isFinished();
++    boolean finished =
++        state.get() != State.ALIVE || driverContext.isDone() || root == null || root.isFinished();
 +    if (finished) {
 +      state.compareAndSet(State.ALIVE, State.NEED_DESTRUCTION);
 +    }
 +    return finished;
 +  }
 +
- 
 +  private ListenableFuture<Void> processInternal() {
 +    try {
 +      ListenableFuture<Void> blocked = root.isBlocked();
 +      if (!blocked.isDone()) {
 +        return blocked;
 +      }
 +      blocked = sinkHandle.isFull();
 +      if (!blocked.isDone()) {
 +        return blocked;
 +      }
 +      if (root.hasNext()) {
 +        TsBlock tsBlock = root.next();
 +        if (tsBlock != null && !tsBlock.isEmpty()) {
 +          sinkHandle.send(Collections.singletonList(tsBlock));
 +        }
 +      }
 +      return NOT_BLOCKED;
 +    } catch (Throwable t) {
 +      LOGGER.error("Failed to execute fragment instance {}", driverContext.getId(), t);
 +      List<StackTraceElement> interrupterStack = exclusiveLock.getInterrupterStack();
 +      if (interrupterStack == null) {
 +        driverContext.failed(t);
 +        throw t;
 +      }
 +
 +      // Driver thread was interrupted which should only happen if the task is already finished.
-       // If this becomes the actual cause of a failed query there is a bug in the task state machine.
++      // If this becomes the actual cause of a failed query there is a bug in the task state
++      // machine.
 +      Exception exception = new Exception("Interrupted By");
 +      exception.setStackTrace(interrupterStack.toArray(new StackTraceElement[0]));
 +      RuntimeException newException = new RuntimeException("Driver was interrupted", exception);
 +      newException.addSuppressed(t);
 +      driverContext.failed(newException);
 +      throw newException;
 +    }
 +  }
 +
 +  private ListenableFuture<Void> updateDriverBlockedFuture(
 +      ListenableFuture<Void> sourceBlockedFuture) {
 +    // driverBlockedFuture will be completed as soon as the sourceBlockedFuture is completed
 +    // or any of the operators gets a memory revocation request
 +    SettableFuture<Void> newDriverBlockedFuture = SettableFuture.create();
 +    driverBlockedFuture.set(newDriverBlockedFuture);
 +    sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), directExecutor());
 +
 +    // TODO Although we don't have memory management for operator now, we should consider it for
 +    // future
 +    // it's possible that memory revoking is requested for some operator
 +    // before we update driverBlockedFuture above and we don't want to miss that
 +    // notification, so we check to see whether that's the case before returning.
 +
 +    return newDriverBlockedFuture;
 +  }
 +
- 
 +  private synchronized void checkLockNotHeld(String message) {
 +    checkState(!exclusiveLock.isHeldByCurrentThread(), message);
 +  }
 +
 +  @GuardedBy("exclusiveLock")
 +  private synchronized void checkLockHeld(String message) {
 +    checkState(exclusiveLock.isHeldByCurrentThread(), message);
 +  }
 +
 +  /**
-    * Try to acquire the {@code exclusiveLock} immediately and run a {@code task}
-    * The task will not be interrupted if the {@code Driver} is closed.
-    * <p>
-    * Note: task cannot return null
++   * Try to acquire the {@code exclusiveLock} immediately and run a {@code task} The task will not
++   * be interrupted if the {@code Driver} is closed.
++   *
++   * <p>Note: task cannot return null
 +   */
 +  private <T> Optional<T> tryWithLockUnInterruptibly(Supplier<T> task) {
 +    return tryWithLock(0, TimeUnit.MILLISECONDS, false, task);
 +  }
 +
 +  /**
-    * Try to acquire the {@code exclusiveLock} with {@code timeout} and run a {@code task}.
-    * If the {@code interruptOnClose} flag is set to {@code true} the {@code task} will be
-    * interrupted if the {@code Driver} is closed.
-    * <p>
-    * Note: task cannot return null
++   * Try to acquire the {@code exclusiveLock} with {@code timeout} and run a {@code task}. If the
++   * {@code interruptOnClose} flag is set to {@code true} the {@code task} will be interrupted if
++   * the {@code Driver} is closed.
++   *
++   * <p>Note: task cannot return null
 +   */
-   private <T> Optional<T> tryWithLock(long timeout, TimeUnit unit, boolean interruptOnClose, Supplier<T> task) {
++  private <T> Optional<T> tryWithLock(
++      long timeout, TimeUnit unit, boolean interruptOnClose, Supplier<T> task) {
 +    checkLockNotHeld("Lock cannot be reacquired");
 +
 +    boolean acquired = false;
 +    try {
 +      acquired = exclusiveLock.tryLock(timeout, unit, interruptOnClose);
 +    } catch (InterruptedException e) {
 +      Thread.currentThread().interrupt();
 +    }
 +
 +    if (!acquired) {
 +      return Optional.empty();
 +    }
 +
 +    Optional<T> result;
 +    try {
 +      result = Optional.of(task.get());
 +    } finally {
 +      try {
 +        destroyIfNecessary();
 +      } finally {
 +        exclusiveLock.unlock();
 +      }
 +    }
 +
 +    return result;
 +  }
 +
 +  @GuardedBy("exclusiveLock")
 +  private void destroyIfNecessary() {
 +    checkLockHeld("Lock must be held to call destroyIfNecessary");
 +
 +    if (!state.compareAndSet(State.NEED_DESTRUCTION, State.DESTROYED)) {
 +      return;
 +    }
 +
 +    // if we get an error while closing a driver, record it and we will throw it at the end
 +    Throwable inFlightException = null;
 +    try {
 +      inFlightException = closeAndDestroyOperators();
 +      driverContext.finished();
 +    } catch (Throwable t) {
 +      // this shouldn't happen but be safe
-       inFlightException = addSuppressedException(
-           inFlightException,
-           t,
-           "Error destroying driver for task %s",
-           driverContext.getId());
++      inFlightException =
++          addSuppressedException(
++              inFlightException, t, "Error destroying driver for task %s", driverContext.getId());
 +    } finally {
 +      releaseResource();
 +    }
 +
 +    if (inFlightException != null) {
 +      // this will always be an Error or Runtime
 +      throwIfUnchecked(inFlightException);
 +      throw new RuntimeException(inFlightException);
 +    }
 +  }
 +
 +  private Throwable closeAndDestroyOperators() {
 +    // record the current interrupted status (and clear the flag); we'll reset it later
 +    boolean wasInterrupted = Thread.interrupted();
 +
 +    Throwable inFlightException = null;
 +
 +    try {
 +      if (root != null) {
 +        root.close();
 +      }
 +      if (sinkHandle != null) {
 +        sinkHandle.close();
 +      }
 +    } catch (InterruptedException t) {
 +      // don't record the stack
 +      wasInterrupted = true;
 +    } catch (Throwable t) {
 +      // TODO currently, we won't know exact operator which is failed in closing
-       inFlightException = addSuppressedException(
-           inFlightException,
-           t,
-           "Error closing operator {} for fragment instance {}",
-           root.getOperatorContext().getOperatorId(),
-           driverContext.getId());
++      inFlightException =
++          addSuppressedException(
++              inFlightException,
++              t,
++              "Error closing operator {} for fragment instance {}",
++              root.getOperatorContext().getOperatorId(),
++              driverContext.getId());
 +    } finally {
 +      // reset the interrupted flag
 +      if (wasInterrupted) {
 +        Thread.currentThread().interrupt();
 +      }
 +    }
 +    return inFlightException;
 +  }
 +
-   private static Throwable addSuppressedException(Throwable inFlightException, Throwable newException, String message, Object... args) {
++  private static Throwable addSuppressedException(
++      Throwable inFlightException, Throwable newException, String message, Object... args) {
 +    if (newException instanceof Error) {
 +      if (inFlightException == null) {
 +        inFlightException = newException;
 +      } else {
 +        // Self-suppression not permitted
 +        if (inFlightException != newException) {
 +          inFlightException.addSuppressed(newException);
 +        }
 +      }
 +    } else {
 +      // log normal exceptions instead of rethrowing them
 +      LOGGER.error(message, args, newException);
 +    }
 +    return inFlightException;
 +  }
 +
 +  private static class DriverLock {
 +    private final ReentrantLock lock = new ReentrantLock();
 +
 +    @GuardedBy("this")
 +    private Thread currentOwner;
++
 +    @GuardedBy("this")
 +    private boolean currentOwnerInterruptionAllowed;
 +
 +    @GuardedBy("this")
 +    private List<StackTraceElement> interrupterStack;
 +
 +    public boolean isHeldByCurrentThread() {
 +      return lock.isHeldByCurrentThread();
 +    }
 +
 +    public boolean tryLock(boolean currentThreadInterruptionAllowed) {
 +      checkState(!lock.isHeldByCurrentThread(), "Lock is not reentrant");
 +      boolean acquired = lock.tryLock();
 +      if (acquired) {
 +        setOwner(currentThreadInterruptionAllowed);
 +      }
 +      return acquired;
 +    }
 +
 +    public boolean tryLock(long timeout, TimeUnit unit, boolean currentThreadInterruptionAllowed)
 +        throws InterruptedException {
 +      checkState(!lock.isHeldByCurrentThread(), "Lock is not reentrant");
 +      boolean acquired = lock.tryLock(timeout, unit);
 +      if (acquired) {
 +        setOwner(currentThreadInterruptionAllowed);
 +      }
 +      return acquired;
 +    }
 +
 +    private synchronized void setOwner(boolean interruptionAllowed) {
 +      checkState(lock.isHeldByCurrentThread(), "Current thread does not hold lock");
 +      currentOwner = Thread.currentThread();
 +      currentOwnerInterruptionAllowed = interruptionAllowed;
 +      // NOTE: We do not use interrupted stack information to know that another
 +      // thread has attempted to interrupt the driver, and interrupt this new lock
 +      // owner.  The interrupted stack information is for debugging purposes only.
 +      // In the case of interruption, the caller should (and does) have a separate
 +      // state to prevent further processing in the Driver.
 +    }
 +
 +    public synchronized void unlock() {
 +      checkState(lock.isHeldByCurrentThread(), "Current thread does not hold lock");
 +      currentOwner = null;
 +      currentOwnerInterruptionAllowed = false;
 +      lock.unlock();
 +    }
 +
 +    public synchronized List<StackTraceElement> getInterrupterStack() {
 +      return interrupterStack;
 +    }
 +
 +    public synchronized void interruptCurrentOwner() {
 +      if (!currentOwnerInterruptionAllowed) {
 +        return;
 +      }
 +      // there is a benign race condition here were the lock holder
 +      // can be change between attempting to get lock and grabbing
 +      // the synchronized lock here, but in either case we want to
 +      // interrupt the lock holder thread
 +      if (interrupterStack == null) {
 +        interrupterStack = ImmutableList.copyOf(Thread.currentThread().getStackTrace());
 +      }
 +
 +      if (currentOwner != null) {
 +        currentOwner.interrupt();
 +      }
 +    }
 +  }
  }
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
index d0bc3bb232,8c20a2c334..8985508b0c
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
@@@ -26,9 -24,6 +26,8 @@@ public class DriverContext 
  
    private final FragmentInstanceContext fragmentInstanceContext;
  
 +  private final AtomicBoolean finished = new AtomicBoolean();
 +
- 
    public DriverContext(FragmentInstanceContext fragmentInstanceContext) {
      this.fragmentInstanceContext = fragmentInstanceContext;
    }
@@@ -43,15 -38,13 +42,14 @@@
  
    public void failed(Throwable cause) {
      fragmentInstanceContext.failed(cause);
 +    finished.set(true);
    }
  
 -  public void finish() {
 -    fragmentInstanceContext.finish();
 +  public void finished() {
 +    finished.compareAndSet(false, true);
    }
  
- 
 -  public void flushing() {
 -    fragmentInstanceContext.flushing();
 +  public boolean isDone() {
 +    return finished.get();
    }
  }
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
index c3d33dbd74,b2e51be2bc..d1b81d7be9
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
@@@ -65,44 -58,10 +64,43 @@@ public class FragmentInstanceContext ex
    //    private final AtomicLong endFullGcCount = new AtomicLong(-1);
    //    private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1);
  
--  public FragmentInstanceContext(
-       FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) {
 -      FragmentInstanceId id, AtomicReference<FragmentInstanceState> state) {
++  public FragmentInstanceContext(FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) {
      this.id = id;
 -    this.state = state;
 +    this.stateMachine = stateMachine;
 +  }
 +
 +  public void start() {
 +    long now = System.currentTimeMillis();
 +    executionStartTime.compareAndSet(null, now);
 +    startNanos.compareAndSet(0, System.nanoTime());
 +
 +    // always update last execution start time
 +    lastExecutionStartTime.set(now);
 +  }
 +
 +  // the state change listener is added here in a separate initialize() method
 +  // instead of the constructor to prevent leaking the "this" reference to
 +  // another thread, which will cause unsafe publication of this instance.
 +  private void initialize() {
 +    stateMachine.addStateChangeListener(this::updateStatsIfDone);
 +  }
 +
 +  private void updateStatsIfDone(FragmentInstanceState newState) {
 +    if (newState.isDone()) {
 +      long now = System.currentTimeMillis();
 +
 +      // before setting the end times, make sure a start has been recorded
 +      executionStartTime.compareAndSet(null, now);
 +      startNanos.compareAndSet(0, System.nanoTime());
 +
 +      // Only update last start time, if the nothing was started
 +      lastExecutionStartTime.compareAndSet(null, now);
 +
 +      // use compare and set from initial value to avoid overwriting if there
 +      // were a duplicate notification, which shouldn't happen
 +      executionEndTime.compareAndSet(null, now);
 +      endNanos.compareAndSet(0, System.nanoTime());
 +    }
    }
  
    public OperatorContext addOperatorContext(
@@@ -139,10 -98,40 +137,18 @@@
    }
  
    public void failed(Throwable cause) {
 -    LOGGER.warn("Fragment Instance {} failed.", id, cause);
 -    state.set(FragmentInstanceState.FAILED);
 +    stateMachine.failed(cause);
    }
  
 -  public void cancel() {
 -    state.set(FragmentInstanceState.CANCELED);
 -    this.endTime = System.currentTimeMillis();
++  public void finished() {
++    stateMachine.finished();
+   }
+ 
 -  public void abort() {
 -    state.set(FragmentInstanceState.ABORTED);
 -    this.endTime = System.currentTimeMillis();
 -  }
 -
 -  public void finish() {
 -    if (state.get().isDone()) {
 -      return;
 -    }
 -    state.set(FragmentInstanceState.FINISHED);
 -    this.endTime = System.currentTimeMillis();
 -  }
 -
 -  public void flushing() {
 -    if (state.get().isDone()) {
 -      return;
 -    }
 -    state.set(FragmentInstanceState.FLUSHING);
++  public void transitionToFlushing() {
++    stateMachine.transitionToFlushing();
+   }
+ 
    public long getEndTime() {
 -    return endTime;
 -  }
 -
 -  public void setEndTime(long endTime) {
 -    this.endTime = endTime;
 +    return executionEndTime.get();
    }
  }
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
index b00ddcadb3,21e0cc3d50..149bce1964
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
@@@ -18,17 -18,14 +18,15 @@@
   */
  package org.apache.iotdb.db.mpp.execution;
  
- import io.airlift.stats.CounterStat;
 +import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
  import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
  import org.apache.iotdb.db.mpp.schedule.IFragmentInstanceScheduler;
  
  import com.google.common.collect.ImmutableList;
 -
 -import java.util.concurrent.atomic.AtomicReference;
++import io.airlift.stats.CounterStat;
  
  import static java.util.Objects.requireNonNull;
- import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.ABORTED;
- import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.CANCELLED;
 +import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FAILED;
  
  public class FragmentInstanceExecution {
  
@@@ -45,18 -41,7 +43,20 @@@
  
    private long lastHeartbeat;
  
-   public static FragmentInstanceExecution createFragmentInstanceExecution(IFragmentInstanceScheduler scheduler,
-                                                                           FragmentInstanceId instanceId,
-                                                                           FragmentInstanceContext context,
-                                                                           Driver driver,
-                                                                           FragmentInstanceStateMachine stateMachine,
-                                                                           CounterStat failedInstances) {
-     FragmentInstanceExecution execution = new FragmentInstanceExecution(scheduler, instanceId, context, driver, stateMachine);
 -  public FragmentInstanceExecution(
++  public static FragmentInstanceExecution createFragmentInstanceExecution(
++      IFragmentInstanceScheduler scheduler,
++      FragmentInstanceId instanceId,
++      FragmentInstanceContext context,
++      Driver driver,
++      FragmentInstanceStateMachine stateMachine,
++      CounterStat failedInstances) {
++    FragmentInstanceExecution execution =
++        new FragmentInstanceExecution(scheduler, instanceId, context, driver, stateMachine);
 +    execution.initialize(failedInstances, scheduler, instanceId, driver);
 +    return execution;
 +  }
 +
 +  private FragmentInstanceExecution(
        IFragmentInstanceScheduler scheduler,
        FragmentInstanceId instanceId,
        FragmentInstanceContext context,
@@@ -97,49 -82,7 +97,30 @@@
    }
  
    public void abort() {
 -    scheduler.abortFragmentInstance(instanceId);
 -    context.abort();
 +    stateMachine.abort();
 +  }
 +
 +  // this is a separate method to ensure that the `this` reference is not leaked during construction
-   private void initialize(CounterStat failedInstances, IFragmentInstanceScheduler scheduler, FragmentInstanceId instanceId, Driver driver) {
++  private void initialize(
++      CounterStat failedInstances,
++      IFragmentInstanceScheduler scheduler,
++      FragmentInstanceId instanceId,
++      Driver driver) {
 +    requireNonNull(failedInstances, "failedInstances is null");
-     stateMachine.addStateChangeListener(newState -> {
- 
-       if (!newState.isDone()) {
-         return;
-       }
- 
-       // Update failed tasks counter
-       if (newState == FAILED) {
-         failedInstances.update(1);
-       }
- 
-       driver.close();
-       sinkHandle.abort();
-       scheduler.abortFragmentInstance(instanceId);
-     });
-   }
- 
-   private synchronized void instanceCompletion() {
-     if (stateMachine.getState().isDone()) {
-       return;
-     }
- 
-     if (!sinkHandle.isFinished()) {
-       stateMachine.transitionToFlushing();
-       return;
-     }
- 
-     if (sinkHandle.isFinished()) {
-       // Cool! All done!
-       stateMachine.finished();
-       return;
-     }
- 
-     if (sinkHandle.isFailed()) {
-       Throwable failureCause = sinkHandle.getFailureCause()
-           .orElseGet(() -> new RuntimeException("Fragment Instance " + instanceId + " 's SinkHandle is failed but the failure cause is missing"));
-       stateMachine.failed(failureCause);
-     }
++    stateMachine.addStateChangeListener(
++        newState -> {
++          if (!newState.isDone()) {
++            return;
++          }
++
++          // Update failed tasks counter
++          if (newState == FAILED) {
++            failedInstances.update(1);
++          }
++
++          driver.close();
++          sinkHandle.abort();
++          scheduler.abortFragmentInstance(instanceId);
++        });
    }
  }
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
index 5f49fa47ef,537f330f04..2e23cf03b4
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
@@@ -28,6 -27,6 +27,7 @@@ import org.apache.iotdb.db.mpp.schedule
  import org.apache.iotdb.db.mpp.sql.planner.LocalExecutionPlanner;
  import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
  
++import io.airlift.stats.CounterStat;
  import io.airlift.units.Duration;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -51,14 -49,9 +51,13 @@@ public class FragmentInstanceManager 
    private final IFragmentInstanceScheduler scheduler = FragmentInstanceScheduler.getInstance();
  
    private final ScheduledExecutorService instanceManagementExecutor;
 +  private final ExecutorService instanceNotificationExecutor;
  
- 
    private final Duration infoCacheTime;
  
 +  // record failed instances count
 +  private final CounterStat failedInstances = new CounterStat();
 +
    public static FragmentInstanceManager getInstance() {
      return FragmentInstanceManager.InstanceHolder.INSTANCE;
    }
@@@ -68,7 -61,6 +67,8 @@@
      this.instanceExecution = new ConcurrentHashMap<>();
      this.instanceManagementExecutor =
          IoTDBThreadPoolFactory.newScheduledThreadPool(1, "instance-management");
-     this.instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(4, "instance-notification");
++    this.instanceNotificationExecutor =
++        IoTDBThreadPoolFactory.newFixedThreadPool(4, "instance-notification");
  
      this.infoCacheTime = new Duration(15, TimeUnit.MINUTES);
  
@@@ -93,13 -85,13 +93,14 @@@
          instanceExecution.computeIfAbsent(
              instanceId,
              id -> {
- 
-               FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
 -              AtomicReference<FragmentInstanceState> state = new AtomicReference<>();
 -              state.set(FragmentInstanceState.PLANNED);
++              FragmentInstanceStateMachine stateMachine =
++                  new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
  
                FragmentInstanceContext context =
                    instanceContext.computeIfAbsent(
                        instanceId,
-                       fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, stateMachine));
 -                      fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, state));
++                      fragmentInstanceId ->
++                          new FragmentInstanceContext(fragmentInstanceId, stateMachine));
  
                try {
                  DataDriver driver =
@@@ -108,9 -100,9 +109,10 @@@
                          context,
                          instance.getTimeFilter(),
                          dataRegion);
-                 return createFragmentInstanceExecution(scheduler, instanceId, context, driver, stateMachine, failedInstances);
 -                return new FragmentInstanceExecution(scheduler, instanceId, context, driver, state);
++                return createFragmentInstanceExecution(
++                    scheduler, instanceId, context, driver, stateMachine, failedInstances);
                } catch (Throwable t) {
 -                context.failed(t);
 +                stateMachine.failed(t);
                  return null;
                }
              });
@@@ -126,29 -118,26 +128,29 @@@
          instanceExecution.computeIfAbsent(
              instanceId,
              id -> {
-               FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
- 
 -              AtomicReference<FragmentInstanceState> state = new AtomicReference<>();
 -              state.set(FragmentInstanceState.PLANNED);
++              FragmentInstanceStateMachine stateMachine =
++                  new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
  
                FragmentInstanceContext context =
                    instanceContext.computeIfAbsent(
                        instanceId,
-                       fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, stateMachine));
 -                      fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, state));
++                      fragmentInstanceId ->
++                          new FragmentInstanceContext(fragmentInstanceId, stateMachine));
  
                try {
                  SchemaDriver driver =
                      planner.plan(instance.getFragment().getRoot(), context, schemaRegion);
-                 return createFragmentInstanceExecution(scheduler, instanceId, context, driver, stateMachine, failedInstances);
 -                return new FragmentInstanceExecution(scheduler, instanceId, context, driver, state);
++                return createFragmentInstanceExecution(
++                    scheduler, instanceId, context, driver, stateMachine, failedInstances);
                } catch (Throwable t) {
 -                context.failed(t);
 +                stateMachine.failed(t);
                  return null;
                }
              });
      return execution != null ? execution.getInstanceInfo() : createFailedInstanceInfo(instanceId);
    }
  
-   /**
-    * Aborts a FragmentInstance.
-    */
++  /** Aborts a FragmentInstance. */
    public FragmentInstanceInfo abortFragmentInstance(FragmentInstanceId fragmentInstanceId) {
      FragmentInstanceExecution execution = instanceExecution.remove(fragmentInstanceId);
      if (execution != null) {
@@@ -159,21 -148,6 +161,19 @@@
      return null;
    }
  
-   /**
-    * Cancels a FragmentInstance.
-    */
++  /** Cancels a FragmentInstance. */
 +  public FragmentInstanceInfo cancelTask(FragmentInstanceId instanceId) {
 +    requireNonNull(instanceId, "taskId is null");
 +
 +    FragmentInstanceExecution execution = instanceExecution.remove(instanceId);
 +    if (execution != null) {
 +      instanceContext.remove(instanceId);
 +      execution.cancel();
 +      return execution.getInstanceInfo();
 +    }
 +    return null;
 +  }
 +
    /**
     * Gets the info for the specified fragment instance.
     *
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceStateMachine.java
index 7f66d1a321,0000000000..febc9e9e09
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceStateMachine.java
@@@ -1,171 -1,0 +1,182 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.iotdb.db.mpp.execution;
 +
++import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
++import org.apache.iotdb.db.mpp.execution.StateMachine.StateChangeListener;
++
 +import com.google.common.collect.ImmutableList;
 +import com.google.common.collect.ImmutableMap;
 +import com.google.common.util.concurrent.ListenableFuture;
- import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
- import org.apache.iotdb.db.mpp.execution.StateMachine.StateChangeListener;
 +
 +import javax.annotation.concurrent.GuardedBy;
 +import javax.annotation.concurrent.ThreadSafe;
++
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.concurrent.Executor;
 +import java.util.concurrent.LinkedBlockingQueue;
 +
 +import static com.google.common.base.MoreObjects.toStringHelper;
 +import static com.google.common.base.Preconditions.checkArgument;
 +import static com.google.common.util.concurrent.Futures.immediateFuture;
 +import static java.util.Objects.requireNonNull;
 +import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.ABORTED;
 +import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.CANCELLED;
 +import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FAILED;
 +import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FINISHED;
 +import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FLUSHING;
 +import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.RUNNING;
 +import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.TERMINAL_INSTANCE_STATES;
 +
- 
 +@ThreadSafe
 +public class FragmentInstanceStateMachine {
 +  private static final Logger LOGGER = LoggerFactory.getLogger(FragmentInstanceStateMachine.class);
 +
 +  private final long createdTime = System.currentTimeMillis();
 +
 +  private final FragmentInstanceId instanceId;
 +  private final Executor executor;
 +  private final StateMachine<FragmentInstanceState> instanceState;
 +  private final LinkedBlockingQueue<Throwable> failureCauses = new LinkedBlockingQueue<>();
 +
 +  @GuardedBy("this")
 +  private final Map<FragmentInstanceId, Throwable> sourceInstanceFailures = new HashMap<>();
++
 +  @GuardedBy("this")
-   private final List<FragmentInstanceFailureListener> sourceInstanceFailureListeners = new ArrayList<>();
++  private final List<FragmentInstanceFailureListener> sourceInstanceFailureListeners =
++      new ArrayList<>();
 +
 +  public FragmentInstanceStateMachine(FragmentInstanceId fragmentInstanceId, Executor executor) {
 +    this.instanceId = requireNonNull(fragmentInstanceId, "fragmentInstanceId is null");
 +    this.executor = requireNonNull(executor, "executor is null");
-     instanceState = new StateMachine<>("FragmentInstance " + fragmentInstanceId, executor, RUNNING, TERMINAL_INSTANCE_STATES);
-     instanceState.addStateChangeListener(newState -> LOGGER.debug("Fragment Instance {} is {}", fragmentInstanceId, newState));
++    instanceState =
++        new StateMachine<>(
++            "FragmentInstance " + fragmentInstanceId, executor, RUNNING, TERMINAL_INSTANCE_STATES);
++    instanceState.addStateChangeListener(
++        newState -> LOGGER.debug("Fragment Instance {} is {}", fragmentInstanceId, newState));
 +  }
 +
 +  public long getCreatedTime() {
 +    return createdTime;
 +  }
 +
 +  public FragmentInstanceId getFragmentInstanceId() {
 +    return instanceId;
 +  }
 +
 +  public FragmentInstanceState getState() {
 +    return instanceState.get();
 +  }
 +
-   public ListenableFuture<FragmentInstanceState> getStateChange(FragmentInstanceState currentState) {
++  public ListenableFuture<FragmentInstanceState> getStateChange(
++      FragmentInstanceState currentState) {
 +    requireNonNull(currentState, "currentState is null");
 +    checkArgument(!currentState.isDone(), "Current state is already done");
 +
 +    ListenableFuture<FragmentInstanceState> future = instanceState.getStateChange(currentState);
 +    FragmentInstanceState state = instanceState.get();
 +    if (state.isDone()) {
 +      return immediateFuture(state);
 +    }
 +    return future;
 +  }
 +
 +  public LinkedBlockingQueue<Throwable> getFailureCauses() {
 +    return failureCauses;
 +  }
 +
 +  public void transitionToFlushing() {
 +    instanceState.setIf(FLUSHING, currentState -> currentState == RUNNING);
 +  }
 +
 +  public void finished() {
 +    transitionToDoneState(FINISHED);
 +  }
 +
 +  public void cancel() {
 +    transitionToDoneState(CANCELLED);
 +  }
 +
 +  public void abort() {
 +    transitionToDoneState(ABORTED);
 +  }
 +
 +  public void failed(Throwable cause) {
 +    failureCauses.add(cause);
 +    transitionToDoneState(FAILED);
 +  }
 +
 +  private void transitionToDoneState(FragmentInstanceState doneState) {
 +    requireNonNull(doneState, "doneState is null");
 +    checkArgument(doneState.isDone(), "doneState %s is not a done state", doneState);
 +
 +    instanceState.setIf(doneState, currentState -> !currentState.isDone());
 +  }
 +
 +  /**
-    * Listener is always notified asynchronously using a dedicated notification thread pool so, care should
-    * be taken to avoid leaking {@code this} when adding a listener in a constructor. Additionally, it is
-    * possible notifications are observed out of order due to the asynchronous execution.
++   * Listener is always notified asynchronously using a dedicated notification thread pool so, care
++   * should be taken to avoid leaking {@code this} when adding a listener in a constructor.
++   * Additionally, it is possible notifications are observed out of order due to the asynchronous
++   * execution.
 +   */
-   public void addStateChangeListener(StateChangeListener<FragmentInstanceState> stateChangeListener) {
++  public void addStateChangeListener(
++      StateChangeListener<FragmentInstanceState> stateChangeListener) {
 +    instanceState.addStateChangeListener(stateChangeListener);
 +  }
 +
 +  public void addSourceTaskFailureListener(FragmentInstanceFailureListener listener) {
 +    Map<FragmentInstanceId, Throwable> failures;
 +    synchronized (this) {
 +      sourceInstanceFailureListeners.add(listener);
 +      failures = ImmutableMap.copyOf(sourceInstanceFailures);
 +    }
-     executor.execute(() -> {
-       failures.forEach(listener::onTaskFailed);
-     });
++    executor.execute(
++        () -> {
++          failures.forEach(listener::onTaskFailed);
++        });
 +  }
 +
 +  public void sourceTaskFailed(FragmentInstanceId instanceId, Throwable failure) {
 +    List<FragmentInstanceFailureListener> listeners;
 +    synchronized (this) {
 +      sourceInstanceFailures.putIfAbsent(instanceId, failure);
 +      listeners = ImmutableList.copyOf(sourceInstanceFailureListeners);
 +    }
-     executor.execute(() -> {
-       for (FragmentInstanceFailureListener listener : listeners) {
-         listener.onTaskFailed(instanceId, failure);
-       }
-     });
++    executor.execute(
++        () -> {
++          for (FragmentInstanceFailureListener listener : listeners) {
++            listener.onTaskFailed(instanceId, failure);
++          }
++        });
 +  }
 +
 +  @Override
 +  public String toString() {
 +    return toStringHelper(this)
 +        .add("FragmentInstanceId", instanceId)
 +        .add("FragmentInstanceState", instanceState)
 +        .add("failureCauses", failureCauses)
 +        .toString();
 +  }
 +}
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
index 6c45eb529a,844fe417ba..258065cd80
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
@@@ -18,23 -18,152 +18,24 @@@
   */
  package org.apache.iotdb.db.mpp.execution;
  
- import com.google.common.util.concurrent.SettableFuture;
  import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
 -import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
  import org.apache.iotdb.db.mpp.operator.Operator;
 -import org.apache.iotdb.tsfile.read.common.block.TsBlock;
  
 -import com.google.common.util.concurrent.ListenableFuture;
+ import com.google.common.util.concurrent.SettableFuture;
 -import io.airlift.units.Duration;
 -import org.slf4j.Logger;
 -import org.slf4j.LoggerFactory;
+ 
  import javax.annotation.concurrent.NotThreadSafe;
  
 -import java.io.IOException;
 -import java.util.Collections;
 -import java.util.concurrent.TimeUnit;
 -import java.util.concurrent.atomic.AtomicReference;
 -
 -import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 -import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
 -
+ /** One SchemaDriver is used to execute one FragmentInstance which is for metadata query. */
  @NotThreadSafe
 -public class SchemaDriver implements Driver {
 -
 -  private static final Logger logger = LoggerFactory.getLogger(SchemaDriver.class);
 -
 -  private final Operator root;
 -  private final ISinkHandle sinkHandle;
 -  private final SchemaDriverContext driverContext;
 -
 -  private final AtomicReference<SettableFuture<Void>> driverBlockedFuture = new AtomicReference<>();
 -
 -  private boolean closed = false;
 +public class SchemaDriver extends Driver {
  
    public SchemaDriver(Operator root, ISinkHandle sinkHandle, SchemaDriverContext driverContext) {
 -    this.root = root;
 -    this.sinkHandle = sinkHandle;
 -    this.driverContext = driverContext;
 -    // initially the driverBlockedFuture is not blocked (it is completed)
 -    SettableFuture<Void> future = SettableFuture.create();
 -    future.set(null);
 -    driverBlockedFuture.set(future);
 -  }
 -
 -  @Override
 -  public boolean isFinished() {
 -    try {
 -      boolean isFinished = driverBlockedFuture.get().isDone() && root != null && root.isFinished();
 -      if (isFinished) {
 -        close();
 -      }
 -      return isFinished;
 -    } catch (Throwable t) {
 -      logger.error(
 -          "Failed to query whether the schema driver {} is finished", driverContext.getId(), t);
 -      driverContext.failed(t);
 -      return true;
 -    }
 -  }
 -
 -  @Override
 -  public ListenableFuture<Void> processFor(Duration duration) {
 -    // if the driver is blocked we don't need to continue
 -    SettableFuture<Void> blockedFuture = driverBlockedFuture.get();
 -    if (!blockedFuture.isDone()) {
 -      return blockedFuture;
 -    }
 -
 -    long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
 -
 -    long start = System.nanoTime();
 -    try {
 -      do {
 -        ListenableFuture<Void> future = processInternal();
 -        if (!future.isDone()) {
 -          return updateDriverBlockedFuture(future);
 -        }
 -      } while (System.nanoTime() - start < maxRuntime && !root.isFinished());
 -    } catch (Throwable t) {
 -      logger.error("Failed to execute fragment instance {}", driverContext.getId(), t);
 -      driverContext.failed(t);
 -      close();
 -      blockedFuture.setException(t);
 -      return blockedFuture;
 -    }
 -    return NOT_BLOCKED;
 -  }
 -
 -  private ListenableFuture<Void> processInternal() throws IOException {
 -    ListenableFuture<Void> blocked = root.isBlocked();
 -    if (!blocked.isDone()) {
 -      return blocked;
 -    }
 -    blocked = sinkHandle.isFull();
 -    if (!blocked.isDone()) {
 -      return blocked;
 -    }
 -    if (root.hasNext()) {
 -      TsBlock tsBlock = root.next();
 -      if (tsBlock != null && !tsBlock.isEmpty()) {
 -        sinkHandle.send(Collections.singletonList(tsBlock));
 -      }
 -    }
 -    return NOT_BLOCKED;
 -  }
 -
 -  private ListenableFuture<Void> updateDriverBlockedFuture(
 -      ListenableFuture<Void> sourceBlockedFuture) {
 -    // driverBlockedFuture will be completed as soon as the sourceBlockedFuture is completed
 -    // or any of the operators gets a memory revocation request
 -    SettableFuture<Void> newDriverBlockedFuture = SettableFuture.create();
 -    driverBlockedFuture.set(newDriverBlockedFuture);
 -    sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), directExecutor());
 -
 -    // TODO Although we don't have memory management for operator now, we should consider it for
 -    // future
 -    // it's possible that memory revoking is requested for some operator
 -    // before we update driverBlockedFuture above and we don't want to miss that
 -    // notification, so we check to see whether that's the case before returning.
 -
 -    return newDriverBlockedFuture;
 -  }
 -
 -  @Override
 -  public FragmentInstanceId getInfo() {
 -    return driverContext.getId();
 +    super(root, sinkHandle, driverContext);
    }
  
- 
    @Override
 -  public void close() {
 -    if (closed) {
 -      return;
 -    }
 -    closed = true;
 -    try {
 -      if (root != null) {
 -        root.close();
 -      }
 -      if (sinkHandle != null) {
 -        sinkHandle.close();
 -      }
 -    } catch (Throwable t) {
 -      logger.error("Failed to closed driver {}", driverContext.getId(), t);
 -      driverContext.failed(t);
 -    }
 +  protected boolean init(SettableFuture<Void> blockedFuture) {
 +    return true;
    }
  
    @Override
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
index 7305feee83,c8cccc0520..398b2f03c4
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
@@@ -22,8 -22,8 +22,6 @@@ import org.apache.iotdb.tsfile.read.com
  
  import com.google.common.util.concurrent.ListenableFuture;
  
--import java.io.IOException;
--
  import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
  
  public interface Operator extends AutoCloseable {
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaFetchOperator.java
index 97da0a4935,003da76f2d..34fb44373e
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaFetchOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaFetchOperator.java
@@@ -35,7 -35,7 +35,6 @@@ import org.apache.iotdb.tsfile.utils.Bi
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
--import java.io.IOException;
  import java.nio.BufferOverflowException;
  import java.nio.ByteBuffer;
  import java.util.List;
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
index 888f4ce5e7,abebdaf30d..b66d4f2ff1
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
@@@ -18,8 -18,6 +18,7 @@@
   */
  package org.apache.iotdb.db.mpp.schedule.task;
  
- import com.google.common.util.concurrent.SettableFuture;
 +import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
  import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
  import org.apache.iotdb.db.mpp.common.PlanFragmentId;
  import org.apache.iotdb.db.mpp.common.QueryId;
@@@ -32,6 -28,6 +29,7 @@@ import org.apache.iotdb.db.mpp.schedule
  import org.apache.iotdb.db.mpp.schedule.queue.IDIndexedAccessible;
  
  import com.google.common.util.concurrent.ListenableFuture;
++import com.google.common.util.concurrent.SettableFuture;
  import io.airlift.units.Duration;
  
  import java.util.Comparator;
@@@ -194,16 -196,6 +202,14 @@@ public class FragmentInstanceTask imple
        return false;
      }
  
 +    @Override
 +    protected boolean init(SettableFuture<Void> blockedFuture) {
 +      return true;
 +    }
 +
 +    @Override
-     protected void releaseResource() {
- 
-     }
++    protected void releaseResource() {}
 +
      @Override
      public ListenableFuture<Void> processFor(Duration duration) {
        return null;
diff --cc server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
index db060ac405,2703d0aa37..d41c7f5391
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
@@@ -56,8 -55,7 +56,7 @@@ import java.util.Arrays
  import java.util.HashSet;
  import java.util.List;
  import java.util.Set;
 -import java.util.concurrent.atomic.AtomicReference;
 +import java.util.concurrent.ExecutorService;
- import java.util.concurrent.atomic.AtomicReference;
  
  import static org.apache.iotdb.db.mpp.schedule.FragmentInstanceTaskExecutor.EXECUTION_TIME_SLICE;
  import static org.junit.Assert.assertEquals;
@@@ -87,7 -85,6 +86,8 @@@ public class DataDriverTest 
  
    @Test
    public void batchTest() {
-     ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
++    ExecutorService instanceNotificationExecutor =
++        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
      try {
        MeasurementPath measurementPath1 =
            new MeasurementPath(DATA_DRIVER_TEST_SG + ".device0.sensor0", TSDataType.INT32);
@@@ -95,10 -92,11 +95,12 @@@
        allSensors.add("sensor0");
        allSensors.add("sensor1");
        QueryId queryId = new QueryId("stub_query");
-       FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
-       FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
 -      AtomicReference<FragmentInstanceState> state =
 -          new AtomicReference<>(FragmentInstanceState.RUNNING);
++      FragmentInstanceId instanceId =
++          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
++      FragmentInstanceStateMachine stateMachine =
++          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
        FragmentInstanceContext fragmentInstanceContext =
 -          new FragmentInstanceContext(
 -              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
 +          new FragmentInstanceContext(instanceId, stateMachine);
        PlanNodeId planNodeId1 = new PlanNodeId("1");
        fragmentInstanceContext.addOperatorContext(
            1, planNodeId1, SeriesScanOperator.class.getSimpleName());
diff --cc server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
index ab0849ddf2,6ad74bd110..17a45db8da
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
@@@ -28,8 -27,7 +28,7 @@@ import org.apache.iotdb.db.mpp.common.F
  import org.apache.iotdb.db.mpp.common.PlanFragmentId;
  import org.apache.iotdb.db.mpp.common.QueryId;
  import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
--import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
 +import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
  import org.apache.iotdb.db.mpp.operator.process.LimitOperator;
  import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator;
  import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
@@@ -52,8 -50,7 +51,7 @@@ import java.util.Arrays
  import java.util.HashSet;
  import java.util.List;
  import java.util.Set;
 -import java.util.concurrent.atomic.AtomicReference;
 +import java.util.concurrent.ExecutorService;
- import java.util.concurrent.atomic.AtomicReference;
  
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertTrue;
@@@ -81,7 -78,6 +79,8 @@@ public class LimitOperatorTest 
  
    @Test
    public void batchTest() {
-     ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
++    ExecutorService instanceNotificationExecutor =
++        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
      try {
        MeasurementPath measurementPath1 =
            new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
@@@ -89,10 -85,11 +88,12 @@@
        allSensors.add("sensor0");
        allSensors.add("sensor1");
        QueryId queryId = new QueryId("stub_query");
-       FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
-       FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
 -      AtomicReference<FragmentInstanceState> state =
 -          new AtomicReference<>(FragmentInstanceState.RUNNING);
++      FragmentInstanceId instanceId =
++          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
++      FragmentInstanceStateMachine stateMachine =
++          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
        FragmentInstanceContext fragmentInstanceContext =
 -          new FragmentInstanceContext(
 -              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
 +          new FragmentInstanceContext(instanceId, stateMachine);
        PlanNodeId planNodeId1 = new PlanNodeId("1");
        fragmentInstanceContext.addOperatorContext(
            1, planNodeId1, SeriesScanOperator.class.getSimpleName());
diff --cc server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
index dd28a90263,78e6821bcd..52ebe68508
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
@@@ -28,8 -27,7 +28,7 @@@ import org.apache.iotdb.db.mpp.common.F
  import org.apache.iotdb.db.mpp.common.PlanFragmentId;
  import org.apache.iotdb.db.mpp.common.QueryId;
  import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
--import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
 +import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
  import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
  import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
  import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
@@@ -45,11 -44,9 +45,9 @@@ import org.junit.Test
  
  import java.io.IOException;
  import java.util.ArrayList;
- import java.util.HashSet;
  import java.util.List;
  import java.util.Set;
 -import java.util.concurrent.atomic.AtomicReference;
 +import java.util.concurrent.ExecutorService;
- import java.util.concurrent.atomic.AtomicReference;
  
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertTrue;
@@@ -76,17 -73,16 +74,19 @@@ public class SeriesScanOperatorTest 
  
    @Test
    public void batchTest() {
-     ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
++    ExecutorService instanceNotificationExecutor =
++        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
      try {
        MeasurementPath measurementPath =
            new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
-       Set<String> allSensors = new HashSet<>();
-       allSensors.add("sensor0");
+       Set<String> allSensors = Sets.newHashSet("sensor0");
        QueryId queryId = new QueryId("stub_query");
-       FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
-       FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
 -      AtomicReference<FragmentInstanceState> state =
 -          new AtomicReference<>(FragmentInstanceState.RUNNING);
++      FragmentInstanceId instanceId =
++          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
++      FragmentInstanceStateMachine stateMachine =
++          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
        FragmentInstanceContext fragmentInstanceContext =
 -          new FragmentInstanceContext(
 -              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
 +          new FragmentInstanceContext(instanceId, stateMachine);
        PlanNodeId planNodeId = new PlanNodeId("1");
        fragmentInstanceContext.addOperatorContext(
            1, planNodeId, SeriesScanOperator.class.getSimpleName());
diff --cc server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
index 70fe4def02,5534418b84..5fbfda4199
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
@@@ -28,8 -27,7 +28,7 @@@ import org.apache.iotdb.db.mpp.common.F
  import org.apache.iotdb.db.mpp.common.PlanFragmentId;
  import org.apache.iotdb.db.mpp.common.QueryId;
  import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
--import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
 +import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
  import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator;
  import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
  import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@@ -51,8 -49,7 +50,7 @@@ import java.util.Arrays
  import java.util.HashSet;
  import java.util.List;
  import java.util.Set;
 -import java.util.concurrent.atomic.AtomicReference;
 +import java.util.concurrent.ExecutorService;
- import java.util.concurrent.atomic.AtomicReference;
  
  import static org.junit.Assert.*;
  
@@@ -77,7 -74,6 +75,8 @@@ public class TimeJoinOperatorTest 
  
    @Test
    public void batchTest() {
-     ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
++    ExecutorService instanceNotificationExecutor =
++        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
      try {
        MeasurementPath measurementPath1 =
            new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
@@@ -85,10 -81,11 +84,12 @@@
        allSensors.add("sensor0");
        allSensors.add("sensor1");
        QueryId queryId = new QueryId("stub_query");
-       FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
-       FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
 -      AtomicReference<FragmentInstanceState> state =
 -          new AtomicReference<>(FragmentInstanceState.RUNNING);
++      FragmentInstanceId instanceId =
++          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
++      FragmentInstanceStateMachine stateMachine =
++          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
        FragmentInstanceContext fragmentInstanceContext =
 -          new FragmentInstanceContext(
 -              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
 +          new FragmentInstanceContext(instanceId, stateMachine);
        PlanNodeId planNodeId1 = new PlanNodeId("1");
        fragmentInstanceContext.addOperatorContext(
            1, planNodeId1, SeriesScanOperator.class.getSimpleName());
diff --cc server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
index 320be3be21,609dc9befa..958b0a5b72
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
@@@ -29,8 -28,7 +29,7 @@@ import org.apache.iotdb.db.mpp.common.F
  import org.apache.iotdb.db.mpp.common.PlanFragmentId;
  import org.apache.iotdb.db.mpp.common.QueryId;
  import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
--import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
 +import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
  import org.apache.iotdb.db.mpp.execution.SchemaDriverContext;
  import org.apache.iotdb.db.mpp.operator.OperatorContext;
  import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@@ -54,8 -52,7 +53,7 @@@ import java.io.IOException
  import java.util.ArrayList;
  import java.util.Arrays;
  import java.util.List;
 -import java.util.concurrent.atomic.AtomicReference;
 +import java.util.concurrent.ExecutorService;
- import java.util.concurrent.atomic.AtomicReference;
  
  import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_ATTRIBUTES;
  import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_DEVICES;
@@@ -93,13 -90,13 +91,16 @@@ public class SchemaScanOperatorTest 
  
    @Test
    public void testDeviceMetaScanOperator() {
-     ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
++    ExecutorService instanceNotificationExecutor =
++        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
      try {
        QueryId queryId = new QueryId("stub_query");
-       FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
-       FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
 -      AtomicReference<FragmentInstanceState> state =
 -          new AtomicReference<>(FragmentInstanceState.RUNNING);
++      FragmentInstanceId instanceId =
++          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
++      FragmentInstanceStateMachine stateMachine =
++          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
        FragmentInstanceContext fragmentInstanceContext =
 -          new FragmentInstanceContext(
 -              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
 +          new FragmentInstanceContext(instanceId, stateMachine);
        PlanNodeId planNodeId = queryId.genPlanNodeId();
        OperatorContext operatorContext =
            fragmentInstanceContext.addOperatorContext(
@@@ -159,13 -154,13 +160,16 @@@
  
    @Test
    public void testTimeSeriesMetaScanOperator() {
-     ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
++    ExecutorService instanceNotificationExecutor =
++        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
      try {
        QueryId queryId = new QueryId("stub_query");
-       FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
-       FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
 -      AtomicReference<FragmentInstanceState> state =
 -          new AtomicReference<>(FragmentInstanceState.RUNNING);
++      FragmentInstanceId instanceId =
++          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
++      FragmentInstanceStateMachine stateMachine =
++          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
        FragmentInstanceContext fragmentInstanceContext =
 -          new FragmentInstanceContext(
 -              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
 +          new FragmentInstanceContext(instanceId, stateMachine);
        PlanNodeId planNodeId = queryId.genPlanNodeId();
        OperatorContext operatorContext =
            fragmentInstanceContext.addOperatorContext(


[iotdb] 01/02: Add FragmentInstanceStateMachine

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch stable-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fb52130d2bc1d0ea95b36055049f1c98c728dbb2
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Wed Apr 20 17:24:28 2022 +0800

    Add FragmentInstanceStateMachine
---
 pom.xml                                            |   5 +
 server/pom.xml                                     |   4 +
 .../apache/iotdb/db/mpp/buffer/ISinkHandle.java    |  21 +-
 .../org/apache/iotdb/db/mpp/buffer/SinkHandle.java |  16 +-
 .../apache/iotdb/db/mpp/buffer/StubSinkHandle.java |  13 +-
 .../apache/iotdb/db/mpp/execution/DataDriver.java  | 186 ++-------
 .../org/apache/iotdb/db/mpp/execution/Driver.java  | 415 ++++++++++++++++++++-
 .../iotdb/db/mpp/execution/DriverContext.java      |  15 +-
 .../db/mpp/execution/FragmentInstanceContext.java  |  87 +++--
 .../mpp/execution/FragmentInstanceExecution.java   |  85 ++++-
 ...t.java => FragmentInstanceFailureListener.java} |  29 +-
 .../db/mpp/execution/FragmentInstanceManager.java  |  56 ++-
 .../db/mpp/execution/FragmentInstanceState.java    |   2 +-
 .../execution/FragmentInstanceStateMachine.java    | 171 +++++++++
 .../iotdb/db/mpp/execution/SchemaDriver.java       | 142 +------
 .../org/apache/iotdb/db/mpp/operator/Operator.java |   2 +-
 .../db/mpp/operator/process/TransformOperator.java |   2 +-
 .../mpp/operator/schema/SchemaFetchOperator.java   |   2 +-
 .../db/mpp/operator/source/ExchangeOperator.java   |   2 +-
 .../db/mpp/schedule/task/FragmentInstanceTask.java |  25 +-
 .../iotdb/db/mpp/execution/DataDriverTest.java     |  25 +-
 .../iotdb/db/mpp/operator/LimitOperatorTest.java   |  13 +-
 .../db/mpp/operator/SeriesScanOperatorTest.java    |  13 +-
 .../db/mpp/operator/TimeJoinOperatorTest.java      |  13 +-
 .../operator/schema/SchemaScanOperatorTest.java    |  23 +-
 25 files changed, 916 insertions(+), 451 deletions(-)

diff --git a/pom.xml b/pom.xml
index 90a8dbccb4..a54d70cdb4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -495,6 +495,11 @@
                 <artifactId>slice</artifactId>
                 <version>0.41</version>
             </dependency>
+            <dependency>
+                <groupId>io.airlift</groupId>
+                <artifactId>stats</artifactId>
+                <version>214</version>
+            </dependency>
             <dependency>
                 <groupId>org.openjdk.jol</groupId>
                 <artifactId>jol-core</artifactId>
diff --git a/server/pom.xml b/server/pom.xml
index e6c9c29adf..fdc67576bc 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -99,6 +99,10 @@
             <groupId>io.airlift</groupId>
             <artifactId>units</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.airlift</groupId>
+            <artifactId>stats</artifactId>
+        </dependency>
         <dependency>
             <groupId>io.airlift</groupId>
             <artifactId>airline</artifactId>
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
index 6300c5beef..1d037678ed 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
@@ -24,8 +24,9 @@ import com.google.common.util.concurrent.ListenableFuture;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Optional;
 
-public interface ISinkHandle extends AutoCloseable {
+public interface ISinkHandle {
 
   /** Get the total amount of memory used by buffered tsblocks. */
   long getBufferRetainedSizeInBytes();
@@ -41,7 +42,7 @@ public interface ISinkHandle extends AutoCloseable {
    * the send tsblock call is ignored. This can happen with limit queries. A {@link
    * RuntimeException} will be thrown if any exception happened * during the data transmission.
    */
-  void send(List<TsBlock> tsBlocks) throws IOException;
+  void send(List<TsBlock> tsBlocks);
 
   /**
    * Send a {@link TsBlock} to a specific partition. If no-more-tsblocks has been set, the send
@@ -57,22 +58,32 @@ public interface ISinkHandle extends AutoCloseable {
   void setNoMoreTsBlocks();
 
   /** If the handle is closed. */
-  public boolean isClosed();
+  boolean isClosed();
 
   /**
    * If no more tsblocks will be sent and all the tsblocks have been fetched by downstream fragment
    * instances.
    */
-  public boolean isFinished();
+  boolean isFinished();
 
   /**
    * Close the handle. The output buffer will not be cleared until all tsblocks are fetched by
    * downstream instances. A {@link RuntimeException} will be thrown if any exception happened
    * during the data transmission.
    */
-  @Override
   void close() throws IOException;
 
   /** Abort the sink handle, discarding all tsblocks which may still be in memory buffer. */
   void abort();
+
+  /**
+   * @return whether this SinkHandle is failed
+   */
+  boolean isFailed();
+
+
+  /**
+   * Returns non empty failure cause if the sink handle is failed
+   */
+  Optional<Throwable> getFailureCause();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index af4ecf089d..b2fd70459d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -41,6 +41,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.StringJoiner;
 import java.util.concurrent.ExecutorService;
 
@@ -113,11 +114,8 @@ public class SinkHandle implements ISinkHandle {
   }
 
   @Override
-  public void send(List<TsBlock> tsBlocks) throws IOException {
+  public void send(List<TsBlock> tsBlocks) {
     Validate.notNull(tsBlocks, "tsBlocks is null");
-    if (throwable != null) {
-      throw new IOException(throwable);
-    }
     if (closed) {
       throw new IllegalStateException("Sink handle is closed.");
     }
@@ -229,6 +227,16 @@ public class SinkHandle implements ISinkHandle {
     logger.info("Sink handle {} is aborted", this);
   }
 
+  @Override
+  public boolean isFailed() {
+    return throwable != null;
+  }
+
+  @Override
+  public Optional<Throwable> getFailureCause() {
+    return Optional.ofNullable(throwable);
+  }
+
   @Override
   public synchronized void setNoMoreTsBlocks() {
     noMoreTsBlocks = true;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
index 9f3b9240c2..c4bc0e377a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
@@ -26,6 +26,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
 
@@ -59,7 +60,7 @@ public class StubSinkHandle implements ISinkHandle {
   }
 
   @Override
-  public void send(List<TsBlock> tsBlocks) throws IOException {
+  public void send(List<TsBlock> tsBlocks) {
     this.tsBlocks.addAll(tsBlocks);
   }
 
@@ -95,6 +96,16 @@ public class StubSinkHandle implements ISinkHandle {
     tsBlocks.clear();
   }
 
+  @Override
+  public boolean isFailed() {
+    return false;
+  }
+
+  @Override
+  public Optional<Throwable> getFailureCause() {
+    return Optional.empty();
+  }
+
   public List<TsBlock> getTsBlocks() {
     return tsBlocks;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
index 21aebe0214..2921c9d889 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
-import org.apache.iotdb.commons.exception.IoTDBException;
+import com.google.common.util.concurrent.SettableFuture;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -26,164 +26,74 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.idtable.IDTable;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
-import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.operator.Operator;
 import org.apache.iotdb.db.mpp.operator.source.DataSourceOperator;
 import org.apache.iotdb.db.query.control.FileReaderManager;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import io.airlift.units.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.NotThreadSafe;
-
-import java.io.IOException;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
-import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
-import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
 
 @NotThreadSafe
-public class DataDriver implements Driver {
-
-  private static final Logger logger = LoggerFactory.getLogger(DataDriver.class);
-
-  private final Operator root;
-  private final ISinkHandle sinkHandle;
-  private final DataDriverContext driverContext;
+public class DataDriver extends Driver {
 
   private boolean init;
-  private boolean closed;
 
   /** closed tsfile used in this fragment instance */
   private Set<TsFileResource> closedFilePaths;
   /** unClosed tsfile used in this fragment instance */
   private Set<TsFileResource> unClosedFilePaths;
 
-  private final AtomicReference<SettableFuture<Void>> driverBlockedFuture = new AtomicReference<>();
 
   public DataDriver(Operator root, ISinkHandle sinkHandle, DataDriverContext driverContext) {
-    this.root = root;
-    this.sinkHandle = sinkHandle;
-    this.driverContext = driverContext;
+    super(root, sinkHandle, driverContext);
     this.closedFilePaths = new HashSet<>();
     this.unClosedFilePaths = new HashSet<>();
-    // initially the driverBlockedFuture is not blocked (it is completed)
-    SettableFuture<Void> future = SettableFuture.create();
-    future.set(null);
-    driverBlockedFuture.set(future);
-  }
-
-  @Override
-  public boolean isFinished() {
-    try {
-      boolean isFinished =
-          closed || (driverBlockedFuture.get().isDone() && root != null && root.isFinished());
-      if (isFinished) {
-        close();
-      }
-      return isFinished;
-    } catch (Throwable t) {
-      logger.error(
-          "Failed to query whether the data driver {} is finished", driverContext.getId(), t);
-      driverContext.failed(t);
-      close();
-      return true;
-    }
   }
 
   @Override
-  public ListenableFuture<Void> processFor(Duration duration) {
-
-    SettableFuture<Void> blockedFuture = driverBlockedFuture.get();
-    // initialization may be time-consuming, so we keep it in the processFor method
-    // in normal case, it won't cause deadlock and should finish soon, otherwise it will be a
-    // critical bug
+  protected boolean init(SettableFuture<Void> blockedFuture) {
     if (!init) {
       try {
         initialize();
       } catch (Throwable t) {
-        logger.error(
+        LOGGER.error(
             "Failed to do the initialization for fragment instance {} ", driverContext.getId(), t);
         driverContext.failed(t);
-        close();
         blockedFuture.setException(t);
-        return blockedFuture;
+        return false;
       }
     }
-
-    // if the driver is blocked we don't need to continue
-    if (!blockedFuture.isDone()) {
-      return blockedFuture;
-    }
-
-    long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
-
-    long start = System.nanoTime();
-    try {
-      do {
-        ListenableFuture<Void> future = processInternal();
-        if (!future.isDone()) {
-          return updateDriverBlockedFuture(future);
-        }
-      } while (System.nanoTime() - start < maxRuntime && !root.isFinished());
-    } catch (Throwable t) {
-      logger.error("Failed to execute fragment instance {}", driverContext.getId(), t);
-      driverContext.failed(t);
-      close();
-      blockedFuture.setException(t);
-      return blockedFuture;
-    }
-    return NOT_BLOCKED;
-  }
-
-  @Override
-  public FragmentInstanceId getInfo() {
-    return driverContext.getId();
+    return true;
   }
 
+  /**
+   * All file paths used by this fragment instance must be cleared and thus the usage reference must
+   * be decreased.
+   */
   @Override
-  public void close() {
-    if (closed) {
-      return;
+  protected void releaseResource() {
+    for (TsFileResource tsFile : closedFilePaths) {
+      FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true);
     }
-    closed = true;
-    try {
-      if (root != null) {
-        root.close();
-      }
-      if (sinkHandle != null) {
-        sinkHandle.close();
-      }
-    } catch (Throwable t) {
-      logger.error("Failed to closed driver {}", driverContext.getId(), t);
-      driverContext.failed(t);
-    } finally {
-      removeUsedFilesForQuery();
+    closedFilePaths = null;
+    for (TsFileResource tsFile : unClosedFilePaths) {
+      FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true);
     }
+    unClosedFilePaths = null;
   }
 
-  @Override
-  public void failed(Throwable t) {
-    driverContext.failed(t);
-  }
 
   /**
    * init seq file list and unseq file list in QueryDataSource and set it into each SourceNode TODO
    * we should change all the blocked lock operation into tryLock
    */
   private void initialize() throws QueryProcessException {
-    List<DataSourceOperator> sourceOperators = driverContext.getSourceOperators();
+    List<DataSourceOperator> sourceOperators = ((DataDriverContext)driverContext).getSourceOperators();
     if (sourceOperators != null && !sourceOperators.isEmpty()) {
       QueryDataSource dataSource = initQueryDataSourceCache();
       sourceOperators.forEach(
@@ -206,11 +116,12 @@ public class DataDriver implements Driver {
    * QueryDataSource needed for this query
    */
   public QueryDataSource initQueryDataSourceCache() throws QueryProcessException {
-    DataRegion dataRegion = driverContext.getDataRegion();
+    DataDriverContext context = (DataDriverContext) driverContext;
+    DataRegion dataRegion = context.getDataRegion();
     dataRegion.readLock();
     try {
       List<PartialPath> pathList =
-          driverContext.getPaths().stream()
+          context.getPaths().stream()
               .map(IDTable::translateQueryPath)
               .collect(Collectors.toList());
       // when all the selected series are under the same device, the QueryDataSource will be
@@ -223,7 +134,7 @@ public class DataDriver implements Driver {
               pathList,
               selectedDeviceIdSet.size() == 1 ? selectedDeviceIdSet.iterator().next() : null,
               driverContext.getFragmentInstanceContext(),
-              driverContext.getTimeFilter());
+              context.getTimeFilter());
 
       // used files should be added before mergeLock is unlocked, or they may be deleted by
       // running merge
@@ -264,28 +175,13 @@ public class DataDriver implements Driver {
     }
   }
 
-  /**
-   * All file paths used by this fragment instance must be cleared and thus the usage reference must
-   * be decreased.
-   */
-  private void removeUsedFilesForQuery() {
-    for (TsFileResource tsFile : closedFilePaths) {
-      FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true);
-    }
-    closedFilePaths = null;
-    for (TsFileResource tsFile : unClosedFilePaths) {
-      FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true);
-    }
-    unClosedFilePaths = null;
-  }
-
   /**
    * Increase the usage reference of filePath of job id. Before the invoking of this method, <code>
    * this.setqueryIdForCurrentRequestThread</code> has been invoked, so <code>
    * sealedFilePathsMap.get(queryId)</code> or <code>unsealedFilePathsMap.get(queryId)</code> must
    * not return null.
    */
-  void addFilePathToMap(TsFileResource tsFile, boolean isClosed) {
+  private void addFilePathToMap(TsFileResource tsFile, boolean isClosed) {
     Set<TsFileResource> pathSet = isClosed ? closedFilePaths : unClosedFilePaths;
     if (!pathSet.contains(tsFile)) {
       pathSet.add(tsFile);
@@ -293,38 +189,4 @@ public class DataDriver implements Driver {
     }
   }
 
-  private ListenableFuture<Void> processInternal() throws IOException, IoTDBException {
-    ListenableFuture<Void> blocked = root.isBlocked();
-    if (!blocked.isDone()) {
-      return blocked;
-    }
-    blocked = sinkHandle.isFull();
-    if (!blocked.isDone()) {
-      return blocked;
-    }
-    if (root.hasNext()) {
-      TsBlock tsBlock = root.next();
-      if (tsBlock != null && !tsBlock.isEmpty()) {
-        sinkHandle.send(Collections.singletonList(tsBlock));
-      }
-    }
-    return NOT_BLOCKED;
-  }
-
-  private ListenableFuture<Void> updateDriverBlockedFuture(
-      ListenableFuture<Void> sourceBlockedFuture) {
-    // driverBlockedFuture will be completed as soon as the sourceBlockedFuture is completed
-    // or any of the operators gets a memory revocation request
-    SettableFuture<Void> newDriverBlockedFuture = SettableFuture.create();
-    driverBlockedFuture.set(newDriverBlockedFuture);
-    sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), directExecutor());
-
-    // TODO Although we don't have memory management for operator now, we should consider it for
-    // future
-    // it's possible that memory revoking is requested for some operator
-    // before we update driverBlockedFuture above and we don't want to miss that
-    // notification, so we check to see whether that's the case before returning.
-
-    return newDriverBlockedFuture;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
index f211ce593c..fd2fef8b3a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
@@ -18,25 +18,88 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
-import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-
+import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import io.airlift.units.Duration;
+import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.operator.Operator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
+import javax.annotation.concurrent.GuardedBy;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Throwables.throwIfUnchecked;
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
+import static java.lang.Boolean.TRUE;
+import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
 
 /**
  * Driver encapsulates some methods which are necessary for execution scheduler to run a fragment
  * instance
  */
-public interface Driver extends Closeable {
+public abstract class Driver {
+
+  protected static final Logger LOGGER = LoggerFactory.getLogger(Driver.class);
+
+
+  protected final Operator root;
+  protected final ISinkHandle sinkHandle;
+  protected final DriverContext driverContext;
+  protected final AtomicReference<SettableFuture<Void>> driverBlockedFuture = new AtomicReference<>();
+  protected final AtomicReference<State> state = new AtomicReference<>(State.ALIVE);
+
+  protected final DriverLock exclusiveLock = new DriverLock();
+
+  protected enum State {
+    ALIVE, NEED_DESTRUCTION, DESTROYED
+  }
+
+  public Driver(Operator root, ISinkHandle sinkHandle, DriverContext driverContext) {
+    this.root = root;
+    this.sinkHandle = sinkHandle;
+    this.driverContext = driverContext;
+
+    // initially the driverBlockedFuture is not blocked (it is completed)
+    SettableFuture<Void> future = SettableFuture.create();
+    future.set(null);
+    driverBlockedFuture.set(future);
+  }
 
   /**
    * Used to judge whether this fragment instance should be scheduled for execution anymore
    *
    * @return true if the FragmentInstance is done or terminated due to failure, otherwise false.
    */
-  boolean isFinished();
+  public boolean isFinished() {
+    checkLockNotHeld("Cannot check finished status while holding the driver lock");
+
+    // if we can get the lock, attempt a clean shutdown; otherwise someone else will shutdown
+    Optional<Boolean> result = tryWithLockUnInterruptibly(this::isFinishedInternal);
+    return result.orElseGet(() -> state.get() != State.ALIVE || driverContext.isDone());
+  }
+
+  /**
+   * do initialization
+   *
+   * @return true if init succeed, false otherwise
+   */
+  protected abstract boolean init(SettableFuture<Void> blockedFuture);
+
+  /**
+   * release resource this driver used
+   */
+  protected abstract void releaseResource();
 
   /**
    * run the fragment instance for {@param duration} time slice, the time of this run is likely not
@@ -44,27 +107,351 @@ public interface Driver extends Closeable {
    *
    * @param duration how long should this fragment instance run
    * @return the returned ListenableFuture<Void> is used to represent status of this processing if
-   *     isDone() return true, meaning that this fragment instance is not blocked and is ready for
-   *     next processing otherwise, meaning that this fragment instance is blocked and not ready for
-   *     next processing.
+   * isDone() return true, meaning that this fragment instance is not blocked and is ready for
+   * next processing otherwise, meaning that this fragment instance is blocked and not ready for
+   * next processing.
    */
-  ListenableFuture<Void> processFor(Duration duration);
+  public ListenableFuture<Void> processFor(Duration duration) {
+
+    SettableFuture<Void> blockedFuture = driverBlockedFuture.get();
+    // initialization may be time-consuming, so we keep it in the processFor method
+    // in normal case, it won't cause deadlock and should finish soon, otherwise it will be a
+    // critical bug
+    if (!init(blockedFuture)) {
+      return blockedFuture;
+    }
+
+    // if the driver is blocked we don't need to continue
+    if (!blockedFuture.isDone()) {
+      return blockedFuture;
+    }
+
+    long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
+
+    Optional<ListenableFuture<Void>> result = tryWithLock(100, TimeUnit.MILLISECONDS, true, () -> {
+      long start = System.nanoTime();
+      do {
+        ListenableFuture<Void> future = processInternal();
+        if (!future.isDone()) {
+          return updateDriverBlockedFuture(future);
+        }
+      }
+      while (System.nanoTime() - start < maxRuntime && !isFinishedInternal());
+      return NOT_BLOCKED;
+    });
+
+    return result.orElse(NOT_BLOCKED);
+  }
 
   /**
    * the id information about this Fragment Instance.
    *
    * @return a {@link FragmentInstanceId} instance.
    */
-  FragmentInstanceId getInfo();
+  public FragmentInstanceId getInfo() {
+    return driverContext.getId();
+  }
+
+  /**
+   * clear resource used by this fragment instance
+   */
+  public void close() {
+    // mark the service for destruction
+    if (!state.compareAndSet(State.ALIVE, State.NEED_DESTRUCTION)) {
+      return;
+    }
 
-  /** clear resource used by this fragment instance */
-  @Override
-  void close();
+    exclusiveLock.interruptCurrentOwner();
+
+    // if we can get the lock, attempt a clean shutdown; otherwise someone else will shutdown
+    tryWithLockUnInterruptibly(() -> TRUE);
+  }
 
   /**
    * fail current driver
    *
    * @param t reason cause this failure
    */
-  void failed(Throwable t);
+  public void failed(Throwable t) {
+    driverContext.failed(t);
+  }
+
+  public ISinkHandle getSinkHandle() {
+    return sinkHandle;
+  }
+
+  @GuardedBy("exclusiveLock")
+  private boolean isFinishedInternal() {
+    checkLockHeld("Lock must be held to call isFinishedInternal");
+
+    boolean finished = state.get() != State.ALIVE || driverContext.isDone() || root == null || root.isFinished();
+    if (finished) {
+      state.compareAndSet(State.ALIVE, State.NEED_DESTRUCTION);
+    }
+    return finished;
+  }
+
+
+  private ListenableFuture<Void> processInternal() {
+    try {
+      ListenableFuture<Void> blocked = root.isBlocked();
+      if (!blocked.isDone()) {
+        return blocked;
+      }
+      blocked = sinkHandle.isFull();
+      if (!blocked.isDone()) {
+        return blocked;
+      }
+      if (root.hasNext()) {
+        TsBlock tsBlock = root.next();
+        if (tsBlock != null && !tsBlock.isEmpty()) {
+          sinkHandle.send(Collections.singletonList(tsBlock));
+        }
+      }
+      return NOT_BLOCKED;
+    } catch (Throwable t) {
+      LOGGER.error("Failed to execute fragment instance {}", driverContext.getId(), t);
+      List<StackTraceElement> interrupterStack = exclusiveLock.getInterrupterStack();
+      if (interrupterStack == null) {
+        driverContext.failed(t);
+        throw t;
+      }
+
+      // Driver thread was interrupted which should only happen if the task is already finished.
+      // If this becomes the actual cause of a failed query there is a bug in the task state machine.
+      Exception exception = new Exception("Interrupted By");
+      exception.setStackTrace(interrupterStack.toArray(new StackTraceElement[0]));
+      RuntimeException newException = new RuntimeException("Driver was interrupted", exception);
+      newException.addSuppressed(t);
+      driverContext.failed(newException);
+      throw newException;
+    }
+  }
+
+  private ListenableFuture<Void> updateDriverBlockedFuture(
+      ListenableFuture<Void> sourceBlockedFuture) {
+    // driverBlockedFuture will be completed as soon as the sourceBlockedFuture is completed
+    // or any of the operators gets a memory revocation request
+    SettableFuture<Void> newDriverBlockedFuture = SettableFuture.create();
+    driverBlockedFuture.set(newDriverBlockedFuture);
+    sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), directExecutor());
+
+    // TODO Although we don't have memory management for operator now, we should consider it for
+    // future
+    // it's possible that memory revoking is requested for some operator
+    // before we update driverBlockedFuture above and we don't want to miss that
+    // notification, so we check to see whether that's the case before returning.
+
+    return newDriverBlockedFuture;
+  }
+
+
+  private synchronized void checkLockNotHeld(String message) {
+    checkState(!exclusiveLock.isHeldByCurrentThread(), message);
+  }
+
+  @GuardedBy("exclusiveLock")
+  private synchronized void checkLockHeld(String message) {
+    checkState(exclusiveLock.isHeldByCurrentThread(), message);
+  }
+
+  /**
+   * Try to acquire the {@code exclusiveLock} immediately and run a {@code task}
+   * The task will not be interrupted if the {@code Driver} is closed.
+   * <p>
+   * Note: task cannot return null
+   */
+  private <T> Optional<T> tryWithLockUnInterruptibly(Supplier<T> task) {
+    return tryWithLock(0, TimeUnit.MILLISECONDS, false, task);
+  }
+
+  /**
+   * Try to acquire the {@code exclusiveLock} with {@code timeout} and run a {@code task}.
+   * If the {@code interruptOnClose} flag is set to {@code true} the {@code task} will be
+   * interrupted if the {@code Driver} is closed.
+   * <p>
+   * Note: task cannot return null
+   */
+  private <T> Optional<T> tryWithLock(long timeout, TimeUnit unit, boolean interruptOnClose, Supplier<T> task) {
+    checkLockNotHeld("Lock cannot be reacquired");
+
+    boolean acquired = false;
+    try {
+      acquired = exclusiveLock.tryLock(timeout, unit, interruptOnClose);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+
+    if (!acquired) {
+      return Optional.empty();
+    }
+
+    Optional<T> result;
+    try {
+      result = Optional.of(task.get());
+    } finally {
+      try {
+        destroyIfNecessary();
+      } finally {
+        exclusiveLock.unlock();
+      }
+    }
+
+    return result;
+  }
+
+  @GuardedBy("exclusiveLock")
+  private void destroyIfNecessary() {
+    checkLockHeld("Lock must be held to call destroyIfNecessary");
+
+    if (!state.compareAndSet(State.NEED_DESTRUCTION, State.DESTROYED)) {
+      return;
+    }
+
+    // if we get an error while closing a driver, record it and we will throw it at the end
+    Throwable inFlightException = null;
+    try {
+      inFlightException = closeAndDestroyOperators();
+      driverContext.finished();
+    } catch (Throwable t) {
+      // this shouldn't happen but be safe
+      inFlightException = addSuppressedException(
+          inFlightException,
+          t,
+          "Error destroying driver for task %s",
+          driverContext.getId());
+    } finally {
+      releaseResource();
+    }
+
+    if (inFlightException != null) {
+      // this will always be an Error or Runtime
+      throwIfUnchecked(inFlightException);
+      throw new RuntimeException(inFlightException);
+    }
+  }
+
+  private Throwable closeAndDestroyOperators() {
+    // record the current interrupted status (and clear the flag); we'll reset it later
+    boolean wasInterrupted = Thread.interrupted();
+
+    Throwable inFlightException = null;
+
+    try {
+      if (root != null) {
+        root.close();
+      }
+      if (sinkHandle != null) {
+        sinkHandle.close();
+      }
+    } catch (InterruptedException t) {
+      // don't record the stack
+      wasInterrupted = true;
+    } catch (Throwable t) {
+      // TODO currently, we won't know exact operator which is failed in closing
+      inFlightException = addSuppressedException(
+          inFlightException,
+          t,
+          "Error closing operator {} for fragment instance {}",
+          root.getOperatorContext().getOperatorId(),
+          driverContext.getId());
+    } finally {
+      // reset the interrupted flag
+      if (wasInterrupted) {
+        Thread.currentThread().interrupt();
+      }
+    }
+    return inFlightException;
+  }
+
+  private static Throwable addSuppressedException(Throwable inFlightException, Throwable newException, String message, Object... args) {
+    if (newException instanceof Error) {
+      if (inFlightException == null) {
+        inFlightException = newException;
+      } else {
+        // Self-suppression not permitted
+        if (inFlightException != newException) {
+          inFlightException.addSuppressed(newException);
+        }
+      }
+    } else {
+      // log normal exceptions instead of rethrowing them
+      LOGGER.error(message, args, newException);
+    }
+    return inFlightException;
+  }
+
+  private static class DriverLock {
+    private final ReentrantLock lock = new ReentrantLock();
+
+    @GuardedBy("this")
+    private Thread currentOwner;
+    @GuardedBy("this")
+    private boolean currentOwnerInterruptionAllowed;
+
+    @GuardedBy("this")
+    private List<StackTraceElement> interrupterStack;
+
+    public boolean isHeldByCurrentThread() {
+      return lock.isHeldByCurrentThread();
+    }
+
+    public boolean tryLock(boolean currentThreadInterruptionAllowed) {
+      checkState(!lock.isHeldByCurrentThread(), "Lock is not reentrant");
+      boolean acquired = lock.tryLock();
+      if (acquired) {
+        setOwner(currentThreadInterruptionAllowed);
+      }
+      return acquired;
+    }
+
+    public boolean tryLock(long timeout, TimeUnit unit, boolean currentThreadInterruptionAllowed)
+        throws InterruptedException {
+      checkState(!lock.isHeldByCurrentThread(), "Lock is not reentrant");
+      boolean acquired = lock.tryLock(timeout, unit);
+      if (acquired) {
+        setOwner(currentThreadInterruptionAllowed);
+      }
+      return acquired;
+    }
+
+    private synchronized void setOwner(boolean interruptionAllowed) {
+      checkState(lock.isHeldByCurrentThread(), "Current thread does not hold lock");
+      currentOwner = Thread.currentThread();
+      currentOwnerInterruptionAllowed = interruptionAllowed;
+      // NOTE: We do not use interrupted stack information to know that another
+      // thread has attempted to interrupt the driver, and interrupt this new lock
+      // owner.  The interrupted stack information is for debugging purposes only.
+      // In the case of interruption, the caller should (and does) have a separate
+      // state to prevent further processing in the Driver.
+    }
+
+    public synchronized void unlock() {
+      checkState(lock.isHeldByCurrentThread(), "Current thread does not hold lock");
+      currentOwner = null;
+      currentOwnerInterruptionAllowed = false;
+      lock.unlock();
+    }
+
+    public synchronized List<StackTraceElement> getInterrupterStack() {
+      return interrupterStack;
+    }
+
+    public synchronized void interruptCurrentOwner() {
+      if (!currentOwnerInterruptionAllowed) {
+        return;
+      }
+      // there is a benign race condition here were the lock holder
+      // can be change between attempting to get lock and grabbing
+      // the synchronized lock here, but in either case we want to
+      // interrupt the lock holder thread
+      if (interrupterStack == null) {
+        interrupterStack = ImmutableList.copyOf(Thread.currentThread().getStackTrace());
+      }
+
+      if (currentOwner != null) {
+        currentOwner.interrupt();
+      }
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
index 8c20a2c334..d0bc3bb232 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
@@ -20,10 +20,15 @@ package org.apache.iotdb.db.mpp.execution;
 
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 public class DriverContext {
 
   private final FragmentInstanceContext fragmentInstanceContext;
 
+  private final AtomicBoolean finished = new AtomicBoolean();
+
+
   public DriverContext(FragmentInstanceContext fragmentInstanceContext) {
     this.fragmentInstanceContext = fragmentInstanceContext;
   }
@@ -38,13 +43,15 @@ public class DriverContext {
 
   public void failed(Throwable cause) {
     fragmentInstanceContext.failed(cause);
+    finished.set(true);
   }
 
-  public void finish() {
-    fragmentInstanceContext.finish();
+  public void finished() {
+    finished.compareAndSet(false, true);
   }
 
-  public void flushing() {
-    fragmentInstanceContext.flushing();
+
+  public boolean isDone() {
+    return finished.get();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
index b2e51be2bc..c3d33dbd74 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
@@ -28,9 +28,11 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 public class FragmentInstanceContext extends QueryContext {
 
@@ -41,14 +43,19 @@ public class FragmentInstanceContext extends QueryContext {
   // TODO if we split one fragment instance into multiple pipelines to run, we need to replace it
   // with CopyOnWriteArrayList or some other thread safe data structure
   private final List<OperatorContext> operatorContexts = new ArrayList<>();
-  private final long createNanos = System.nanoTime();
 
   private DriverContext driverContext;
 
-  // TODO we may use StateMachine<FragmentInstanceState> to replace it
-  private final AtomicReference<FragmentInstanceState> state;
+  private final FragmentInstanceStateMachine stateMachine;
+
+  private final long createNanos = System.nanoTime();
+
+  private final AtomicLong startNanos = new AtomicLong();
+  private final AtomicLong endNanos = new AtomicLong();
 
-  private long endTime = -1;
+  private final AtomicReference<Long> executionStartTime = new AtomicReference<>();
+  private final AtomicReference<Long> lastExecutionStartTime = new AtomicReference<>();
+  private final AtomicReference<Long> executionEndTime = new AtomicReference<>();
 
   //    private final GcMonitor gcMonitor;
   //    private final AtomicLong startNanos = new AtomicLong();
@@ -59,9 +66,43 @@ public class FragmentInstanceContext extends QueryContext {
   //    private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1);
 
   public FragmentInstanceContext(
-      FragmentInstanceId id, AtomicReference<FragmentInstanceState> state) {
+      FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) {
     this.id = id;
-    this.state = state;
+    this.stateMachine = stateMachine;
+  }
+
+  public void start() {
+    long now = System.currentTimeMillis();
+    executionStartTime.compareAndSet(null, now);
+    startNanos.compareAndSet(0, System.nanoTime());
+
+    // always update last execution start time
+    lastExecutionStartTime.set(now);
+  }
+
+  // the state change listener is added here in a separate initialize() method
+  // instead of the constructor to prevent leaking the "this" reference to
+  // another thread, which will cause unsafe publication of this instance.
+  private void initialize() {
+    stateMachine.addStateChangeListener(this::updateStatsIfDone);
+  }
+
+  private void updateStatsIfDone(FragmentInstanceState newState) {
+    if (newState.isDone()) {
+      long now = System.currentTimeMillis();
+
+      // before setting the end times, make sure a start has been recorded
+      executionStartTime.compareAndSet(null, now);
+      startNanos.compareAndSet(0, System.nanoTime());
+
+      // Only update last start time, if the nothing was started
+      lastExecutionStartTime.compareAndSet(null, now);
+
+      // use compare and set from initial value to avoid overwriting if there
+      // were a duplicate notification, which shouldn't happen
+      executionEndTime.compareAndSet(null, now);
+      endNanos.compareAndSet(0, System.nanoTime());
+    }
   }
 
   public OperatorContext addOperatorContext(
@@ -98,40 +139,10 @@ public class FragmentInstanceContext extends QueryContext {
   }
 
   public void failed(Throwable cause) {
-    LOGGER.warn("Fragment Instance {} failed.", id, cause);
-    state.set(FragmentInstanceState.FAILED);
-  }
-
-  public void cancel() {
-    state.set(FragmentInstanceState.CANCELED);
-    this.endTime = System.currentTimeMillis();
-  }
-
-  public void abort() {
-    state.set(FragmentInstanceState.ABORTED);
-    this.endTime = System.currentTimeMillis();
-  }
-
-  public void finish() {
-    if (state.get().isDone()) {
-      return;
-    }
-    state.set(FragmentInstanceState.FINISHED);
-    this.endTime = System.currentTimeMillis();
-  }
-
-  public void flushing() {
-    if (state.get().isDone()) {
-      return;
-    }
-    state.set(FragmentInstanceState.FLUSHING);
+    stateMachine.failed(cause);
   }
 
   public long getEndTime() {
-    return endTime;
-  }
-
-  public void setEndTime(long endTime) {
-    this.endTime = endTime;
+    return executionEndTime.get();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
index 21e0cc3d50..b00ddcadb3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
@@ -18,14 +18,17 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
+import io.airlift.stats.CounterStat;
+import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.schedule.IFragmentInstanceScheduler;
 
 import com.google.common.collect.ImmutableList;
 
-import java.util.concurrent.atomic.AtomicReference;
-
 import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.ABORTED;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.CANCELLED;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FAILED;
 
 public class FragmentInstanceExecution {
 
@@ -36,23 +39,35 @@ public class FragmentInstanceExecution {
 
   private final Driver driver;
 
-  // TODO we may use StateMachine<FragmentInstanceState> to replace it
-  private final AtomicReference<FragmentInstanceState> state;
+  private final ISinkHandle sinkHandle;
+
+  private final FragmentInstanceStateMachine stateMachine;
 
   private long lastHeartbeat;
 
-  public FragmentInstanceExecution(
+  public static FragmentInstanceExecution createFragmentInstanceExecution(IFragmentInstanceScheduler scheduler,
+                                                                          FragmentInstanceId instanceId,
+                                                                          FragmentInstanceContext context,
+                                                                          Driver driver,
+                                                                          FragmentInstanceStateMachine stateMachine,
+                                                                          CounterStat failedInstances) {
+    FragmentInstanceExecution execution = new FragmentInstanceExecution(scheduler, instanceId, context, driver, stateMachine);
+    execution.initialize(failedInstances, scheduler, instanceId, driver);
+    return execution;
+  }
+
+  private FragmentInstanceExecution(
       IFragmentInstanceScheduler scheduler,
       FragmentInstanceId instanceId,
       FragmentInstanceContext context,
       Driver driver,
-      AtomicReference<FragmentInstanceState> state) {
+      FragmentInstanceStateMachine stateMachine) {
     this.scheduler = scheduler;
     this.instanceId = instanceId;
     this.context = context;
     this.driver = driver;
-    this.state = state;
-    state.set(FragmentInstanceState.RUNNING);
+    this.sinkHandle = driver.getSinkHandle();
+    this.stateMachine = stateMachine;
     scheduler.submitFragmentInstances(instanceId.getQueryId(), ImmutableList.of(driver));
   }
 
@@ -65,24 +80,66 @@ public class FragmentInstanceExecution {
   }
 
   public FragmentInstanceState getInstanceState() {
-    return state.get();
+    return stateMachine.getState();
   }
 
   public FragmentInstanceInfo getInstanceInfo() {
-    return new FragmentInstanceInfo(state.get(), context.getEndTime());
+    return new FragmentInstanceInfo(stateMachine.getState(), context.getEndTime());
   }
 
   public void failed(Throwable cause) {
     requireNonNull(cause, "cause is null");
-    context.failed(cause);
+    stateMachine.failed(cause);
   }
 
   public void cancel() {
-    context.cancel();
+    stateMachine.cancel();
   }
 
   public void abort() {
-    scheduler.abortFragmentInstance(instanceId);
-    context.abort();
+    stateMachine.abort();
+  }
+
+  // this is a separate method to ensure that the `this` reference is not leaked during construction
+  private void initialize(CounterStat failedInstances, IFragmentInstanceScheduler scheduler, FragmentInstanceId instanceId, Driver driver) {
+    requireNonNull(failedInstances, "failedInstances is null");
+    stateMachine.addStateChangeListener(newState -> {
+
+      if (!newState.isDone()) {
+        return;
+      }
+
+      // Update failed tasks counter
+      if (newState == FAILED) {
+        failedInstances.update(1);
+      }
+
+      driver.close();
+      sinkHandle.abort();
+      scheduler.abortFragmentInstance(instanceId);
+    });
+  }
+
+  private synchronized void instanceCompletion() {
+    if (stateMachine.getState().isDone()) {
+      return;
+    }
+
+    if (!sinkHandle.isFinished()) {
+      stateMachine.transitionToFlushing();
+      return;
+    }
+
+    if (sinkHandle.isFinished()) {
+      // Cool! All done!
+      stateMachine.finished();
+      return;
+    }
+
+    if (sinkHandle.isFailed()) {
+      Throwable failureCause = sinkHandle.getFailureCause()
+          .orElseGet(() -> new RuntimeException("Fragment Instance " + instanceId + " 's SinkHandle is failed but the failure cause is missing"));
+      stateMachine.failed(failureCause);
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceFailureListener.java
similarity index 58%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceFailureListener.java
index 8c20a2c334..3f99848c29 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceFailureListener.java
@@ -20,31 +20,6 @@ package org.apache.iotdb.db.mpp.execution;
 
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 
-public class DriverContext {
-
-  private final FragmentInstanceContext fragmentInstanceContext;
-
-  public DriverContext(FragmentInstanceContext fragmentInstanceContext) {
-    this.fragmentInstanceContext = fragmentInstanceContext;
-  }
-
-  public FragmentInstanceId getId() {
-    return fragmentInstanceContext.getId();
-  }
-
-  public FragmentInstanceContext getFragmentInstanceContext() {
-    return fragmentInstanceContext;
-  }
-
-  public void failed(Throwable cause) {
-    fragmentInstanceContext.failed(cause);
-  }
-
-  public void finish() {
-    fragmentInstanceContext.finish();
-  }
-
-  public void flushing() {
-    fragmentInstanceContext.flushing();
-  }
+public interface FragmentInstanceFailureListener {
+  void onTaskFailed(FragmentInstanceId taskId, Throwable failure);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
index 537f330f04..5f49fa47ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
+import io.airlift.stats.CounterStat;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
@@ -33,11 +34,12 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceExecution.createFragmentInstanceExecution;
 
 public class FragmentInstanceManager {
 
@@ -49,9 +51,14 @@ public class FragmentInstanceManager {
   private final IFragmentInstanceScheduler scheduler = FragmentInstanceScheduler.getInstance();
 
   private final ScheduledExecutorService instanceManagementExecutor;
+  private final ExecutorService instanceNotificationExecutor;
+
 
   private final Duration infoCacheTime;
 
+  // record failed instances count
+  private final CounterStat failedInstances = new CounterStat();
+
   public static FragmentInstanceManager getInstance() {
     return FragmentInstanceManager.InstanceHolder.INSTANCE;
   }
@@ -61,13 +68,14 @@ public class FragmentInstanceManager {
     this.instanceExecution = new ConcurrentHashMap<>();
     this.instanceManagementExecutor =
         IoTDBThreadPoolFactory.newScheduledThreadPool(1, "instance-management");
+    this.instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(4, "instance-notification");
 
     this.infoCacheTime = new Duration(15, TimeUnit.MINUTES);
 
     instanceManagementExecutor.scheduleWithFixedDelay(
         () -> {
           try {
-            removeOldTasks();
+            removeOldInstances();
           } catch (Throwable e) {
             logger.warn("Error removing old tasks", e);
           }
@@ -85,13 +93,13 @@ public class FragmentInstanceManager {
         instanceExecution.computeIfAbsent(
             instanceId,
             id -> {
-              AtomicReference<FragmentInstanceState> state = new AtomicReference<>();
-              state.set(FragmentInstanceState.PLANNED);
+
+              FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
 
               FragmentInstanceContext context =
                   instanceContext.computeIfAbsent(
                       instanceId,
-                      fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, state));
+                      fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, stateMachine));
 
               try {
                 DataDriver driver =
@@ -100,9 +108,9 @@ public class FragmentInstanceManager {
                         context,
                         instance.getTimeFilter(),
                         dataRegion);
-                return new FragmentInstanceExecution(scheduler, instanceId, context, driver, state);
+                return createFragmentInstanceExecution(scheduler, instanceId, context, driver, stateMachine, failedInstances);
               } catch (Throwable t) {
-                context.failed(t);
+                stateMachine.failed(t);
                 return null;
               }
             });
@@ -118,26 +126,29 @@ public class FragmentInstanceManager {
         instanceExecution.computeIfAbsent(
             instanceId,
             id -> {
-              AtomicReference<FragmentInstanceState> state = new AtomicReference<>();
-              state.set(FragmentInstanceState.PLANNED);
+              FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+
 
               FragmentInstanceContext context =
                   instanceContext.computeIfAbsent(
                       instanceId,
-                      fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, state));
+                      fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, stateMachine));
 
               try {
                 SchemaDriver driver =
                     planner.plan(instance.getFragment().getRoot(), context, schemaRegion);
-                return new FragmentInstanceExecution(scheduler, instanceId, context, driver, state);
+                return createFragmentInstanceExecution(scheduler, instanceId, context, driver, stateMachine, failedInstances);
               } catch (Throwable t) {
-                context.failed(t);
+                stateMachine.failed(t);
                 return null;
               }
             });
     return execution != null ? execution.getInstanceInfo() : createFailedInstanceInfo(instanceId);
   }
 
+  /**
+   * Aborts a FragmentInstance.
+   */
   public FragmentInstanceInfo abortFragmentInstance(FragmentInstanceId fragmentInstanceId) {
     FragmentInstanceExecution execution = instanceExecution.remove(fragmentInstanceId);
     if (execution != null) {
@@ -148,6 +159,21 @@ public class FragmentInstanceManager {
     return null;
   }
 
+  /**
+   * Cancels a FragmentInstance.
+   */
+  public FragmentInstanceInfo cancelTask(FragmentInstanceId instanceId) {
+    requireNonNull(instanceId, "taskId is null");
+
+    FragmentInstanceExecution execution = instanceExecution.remove(instanceId);
+    if (execution != null) {
+      instanceContext.remove(instanceId);
+      execution.cancel();
+      return execution.getInstanceInfo();
+    }
+    return null;
+  }
+
   /**
    * Gets the info for the specified fragment instance.
    *
@@ -163,12 +189,16 @@ public class FragmentInstanceManager {
     return execution.getInstanceInfo();
   }
 
+  public CounterStat getFailedInstances() {
+    return failedInstances;
+  }
+
   private FragmentInstanceInfo createFailedInstanceInfo(FragmentInstanceId instanceId) {
     return new FragmentInstanceInfo(
         FragmentInstanceState.FAILED, instanceContext.get(instanceId).getEndTime());
   }
 
-  private void removeOldTasks() {
+  private void removeOldInstances() {
     long oldestAllowedInstance = System.currentTimeMillis() - infoCacheTime.toMillis();
     instanceContext
         .entrySet()
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java
index d55af83e98..d1a182bcbd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java
@@ -40,7 +40,7 @@ public enum FragmentInstanceState {
   /** Instance has finished executing and all output has been consumed. */
   FINISHED(true, false),
   /** Instance was canceled by a user. */
-  CANCELED(true, true),
+  CANCELLED(true, true),
   /** Instance was aborted due to a failure in the query. The failure was not in this instance. */
   ABORTED(true, true),
   /** Instance execution failed. */
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceStateMachine.java
new file mode 100644
index 0000000000..7f66d1a321
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceStateMachine.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.iotdb.db.mpp.execution.StateMachine.StateChangeListener;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.ABORTED;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.CANCELLED;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FAILED;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FINISHED;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FLUSHING;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.RUNNING;
+import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.TERMINAL_INSTANCE_STATES;
+
+
+@ThreadSafe
+public class FragmentInstanceStateMachine {
+  private static final Logger LOGGER = LoggerFactory.getLogger(FragmentInstanceStateMachine.class);
+
+  private final long createdTime = System.currentTimeMillis();
+
+  private final FragmentInstanceId instanceId;
+  private final Executor executor;
+  private final StateMachine<FragmentInstanceState> instanceState;
+  private final LinkedBlockingQueue<Throwable> failureCauses = new LinkedBlockingQueue<>();
+
+  @GuardedBy("this")
+  private final Map<FragmentInstanceId, Throwable> sourceInstanceFailures = new HashMap<>();
+  @GuardedBy("this")
+  private final List<FragmentInstanceFailureListener> sourceInstanceFailureListeners = new ArrayList<>();
+
+  public FragmentInstanceStateMachine(FragmentInstanceId fragmentInstanceId, Executor executor) {
+    this.instanceId = requireNonNull(fragmentInstanceId, "fragmentInstanceId is null");
+    this.executor = requireNonNull(executor, "executor is null");
+    instanceState = new StateMachine<>("FragmentInstance " + fragmentInstanceId, executor, RUNNING, TERMINAL_INSTANCE_STATES);
+    instanceState.addStateChangeListener(newState -> LOGGER.debug("Fragment Instance {} is {}", fragmentInstanceId, newState));
+  }
+
+  public long getCreatedTime() {
+    return createdTime;
+  }
+
+  public FragmentInstanceId getFragmentInstanceId() {
+    return instanceId;
+  }
+
+  public FragmentInstanceState getState() {
+    return instanceState.get();
+  }
+
+  public ListenableFuture<FragmentInstanceState> getStateChange(FragmentInstanceState currentState) {
+    requireNonNull(currentState, "currentState is null");
+    checkArgument(!currentState.isDone(), "Current state is already done");
+
+    ListenableFuture<FragmentInstanceState> future = instanceState.getStateChange(currentState);
+    FragmentInstanceState state = instanceState.get();
+    if (state.isDone()) {
+      return immediateFuture(state);
+    }
+    return future;
+  }
+
+  public LinkedBlockingQueue<Throwable> getFailureCauses() {
+    return failureCauses;
+  }
+
+  public void transitionToFlushing() {
+    instanceState.setIf(FLUSHING, currentState -> currentState == RUNNING);
+  }
+
+  public void finished() {
+    transitionToDoneState(FINISHED);
+  }
+
+  public void cancel() {
+    transitionToDoneState(CANCELLED);
+  }
+
+  public void abort() {
+    transitionToDoneState(ABORTED);
+  }
+
+  public void failed(Throwable cause) {
+    failureCauses.add(cause);
+    transitionToDoneState(FAILED);
+  }
+
+  private void transitionToDoneState(FragmentInstanceState doneState) {
+    requireNonNull(doneState, "doneState is null");
+    checkArgument(doneState.isDone(), "doneState %s is not a done state", doneState);
+
+    instanceState.setIf(doneState, currentState -> !currentState.isDone());
+  }
+
+  /**
+   * Listener is always notified asynchronously using a dedicated notification thread pool so, care should
+   * be taken to avoid leaking {@code this} when adding a listener in a constructor. Additionally, it is
+   * possible notifications are observed out of order due to the asynchronous execution.
+   */
+  public void addStateChangeListener(StateChangeListener<FragmentInstanceState> stateChangeListener) {
+    instanceState.addStateChangeListener(stateChangeListener);
+  }
+
+  public void addSourceTaskFailureListener(FragmentInstanceFailureListener listener) {
+    Map<FragmentInstanceId, Throwable> failures;
+    synchronized (this) {
+      sourceInstanceFailureListeners.add(listener);
+      failures = ImmutableMap.copyOf(sourceInstanceFailures);
+    }
+    executor.execute(() -> {
+      failures.forEach(listener::onTaskFailed);
+    });
+  }
+
+  public void sourceTaskFailed(FragmentInstanceId instanceId, Throwable failure) {
+    List<FragmentInstanceFailureListener> listeners;
+    synchronized (this) {
+      sourceInstanceFailures.putIfAbsent(instanceId, failure);
+      listeners = ImmutableList.copyOf(sourceInstanceFailureListeners);
+    }
+    executor.execute(() -> {
+      for (FragmentInstanceFailureListener listener : listeners) {
+        listener.onTaskFailed(instanceId, failure);
+      }
+    });
+  }
+
+  @Override
+  public String toString() {
+    return toStringHelper(this)
+        .add("FragmentInstanceId", instanceId)
+        .add("FragmentInstanceState", instanceState)
+        .add("failureCauses", failureCauses)
+        .toString();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
index 4020db4593..6c45eb529a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
@@ -18,155 +18,27 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
+import com.google.common.util.concurrent.SettableFuture;
 import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
-import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.operator.Operator;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import io.airlift.units.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.NotThreadSafe;
 
-import java.io.IOException;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
-import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
-
 @NotThreadSafe
-public class SchemaDriver implements Driver {
-
-  private static final Logger logger = LoggerFactory.getLogger(SchemaDriver.class);
-
-  private final Operator root;
-  private final ISinkHandle sinkHandle;
-  private final SchemaDriverContext driverContext;
-
-  private final AtomicReference<SettableFuture<Void>> driverBlockedFuture = new AtomicReference<>();
-
-  private boolean closed = false;
+public class SchemaDriver extends Driver {
 
   public SchemaDriver(Operator root, ISinkHandle sinkHandle, SchemaDriverContext driverContext) {
-    this.root = root;
-    this.sinkHandle = sinkHandle;
-    this.driverContext = driverContext;
-    // initially the driverBlockedFuture is not blocked (it is completed)
-    SettableFuture<Void> future = SettableFuture.create();
-    future.set(null);
-    driverBlockedFuture.set(future);
-  }
-
-  @Override
-  public boolean isFinished() {
-    try {
-      boolean isFinished = driverBlockedFuture.get().isDone() && root != null && root.isFinished();
-      if (isFinished) {
-        close();
-      }
-      return isFinished;
-    } catch (Throwable t) {
-      logger.error(
-          "Failed to query whether the schema driver {} is finished", driverContext.getId(), t);
-      driverContext.failed(t);
-      return true;
-    }
+    super(root, sinkHandle, driverContext);
   }
 
-  @Override
-  public ListenableFuture<Void> processFor(Duration duration) {
-    // if the driver is blocked we don't need to continue
-    SettableFuture<Void> blockedFuture = driverBlockedFuture.get();
-    if (!blockedFuture.isDone()) {
-      return blockedFuture;
-    }
-
-    long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
-
-    long start = System.nanoTime();
-    try {
-      do {
-        ListenableFuture<Void> future = processInternal();
-        if (!future.isDone()) {
-          return updateDriverBlockedFuture(future);
-        }
-      } while (System.nanoTime() - start < maxRuntime && !root.isFinished());
-    } catch (Throwable t) {
-      logger.error("Failed to execute fragment instance {}", driverContext.getId(), t);
-      driverContext.failed(t);
-      close();
-      blockedFuture.setException(t);
-      return blockedFuture;
-    }
-    return NOT_BLOCKED;
-  }
-
-  private ListenableFuture<Void> processInternal() throws IOException {
-    ListenableFuture<Void> blocked = root.isBlocked();
-    if (!blocked.isDone()) {
-      return blocked;
-    }
-    blocked = sinkHandle.isFull();
-    if (!blocked.isDone()) {
-      return blocked;
-    }
-    if (root.hasNext()) {
-      TsBlock tsBlock = root.next();
-      if (tsBlock != null && !tsBlock.isEmpty()) {
-        sinkHandle.send(Collections.singletonList(tsBlock));
-      }
-    }
-    return NOT_BLOCKED;
-  }
-
-  private ListenableFuture<Void> updateDriverBlockedFuture(
-      ListenableFuture<Void> sourceBlockedFuture) {
-    // driverBlockedFuture will be completed as soon as the sourceBlockedFuture is completed
-    // or any of the operators gets a memory revocation request
-    SettableFuture<Void> newDriverBlockedFuture = SettableFuture.create();
-    driverBlockedFuture.set(newDriverBlockedFuture);
-    sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), directExecutor());
-
-    // TODO Although we don't have memory management for operator now, we should consider it for
-    // future
-    // it's possible that memory revoking is requested for some operator
-    // before we update driverBlockedFuture above and we don't want to miss that
-    // notification, so we check to see whether that's the case before returning.
-
-    return newDriverBlockedFuture;
-  }
-
-  @Override
-  public FragmentInstanceId getInfo() {
-    return driverContext.getId();
-  }
 
   @Override
-  public void close() {
-    if (closed) {
-      return;
-    }
-    closed = true;
-    try {
-      if (root != null) {
-        root.close();
-      }
-      if (sinkHandle != null) {
-        sinkHandle.close();
-      }
-    } catch (Throwable t) {
-      logger.error("Failed to closed driver {}", driverContext.getId(), t);
-      driverContext.failed(t);
-    }
+  protected boolean init(SettableFuture<Void> blockedFuture) {
+    return true;
   }
 
   @Override
-  public void failed(Throwable t) {
-    driverContext.failed(t);
+  protected void releaseResource() {
+    // do nothing
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
index c8cccc0520..7305feee83 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
@@ -53,5 +53,5 @@ public interface Operator extends AutoCloseable {
   /**
    * Is this operator completely finished processing and no more output TsBlock will be produced.
    */
-  boolean isFinished() throws IOException;
+  boolean isFinished();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java
index 48d0c71185..cd3355d395 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java
@@ -217,7 +217,7 @@ public class TransformOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean isFinished() throws IOException {
+  public boolean isFinished() {
     return !hasNext();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaFetchOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaFetchOperator.java
index 003da76f2d..97da0a4935 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaFetchOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaFetchOperator.java
@@ -92,7 +92,7 @@ public class SchemaFetchOperator implements SourceOperator {
   }
 
   @Override
-  public boolean isFinished() throws IOException {
+  public boolean isFinished() {
     return isFinished;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java
index 722f63543d..397b7dc461 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java
@@ -65,7 +65,7 @@ public class ExchangeOperator implements SourceOperator {
   }
 
   @Override
-  public boolean isFinished() throws IOException {
+  public boolean isFinished() {
     return sourceHandle.isFinished();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
index e30b1f15bc..888f4ce5e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
@@ -18,10 +18,14 @@
  */
 package org.apache.iotdb.db.mpp.schedule.task;
 
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.Driver;
+import org.apache.iotdb.db.mpp.execution.DriverContext;
+import org.apache.iotdb.db.mpp.operator.Operator;
 import org.apache.iotdb.db.mpp.schedule.ExecutionContext;
 import org.apache.iotdb.db.mpp.schedule.FragmentInstanceTaskExecutor;
 import org.apache.iotdb.db.mpp.schedule.queue.ID;
@@ -175,17 +179,31 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
     }
   }
 
-  private static class StubFragmentInstance implements Driver {
+  private static class StubFragmentInstance extends Driver {
 
     private static final QueryId stubQueryId = new QueryId("stub_query");
     private static final FragmentInstanceId stubInstance =
         new FragmentInstanceId(new PlanFragmentId(stubQueryId, 0), "stub-instance");
 
+    public StubFragmentInstance() {
+      super(null, null, null);
+    }
+
     @Override
     public boolean isFinished() {
       return false;
     }
 
+    @Override
+    protected boolean init(SettableFuture<Void> blockedFuture) {
+      return true;
+    }
+
+    @Override
+    protected void releaseResource() {
+
+    }
+
     @Override
     public ListenableFuture<Void> processFor(Duration duration) {
       return null;
@@ -201,5 +219,10 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
 
     @Override
     public void failed(Throwable t) {}
+
+    @Override
+    public ISinkHandle getSinkHandle() {
+      return null;
+    }
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
index 2703d0aa37..db060ac405 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -55,6 +56,7 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.iotdb.db.mpp.schedule.FragmentInstanceTaskExecutor.EXECUTION_TIME_SLICE;
@@ -85,6 +87,7 @@ public class DataDriverTest {
 
   @Test
   public void batchTest() {
+    ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
       MeasurementPath measurementPath1 =
           new MeasurementPath(DATA_DRIVER_TEST_SG + ".device0.sensor0", TSDataType.INT32);
@@ -92,11 +95,10 @@ public class DataDriverTest {
       allSensors.add("sensor0");
       allSensors.add("sensor1");
       QueryId queryId = new QueryId("stub_query");
-      AtomicReference<FragmentInstanceState> state =
-          new AtomicReference<>(FragmentInstanceState.RUNNING);
+      FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
       FragmentInstanceContext fragmentInstanceContext =
-          new FragmentInstanceContext(
-              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+          new FragmentInstanceContext(instanceId, stateMachine);
       PlanNodeId planNodeId1 = new PlanNodeId("1");
       fragmentInstanceContext.addOperatorContext(
           1, planNodeId1, SeriesScanOperator.class.getSimpleName());
@@ -159,19 +161,20 @@ public class DataDriverTest {
               ImmutableList.of(seriesScanOperator1, seriesScanOperator2));
 
       StubSinkHandle sinkHandle = new StubSinkHandle(fragmentInstanceContext);
-
-      try (Driver dataDriver = new DataDriver(limitOperator, sinkHandle, driverContext)) {
+      Driver dataDriver = null;
+      try {
+        dataDriver = new DataDriver(limitOperator, sinkHandle, driverContext);
         assertEquals(fragmentInstanceContext.getId(), dataDriver.getInfo());
 
         assertFalse(dataDriver.isFinished());
 
         while (!dataDriver.isFinished()) {
-          assertEquals(FragmentInstanceState.RUNNING, state.get());
+          assertEquals(FragmentInstanceState.RUNNING, stateMachine.getState());
           ListenableFuture<Void> blocked = dataDriver.processFor(EXECUTION_TIME_SLICE);
           assertTrue(blocked.isDone());
         }
 
-        assertEquals(FragmentInstanceState.FLUSHING, state.get());
+        assertEquals(FragmentInstanceState.FLUSHING, stateMachine.getState());
 
         List<TsBlock> result = sinkHandle.getTsBlocks();
         assertEquals(13, result.size());
@@ -204,10 +207,16 @@ public class DataDriverTest {
             }
           }
         }
+      } finally {
+        if (dataDriver != null) {
+          dataDriver.close();
+        }
       }
     } catch (IllegalPathException | QueryProcessException e) {
       e.printStackTrace();
       fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
     }
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
index 6ad74bd110..ab0849ddf2 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.operator;
 
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -28,6 +29,7 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.operator.process.LimitOperator;
 import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator;
 import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
@@ -50,6 +52,7 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
@@ -78,6 +81,7 @@ public class LimitOperatorTest {
 
   @Test
   public void batchTest() {
+    ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
       MeasurementPath measurementPath1 =
           new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
@@ -85,11 +89,10 @@ public class LimitOperatorTest {
       allSensors.add("sensor0");
       allSensors.add("sensor1");
       QueryId queryId = new QueryId("stub_query");
-      AtomicReference<FragmentInstanceState> state =
-          new AtomicReference<>(FragmentInstanceState.RUNNING);
+      FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
       FragmentInstanceContext fragmentInstanceContext =
-          new FragmentInstanceContext(
-              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+          new FragmentInstanceContext(instanceId, stateMachine);
       PlanNodeId planNodeId1 = new PlanNodeId("1");
       fragmentInstanceContext.addOperatorContext(
           1, planNodeId1, SeriesScanOperator.class.getSimpleName());
@@ -169,6 +172,8 @@ public class LimitOperatorTest {
     } catch (IllegalPathException e) {
       e.printStackTrace();
       fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
     }
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
index 3793504970..dd28a90263 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.operator;
 
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -28,6 +29,7 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
@@ -46,6 +48,7 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
@@ -73,17 +76,17 @@ public class SeriesScanOperatorTest {
 
   @Test
   public void batchTest() {
+    ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
       MeasurementPath measurementPath =
           new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
       Set<String> allSensors = new HashSet<>();
       allSensors.add("sensor0");
       QueryId queryId = new QueryId("stub_query");
-      AtomicReference<FragmentInstanceState> state =
-          new AtomicReference<>(FragmentInstanceState.RUNNING);
+      FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
       FragmentInstanceContext fragmentInstanceContext =
-          new FragmentInstanceContext(
-              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+          new FragmentInstanceContext(instanceId, stateMachine);
       PlanNodeId planNodeId = new PlanNodeId("1");
       fragmentInstanceContext.addOperatorContext(
           1, planNodeId, SeriesScanOperator.class.getSimpleName());
@@ -123,6 +126,8 @@ public class SeriesScanOperatorTest {
     } catch (IllegalPathException e) {
       e.printStackTrace();
       fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
     }
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
index 5534418b84..70fe4def02 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.operator;
 
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -28,6 +29,7 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator;
 import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -49,6 +51,7 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.*;
@@ -74,6 +77,7 @@ public class TimeJoinOperatorTest {
 
   @Test
   public void batchTest() {
+    ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
       MeasurementPath measurementPath1 =
           new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
@@ -81,11 +85,10 @@ public class TimeJoinOperatorTest {
       allSensors.add("sensor0");
       allSensors.add("sensor1");
       QueryId queryId = new QueryId("stub_query");
-      AtomicReference<FragmentInstanceState> state =
-          new AtomicReference<>(FragmentInstanceState.RUNNING);
+      FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
       FragmentInstanceContext fragmentInstanceContext =
-          new FragmentInstanceContext(
-              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+          new FragmentInstanceContext(instanceId, stateMachine);
       PlanNodeId planNodeId1 = new PlanNodeId("1");
       fragmentInstanceContext.addOperatorContext(
           1, planNodeId1, SeriesScanOperator.class.getSimpleName());
@@ -155,6 +158,8 @@ public class TimeJoinOperatorTest {
     } catch (IllegalPathException e) {
       e.printStackTrace();
       fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
     }
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
index 609dc9befa..320be3be21 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperatorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.operator.schema;
 
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.LocalConfigNode;
@@ -29,6 +30,7 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.execution.SchemaDriverContext;
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -52,6 +54,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_ATTRIBUTES;
@@ -90,13 +93,13 @@ public class SchemaScanOperatorTest {
 
   @Test
   public void testDeviceMetaScanOperator() {
+    ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
       QueryId queryId = new QueryId("stub_query");
-      AtomicReference<FragmentInstanceState> state =
-          new AtomicReference<>(FragmentInstanceState.RUNNING);
+      FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
       FragmentInstanceContext fragmentInstanceContext =
-          new FragmentInstanceContext(
-              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+          new FragmentInstanceContext(instanceId, stateMachine);
       PlanNodeId planNodeId = queryId.genPlanNodeId();
       OperatorContext operatorContext =
           fragmentInstanceContext.addOperatorContext(
@@ -149,18 +152,20 @@ public class SchemaScanOperatorTest {
     } catch (MetadataException e) {
       e.printStackTrace();
       fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
     }
   }
 
   @Test
   public void testTimeSeriesMetaScanOperator() {
+    ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
       QueryId queryId = new QueryId("stub_query");
-      AtomicReference<FragmentInstanceState> state =
-          new AtomicReference<>(FragmentInstanceState.RUNNING);
+      FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
       FragmentInstanceContext fragmentInstanceContext =
-          new FragmentInstanceContext(
-              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+          new FragmentInstanceContext(instanceId, stateMachine);
       PlanNodeId planNodeId = queryId.genPlanNodeId();
       OperatorContext operatorContext =
           fragmentInstanceContext.addOperatorContext(
@@ -237,6 +242,8 @@ public class SchemaScanOperatorTest {
     } catch (MetadataException e) {
       e.printStackTrace();
       fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
     }
   }
 }