You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/06 03:39:42 UTC

[iotdb] branch xingtanzjr/query_execution updated (08b659f9eb -> 3050807331)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch xingtanzjr/query_execution
in repository https://gitbox.apache.org/repos/asf/iotdb.git


    from 08b659f9eb fix the bug
     add ee6e7700cf [IOTDB-2748] Writing statement and writing process of coordinator (#5355)
     add 90e381d7c8 [IOTDB-2811] Fix compaction exception handle failure cause by deletion of storage group (#5363)
     add 2c85f9cda2 [IOTDB-2839] Add Python client CI (#5407)
     add fbfb3ebe2c [IOTDB-2843] remove unused jna dependency in cli (#5410)
     new 61304abf92 Merge branch 'master' into xingtanzjr/query_execution
     new e01ff0c86c spotless
     new a7ceb4934b fix the bug in distributionpalnner
     new 3050807331 add basic scheduler

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/{client.yml => client-cpp.yml}   |   2 +-
 .../workflows/{client-go.yml => client-python.yml} |  19 +-
 cli/pom.xml                                        |   2 +
 client-py/SessionAlignedTimeseriesTest.py          | 280 -----------
 client-py/SessionTest.py                           | 336 -------------
 client-py/requirements_dev.txt                     |   2 +-
 client-py/tests/test_aligned_timeseries.py         | 279 +++++++++++
 client-py/tests/test_dataframe.py                  |   6 +-
 client-py/tests/test_session.py                    | 335 +++++++++++++
 client-py/tests/test_todf.py                       |  12 +-
 .../cluster/log/snapshot/FileSnapshotTest.java     |  10 +-
 .../log/snapshot/PartitionedSnapshotTest.java      |   4 +-
 .../cluster/log/snapshot/PullSnapshotTaskTest.java |   2 +-
 .../cluster/server/member/DataGroupMemberTest.java |   2 +-
 .../iotdb/db/integration/IoTDBFilePathUtilsIT.java |   2 +-
 .../db/integration/IoTDBLoadExternalTsfileIT.java  |  70 +--
 .../integration/IoTDBManageTsFileResourceIT.java   |   8 +-
 .../aligned/IoTDBLoadExternalAlignedTsFileIT.java  |  64 +--
 .../iotdb/commons/partition/DataPartition.java     |  20 +-
 .../commons/partition/DataPartitionQueryParam.java |   4 +-
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  23 +-
 .../engine/compaction/CompactionTaskManager.java   |  95 ++--
 .../cross/AbstractCrossSpaceCompactionTask.java    |   9 +
 .../task/RewriteCrossSpaceCompactionTask.java      |   4 +
 .../inner/AbstractInnerSpaceCompactionTask.java    |   9 +
 .../inner/sizetiered/SizeTieredCompactionTask.java |   6 +-
 .../compaction/task/AbstractCompactionTask.java    |  20 +-
 .../db/engine/storagegroup/TsFileResourceList.java |   5 +-
 .../storagegroup/VirtualStorageGroupProcessor.java |  40 +-
 .../virtualSg/StorageGroupManager.java             |  14 +-
 .../db/mpp/common/schematree/PathPatternTree.java  |  13 +
 .../iotdb/db/mpp/common/schematree/SchemaTree.java |   6 +
 .../apache/iotdb/db/mpp/execution/Coordinator.java |  27 +-
 .../{ExecutionStatus.java => ExecutionResult.java} |   4 +-
 .../iotdb/db/mpp/execution/FutureStateChange.java  |  76 ++-
 .../iotdb/db/mpp/execution/QueryExecution.java     |  76 +--
 .../apache/iotdb/db/mpp/execution/QueryState.java  |  44 +-
 .../iotdb/db/mpp/execution/QueryStateMachine.java  | 146 +++---
 .../iotdb/db/mpp/execution/StateMachine.java       | 527 ++++++++++-----------
 .../scheduler/AbstractFragInsStateFetcher.java     |  60 ---
 .../scheduler/AbstractFragInsStateTracker.java     |  70 +++
 .../mpp/execution/scheduler/ClusterScheduler.java  |  50 +-
 .../scheduler/FixedRateFragInsStateFetcher.java    |  61 ---
 .../scheduler/FixedRateFragInsStateTracker.java    |  65 +++
 .../scheduler/FragInstanceDispatchResult.java      |  14 +-
 .../scheduler/IFragInstanceDispatcher.java         |  15 +-
 ...Fetcher.java => IFragInstanceStateTracker.java} |   6 +-
 .../scheduler/InternalServiceClientFactory.java    |  16 +-
 .../scheduler/SimpleFragInstanceDispatcher.java    |  38 +-
 .../apache/iotdb/db/mpp/sql/analyze/Analysis.java  |  16 +-
 .../apache/iotdb/db/mpp/sql/analyze/Analyzer.java  |  72 ++-
 .../db/mpp/sql/analyze/ClusterSchemaFetcher.java   |   8 +
 .../db/mpp/sql/analyze/FakeSchemaFetcherImpl.java  |   8 +
 .../iotdb/db/mpp/sql/analyze/ISchemaFetcher.java   |   5 +
 .../mpp/sql/analyze/StandaloneSchemaFetcher.java   |   8 +
 .../db/mpp/sql/parser/StatementGenerator.java      |  52 +-
 .../db/mpp/sql/planner/DistributionPlanner.java    |   1 +
 .../iotdb/db/mpp/sql/planner/LogicalPlanner.java   |  43 +-
 .../db/mpp/sql/planner/plan/node/PlanNode.java     |   6 +-
 .../planner/plan/node/sink/FragmentSinkNode.java   |   6 +-
 .../sql/planner/plan/node/write/InsertNode.java    |  69 ++-
 .../sql/planner/plan/node/write/InsertRowNode.java |  48 +-
 .../planner/plan/node/write/InsertTabletNode.java  | 198 +++++++-
 .../db/mpp/sql/statement/StatementVisitor.java     |  10 +-
 .../sql/statement/crud/InsertBaseStatement.java    |  59 +++
 .../mpp/sql/statement/crud/InsertRowStatement.java | 203 ++++++++
 .../sql/statement/crud/InsertTabletStatement.java  |  96 ++++
 .../iotdb/db/query/control/SessionManager.java     |  15 +
 .../db/service/thrift/impl/TSServiceImpl.java      |  61 ++-
 .../storagegroup/StorageGroupProcessorTest.java    |  58 ++-
 .../iotdb/db/engine/storagegroup/TTLTest.java      |   4 +-
 .../db/metadata/idtable/IDTableFlushTimeTest.java  |   6 +-
 .../iotdb/db/mpp/sql/plan/QueryPlannerTest.java    |   6 +-
 .../db/sync/receiver/load/FileLoaderTest.java      |  14 +-
 .../recover/SyncReceiverLogAnalyzerTest.java       |   2 +-
 75 files changed, 2599 insertions(+), 1473 deletions(-)
 rename .github/workflows/{client.yml => client-cpp.yml} (99%)
 copy .github/workflows/{client-go.yml => client-python.yml} (71%)
 delete mode 100644 client-py/SessionAlignedTimeseriesTest.py
 delete mode 100644 client-py/SessionTest.py
 create mode 100644 client-py/tests/test_aligned_timeseries.py
 create mode 100644 client-py/tests/test_session.py
 rename server/src/main/java/org/apache/iotdb/db/mpp/execution/{ExecutionStatus.java => ExecutionResult.java} (92%)
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateFetcher.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateFetcher.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
 rename server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/{IFragInstanceStateFetcher.java => IFragInstanceStateTracker.java} (91%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowStatement.java


[iotdb] 01/04: Merge branch 'master' into xingtanzjr/query_execution

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/query_execution
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 61304abf9240b181813f778aaa8506e7dc9b13f9
Merge: 08b659f9eb fbfb3ebe2c
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Apr 6 10:17:33 2022 +0800

    Merge branch 'master' into xingtanzjr/query_execution

 .github/workflows/{client.yml => client-cpp.yml}   |   2 +-
 .github/workflows/client-python.yml                |  63 ++++
 cli/pom.xml                                        |   2 +
 client-py/SessionAlignedTimeseriesTest.py          | 280 -----------------
 client-py/SessionTest.py                           | 336 ---------------------
 client-py/requirements_dev.txt                     |   2 +-
 client-py/tests/test_aligned_timeseries.py         | 279 +++++++++++++++++
 client-py/tests/test_dataframe.py                  |   6 +-
 client-py/tests/test_session.py                    | 335 ++++++++++++++++++++
 client-py/tests/test_todf.py                       |  12 +-
 .../cluster/log/snapshot/FileSnapshotTest.java     |  10 +-
 .../log/snapshot/PartitionedSnapshotTest.java      |   4 +-
 .../cluster/log/snapshot/PullSnapshotTaskTest.java |   2 +-
 .../cluster/server/member/DataGroupMemberTest.java |   2 +-
 .../iotdb/db/integration/IoTDBFilePathUtilsIT.java |   2 +-
 .../db/integration/IoTDBLoadExternalTsfileIT.java  |  70 +++--
 .../integration/IoTDBManageTsFileResourceIT.java   |   8 +-
 .../aligned/IoTDBLoadExternalAlignedTsFileIT.java  |  64 ++--
 .../iotdb/commons/partition/DataPartition.java     |  20 +-
 .../commons/partition/DataPartitionQueryParam.java |   4 +-
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  23 +-
 .../engine/compaction/CompactionTaskManager.java   |  95 +++---
 .../cross/AbstractCrossSpaceCompactionTask.java    |   9 +
 .../task/RewriteCrossSpaceCompactionTask.java      |   4 +
 .../inner/AbstractInnerSpaceCompactionTask.java    |   9 +
 .../inner/sizetiered/SizeTieredCompactionTask.java |   6 +-
 .../compaction/task/AbstractCompactionTask.java    |  20 +-
 .../db/engine/storagegroup/TsFileResourceList.java |   5 +-
 .../storagegroup/VirtualStorageGroupProcessor.java |  40 ++-
 .../virtualSg/StorageGroupManager.java             |  14 +-
 .../db/mpp/common/schematree/PathPatternTree.java  |  13 +
 .../iotdb/db/mpp/common/schematree/SchemaTree.java |   6 +
 .../apache/iotdb/db/mpp/sql/analyze/Analysis.java  |  16 +-
 .../apache/iotdb/db/mpp/sql/analyze/Analyzer.java  |  72 +++--
 .../db/mpp/sql/analyze/ClusterSchemaFetcher.java   |   8 +
 .../db/mpp/sql/analyze/FakeSchemaFetcherImpl.java  |   8 +
 .../iotdb/db/mpp/sql/analyze/ISchemaFetcher.java   |   5 +
 .../mpp/sql/analyze/StandaloneSchemaFetcher.java   |   8 +
 .../db/mpp/sql/parser/StatementGenerator.java      |  52 +++-
 .../iotdb/db/mpp/sql/planner/LogicalPlanner.java   |  43 ++-
 .../sql/planner/plan/node/write/InsertNode.java    |  69 ++++-
 .../sql/planner/plan/node/write/InsertRowNode.java |  48 ++-
 .../planner/plan/node/write/InsertTabletNode.java  | 198 +++++++++++-
 .../db/mpp/sql/statement/StatementVisitor.java     |  10 +-
 .../sql/statement/crud/InsertBaseStatement.java    |  59 ++++
 .../mpp/sql/statement/crud/InsertRowStatement.java | 203 +++++++++++++
 .../sql/statement/crud/InsertTabletStatement.java  |  96 ++++++
 .../iotdb/db/query/control/SessionManager.java     |  15 +
 .../db/service/thrift/impl/TSServiceImpl.java      |  57 +++-
 .../storagegroup/StorageGroupProcessorTest.java    |  58 +++-
 .../iotdb/db/engine/storagegroup/TTLTest.java      |   4 +-
 .../db/metadata/idtable/IDTableFlushTimeTest.java  |   6 +-
 .../db/sync/receiver/load/FileLoaderTest.java      |  14 +-
 .../recover/SyncReceiverLogAnalyzerTest.java       |   2 +-
 54 files changed, 1947 insertions(+), 851 deletions(-)

diff --cc server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 5033b667c4,315620643e..e348836021
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@@ -37,8 -37,10 +37,10 @@@ import org.apache.iotdb.db.metadata.pat
  import org.apache.iotdb.db.metadata.template.TemplateQueryType;
  import org.apache.iotdb.db.mpp.common.QueryId;
  import org.apache.iotdb.db.mpp.execution.Coordinator;
 -import org.apache.iotdb.db.mpp.execution.ExecutionResult;
 +import org.apache.iotdb.db.mpp.execution.ExecutionStatus;
  import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+ import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
+ import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowStatement;
  import org.apache.iotdb.db.mpp.sql.statement.crud.InsertTabletStatement;
  import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
  import org.apache.iotdb.db.qp.physical.PhysicalPlan;


[iotdb] 03/04: fix the bug in distributionpalnner

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/query_execution
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a7ceb4934b7e1f7693261e34ffa18bc64e80faf7
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Apr 6 10:34:09 2022 +0800

    fix the bug in distributionpalnner
---
 .../org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java    | 1 +
 .../org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java     | 6 +++++-
 2 files changed, 6 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index 06f96f632f..d4d713f1fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -111,6 +111,7 @@ public class DistributionPlanner {
           // SeriesScanNode.
           for (RegionReplicaSet dataRegion : dataDistribution) {
             SeriesScanNode split = (SeriesScanNode) handle.clone();
+            split.setId(PlanNodeIdAllocator.generateId());
             split.setDataRegionReplicaSet(dataRegion);
             sources.add(split);
           }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
index 91f95c0b68..518ba9a748 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
@@ -32,7 +32,7 @@ public abstract class PlanNode {
   protected static final int ONE_CHILD = 1;
   protected static final int CHILD_COUNT_NO_LIMIT = -1;
 
-  private final PlanNodeId id;
+  private PlanNodeId id;
 
   protected PlanNode(PlanNodeId id) {
     requireNonNull(id, "id is null");
@@ -43,6 +43,10 @@ public abstract class PlanNode {
     return id;
   }
 
+  public void setId(PlanNodeId id) {
+    this.id = id;
+  }
+
   public abstract List<PlanNode> getChildren();
 
   public abstract void addChild(PlanNode child);


[iotdb] 02/04: spotless

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/query_execution
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e01ff0c86c0ca9e0582c20cb2f75e93d7c9db295
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Apr 6 10:27:14 2022 +0800

    spotless
---
 .../iotdb/db/mpp/execution/FutureStateChange.java  |  76 ++-
 .../iotdb/db/mpp/execution/QueryExecution.java     |  52 +-
 .../apache/iotdb/db/mpp/execution/QueryState.java  |  44 +-
 .../iotdb/db/mpp/execution/QueryStateMachine.java  | 145 +++---
 .../iotdb/db/mpp/execution/StateMachine.java       | 527 ++++++++++-----------
 .../scheduler/AbstractFragInsStateFetcher.java     |  45 +-
 .../mpp/execution/scheduler/ClusterScheduler.java  |  16 +-
 .../scheduler/FixedRateFragInsStateFetcher.java    |  54 ++-
 .../scheduler/FragInstanceDispatchResult.java      |  14 +-
 .../scheduler/IFragInstanceDispatcher.java         |  15 +-
 .../scheduler/IFragInstanceStateFetcher.java       |   2 +-
 .../scheduler/InternalServiceClientFactory.java    |  16 +-
 .../scheduler/SimpleFragInstanceDispatcher.java    |  38 +-
 .../planner/plan/node/sink/FragmentSinkNode.java   |   6 +-
 14 files changed, 534 insertions(+), 516 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FutureStateChange.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FutureStateChange.java
index 55fc2da276..0d33be9b16 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FutureStateChange.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FutureStateChange.java
@@ -19,6 +19,7 @@ import com.google.common.util.concurrent.SettableFuture;
 
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.ThreadSafe;
+
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.Executor;
@@ -27,52 +28,47 @@ import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 import static java.util.Objects.requireNonNull;
 
 @ThreadSafe
-public class FutureStateChange<T>
-{
-    // Use a separate future for each listener so canceled listeners can be removed
-    @GuardedBy("listeners")
-    private final Set<SettableFuture<T>> listeners = new HashSet<>();
+public class FutureStateChange<T> {
+  // Use a separate future for each listener so canceled listeners can be removed
+  @GuardedBy("listeners")
+  private final Set<SettableFuture<T>> listeners = new HashSet<>();
 
-    public ListenableFuture<T> createNewListener()
-    {
-        SettableFuture<T> listener = SettableFuture.create();
-        synchronized (listeners) {
-            listeners.add(listener);
-        }
+  public ListenableFuture<T> createNewListener() {
+    SettableFuture<T> listener = SettableFuture.create();
+    synchronized (listeners) {
+      listeners.add(listener);
+    }
 
-        // remove the listener when the future completes
-        listener.addListener(
-                () -> {
-                    synchronized (listeners) {
-                        listeners.remove(listener);
-                    }
-                },
-                directExecutor());
+    // remove the listener when the future completes
+    listener.addListener(
+        () -> {
+          synchronized (listeners) {
+            listeners.remove(listener);
+          }
+        },
+        directExecutor());
 
-        return listener;
-    }
+    return listener;
+  }
 
-    public void complete(T newState)
-    {
-        fireStateChange(newState, directExecutor());
-    }
+  public void complete(T newState) {
+    fireStateChange(newState, directExecutor());
+  }
 
-    public void complete(T newState, Executor executor)
-    {
-        fireStateChange(newState, executor);
-    }
+  public void complete(T newState, Executor executor) {
+    fireStateChange(newState, executor);
+  }
 
-    private void fireStateChange(T newState, Executor executor)
-    {
-        requireNonNull(executor, "executor is null");
-        Set<SettableFuture<T>> futures;
-        synchronized (listeners) {
-            futures = ImmutableSet.copyOf(listeners);
-            listeners.clear();
-        }
+  private void fireStateChange(T newState, Executor executor) {
+    requireNonNull(executor, "executor is null");
+    Set<SettableFuture<T>> futures;
+    synchronized (listeners) {
+      futures = ImmutableSet.copyOf(listeners);
+      listeners.clear();
+    }
 
-        for (SettableFuture<T> future : futures) {
-            executor.execute(() -> future.set(newState));
-        }
+    for (SettableFuture<T> future : futures) {
+      executor.execute(() -> future.set(newState));
     }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 0023b488e6..dd48dff9ec 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
-import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.iotdb.db.mpp.buffer.ISourceHandle;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.execution.scheduler.ClusterScheduler;
@@ -34,6 +33,8 @@ import org.apache.iotdb.db.mpp.sql.statement.Statement;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -68,12 +69,13 @@ public class QueryExecution {
 
     // We add the abort logic inside the QueryExecution.
     // So that the other components can only focus on the state change.
-    stateMachine.addStateChangeListener(state -> {
-      if (!state.isDone()) {
-        return;
-      }
-      this.cleanup();
-    });
+    stateMachine.addStateChangeListener(
+        state -> {
+          if (!state.isDone()) {
+            return;
+          }
+          this.cleanup();
+        });
   }
 
   public void start() {
@@ -90,7 +92,8 @@ public class QueryExecution {
 
   private void schedule() {
     // TODO: (xingtanzjr) initialize the query scheduler according to configuration
-    this.scheduler = new ClusterScheduler(stateMachine, distributedPlan.getInstances(), context.getQueryType());
+    this.scheduler =
+        new ClusterScheduler(stateMachine, distributedPlan.getInstances(), context.getQueryType());
     // TODO: (xingtanzjr) how to make the schedule running asynchronously
     this.scheduler.start();
   }
@@ -108,7 +111,8 @@ public class QueryExecution {
   }
 
   /**
-   * Do cleanup work for current QueryExecution including QuerySchedule aborting and resource releasing
+   * Do cleanup work for current QueryExecution including QuerySchedule aborting and resource
+   * releasing
    */
   private void cleanup() {
     if (this.scheduler != null) {
@@ -117,18 +121,13 @@ public class QueryExecution {
     releaseResource();
   }
 
-  /**
-   * Release the resources that current QueryExecution hold.
-   */
-  private void releaseResource() {
-
-  }
+  /** Release the resources that current QueryExecution hold. */
+  private void releaseResource() {}
 
   /**
    * This method will be called by the request thread from client connection. This method will block
    * until one of these conditions occurs: 1. There is a batch of result 2. There is no more result
-   * 3. The query has been cancelled 4. The query is timeout This method wil
-   * l fetch the result from
+   * 3. The query has been cancelled 4. The query is timeout This method wil l fetch the result from
    * DataStreamManager use the virtual ResultOperator's ID (This part will be designed and
    * implemented with DataStreamManager)
    */
@@ -137,22 +136,27 @@ public class QueryExecution {
   }
 
   /**
-   * This method is a synchronized method.
-   * For READ, it will block until all the FragmentInstances have been submitted.
-   * For WRITE, it will block until all the FragmentInstances have finished.
+   * This method is a synchronized method. For READ, it will block until all the FragmentInstances
+   * have been submitted. For WRITE, it will block until all the FragmentInstances have finished.
+   *
    * @return ExecutionStatus. Contains the QueryId and the TSStatus.
    */
   public ExecutionStatus getStatus() {
-    // Although we monitor the state to transition to RUNNING, the future will return if any Terminated state is triggered
-    ListenableFuture<QueryState> future =  stateMachine.getStateChange(QueryState.RUNNING);
+    // Although we monitor the state to transition to RUNNING, the future will return if any
+    // Terminated state is triggered
+    ListenableFuture<QueryState> future = stateMachine.getStateChange(QueryState.RUNNING);
     try {
       QueryState state = future.get();
       // TODO: (xingtanzjr) use more TSStatusCode if the QueryState isn't FINISHED
-      TSStatusCode statusCode = state == QueryState.FINISHED ? TSStatusCode.SUCCESS_STATUS : TSStatusCode.QUERY_PROCESS_ERROR;
+      TSStatusCode statusCode =
+          state == QueryState.FINISHED
+              ? TSStatusCode.SUCCESS_STATUS
+              : TSStatusCode.QUERY_PROCESS_ERROR;
       return new ExecutionStatus(context.getQueryId(), RpcUtils.getStatus(statusCode));
     } catch (InterruptedException | ExecutionException e) {
       // TODO: (xingtanzjr) use more accurate error handling
-      return new ExecutionStatus(context.getQueryId(), RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+      return new ExecutionStatus(
+          context.getQueryId(), RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java
index b585ab28c8..383875a022 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java
@@ -25,27 +25,25 @@ import java.util.stream.Stream;
 import static com.google.common.collect.ImmutableSet.toImmutableSet;
 
 public enum QueryState {
-    QUEUED(false),
-    PLANNED(false),
-    DISPATCHING(false),
-    RUNNING(false),
-    FINISHED(true),
-    CANCELED(true),
-    ABORTED(true),
-    FAILED(true);
-
-    private final boolean doneState;
-
-    public static final Set<QueryState> TERMINAL_INSTANCE_STATES =
-            Stream.of(QueryState.values())
-                    .filter(QueryState::isDone)
-                    .collect(toImmutableSet());
-
-    QueryState(boolean doneState) {
-        this.doneState = doneState;
-    }
-
-    public boolean isDone() {
-        return doneState;
-    }
+  QUEUED(false),
+  PLANNED(false),
+  DISPATCHING(false),
+  RUNNING(false),
+  FINISHED(true),
+  CANCELED(true),
+  ABORTED(true),
+  FAILED(true);
+
+  private final boolean doneState;
+
+  public static final Set<QueryState> TERMINAL_INSTANCE_STATES =
+      Stream.of(QueryState.values()).filter(QueryState::isDone).collect(toImmutableSet());
+
+  QueryState(boolean doneState) {
+    this.doneState = doneState;
+  }
+
+  public boolean isDone() {
+    return doneState;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index 33b9049b89..0ed499c785 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -18,13 +18,12 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
-import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 
-import java.util.List;
+import com.google.common.util.concurrent.ListenableFuture;
+
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
@@ -34,72 +33,76 @@ import java.util.concurrent.Executor;
  * register listeners when the state changes of the QueryExecution.
  */
 public class QueryStateMachine {
-    private String name;
-    private StateMachine<QueryState> queryState;
-    private Map<FragmentInstanceId, FragmentInstanceState> fragInstanceStateMap;
-
-    // The executor will be used in all the state machines belonged to this query.
-    private Executor stateMachineExecutor;
-
-    public QueryStateMachine(QueryId queryId) {
-        this.name = String.format("QueryStateMachine[%s]", queryId);
-        this.stateMachineExecutor = getStateMachineExecutor();
-        this.fragInstanceStateMap = new ConcurrentHashMap<>();
-        this.queryState = new StateMachine<>(queryId.toString(), this.stateMachineExecutor ,QueryState.QUEUED, QueryState.TERMINAL_INSTANCE_STATES);
-    }
-
-    public void initialFragInstanceState(FragmentInstanceId id, FragmentInstanceState state) {
-        this.fragInstanceStateMap.put(id, state);
-    }
-
-    public void updateFragInstanceState(FragmentInstanceId id, FragmentInstanceState state) {
-        this.fragInstanceStateMap.put(id, state);
-    }
-
-    // TODO: (xingtanzjr) consider more suitable method for executor initialization
-    private Executor getStateMachineExecutor() {
-        return IoTDBThreadPoolFactory.newSingleThreadExecutor(name);
-    }
-
-    public void addStateChangeListener(StateMachine.StateChangeListener<QueryState> stateChangeListener)
-    {
-        queryState.addStateChangeListener(stateChangeListener);
-    }
-
-    public ListenableFuture<QueryState> getStateChange(QueryState currentState)
-    {
-        return queryState.getStateChange(currentState);
-    }
-
-    private String getName() {
-        return name;
-    }
-
-    public void transitionToPlanned() {
-        queryState.set(QueryState.PLANNED);
-    }
-
-    public void transitionToDispatching() {
-        queryState.set(QueryState.DISPATCHING);
-    }
-
-    public void transitionToRunning() {
-        queryState.set(QueryState.RUNNING);
-    }
-
-    public void transitionToFinished() {
-        queryState.set(QueryState.FINISHED);
-    }
-
-    public void transitionToCanceled() {
-        queryState.set(QueryState.CANCELED);
-    }
-
-    public void transitionToAborted() {
-        queryState.set(QueryState.ABORTED);
-    }
-
-    public void transitionToFailed() {
-        queryState.set(QueryState.FAILED);
-    }
+  private String name;
+  private StateMachine<QueryState> queryState;
+  private Map<FragmentInstanceId, FragmentInstanceState> fragInstanceStateMap;
+
+  // The executor will be used in all the state machines belonged to this query.
+  private Executor stateMachineExecutor;
+
+  public QueryStateMachine(QueryId queryId) {
+    this.name = String.format("QueryStateMachine[%s]", queryId);
+    this.stateMachineExecutor = getStateMachineExecutor();
+    this.fragInstanceStateMap = new ConcurrentHashMap<>();
+    this.queryState =
+        new StateMachine<>(
+            queryId.toString(),
+            this.stateMachineExecutor,
+            QueryState.QUEUED,
+            QueryState.TERMINAL_INSTANCE_STATES);
+  }
+
+  public void initialFragInstanceState(FragmentInstanceId id, FragmentInstanceState state) {
+    this.fragInstanceStateMap.put(id, state);
+  }
+
+  public void updateFragInstanceState(FragmentInstanceId id, FragmentInstanceState state) {
+    this.fragInstanceStateMap.put(id, state);
+  }
+
+  // TODO: (xingtanzjr) consider more suitable method for executor initialization
+  private Executor getStateMachineExecutor() {
+    return IoTDBThreadPoolFactory.newSingleThreadExecutor(name);
+  }
+
+  public void addStateChangeListener(
+      StateMachine.StateChangeListener<QueryState> stateChangeListener) {
+    queryState.addStateChangeListener(stateChangeListener);
+  }
+
+  public ListenableFuture<QueryState> getStateChange(QueryState currentState) {
+    return queryState.getStateChange(currentState);
+  }
+
+  private String getName() {
+    return name;
+  }
+
+  public void transitionToPlanned() {
+    queryState.set(QueryState.PLANNED);
+  }
+
+  public void transitionToDispatching() {
+    queryState.set(QueryState.DISPATCHING);
+  }
+
+  public void transitionToRunning() {
+    queryState.set(QueryState.RUNNING);
+  }
+
+  public void transitionToFinished() {
+    queryState.set(QueryState.FINISHED);
+  }
+
+  public void transitionToCanceled() {
+    queryState.set(QueryState.CANCELED);
+  }
+
+  public void transitionToAborted() {
+    queryState.set(QueryState.ABORTED);
+  }
+
+  public void transitionToFailed() {
+    queryState.set(QueryState.FAILED);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java
index 78d7badfea..39e663fda5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java
@@ -20,6 +20,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.ThreadSafe;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
@@ -37,288 +38,284 @@ import static java.util.Objects.requireNonNull;
  * Simple state machine which holds a single state. Callers can register for state change events.
  */
 @ThreadSafe
-public class StateMachine<T>
-{
-    private final String name;
-    private final Executor executor;
-    private final Object lock = new Object();
-    private final Set<T> terminalStates;
-
-    @GuardedBy("lock")
-    private volatile T state;
-
-    @GuardedBy("lock")
-    private final List<StateChangeListener<T>> stateChangeListeners = new ArrayList<>();
-
-    private final AtomicReference<FutureStateChange<T>> futureStateChange = new AtomicReference<>(new FutureStateChange<>());
-
-    /**
-     * Creates a state machine with the specified initial state and no terminal states.
-     *
-     * @param name name of this state machine to use in debug statements
-     * @param executor executor for firing state change events; must not be a same thread executor
-     * @param initialState the initial state
-     */
-    public StateMachine(String name, Executor executor, T initialState)
-    {
-        this(name, executor, initialState, ImmutableSet.of());
-    }
-
-    /**
-     * Creates a state machine with the specified initial state and terminal states.
-     *
-     * @param name name of this state machine to use in debug statements
-     * @param executor executor for firing state change events; must not be a same thread executor
-     * @param initialState the initial state
-     * @param terminalStates the terminal states
-     */
-    public StateMachine(String name, Executor executor, T initialState, Iterable<T> terminalStates)
-    {
-        this.name = requireNonNull(name, "name is null");
-        this.executor = requireNonNull(executor, "executor is null");
-        this.state = requireNonNull(initialState, "initialState is null");
-        this.terminalStates = ImmutableSet.copyOf(requireNonNull(terminalStates, "terminalStates is null"));
-    }
-
-    // state changes are atomic and state is volatile, so a direct read is safe here
-    @SuppressWarnings("FieldAccessNotGuarded")
-    public T get()
-    {
+public class StateMachine<T> {
+  private final String name;
+  private final Executor executor;
+  private final Object lock = new Object();
+  private final Set<T> terminalStates;
+
+  @GuardedBy("lock")
+  private volatile T state;
+
+  @GuardedBy("lock")
+  private final List<StateChangeListener<T>> stateChangeListeners = new ArrayList<>();
+
+  private final AtomicReference<FutureStateChange<T>> futureStateChange =
+      new AtomicReference<>(new FutureStateChange<>());
+
+  /**
+   * Creates a state machine with the specified initial state and no terminal states.
+   *
+   * @param name name of this state machine to use in debug statements
+   * @param executor executor for firing state change events; must not be a same thread executor
+   * @param initialState the initial state
+   */
+  public StateMachine(String name, Executor executor, T initialState) {
+    this(name, executor, initialState, ImmutableSet.of());
+  }
+
+  /**
+   * Creates a state machine with the specified initial state and terminal states.
+   *
+   * @param name name of this state machine to use in debug statements
+   * @param executor executor for firing state change events; must not be a same thread executor
+   * @param initialState the initial state
+   * @param terminalStates the terminal states
+   */
+  public StateMachine(String name, Executor executor, T initialState, Iterable<T> terminalStates) {
+    this.name = requireNonNull(name, "name is null");
+    this.executor = requireNonNull(executor, "executor is null");
+    this.state = requireNonNull(initialState, "initialState is null");
+    this.terminalStates =
+        ImmutableSet.copyOf(requireNonNull(terminalStates, "terminalStates is null"));
+  }
+
+  // state changes are atomic and state is volatile, so a direct read is safe here
+  @SuppressWarnings("FieldAccessNotGuarded")
+  public T get() {
+    return state;
+  }
+
+  /**
+   * Sets the state. If the new state does not {@code .equals()} the current state, listeners and
+   * waiters will be notified.
+   *
+   * @return the old state
+   * @throws IllegalStateException if state change would cause a transition from a terminal state
+   */
+  public T set(T newState) {
+    T oldState = trySet(newState);
+    checkState(
+        oldState.equals(newState) || !isTerminalState(oldState),
+        "%s cannot transition from %s to %s",
+        name,
+        state,
+        newState);
+    return oldState;
+  }
+
+  /**
+   * Tries to change the state. State will not change if the new state {@code .equals()} the current
+   * state, of if the current state is a terminal state. If the state changed, listeners and waiters
+   * will be notified.
+   *
+   * @return the state before the possible state change
+   */
+  public T trySet(T newState) {
+    checkState(!Thread.holdsLock(lock), "Cannot set state while holding the lock");
+    requireNonNull(newState, "newState is null");
+
+    T oldState;
+    FutureStateChange<T> futureStateChange;
+    ImmutableList<StateChangeListener<T>> stateChangeListeners;
+    synchronized (lock) {
+      if (state.equals(newState) || isTerminalState(state)) {
         return state;
-    }
+      }
 
-    /**
-     * Sets the state.
-     * If the new state does not {@code .equals()} the current state, listeners and waiters will be notified.
-     *
-     * @return the old state
-     * @throws IllegalStateException if state change would cause a transition from a terminal state
-     */
-    public T set(T newState)
-    {
-        T oldState = trySet(newState);
-        checkState(oldState.equals(newState) || !isTerminalState(oldState), "%s cannot transition from %s to %s", name, state, newState);
-        return oldState;
-    }
+      oldState = state;
+      state = newState;
 
-    /**
-     * Tries to change the state.  State will not change if the new state {@code .equals()} the current state,
-     * of if the current state is a terminal state. If the state changed, listeners and waiters will be notified.
-     *
-     * @return the state before the possible state change
-     */
-    public T trySet(T newState)
-    {
-        checkState(!Thread.holdsLock(lock), "Cannot set state while holding the lock");
-        requireNonNull(newState, "newState is null");
-
-        T oldState;
-        FutureStateChange<T> futureStateChange;
-        ImmutableList<StateChangeListener<T>> stateChangeListeners;
-        synchronized (lock) {
-            if (state.equals(newState) || isTerminalState(state)) {
-                return state;
-            }
-
-            oldState = state;
-            state = newState;
-
-            futureStateChange = this.futureStateChange.getAndSet(new FutureStateChange<>());
-            stateChangeListeners = ImmutableList.copyOf(this.stateChangeListeners);
-
-            // if we are now in a terminal state, free the listeners since this will be the last notification
-            if (isTerminalState(state)) {
-                this.stateChangeListeners.clear();
-            }
-        }
-
-        fireStateChanged(newState, futureStateChange, stateChangeListeners);
-        return oldState;
-    }
+      futureStateChange = this.futureStateChange.getAndSet(new FutureStateChange<>());
+      stateChangeListeners = ImmutableList.copyOf(this.stateChangeListeners);
 
-    /**
-     * Sets the state if the current state satisfies the specified predicate.
-     * If the new state does not {@code .equals()} the current state, listeners and waiters will be notified.
-     *
-     * @return true if the state is set
-     */
-    public boolean setIf(T newState, Predicate<T> predicate)
-    {
-        checkState(!Thread.holdsLock(lock), "Cannot set state while holding the lock");
-        requireNonNull(newState, "newState is null");
-
-        while (true) {
-            // check if the current state passes the predicate
-            T currentState = get();
-
-            // change to same state is not a change, and does not notify the notify listeners
-            if (currentState.equals(newState)) {
-                return false;
-            }
-
-            // do not call predicate while holding the lock
-            if (!predicate.test(currentState)) {
-                return false;
-            }
-
-            // if state did not change while, checking the predicate, apply the new state
-            if (compareAndSet(currentState, newState)) {
-                return true;
-            }
-        }
+      // if we are now in a terminal state, free the listeners since this will be the last
+      // notification
+      if (isTerminalState(state)) {
+        this.stateChangeListeners.clear();
+      }
     }
 
-    /**
-     * Sets the state if the current state {@code .equals()} the specified expected state.
-     * If the new state does not {@code .equals()} the current state, listeners and waiters will be notified.
-     *
-     * @return true if the state is set
-     */
-    public boolean compareAndSet(T expectedState, T newState)
-    {
-        checkState(!Thread.holdsLock(lock), "Cannot set state while holding the lock");
-        requireNonNull(expectedState, "expectedState is null");
-        requireNonNull(newState, "newState is null");
-
-        FutureStateChange<T> futureStateChange;
-        ImmutableList<StateChangeListener<T>> stateChangeListeners;
-        synchronized (lock) {
-            if (!state.equals(expectedState)) {
-                return false;
-            }
-
-            // change to same state is not a change, and does not notify the notify listeners
-            if (state.equals(newState)) {
-                return false;
-            }
-
-            checkState(!isTerminalState(state), "%s cannot transition from %s to %s", name, state, newState);
-
-            state = newState;
-
-            futureStateChange = this.futureStateChange.getAndSet(new FutureStateChange<>());
-            stateChangeListeners = ImmutableList.copyOf(this.stateChangeListeners);
-
-            // if we are now in a terminal state, free the listeners since this will be the last notification
-            if (isTerminalState(state)) {
-                this.stateChangeListeners.clear();
-            }
-        }
-
-        fireStateChanged(newState, futureStateChange, stateChangeListeners);
+    fireStateChanged(newState, futureStateChange, stateChangeListeners);
+    return oldState;
+  }
+
+  /**
+   * Sets the state if the current state satisfies the specified predicate. If the new state does
+   * not {@code .equals()} the current state, listeners and waiters will be notified.
+   *
+   * @return true if the state is set
+   */
+  public boolean setIf(T newState, Predicate<T> predicate) {
+    checkState(!Thread.holdsLock(lock), "Cannot set state while holding the lock");
+    requireNonNull(newState, "newState is null");
+
+    while (true) {
+      // check if the current state passes the predicate
+      T currentState = get();
+
+      // change to same state is not a change, and does not notify the notify listeners
+      if (currentState.equals(newState)) {
+        return false;
+      }
+
+      // do not call predicate while holding the lock
+      if (!predicate.test(currentState)) {
+        return false;
+      }
+
+      // if state did not change while, checking the predicate, apply the new state
+      if (compareAndSet(currentState, newState)) {
         return true;
+      }
     }
-
-    private void fireStateChanged(T newState, FutureStateChange<T> futureStateChange, List<StateChangeListener<T>> stateChangeListeners)
-    {
-        checkState(!Thread.holdsLock(lock), "Cannot fire state change event while holding the lock");
-        requireNonNull(newState, "newState is null");
-
-        // always fire listener callbacks from a different thread
-        safeExecute(() -> {
-            checkState(!Thread.holdsLock(lock), "Cannot notify while holding the lock");
-            try {
-                futureStateChange.complete(newState);
-            }
-            catch (Throwable e) {
-//                log.error(e, "Error setting future state for %s", name);
-            }
-            for (StateChangeListener<T> stateChangeListener : stateChangeListeners) {
-                fireStateChangedListener(newState, stateChangeListener);
-            }
-        });
+  }
+
+  /**
+   * Sets the state if the current state {@code .equals()} the specified expected state. If the new
+   * state does not {@code .equals()} the current state, listeners and waiters will be notified.
+   *
+   * @return true if the state is set
+   */
+  public boolean compareAndSet(T expectedState, T newState) {
+    checkState(!Thread.holdsLock(lock), "Cannot set state while holding the lock");
+    requireNonNull(expectedState, "expectedState is null");
+    requireNonNull(newState, "newState is null");
+
+    FutureStateChange<T> futureStateChange;
+    ImmutableList<StateChangeListener<T>> stateChangeListeners;
+    synchronized (lock) {
+      if (!state.equals(expectedState)) {
+        return false;
+      }
+
+      // change to same state is not a change, and does not notify the notify listeners
+      if (state.equals(newState)) {
+        return false;
+      }
+
+      checkState(
+          !isTerminalState(state), "%s cannot transition from %s to %s", name, state, newState);
+
+      state = newState;
+
+      futureStateChange = this.futureStateChange.getAndSet(new FutureStateChange<>());
+      stateChangeListeners = ImmutableList.copyOf(this.stateChangeListeners);
+
+      // if we are now in a terminal state, free the listeners since this will be the last
+      // notification
+      if (isTerminalState(state)) {
+        this.stateChangeListeners.clear();
+      }
     }
 
-    private void fireStateChangedListener(T newState, StateChangeListener<T> stateChangeListener)
-    {
-        try {
-            stateChangeListener.stateChanged(newState);
-        }
-        catch (Throwable e) {
-//            log.error(e, "Error notifying state change listener for %s", name);
-        }
-    }
+    fireStateChanged(newState, futureStateChange, stateChangeListeners);
+    return true;
+  }
+
+  private void fireStateChanged(
+      T newState,
+      FutureStateChange<T> futureStateChange,
+      List<StateChangeListener<T>> stateChangeListeners) {
+    checkState(!Thread.holdsLock(lock), "Cannot fire state change event while holding the lock");
+    requireNonNull(newState, "newState is null");
+
+    // always fire listener callbacks from a different thread
+    safeExecute(
+        () -> {
+          checkState(!Thread.holdsLock(lock), "Cannot notify while holding the lock");
+          try {
+            futureStateChange.complete(newState);
+          } catch (Throwable e) {
+            //                log.error(e, "Error setting future state for %s", name);
+          }
+          for (StateChangeListener<T> stateChangeListener : stateChangeListeners) {
+            fireStateChangedListener(newState, stateChangeListener);
+          }
+        });
+  }
 
-    /**
-     * Gets a future that completes when the state is no longer {@code .equals()} to {@code currentState)}.
-     */
-    public ListenableFuture<T> getStateChange(T currentState)
-    {
-        checkState(!Thread.holdsLock(lock), "Cannot wait for state change while holding the lock");
-        requireNonNull(currentState, "currentState is null");
-
-        synchronized (lock) {
-            // return a completed future if the state has already changed, or we are in a terminal state
-            if (!state.equals(currentState) || isTerminalState(state)) {
-                return immediateFuture(state);
-            }
-
-            return futureStateChange.get().createNewListener();
-        }
+  private void fireStateChangedListener(T newState, StateChangeListener<T> stateChangeListener) {
+    try {
+      stateChangeListener.stateChanged(newState);
+    } catch (Throwable e) {
+      //            log.error(e, "Error notifying state change listener for %s", name);
     }
-
-    /**
-     * Adds a listener to be notified when the state instance changes according to {@code .equals()}.
-     * Listener is always notified asynchronously using a dedicated notification thread pool so, care should
-     * be taken to avoid leaking {@code this} when adding a listener in a constructor. Additionally, it is
-     * possible notifications are observed out of order due to the asynchronous execution. The listener is
-     * immediately notified immediately of the current state.
-     */
-    public void addStateChangeListener(StateChangeListener<T> stateChangeListener)
-    {
-        requireNonNull(stateChangeListener, "stateChangeListener is null");
-
-        boolean inTerminalState;
-        T currentState;
-        synchronized (lock) {
-            currentState = state;
-            inTerminalState = isTerminalState(currentState);
-            if (!inTerminalState) {
-                stateChangeListeners.add(stateChangeListener);
-            }
-        }
-
-        // fire state change listener with the current state
-        // always fire listener callbacks from a different thread
-        safeExecute(() -> stateChangeListener.stateChanged(currentState));
+  }
+
+  /**
+   * Gets a future that completes when the state is no longer {@code .equals()} to {@code
+   * currentState)}.
+   */
+  public ListenableFuture<T> getStateChange(T currentState) {
+    checkState(!Thread.holdsLock(lock), "Cannot wait for state change while holding the lock");
+    requireNonNull(currentState, "currentState is null");
+
+    synchronized (lock) {
+      // return a completed future if the state has already changed, or we are in a terminal state
+      if (!state.equals(currentState) || isTerminalState(state)) {
+        return immediateFuture(state);
+      }
+
+      return futureStateChange.get().createNewListener();
     }
-
-    @VisibleForTesting
-    boolean isTerminalState(T state)
-    {
-        return terminalStates.contains(state);
+  }
+
+  /**
+   * Adds a listener to be notified when the state instance changes according to {@code .equals()}.
+   * Listener is always notified asynchronously using a dedicated notification thread pool so, care
+   * should be taken to avoid leaking {@code this} when adding a listener in a constructor.
+   * Additionally, it is possible notifications are observed out of order due to the asynchronous
+   * execution. The listener is immediately notified immediately of the current state.
+   */
+  public void addStateChangeListener(StateChangeListener<T> stateChangeListener) {
+    requireNonNull(stateChangeListener, "stateChangeListener is null");
+
+    boolean inTerminalState;
+    T currentState;
+    synchronized (lock) {
+      currentState = state;
+      inTerminalState = isTerminalState(currentState);
+      if (!inTerminalState) {
+        stateChangeListeners.add(stateChangeListener);
+      }
     }
 
-    @VisibleForTesting
-    List<StateChangeListener<T>> getStateChangeListeners()
-    {
-        synchronized (lock) {
-            return ImmutableList.copyOf(stateChangeListeners);
-        }
-    }
+    // fire state change listener with the current state
+    // always fire listener callbacks from a different thread
+    safeExecute(() -> stateChangeListener.stateChanged(currentState));
+  }
 
-    public interface StateChangeListener<T>
-    {
-        void stateChanged(T newState);
-    }
+  @VisibleForTesting
+  boolean isTerminalState(T state) {
+    return terminalStates.contains(state);
+  }
 
-    @Override
-    public String toString()
-    {
-        return get().toString();
+  @VisibleForTesting
+  List<StateChangeListener<T>> getStateChangeListeners() {
+    synchronized (lock) {
+      return ImmutableList.copyOf(stateChangeListeners);
     }
-
-    private void safeExecute(Runnable command)
-    {
-        try {
-            executor.execute(command);
-        }
-        catch (RejectedExecutionException e) {
-            if ((executor instanceof ExecutorService) && ((ExecutorService) executor).isShutdown()) {
-                // TODO: (xingtanzjr) handle the exception
-                throw new RuntimeException("Server is shutting down", e);
-            }
-            throw e;
-        }
+  }
+
+  public interface StateChangeListener<T> {
+    void stateChanged(T newState);
+  }
+
+  @Override
+  public String toString() {
+    return get().toString();
+  }
+
+  private void safeExecute(Runnable command) {
+    try {
+      executor.execute(command);
+    } catch (RejectedExecutionException e) {
+      if ((executor instanceof ExecutorService) && ((ExecutorService) executor).isShutdown()) {
+        // TODO: (xingtanzjr) handle the exception
+        throw new RuntimeException("Server is shutting down", e);
+      }
+      throw e;
     }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateFetcher.java
index 6f283b8994..de842f0207 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateFetcher.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.mpp.rpc.thrift.InternalService;
 import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStateReq;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceStateResp;
+
 import org.apache.thrift.TException;
 
 import java.util.List;
@@ -33,28 +34,32 @@ import java.util.concurrent.ExecutorService;
 
 public abstract class AbstractFragInsStateFetcher implements IFragInstanceStateFetcher {
 
-    protected QueryStateMachine stateMachine;
-    protected ExecutorService executor;
-    protected List<FragmentInstance> instances;
-
-    public AbstractFragInsStateFetcher(QueryStateMachine stateMachine, ExecutorService executor, List<FragmentInstance> instances) {
-        this.stateMachine = stateMachine;
-        this.executor = executor;
-        this.instances = instances;
-    }
+  protected QueryStateMachine stateMachine;
+  protected ExecutorService executor;
+  protected List<FragmentInstance> instances;
 
-    public abstract void start();
+  public AbstractFragInsStateFetcher(
+      QueryStateMachine stateMachine, ExecutorService executor, List<FragmentInstance> instances) {
+    this.stateMachine = stateMachine;
+    this.executor = executor;
+    this.instances = instances;
+  }
 
-    protected FragmentInstanceState fetchState(FragmentInstance instance) throws TException {
-        InternalService.Client client = InternalServiceClientFactory
-                .getInternalServiceClient(instance.getHostEndpoint().getIp(), instance.getHostEndpoint().getPort());
-        TFragmentInstanceStateResp resp = client.fetchFragmentInstanceState(new TFetchFragmentInstanceStateReq(getTId(instance)));
-        return FragmentInstanceState.valueOf(resp.state);
-    }
+  public abstract void start();
 
-    private TFragmentInstanceId getTId(FragmentInstance instance) {
-        return new TFragmentInstanceId(instance.getId().getQueryId().getId(),
-                String.valueOf(instance.getId().getFragmentId().getId()), instance.getId().getInstanceId());
-    }
+  protected FragmentInstanceState fetchState(FragmentInstance instance) throws TException {
+    InternalService.Client client =
+        InternalServiceClientFactory.getInternalServiceClient(
+            instance.getHostEndpoint().getIp(), instance.getHostEndpoint().getPort());
+    TFragmentInstanceStateResp resp =
+        client.fetchFragmentInstanceState(new TFetchFragmentInstanceStateReq(getTId(instance)));
+    return FragmentInstanceState.valueOf(resp.state);
+  }
 
+  private TFragmentInstanceId getTId(FragmentInstance instance) {
+    return new TFragmentInstanceId(
+        instance.getId().getQueryId().getId(),
+        String.valueOf(instance.getId().getFragmentId().getId()),
+        instance.getId().getInstanceId());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
index c4050a3045..8c6c2991bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
-import io.airlift.units.Duration;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.execution.FragmentInfo;
@@ -27,6 +26,8 @@ import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 
+import io.airlift.units.Duration;
+
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -46,7 +47,8 @@ public class ClusterScheduler implements IScheduler {
   private List<FragmentInstance> instances;
   private IFragInstanceDispatcher dispatcher;
 
-  public ClusterScheduler(QueryStateMachine stateMachine, List<FragmentInstance> instances, QueryType queryType) {
+  public ClusterScheduler(
+      QueryStateMachine stateMachine, List<FragmentInstance> instances, QueryType queryType) {
     this.stateMachine = stateMachine;
     this.instances = instances;
     this.queryType = queryType;
@@ -74,11 +76,13 @@ public class ClusterScheduler implements IScheduler {
       return;
     }
 
-    // The FragmentInstances has been dispatched successfully to corresponding host, we mark the QueryState to Running
+    // The FragmentInstances has been dispatched successfully to corresponding host, we mark the
+    // QueryState to Running
     stateMachine.transitionToRunning();
-    instances.forEach(instance -> {
-      stateMachine.initialFragInstanceState(instance.getId(), FragmentInstanceState.RUNNING);
-    });
+    instances.forEach(
+        instance -> {
+          stateMachine.initialFragInstanceState(instance.getId(), FragmentInstanceState.RUNNING);
+        });
 
     // TODO: (xingtanzjr) start the stateFetcher/heartbeat for each fragment instance
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateFetcher.java
index 583f2503f1..9c0bc9e807 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateFetcher.java
@@ -22,40 +22,44 @@ package org.apache.iotdb.db.mpp.execution.scheduler;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+
 import org.apache.thrift.TException;
 
 import java.util.List;
 import java.util.concurrent.*;
 
 public class FixedRateFragInsStateFetcher extends AbstractFragInsStateFetcher {
-    private static final long STATE_FETCH_INTERVAL_IN_MS = 1000;
-    private volatile boolean aborted = false;
+  private static final long STATE_FETCH_INTERVAL_IN_MS = 1000;
+  private volatile boolean aborted = false;
 
-    public FixedRateFragInsStateFetcher(QueryStateMachine stateMachine, ExecutorService executor, List<FragmentInstance> instances) {
-        super(stateMachine, executor, instances);
-    }
+  public FixedRateFragInsStateFetcher(
+      QueryStateMachine stateMachine, ExecutorService executor, List<FragmentInstance> instances) {
+    super(stateMachine, executor, instances);
+  }
 
-    @Override
-    public void start() {
-        while (!aborted) {
-            try {
-                Future<Boolean> future = executor.submit(() -> {
-                    for (FragmentInstance instance : instances) {
-                        try {
-                            FragmentInstanceState state = fetchState(instance);
-                            stateMachine.updateFragInstanceState(instance.getId(), state);
-                        } catch (TException e) {
-                            // TODO: do nothing ?
-                            return false;
-                        }
+  @Override
+  public void start() {
+    while (!aborted) {
+      try {
+        Future<Boolean> future =
+            executor.submit(
+                () -> {
+                  for (FragmentInstance instance : instances) {
+                    try {
+                      FragmentInstanceState state = fetchState(instance);
+                      stateMachine.updateFragInstanceState(instance.getId(), state);
+                    } catch (TException e) {
+                      // TODO: do nothing ?
+                      return false;
                     }
-                    return true;
+                  }
+                  return true;
                 });
-                future.get();
-                Thread.sleep(STATE_FETCH_INTERVAL_IN_MS);
-            } catch (InterruptedException | ExecutionException e) {
-                // TODO:
-            }
-        }
+        future.get();
+        Thread.sleep(STATE_FETCH_INTERVAL_IN_MS);
+      } catch (InterruptedException | ExecutionException e) {
+        // TODO:
+      }
     }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FragInstanceDispatchResult.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FragInstanceDispatchResult.java
index 8108c9c0c0..fbafb7a34b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FragInstanceDispatchResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FragInstanceDispatchResult.java
@@ -20,13 +20,13 @@
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
 public class FragInstanceDispatchResult {
-    private boolean successful;
+  private boolean successful;
 
-    public FragInstanceDispatchResult(boolean successful) {
-        this.successful = successful;
-    }
+  public FragInstanceDispatchResult(boolean successful) {
+    this.successful = successful;
+  }
 
-    public boolean isSuccessful() {
-        return successful;
-    }
+  public boolean isSuccessful() {
+    return successful;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceDispatcher.java
index 9fdedabb30..c9574fdb4c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceDispatcher.java
@@ -25,12 +25,13 @@ import java.util.List;
 import java.util.concurrent.Future;
 
 public interface IFragInstanceDispatcher {
-    /**
-     * Dispatch all Fragment instances asynchronously
-     * @param instances Fragment instance list
-     * @return Boolean.
-     */
-    Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> instances);
+  /**
+   * Dispatch all Fragment instances asynchronously
+   *
+   * @param instances Fragment instance list
+   * @return Boolean.
+   */
+  Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> instances);
 
-    void abort();
+  void abort();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateFetcher.java
index 33d0d5343f..3c14ee27ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateFetcher.java
@@ -20,5 +20,5 @@
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
 public interface IFragInstanceStateFetcher {
-    void start();
+  void start();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
index d79b66ca6b..8dfc825c0f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
 import org.apache.iotdb.mpp.rpc.thrift.InternalService;
+
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.transport.TSocket;
@@ -27,11 +28,12 @@ import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 
 public class InternalServiceClientFactory {
-    // TODO: (xingtanzjr) consider the best practice to maintain the clients
-    public static InternalService.Client getInternalServiceClient(String endpoint, int port) throws TTransportException {
-        TTransport transport = new TSocket(endpoint, port);
-        transport.open();
-        TProtocol protocol = new TBinaryProtocol(transport);
-        return new InternalService.Client(protocol);
-    }
+  // TODO: (xingtanzjr) consider the best practice to maintain the clients
+  public static InternalService.Client getInternalServiceClient(String endpoint, int port)
+      throws TTransportException {
+    TTransport transport = new TSocket(endpoint, port);
+    transport.open();
+    TProtocol protocol = new TBinaryProtocol(transport);
+    return new InternalService.Client(protocol);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index a48fc7b7c3..f47d3df45e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.scheduler;
 
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.InternalService;
+
 import org.apache.thrift.TException;
 
 import java.util.List;
@@ -29,27 +30,28 @@ import java.util.concurrent.FutureTask;
 
 public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
 
-    @Override
-    public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> instances) {
-        FutureTask<FragInstanceDispatchResult> dispatchTask = new FutureTask<>(() -> {
-            try {
+  @Override
+  public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> instances) {
+    FutureTask<FragInstanceDispatchResult> dispatchTask =
+        new FutureTask<>(
+            () -> {
+              try {
                 for (FragmentInstance instance : instances) {
-                    InternalService.Client client = InternalServiceClientFactory.
-                            getInternalServiceClient(instance.getHostEndpoint().getIp(), instance.getHostEndpoint().getPort());
-                    client.sendFragmentInstance(null);
+                  InternalService.Client client =
+                      InternalServiceClientFactory.getInternalServiceClient(
+                          instance.getHostEndpoint().getIp(), instance.getHostEndpoint().getPort());
+                  client.sendFragmentInstance(null);
                 }
-            } catch (TException e) {
+              } catch (TException e) {
                 // TODO: (xingtanzjr) add more details
                 return new FragInstanceDispatchResult(false);
-            }
-            return new FragInstanceDispatchResult(true);
-        });
-        new Thread(dispatchTask).start();
-        return dispatchTask;
-    }
-
-    @Override
-    public void abort() {
+              }
+              return new FragInstanceDispatchResult(true);
+            });
+    new Thread(dispatchTask).start();
+    return dispatchTask;
+  }
 
-    }
+  @Override
+  public void abort() {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
index 26562059ab..c1085e15d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
 
-import org.apache.commons.lang.Validate;
 import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
@@ -26,6 +25,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang.Validate;
 
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -57,7 +57,9 @@ public class FragmentSinkNode extends SinkNode {
 
   @Override
   public PlanNode cloneWithChildren(List<PlanNode> children) {
-    Validate.isTrue(children == null || children.size() == 1, "Children size of FragmentSinkNode should be 0 or 1");
+    Validate.isTrue(
+        children == null || children.size() == 1,
+        "Children size of FragmentSinkNode should be 0 or 1");
     FragmentSinkNode sinkNode = (FragmentSinkNode) clone();
     if (children != null) {
       sinkNode.setChild(children.get(0));


[iotdb] 04/04: add basic scheduler

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/query_execution
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 30508073319647b10fd29ef4a4a91017b5171758
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Apr 6 11:39:33 2022 +0800

    add basic scheduler
---
 .../apache/iotdb/db/mpp/execution/Coordinator.java | 27 +++++++++++-
 .../{ExecutionStatus.java => ExecutionResult.java} |  4 +-
 .../iotdb/db/mpp/execution/QueryExecution.java     | 28 +++++++++---
 .../iotdb/db/mpp/execution/QueryStateMachine.java  |  5 ++-
 ...tcher.java => AbstractFragInsStateTracker.java} | 11 +++--
 .../mpp/execution/scheduler/ClusterScheduler.java  | 36 ++++++++++++----
 ...cher.java => FixedRateFragInsStateTracker.java} | 50 +++++++++++-----------
 ...Fetcher.java => IFragInstanceStateTracker.java} |  4 +-
 .../db/service/thrift/impl/TSServiceImpl.java      |  4 +-
 .../iotdb/db/mpp/sql/plan/QueryPlannerTest.java    |  6 ++-
 10 files changed, 121 insertions(+), 54 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index 3ce8f7ea00..4e3f8ad681 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
@@ -25,6 +26,8 @@ import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
 
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 
 /**
  * The coordinator for MPP. It manages all the queries which are executed in current Node. And it
@@ -32,6 +35,13 @@ import java.util.concurrent.ConcurrentHashMap;
  * QueryExecution.
  */
 public class Coordinator {
+  private static final String COORDINATOR_EXECUTOR_NAME = "MPPCoordinator";
+  private static final int COORDINATOR_EXECUTOR_SIZE = 10;
+  private static final String COORDINATOR_SCHEDULED_EXECUTOR_NAME = "MPPCoordinatorScheduled";
+  private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 10;
+
+  private ExecutorService executor;
+  private ScheduledExecutorService scheduledExecutor;
 
   private static final Coordinator INSTANCE = new Coordinator();
 
@@ -39,13 +49,15 @@ public class Coordinator {
 
   private Coordinator() {
     this.queryExecutionMap = new ConcurrentHashMap<>();
+    this.executor = getQueryExecutor();
+    this.scheduledExecutor = getScheduledExecutor();
   }
 
   private QueryExecution createQueryExecution(Statement statement, MPPQueryContext queryContext) {
-    return new QueryExecution(statement, queryContext);
+    return new QueryExecution(statement, queryContext, executor, scheduledExecutor);
   }
 
-  public ExecutionStatus execute(
+  public ExecutionResult execute(
       Statement statement, QueryId queryId, QueryType queryType, SessionInfo session, String sql) {
 
     QueryExecution execution =
@@ -57,6 +69,17 @@ public class Coordinator {
     return execution.getStatus();
   }
 
+  // TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management
+  private ExecutorService getQueryExecutor() {
+    return IoTDBThreadPoolFactory.newFixedThreadPool(
+        COORDINATOR_EXECUTOR_SIZE, COORDINATOR_EXECUTOR_NAME);
+  }
+  // TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management
+  private ScheduledExecutorService getScheduledExecutor() {
+    return IoTDBThreadPoolFactory.newScheduledThreadPool(
+        COORDINATOR_SCHEDULED_EXECUTOR_SIZE, COORDINATOR_SCHEDULED_EXECUTOR_NAME);
+  }
+
   public static Coordinator getInstance() {
     return INSTANCE;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecutionStatus.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecutionResult.java
similarity index 92%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecutionStatus.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecutionResult.java
index 8c24ade473..4dae202408 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecutionStatus.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecutionResult.java
@@ -21,11 +21,11 @@ package org.apache.iotdb.db.mpp.execution;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-public class ExecutionStatus {
+public class ExecutionResult {
   public QueryId queryId;
   public TSStatus status;
 
-  public ExecutionStatus(QueryId queryId, TSStatus status) {
+  public ExecutionResult(QueryId queryId, TSStatus status) {
     this.queryId = queryId;
     this.status = status;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index dd48dff9ec..c88512771f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -39,6 +39,8 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 
 /**
  * QueryExecution stores all the status of a query which is being prepared or running inside the MPP
@@ -57,15 +59,24 @@ public class QueryExecution {
   private LogicalQueryPlan logicalPlan;
   private DistributedQueryPlan distributedPlan;
 
+  private ExecutorService executor;
+  private ScheduledExecutorService scheduledExecutor;
+
   // The result of QueryExecution will be written to the DataBlockManager in current Node.
   // We use this SourceHandle to fetch the TsBlock from it.
   private ISourceHandle resultHandle;
 
-  public QueryExecution(Statement statement, MPPQueryContext context) {
+  public QueryExecution(
+      Statement statement,
+      MPPQueryContext context,
+      ExecutorService executor,
+      ScheduledExecutorService scheduledExecutor) {
+    this.executor = executor;
+    this.scheduledExecutor = scheduledExecutor;
     this.context = context;
     this.planOptimizers = new ArrayList<>();
     this.analysis = analyze(statement, context);
-    this.stateMachine = new QueryStateMachine(context.getQueryId());
+    this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
 
     // We add the abort logic inside the QueryExecution.
     // So that the other components can only focus on the state change.
@@ -93,7 +104,12 @@ public class QueryExecution {
   private void schedule() {
     // TODO: (xingtanzjr) initialize the query scheduler according to configuration
     this.scheduler =
-        new ClusterScheduler(stateMachine, distributedPlan.getInstances(), context.getQueryType());
+        new ClusterScheduler(
+            stateMachine,
+            distributedPlan.getInstances(),
+            context.getQueryType(),
+            executor,
+            scheduledExecutor);
     // TODO: (xingtanzjr) how to make the schedule running asynchronously
     this.scheduler.start();
   }
@@ -141,7 +157,7 @@ public class QueryExecution {
    *
    * @return ExecutionStatus. Contains the QueryId and the TSStatus.
    */
-  public ExecutionStatus getStatus() {
+  public ExecutionResult getStatus() {
     // Although we monitor the state to transition to RUNNING, the future will return if any
     // Terminated state is triggered
     ListenableFuture<QueryState> future = stateMachine.getStateChange(QueryState.RUNNING);
@@ -152,10 +168,10 @@ public class QueryExecution {
           state == QueryState.FINISHED
               ? TSStatusCode.SUCCESS_STATUS
               : TSStatusCode.QUERY_PROCESS_ERROR;
-      return new ExecutionStatus(context.getQueryId(), RpcUtils.getStatus(statusCode));
+      return new ExecutionResult(context.getQueryId(), RpcUtils.getStatus(statusCode));
     } catch (InterruptedException | ExecutionException e) {
       // TODO: (xingtanzjr) use more accurate error handling
-      return new ExecutionStatus(
+      return new ExecutionResult(
           context.getQueryId(), RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index 0ed499c785..53f33e859c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -27,6 +27,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 
 /**
  * State machine for a QueryExecution. It stores the states for the QueryExecution. Others can
@@ -40,9 +41,9 @@ public class QueryStateMachine {
   // The executor will be used in all the state machines belonged to this query.
   private Executor stateMachineExecutor;
 
-  public QueryStateMachine(QueryId queryId) {
+  public QueryStateMachine(QueryId queryId, ExecutorService executor) {
     this.name = String.format("QueryStateMachine[%s]", queryId);
-    this.stateMachineExecutor = getStateMachineExecutor();
+    this.stateMachineExecutor = executor;
     this.fragInstanceStateMap = new ConcurrentHashMap<>();
     this.queryState =
         new StateMachine<>(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
similarity index 86%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateFetcher.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
index de842f0207..b7908f048b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
@@ -31,22 +31,27 @@ import org.apache.thrift.TException;
 
 import java.util.List;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 
-public abstract class AbstractFragInsStateFetcher implements IFragInstanceStateFetcher {
+public abstract class AbstractFragInsStateTracker implements IFragInstanceStateTracker {
 
   protected QueryStateMachine stateMachine;
   protected ExecutorService executor;
+  protected ScheduledExecutorService scheduledExecutor;
   protected List<FragmentInstance> instances;
 
-  public AbstractFragInsStateFetcher(
-      QueryStateMachine stateMachine, ExecutorService executor, List<FragmentInstance> instances) {
+  public AbstractFragInsStateTracker(
+      QueryStateMachine stateMachine, ExecutorService executor, ScheduledExecutorService scheduledExecutor, List<FragmentInstance> instances) {
     this.stateMachine = stateMachine;
     this.executor = executor;
+    this.scheduledExecutor = scheduledExecutor;
     this.instances = instances;
   }
 
   public abstract void start();
 
+  public abstract void abort();
+
   protected FragmentInstanceState fetchState(FragmentInstance instance) throws TException {
     InternalService.Client client =
         InternalServiceClientFactory.getInternalServiceClient(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
index 8c6c2991bc..2fa2971d0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
@@ -30,7 +30,9 @@ import io.airlift.units.Duration;
 
 import java.util.List;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
 
 /**
  * QueryScheduler is used to dispatch the fragment instances of a query to target nodes. And it will
@@ -47,12 +49,24 @@ public class ClusterScheduler implements IScheduler {
   private List<FragmentInstance> instances;
   private IFragInstanceDispatcher dispatcher;
 
+  private ExecutorService executor;
+  private ScheduledExecutorService scheduledExecutor;
+
+  private IFragInstanceStateTracker stateTracker;
+
   public ClusterScheduler(
-      QueryStateMachine stateMachine, List<FragmentInstance> instances, QueryType queryType) {
+      QueryStateMachine stateMachine,
+      List<FragmentInstance> instances,
+      QueryType queryType,
+      ExecutorService executor,
+      ScheduledExecutorService scheduledExecutor) {
     this.stateMachine = stateMachine;
     this.instances = instances;
     this.queryType = queryType;
+    this.executor = executor;
+    this.scheduledExecutor = scheduledExecutor;
     this.dispatcher = new SimpleFragInstanceDispatcher();
+    this.stateTracker = new FixedRateFragInsStateTracker(stateMachine, executor, scheduledExecutor, instances);
   }
 
   @Override
@@ -85,7 +99,7 @@ public class ClusterScheduler implements IScheduler {
         });
 
     // TODO: (xingtanzjr) start the stateFetcher/heartbeat for each fragment instance
-
+    this.stateTracker.start();
   }
 
   private boolean waitDispatchingFinished(Future<FragInstanceDispatchResult> dispatchResultFuture) {
@@ -102,9 +116,9 @@ public class ClusterScheduler implements IScheduler {
 
   @Override
   public void abort() {
-    if (this.dispatcher != null) {
-      dispatcher.abort();
-    }
+    // TODO: It seems that it is unnecessary to check whether they are null or not. Is it a best practice ?
+    dispatcher.abort();
+    stateTracker.abort();
   }
 
   @Override
@@ -118,14 +132,18 @@ public class ClusterScheduler implements IScheduler {
   }
 
   @Override
-  public void abortFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause) {}
+  public void abortFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause) {
+  }
 
   @Override
-  public void cancelFragment(PlanFragmentId planFragmentId) {}
+  public void cancelFragment(PlanFragmentId planFragmentId) {
+  }
 
   // Send the instances to other nodes
-  private void sendFragmentInstances() {}
+  private void sendFragmentInstances() {
+  }
 
   // After sending, start to collect the states of these fragment instances
-  private void startMonitorInstances() {}
+  private void startMonitorInstances() {
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
similarity index 55%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateFetcher.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
index 9c0bc9e807..cc993298d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
@@ -28,37 +28,37 @@ import org.apache.thrift.TException;
 import java.util.List;
 import java.util.concurrent.*;
 
-public class FixedRateFragInsStateFetcher extends AbstractFragInsStateFetcher {
-  private static final long STATE_FETCH_INTERVAL_IN_MS = 1000;
-  private volatile boolean aborted = false;
+public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
+  // TODO: (xingtanzjr) consider how much Interval is OK for state tracker
+  private static final long STATE_FETCH_INTERVAL_IN_MS = 500;
+  private ScheduledFuture<?> trackTask;
 
-  public FixedRateFragInsStateFetcher(
-      QueryStateMachine stateMachine, ExecutorService executor, List<FragmentInstance> instances) {
-    super(stateMachine, executor, instances);
+  public FixedRateFragInsStateTracker(
+      QueryStateMachine stateMachine, ExecutorService executor, ScheduledExecutorService scheduledExecutor, List<FragmentInstance> instances) {
+    super(stateMachine, executor, scheduledExecutor, instances);
   }
 
   @Override
   public void start() {
-    while (!aborted) {
+    trackTask = scheduledExecutor.scheduleAtFixedRate(this::fetchStateAndUpdate, 0, STATE_FETCH_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
+  }
+
+  @Override
+  public void abort() {
+    if (trackTask != null) {
+      trackTask.cancel(true);
+    }
+  }
+
+  private void fetchStateAndUpdate() {
+    for (FragmentInstance instance : instances) {
       try {
-        Future<Boolean> future =
-            executor.submit(
-                () -> {
-                  for (FragmentInstance instance : instances) {
-                    try {
-                      FragmentInstanceState state = fetchState(instance);
-                      stateMachine.updateFragInstanceState(instance.getId(), state);
-                    } catch (TException e) {
-                      // TODO: do nothing ?
-                      return false;
-                    }
-                  }
-                  return true;
-                });
-        future.get();
-        Thread.sleep(STATE_FETCH_INTERVAL_IN_MS);
-      } catch (InterruptedException | ExecutionException e) {
-        // TODO:
+        FragmentInstanceState state = fetchState(instance);
+        if (state != null) {
+          stateMachine.updateFragInstanceState(instance.getId(), state);
+        }
+      } catch (TException e) {
+        // TODO: do nothing ?
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateTracker.java
similarity index 93%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateFetcher.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateTracker.java
index 3c14ee27ab..05dbc388bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateTracker.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
-public interface IFragInstanceStateFetcher {
+public interface IFragInstanceStateTracker {
   void start();
+
+  void abort();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index e348836021..315620643e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -37,7 +37,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.metadata.template.TemplateQueryType;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.Coordinator;
-import org.apache.iotdb.db.mpp.execution.ExecutionStatus;
+import org.apache.iotdb.db.mpp.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
 import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowStatement;
@@ -1660,7 +1660,7 @@ public class TSServiceImpl implements TSIService.Iface {
 
       // Step 2: call the coordinator
       long queryId = SESSION_MANAGER.requestQueryId(false);
-      ExecutionStatus result =
+      ExecutionResult result =
           coordinator.execute(
               statement,
               new QueryId(String.valueOf(queryId)),
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
index 86527e2788..e38952fea3 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.mpp.sql.plan;
 
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
@@ -47,8 +48,9 @@ public class QueryPlannerTest {
     QueryExecution queryExecution =
         new QueryExecution(
             stmt,
-            new MPPQueryContext(
-                querySql, new QueryId("query1"), new SessionInfo(), QueryType.READ));
+            new MPPQueryContext(querySql, new QueryId("query1"), new SessionInfo(), QueryType.READ),
+            IoTDBThreadPoolFactory.newSingleThreadExecutor("Test-Query"),
+            IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Test-Query-Scheduled"));
     queryExecution.doLogicalPlan();
     System.out.printf("SQL: %s%n%n", querySql);
     System.out.println("===== Step 1: Logical Plan =====");