You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/20 02:23:57 UTC

[iotdb] branch master updated: Fix some issues in MPP framework (#5596)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d0b0ec288 Fix some issues in MPP framework (#5596)
1d0b0ec288 is described below

commit 1d0b0ec28803de7b48f6bb995beddecd1f82fc01
Author: Zhang.Jinrui <xi...@gmail.com>
AuthorDate: Wed Apr 20 10:23:49 2022 +0800

    Fix some issues in MPP framework (#5596)
---
 .../iotdb/commons/partition/DataPartition.java     |  17 +--
 .../{PartitionInfo.java => Partition.java}         |  27 +++--
 .../iotdb/commons/partition/SchemaPartition.java   |  17 +--
 .../iotdb/db/mpp/buffer/DataBlockManager.java      |  14 ++-
 .../org/apache/iotdb/db/mpp/buffer/SinkHandle.java |  10 +-
 .../apache/iotdb/db/mpp/buffer/SourceHandle.java   |  27 ++---
 .../apache/iotdb/db/mpp/execution/Coordinator.java |  16 +--
 .../iotdb/db/mpp/execution/IQueryExecution.java    |   2 +
 .../iotdb/db/mpp/execution/QueryExecution.java     |  43 ++++++-
 .../iotdb/db/mpp/execution/QueryStateMachine.java  |  14 ++-
 .../db/mpp/execution/config/ConfigExecution.java   |  58 ++++------
 .../config/ConfigTaskVisitor.java}                 |  24 ++--
 .../iotdb/db/mpp/execution/config/IConfigTask.java |   2 +-
 .../mpp/execution/scheduler/ClusterScheduler.java  |  31 +++--
 .../mpp/execution/scheduler/IQueryTerminator.java  |   4 +-
 .../scheduler/SimpleFragInstanceDispatcher.java    |  51 ++++-----
 .../execution/scheduler/SimpleQueryTerminator.java |  50 ++++-----
 .../db/mpp/operator/source/SeriesScanOperator.java |   2 +-
 .../db/mpp/schedule/FragmentInstanceScheduler.java |   2 +-
 .../db/mpp/sql/analyze/ClusterSchemaFetcher.java   |   4 +-
 .../db/mpp/sql/planner/DistributionPlanner.java    |  20 ++++
 .../statement/ConfigStatement.java}                |  12 +-
 .../db/mpp/sql/statement/StatementVisitor.java     |   5 +
 .../metadata/SetStorageGroupStatement.java         |  10 +-
 .../iotdb/db/query/control/SessionManager.java     |   7 +-
 .../thrift/impl/DataNodeTSIServiceImpl.java        |  11 +-
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   |   2 +-
 .../iotdb/db/mpp/buffer/SourceHandleTest.java      |   6 +-
 .../db/mpp/execution/ConfigExecutionTest.java      | 125 +++++++++++++++++++++
 .../db/mpp/sql/plan/DistributionPlannerTest.java   |  39 +++++++
 30 files changed, 430 insertions(+), 222 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index bb62f1cea4..e1d24db22d 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -18,8 +18,6 @@
  */
 package org.apache.iotdb.commons.partition;
 
-import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -28,18 +26,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-public class DataPartition {
-
-  private String seriesSlotExecutorName;
-  private int seriesPartitionSlotNum;
+public class DataPartition extends Partition {
 
   // Map<StorageGroup, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionMessage>>>>
   private Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
       dataPartitionMap;
 
   public DataPartition(String seriesSlotExecutorName, int seriesPartitionSlotNum) {
-    this.seriesSlotExecutorName = seriesSlotExecutorName;
-    this.seriesPartitionSlotNum = seriesPartitionSlotNum;
+    super(seriesSlotExecutorName, seriesPartitionSlotNum);
   }
 
   public DataPartition(
@@ -104,13 +98,6 @@ public class DataPartition {
     return regions.get(0);
   }
 
-  private SeriesPartitionSlot calculateDeviceGroupId(String deviceName) {
-    SeriesPartitionExecutor executor =
-        SeriesPartitionExecutor.getSeriesPartitionExecutor(
-            seriesSlotExecutorName, seriesPartitionSlotNum);
-    return executor.getSeriesPartitionSlot(deviceName);
-  }
-
   private String getStorageGroupByDevice(String deviceName) {
     for (String storageGroup : dataPartitionMap.keySet()) {
       if (deviceName.startsWith(storageGroup)) {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/PartitionInfo.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java
similarity index 55%
rename from node-commons/src/main/java/org/apache/iotdb/commons/partition/PartitionInfo.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java
index 6e846ad6e3..e0c6e53bd1 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/PartitionInfo.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java
@@ -18,24 +18,23 @@
  */
 package org.apache.iotdb.commons.partition;
 
-public class PartitionInfo {
+import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
 
-  private DataPartition dataPartition;
-  private SchemaPartition schemaPartition;
+public abstract class Partition {
+  protected String seriesSlotExecutorName;
+  protected int seriesPartitionSlotNum;
 
-  public DataPartition getDataPartitionInfo() {
-    return dataPartition;
-  }
-
-  public void setDataPartitionInfo(DataPartition dataPartition) {
-    this.dataPartition = dataPartition;
-  }
+  private final SeriesPartitionExecutor executor;
 
-  public SchemaPartition getSchemaPartitionInfo() {
-    return schemaPartition;
+  public Partition(String seriesSlotExecutorName, int seriesPartitionSlotNum) {
+    this.seriesSlotExecutorName = seriesSlotExecutorName;
+    this.seriesPartitionSlotNum = seriesPartitionSlotNum;
+    executor =
+        SeriesPartitionExecutor.getSeriesPartitionExecutor(
+            seriesSlotExecutorName, seriesPartitionSlotNum);
   }
 
-  public void setSchemaPartitionInfo(SchemaPartition schemaPartition) {
-    this.schemaPartition = schemaPartition;
+  protected SeriesPartitionSlot calculateDeviceGroupId(String deviceName) {
+    return executor.getSeriesPartitionSlot(deviceName);
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
index f0ca88f774..b3f8c61986 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
@@ -18,24 +18,18 @@
  */
 package org.apache.iotdb.commons.partition;
 
-import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
-
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class SchemaPartition {
-
-  private String seriesSlotExecutorName;
-  private int seriesPartitionSlotNum;
+public class SchemaPartition extends Partition {
 
   // Map<StorageGroup, Map<SeriesPartitionSlot, SchemaRegionPlaceInfo>>
   private Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> schemaPartitionMap;
 
   public SchemaPartition(String seriesSlotExecutorName, int seriesPartitionSlotNum) {
-    this.seriesSlotExecutorName = seriesSlotExecutorName;
-    this.seriesPartitionSlotNum = seriesPartitionSlotNum;
+    super(seriesSlotExecutorName, seriesPartitionSlotNum);
   }
 
   public SchemaPartition(
@@ -64,13 +58,6 @@ public class SchemaPartition {
     return schemaPartitionMap.get(storageGroup).get(seriesPartitionSlot);
   }
 
-  private SeriesPartitionSlot calculateDeviceGroupId(String deviceName) {
-    SeriesPartitionExecutor executor =
-        SeriesPartitionExecutor.getSeriesPartitionExecutor(
-            seriesSlotExecutorName, seriesPartitionSlotNum);
-    return executor.getSeriesPartitionSlot(deviceName);
-  }
-
   private String getStorageGroupByDevice(String deviceName) {
     for (String storageGroup : schemaPartitionMap.keySet()) {
       if (deviceName.startsWith(storageGroup)) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
index ab7daa5289..573771fe81 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
@@ -175,11 +175,13 @@ public class DataBlockManager implements IDataBlockManager {
               .containsKey(sourceHandle.getLocalPlanNodeId())) {
         logger.info(
             "Resources of finished source handle {} has already been released", sourceHandle);
+      } else {
+        sourceHandles
+            .get(sourceHandle.getLocalFragmentInstanceId())
+            .remove(sourceHandle.getLocalPlanNodeId());
       }
-      sourceHandles
-          .get(sourceHandle.getLocalFragmentInstanceId())
-          .remove(sourceHandle.getLocalPlanNodeId());
-      if (sourceHandles.get(sourceHandle.getLocalFragmentInstanceId()).isEmpty()) {
+      if (sourceHandles.containsKey(sourceHandle.getLocalFragmentInstanceId())
+          && sourceHandles.get(sourceHandle.getLocalFragmentInstanceId()).isEmpty()) {
         sourceHandles.remove(sourceHandle.getLocalFragmentInstanceId());
       }
     }
@@ -264,7 +266,7 @@ public class DataBlockManager implements IDataBlockManager {
       throw new IllegalStateException("Sink handle for " + localFragmentInstanceId + " exists.");
     }
 
-    logger.info(
+    logger.debug(
         "Create sink handle to plan node {} of {} for {}",
         remotePlanNodeId,
         remoteFragmentInstanceId,
@@ -301,7 +303,7 @@ public class DataBlockManager implements IDataBlockManager {
               + " exists.");
     }
 
-    logger.info(
+    logger.debug(
         "Create source handle from {} for plan node {} of {}",
         remoteFragmentInstanceId,
         localPlanNodeId,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index af4ecf089d..a72dcb9cb6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -220,10 +220,12 @@ public class SinkHandle implements ISinkHandle {
     synchronized (this) {
       sequenceIdToTsBlock.clear();
       closed = true;
-      localMemoryManager
-          .getQueryPool()
-          .free(localFragmentInstanceId.getQueryId(), bufferRetainedSizeInBytes);
-      bufferRetainedSizeInBytes = 0;
+      if (bufferRetainedSizeInBytes > 0) {
+        localMemoryManager
+            .getQueryPool()
+            .free(localFragmentInstanceId.getQueryId(), bufferRetainedSizeInBytes);
+        bufferRetainedSizeInBytes = 0;
+      }
     }
     sinkHandleListener.onAborted(this);
     logger.info("Sink handle {} is aborted", this);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
index aece366b82..37a5bce1b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
@@ -71,8 +71,6 @@ public class SourceHandle implements ISourceHandle {
   private int currSequenceId = 0;
   private int nextSequenceId = 0;
   private int lastSequenceId = Integer.MAX_VALUE;
-  private int numActiveGetDataBlocksTask = 0;
-  private boolean noMoreTsBlocks;
   private boolean closed;
   private Throwable throwable;
 
@@ -160,7 +158,6 @@ public class SourceHandle implements ISourceHandle {
     if (future.isDone()) {
       nextSequenceId = endSequenceId;
       executorService.submit(new GetDataBlocksTask(startSequenceId, endSequenceId, reservedBytes));
-      numActiveGetDataBlocksTask += 1;
     } else {
       nextSequenceId = endSequenceId + 1;
       // The future being not completed indicates,
@@ -175,7 +172,6 @@ public class SourceHandle implements ISourceHandle {
         // Memory has been reserved. Submit a GetDataBlocksTask for these blocks.
         executorService.submit(
             new GetDataBlocksTask(startSequenceId, endSequenceId, reservedBytes));
-        numActiveGetDataBlocksTask += 1;
       }
 
       // Submit a GetDataBlocksTask when memory is freed.
@@ -188,7 +184,6 @@ public class SourceHandle implements ISourceHandle {
                     sequenceIdOfUnReservedDataBlock,
                     sequenceIdOfUnReservedDataBlock + 1,
                     sizeOfUnReservedDataBlock));
-            numActiveGetDataBlocksTask += 1;
             bufferRetainedSizeInBytes += sizeOfUnReservedDataBlock;
           },
           executorService);
@@ -211,7 +206,9 @@ public class SourceHandle implements ISourceHandle {
 
   synchronized void setNoMoreTsBlocks(int lastSequenceId) {
     this.lastSequenceId = lastSequenceId;
-    noMoreTsBlocks = true;
+    if (!blocked.isDone() && remoteTsBlockedConsumedUp()) {
+      blocked.set(null);
+    }
   }
 
   synchronized void updatePendingDataBlockInfo(int startSequenceId, List<Long> dataBlockSizes) {
@@ -226,6 +223,9 @@ public class SourceHandle implements ISourceHandle {
     if (closed) {
       return;
     }
+    if (blocked != null && !blocked.isDone()) {
+      blocked.cancel(true);
+    }
     sequenceIdToDataBlockSize.clear();
     if (bufferRetainedSizeInBytes > 0) {
       localMemoryManager
@@ -239,11 +239,14 @@ public class SourceHandle implements ISourceHandle {
 
   @Override
   public boolean isFinished() {
-    return throwable == null
-        && noMoreTsBlocks
-        && numActiveGetDataBlocksTask == 0
-        && currSequenceId - 1 == lastSequenceId
-        && sequenceIdToTsBlock.isEmpty();
+    return throwable == null && remoteTsBlockedConsumedUp();
+  }
+
+  // Return true indicates two points:
+  //   1. Remote SinkHandle has told SourceHandle the total count of TsBlocks by lastSequenceId
+  //   2. All the TsBlocks has been consumed up
+  private boolean remoteTsBlockedConsumedUp() {
+    return currSequenceId - 1 == lastSequenceId;
   }
 
   String getRemoteHostname() {
@@ -355,8 +358,6 @@ public class SourceHandle implements ISourceHandle {
                   .free(localFragmentInstanceId.getQueryId(), reservedBytes);
             }
           }
-        } finally {
-          numActiveGetDataBlocksTask -= 1;
         }
       }
       // TODO: try to issue another GetDataBlocksTask to make the query run faster.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index 1f97f3c93c..a7d09b3ccf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -28,10 +28,12 @@ import org.apache.iotdb.db.mpp.execution.config.ConfigExecution;
 import org.apache.iotdb.db.mpp.sql.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.mpp.sql.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+import org.apache.iotdb.db.mpp.sql.statement.ConfigStatement;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
-import org.apache.iotdb.db.mpp.sql.statement.metadata.SetStorageGroupStatement;
 
 import org.apache.commons.lang3.Validate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -43,10 +45,12 @@ import java.util.concurrent.ScheduledExecutorService;
  * QueryExecution.
  */
 public class Coordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class);
+
   private static final String COORDINATOR_EXECUTOR_NAME = "MPPCoordinator";
-  private static final int COORDINATOR_EXECUTOR_SIZE = 2;
+  private static final int COORDINATOR_EXECUTOR_SIZE = 1;
   private static final String COORDINATOR_SCHEDULED_EXECUTOR_NAME = "MPPCoordinatorScheduled";
-  private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 2;
+  private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 1;
 
   private static final Endpoint LOCAL_HOST =
       new Endpoint(
@@ -71,7 +75,7 @@ public class Coordinator {
       MPPQueryContext queryContext,
       IPartitionFetcher partitionFetcher,
       ISchemaFetcher schemaFetcher) {
-    if (statement instanceof SetStorageGroupStatement) {
+    if (statement instanceof ConfigStatement) {
       queryContext.setQueryType(QueryType.WRITE);
       return new ConfigExecution(queryContext, statement, executor);
     }
@@ -94,7 +98,6 @@ public class Coordinator {
             partitionFetcher,
             schemaFetcher);
     queryExecutionMap.put(queryId, execution);
-
     execution.start();
 
     return execution.getStatus();
@@ -125,7 +128,4 @@ public class Coordinator {
   public static Coordinator getInstance() {
     return INSTANCE;
   }
-  //    private TQueryResponse executeQuery(TQueryRequest request) {
-  //
-  //    }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
index 2e0bd9c76d..af9633ee89 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
@@ -28,6 +28,8 @@ public interface IQueryExecution {
 
   void stop();
 
+  void stopAndCleanup();
+
   ExecutionResult getStatus();
 
   TsBlock getBatchResult();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index e7e4995995..a9bd605823 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -43,6 +43,8 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.sql.SQLException;
@@ -61,6 +63,8 @@ import static com.google.common.base.Throwables.throwIfUnchecked;
  * corresponding physical nodes. 3. Collect and monitor the progress/states of this query.
  */
 public class QueryExecution implements IQueryExecution {
+  private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class);
+
   private final MPPQueryContext context;
   private IScheduler scheduler;
   private final QueryStateMachine stateMachine;
@@ -97,8 +101,6 @@ public class QueryExecution implements IQueryExecution {
     this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
     this.partitionFetcher = partitionFetcher;
     this.schemaFetcher = schemaFetcher;
-    // TODO: (xingtanzjr) Initialize the result handle after the DataBlockManager is merged.
-    //    resultHandle = xxxx
 
     // We add the abort logic inside the QueryExecution.
     // So that the other components can only focus on the state change.
@@ -108,6 +110,13 @@ public class QueryExecution implements IQueryExecution {
             return;
           }
           this.stop();
+          // TODO: (xingtanzjr) If the query is in abnormal state, the releaseResource() should be
+          // invoked
+          if (state == QueryState.FAILED
+              || state == QueryState.ABORTED
+              || state == QueryState.CANCELED) {
+            releaseResource();
+          }
         });
   }
 
@@ -140,7 +149,6 @@ public class QueryExecution implements IQueryExecution {
             context.getQueryType(),
             executor,
             scheduledExecutor);
-    // TODO: (xingtanzjr) how to make the schedule running asynchronously
     this.scheduler.start();
   }
 
@@ -156,16 +164,30 @@ public class QueryExecution implements IQueryExecution {
     this.distributedPlan = planner.planFragments();
   }
 
-  /** Abort the query and do cleanup work including QuerySchedule aborting and resource releasing */
+  // Stop the workers for this query
   public void stop() {
     if (this.scheduler != null) {
       this.scheduler.stop();
     }
+  }
+
+  // Stop the query and clean up all the resources this query occupied
+  public void stopAndCleanup() {
+    stop();
     releaseResource();
   }
 
   /** Release the resources that current QueryExecution hold. */
-  private void releaseResource() {}
+  private void releaseResource() {
+    // close ResultHandle to unblock client's getResult request
+    // Actually, we should not close the ResultHandle when the QueryExecution is Finished.
+    // There are only two scenarios where the ResultHandle should be closed:
+    //   1. The client fetch all the result and the ResultHandle is finished.
+    //   2. The client's connection is closed that all owned QueryExecution should be cleaned up
+    if (resultHandle != null && resultHandle.isFinished()) {
+      resultHandle.close();
+    }
+  }
 
   /**
    * This method will be called by the request thread from client connection. This method will block
@@ -177,18 +199,23 @@ public class QueryExecution implements IQueryExecution {
   @Override
   public TsBlock getBatchResult() {
     try {
+      if (resultHandle.isClosed() || resultHandle.isFinished()) {
+        return null;
+      }
       ListenableFuture<Void> blocked = resultHandle.isBlocked();
       blocked.get();
       if (resultHandle.isFinished()) {
+        releaseResource();
         return null;
       }
       return resultHandle.receive();
 
     } catch (ExecutionException | IOException e) {
+      stateMachine.transitionToFailed(e);
       throwIfUnchecked(e.getCause());
       throw new RuntimeException(e.getCause());
     } catch (InterruptedException e) {
-      stateMachine.transitionToFailed();
+      stateMachine.transitionToFailed(e);
       Thread.currentThread().interrupt();
       throw new RuntimeException(new SQLException("ResultSet thread was interrupted", e));
     }
@@ -272,4 +299,8 @@ public class QueryExecution implements IQueryExecution {
   public boolean isQuery() {
     return context.getQueryType() == QueryType.READ;
   }
+
+  public String toString() {
+    return String.format("QueryExecution[%s]", context.getQueryId());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index c5bc48ef42..e9093abf32 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -39,6 +39,7 @@ public class QueryStateMachine {
 
   // The executor will be used in all the state machines belonged to this query.
   private Executor stateMachineExecutor;
+  private Throwable failureException;
 
   public QueryStateMachine(QueryId queryId, ExecutorService executor) {
     this.name = String.format("QueryStateMachine[%s]", queryId);
@@ -60,7 +61,8 @@ public class QueryStateMachine {
     this.fragInstanceStateMap.put(id, state);
     // TODO: (xingtanzjr) we need to distinguish the Timeout situation
     if (state.isFailed()) {
-      transitionToFailed();
+      transitionToFailed(
+          new RuntimeException(String.format("FragmentInstance[%s] is failed.", id)));
     }
     boolean allFinished =
         fragInstanceStateMap.values().stream()
@@ -120,10 +122,18 @@ public class QueryStateMachine {
     queryState.set(QueryState.ABORTED);
   }
 
-  public void transitionToFailed() {
+  public void transitionToFailed(Throwable throwable) {
     if (queryState.get().isDone()) {
       return;
     }
+    this.failureException = throwable;
     queryState.set(QueryState.FAILED);
   }
+
+  public String getFailureMessage() {
+    if (failureException != null) {
+      return failureException.getMessage();
+    }
+    return "no detailed failure reason in QueryStateMachine";
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
index 7ceb542cef..b134d4896b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.mpp.execution.config;
 
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
 import org.apache.iotdb.db.mpp.execution.ExecutionResult;
@@ -26,11 +27,8 @@ import org.apache.iotdb.db.mpp.execution.IQueryExecution;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
-import org.apache.iotdb.db.mpp.sql.statement.metadata.SetStorageGroupStatement;
-import org.apache.iotdb.db.mpp.sql.statement.sys.AuthorStatement;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.FutureCallback;
@@ -42,8 +40,6 @@ import org.jetbrains.annotations.NotNull;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 
-import static com.google.common.base.Throwables.throwIfInstanceOf;
-
 public class ConfigExecution implements IQueryExecution {
 
   private final MPPQueryContext context;
@@ -53,17 +49,30 @@ public class ConfigExecution implements IQueryExecution {
   private final QueryStateMachine stateMachine;
   private final SettableFuture<Boolean> result;
 
+  private final IConfigTask task;
+
   public ConfigExecution(MPPQueryContext context, Statement statement, ExecutorService executor) {
     this.context = context;
     this.statement = statement;
     this.executor = executor;
     this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
     this.result = SettableFuture.create();
+    this.task = statement.accept(new ConfigTaskVisitor(), new ConfigTaskVisitor.TaskContext());
+  }
+
+  @TestOnly
+  public ConfigExecution(
+      MPPQueryContext context, Statement statement, ExecutorService executor, IConfigTask task) {
+    this.context = context;
+    this.statement = statement;
+    this.executor = executor;
+    this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
+    this.result = SettableFuture.create();
+    this.task = task;
   }
 
   @Override
   public void start() {
-    IConfigTask task = getTask(statement);
     try {
       ListenableFuture<Void> future = task.execute();
       Futures.addCallback(
@@ -83,34 +92,33 @@ public class ConfigExecution implements IQueryExecution {
           executor);
     } catch (Throwable e) {
       fail(e);
-      throwIfInstanceOf(e, Error.class);
     }
   }
 
   public void fail(Throwable cause) {
-    stateMachine.transitionToFailed();
-    result.cancel(false);
+    stateMachine.transitionToFailed(cause);
+    result.set(false);
   }
 
   @Override
   public void stop() {}
 
+  @Override
+  public void stopAndCleanup() {}
+
   @Override
   public ExecutionResult getStatus() {
     try {
-      if (result.isCancelled()) {
-        return new ExecutionResult(
-            context.getQueryId(), RpcUtils.getStatus(TSStatusCode.QUERY_PROCESS_ERROR));
-      }
       Boolean success = result.get();
       TSStatusCode statusCode =
           success ? TSStatusCode.SUCCESS_STATUS : TSStatusCode.QUERY_PROCESS_ERROR;
-      return new ExecutionResult(context.getQueryId(), RpcUtils.getStatus(statusCode));
-
+      String message = success ? "" : stateMachine.getFailureMessage();
+      return new ExecutionResult(context.getQueryId(), RpcUtils.getStatus(statusCode, message));
     } catch (InterruptedException | ExecutionException e) {
       Thread.currentThread().interrupt();
       return new ExecutionResult(
-          context.getQueryId(), RpcUtils.getStatus(TSStatusCode.QUERY_PROCESS_ERROR));
+          context.getQueryId(),
+          RpcUtils.getStatus(TSStatusCode.QUERY_PROCESS_ERROR, e.getMessage()));
     }
   }
 
@@ -141,22 +149,4 @@ public class ConfigExecution implements IQueryExecution {
   public boolean isQuery() {
     return context.getQueryType() == QueryType.READ;
   }
-
-  // TODO: consider a more suitable implementation for it
-  // Generate the corresponding IConfigTask by statement.
-  // Each type of statement will has a ConfigTask
-  private IConfigTask getTask(Statement statement) {
-    try {
-      switch (statement.getType()) {
-        case SET_STORAGE_GROUP:
-          return new SetStorageGroupTask((SetStorageGroupStatement) statement);
-        case AUTHOR:
-          return new AuthorizerConfigTask((AuthorStatement) statement);
-        default:
-          throw new NotImplementedException();
-      }
-    } catch (ClassCastException classCastException) {
-      throw new NotImplementedException();
-    }
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/SetStorageGroupStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigTaskVisitor.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/SetStorageGroupStatement.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigTaskVisitor.java
index de54b604e7..30cf56e998 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/SetStorageGroupStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigTaskVisitor.java
@@ -17,25 +17,23 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.sql.statement.metadata;
+package org.apache.iotdb.db.mpp.execution.config;
 
-import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.mpp.sql.constant.StatementType;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.SetStorageGroupStatement;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
 
-public class SetStorageGroupStatement extends Statement {
-  private PartialPath storageGroupPath;
+public class ConfigTaskVisitor
+    extends StatementVisitor<IConfigTask, ConfigTaskVisitor.TaskContext> {
 
-  public SetStorageGroupStatement() {
-    super();
-    statementType = StatementType.SET_STORAGE_GROUP;
+  public IConfigTask visitStatement(Statement statement, TaskContext context) {
+    throw new NotImplementedException("ConfigTask is not implemented for: " + statement);
   }
 
-  public PartialPath getStorageGroupPath() {
-    return storageGroupPath;
+  public IConfigTask visitSetStorageGroup(SetStorageGroupStatement statement, TaskContext context) {
+    return new SetStorageGroupTask(statement);
   }
 
-  public void setStorageGroupPath(PartialPath storageGroupPath) {
-    this.storageGroupPath = storageGroupPath;
-  }
+  public static class TaskContext {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java
index cb3f9b7a02..fac0575fc3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java
@@ -22,5 +22,5 @@ package org.apache.iotdb.db.mpp.execution.config;
 import com.google.common.util.concurrent.ListenableFuture;
 
 public interface IConfigTask {
-  ListenableFuture<Void> execute();
+  ListenableFuture<Void> execute() throws InterruptedException;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
index 29d05136e8..7fa8d3c0b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
@@ -28,6 +28,8 @@ import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 
 import io.airlift.units.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.concurrent.ExecutionException;
@@ -43,6 +45,8 @@ import java.util.concurrent.ScheduledExecutorService;
  * this scheduler.
  */
 public class ClusterScheduler implements IScheduler {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ClusterScheduler.class);
+
   private MPPQueryContext queryContext;
   // The stateMachine of the QueryExecution owned by this QueryScheduler
   private QueryStateMachine stateMachine;
@@ -79,16 +83,20 @@ public class ClusterScheduler implements IScheduler {
 
   @Override
   public void start() {
-    // TODO: consider where the state transition should be put
     stateMachine.transitionToDispatching();
     Future<FragInstanceDispatchResult> dispatchResultFuture = dispatcher.dispatch(instances);
 
     // NOTICE: the FragmentInstance may be dispatched to another Host due to consensus redirect.
     // So we need to start the state fetcher after the dispatching stage.
-    boolean success = waitDispatchingFinished(dispatchResultFuture);
-    // If the dispatch failed, we make the QueryState as failed, and return.
-    if (!success) {
-      stateMachine.transitionToFailed();
+    try {
+      FragInstanceDispatchResult result = dispatchResultFuture.get();
+      if (!result.isSuccessful()) {
+        stateMachine.transitionToFailed(new IllegalStateException("Fragment cannot be dispatched"));
+        return;
+      }
+    } catch (InterruptedException | ExecutionException e) {
+      // If the dispatch failed, we make the QueryState as failed, and return.
+      stateMachine.transitionToFailed(e);
       return;
     }
 
@@ -110,19 +118,6 @@ public class ClusterScheduler implements IScheduler {
     this.stateTracker.start();
   }
 
-  private boolean waitDispatchingFinished(Future<FragInstanceDispatchResult> dispatchResultFuture) {
-    try {
-      FragInstanceDispatchResult result = dispatchResultFuture.get();
-      if (result.isSuccessful()) {
-        return true;
-      }
-    } catch (InterruptedException | ExecutionException e) {
-      Thread.currentThread().interrupt();
-      // TODO: (xingtanzjr) record the dispatch failure reason.
-    }
-    return false;
-  }
-
   @Override
   public void stop() {
     // TODO: It seems that it is unnecessary to check whether they are null or not. Is it a best
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IQueryTerminator.java
index 25ff71363b..29a1e27cfd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IQueryTerminator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IQueryTerminator.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
+import java.util.concurrent.Future;
+
 public interface IQueryTerminator {
-  boolean terminate();
+  Future<Boolean> terminate();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index 17a193d9b4..09ff1171b9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -28,13 +28,16 @@ import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
 public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
-
+  private static final Logger LOGGER = LoggerFactory.getLogger(SimpleFragInstanceDispatcher.class);
   private final ExecutorService executor;
 
   public SimpleFragInstanceDispatcher(ExecutorService exeutor) {
@@ -46,33 +49,27 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
     return executor.submit(
         () -> {
           TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
-          try {
-            for (FragmentInstance instance : instances) {
-              // TODO: (jackie tien) change the port
-              InternalService.Iface client =
-                  InternalServiceClientFactory.getInternalServiceClient(
-                      new Endpoint(
-                          instance.getHostEndpoint().getIp(),
-                          IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
-              // TODO: (xingtanzjr) consider how to handle the buffer here
-              ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
-              instance.serializeRequest(buffer);
-              buffer.flip();
-              TConsensusGroupId groupId =
-                  new TConsensusGroupId(
-                      instance.getRegionReplicaSet().getConsensusGroupId().getId(),
-                      instance.getRegionReplicaSet().getConsensusGroupId().getType().toString());
-              TSendFragmentInstanceReq req =
-                  new TSendFragmentInstanceReq(
-                      new TFragmentInstance(buffer), groupId, instance.getType().toString());
-              resp = client.sendFragmentInstance(req);
-              if (!resp.accepted) {
-                break;
-              }
+          for (FragmentInstance instance : instances) {
+            InternalService.Iface client =
+                InternalServiceClientFactory.getInternalServiceClient(
+                    new Endpoint(
+                        instance.getHostEndpoint().getIp(),
+                        IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
+            // TODO: (xingtanzjr) consider how to handle the buffer here
+            ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
+            instance.serializeRequest(buffer);
+            buffer.flip();
+            TConsensusGroupId groupId =
+                new TConsensusGroupId(
+                    instance.getRegionReplicaSet().getConsensusGroupId().getId(),
+                    instance.getRegionReplicaSet().getConsensusGroupId().getType().toString());
+            TSendFragmentInstanceReq req =
+                new TSendFragmentInstanceReq(
+                    new TFragmentInstance(buffer), groupId, instance.getType().toString());
+            resp = client.sendFragmentInstance(req);
+            if (!resp.accepted) {
+              break;
             }
-          } catch (Exception e) {
-            // TODO: (xingtanzjr) add more details
-            return new FragInstanceDispatchResult(false);
           }
           return new FragInstanceDispatchResult(resp.accepted);
         });
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
index 728966daea..5ffe02080a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
@@ -27,15 +27,16 @@ import org.apache.iotdb.mpp.rpc.thrift.InternalService;
 import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
 
 import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
 public class SimpleQueryTerminator implements IQueryTerminator {
-
+  private static final Logger LOGGER = LoggerFactory.getLogger(SimpleQueryTerminator.class);
   private final ExecutorService executor;
   private final QueryId queryId;
   private final List<FragmentInstance> fragmentInstances;
@@ -48,33 +49,26 @@ public class SimpleQueryTerminator implements IQueryTerminator {
   }
 
   @Override
-  public boolean terminate() {
+  public Future<Boolean> terminate() {
     List<Endpoint> relatedHost = getRelatedHost(fragmentInstances);
-    Future<Boolean> future =
-        executor.submit(
-            () -> {
-              try {
-                for (Endpoint endpoint : relatedHost) {
-                  // TODO (jackie tien) change the port
-                  InternalService.Iface client =
-                      InternalServiceClientFactory.getInternalServiceClient(
-                          new Endpoint(
-                              endpoint.getIp(),
-                              IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
-                  client.cancelQuery(new TCancelQueryReq(queryId.getId()));
-                }
-              } catch (TException e) {
-                return false;
-              }
-              return true;
-            });
-    try {
-      return future.get();
-    } catch (InterruptedException | ExecutionException e) {
-      // TODO: (xingtanzjr) Record the error info with logger
-      Thread.currentThread().interrupt();
-      return false;
-    }
+
+    return executor.submit(
+        () -> {
+          try {
+            for (Endpoint endpoint : relatedHost) {
+              // TODO (jackie tien) change the port
+              InternalService.Iface client =
+                  InternalServiceClientFactory.getInternalServiceClient(
+                      new Endpoint(
+                          endpoint.getIp(),
+                          IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
+              client.cancelQuery(new TCancelQueryReq(queryId.getId()));
+            }
+          } catch (TException e) {
+            return false;
+          }
+          return true;
+        });
   }
 
   private List<Endpoint> getRelatedHost(List<FragmentInstance> instances) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
index 9e9946c323..01b39506fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
@@ -115,7 +115,7 @@ public class SeriesScanOperator implements DataSourceOperator {
 
   @Override
   public boolean isFinished() {
-    return finished || (finished = hasNext());
+    return finished || (finished = !hasNext());
   }
 
   private boolean readChunkData() throws IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
index c7bdb95285..875894f47f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
@@ -66,7 +66,7 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
 
   private static final int MAX_CAPACITY = 1000; // TODO: load from config files
   private static final int WORKER_THREAD_NUM = 4; // TODO: load from config files
-  private static final int QUERY_TIMEOUT_MS = 60_000; // TODO: load from config files or requests
+  private static final int QUERY_TIMEOUT_MS = 10_000; // TODO: load from config files or requests
   private final ThreadGroup workerGroups;
   private final List<AbstractExecutor> threads;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
index 3877706717..929ca01a55 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
@@ -78,7 +78,9 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
       throw new RuntimeException("cannot fetch schema, status is: " + executionResult.status);
     }
     TsBlock tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
-    // TODO: (xingtanzjr) need to release this query's resource here
+    // TODO: (xingtanzjr) need to release this query's resource here. This is a temporary way
+    coordinator.getQueryExecution(queryId).stopAndCleanup();
+
     SchemaTree result = new SchemaTree();
     result.setStorageGroups(storageGroups);
     Binary binary;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index ca42ec2336..83b5e950d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -174,6 +174,26 @@ public class DistributionPlanner {
       return root;
     }
 
+    // TODO: (xingtanzjr) a temporary way to resolve the distribution of single SeriesScanNode issue
+    @Override
+    public PlanNode visitSeriesScan(SeriesScanNode node, DistributionPlanContext context) {
+      List<RegionReplicaSet> dataDistribution =
+          analysis.getPartitionInfo(node.getSeriesPath(), node.getTimeFilter());
+      if (dataDistribution.size() == 1) {
+        node.setRegionReplicaSet(dataDistribution.get(0));
+        return node;
+      }
+      TimeJoinNode timeJoinNode =
+          new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder());
+      for (RegionReplicaSet dataRegion : dataDistribution) {
+        SeriesScanNode split = (SeriesScanNode) node.clone();
+        split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+        split.setRegionReplicaSet(dataRegion);
+        timeJoinNode.addChild(split);
+      }
+      return timeJoinNode;
+    }
+
     @Override
     public PlanNode visitTimeJoin(TimeJoinNode node, DistributionPlanContext context) {
       TimeJoinNode root = (TimeJoinNode) node.clone();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/ConfigStatement.java
similarity index 73%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/ConfigStatement.java
index cb3f9b7a02..304401bc0c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/ConfigStatement.java
@@ -17,10 +17,10 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.execution.config;
+package org.apache.iotdb.db.mpp.sql.statement;
 
-import com.google.common.util.concurrent.ListenableFuture;
-
-public interface IConfigTask {
-  ListenableFuture<Void> execute();
-}
+/**
+ * ConfigStatement represents the statement which should be executed by ConfigNode All the
+ * statements which need to be transformed into IConfigTask should extend this class
+ */
+public abstract class ConfigStatement extends Statement {}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
index 42e88eaa23..14f4606d0d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.mpp.sql.statement.metadata.AlterTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateAlignedTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.SchemaFetchStatement;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.SetStorageGroupStatement;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.ShowDevicesStatement;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.ShowStorageGroupStatement;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.ShowTimeSeriesStatement;
@@ -83,6 +84,10 @@ public abstract class StatementVisitor<R, C> {
     return visitStatement(alterTimeSeriesStatement, context);
   }
 
+  public R visitSetStorageGroup(SetStorageGroupStatement alterTimeSeriesStatement, C context) {
+    return visitStatement(alterTimeSeriesStatement, context);
+  }
+
   /** Data Manipulation Language (DML) */
 
   // Select Statement
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/SetStorageGroupStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/SetStorageGroupStatement.java
index de54b604e7..b99b1363f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/SetStorageGroupStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/SetStorageGroupStatement.java
@@ -21,9 +21,10 @@ package org.apache.iotdb.db.mpp.sql.statement.metadata;
 
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.sql.constant.StatementType;
-import org.apache.iotdb.db.mpp.sql.statement.Statement;
+import org.apache.iotdb.db.mpp.sql.statement.ConfigStatement;
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
 
-public class SetStorageGroupStatement extends Statement {
+public class SetStorageGroupStatement extends ConfigStatement {
   private PartialPath storageGroupPath;
 
   public SetStorageGroupStatement() {
@@ -35,6 +36,11 @@ public class SetStorageGroupStatement extends Statement {
     return storageGroupPath;
   }
 
+  @Override
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitSetStorageGroup(this, context);
+  }
+
   public void setStorageGroupPath(PartialPath storageGroupPath) {
     this.storageGroupPath = storageGroupPath;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index 20aa1d1b86..a8a0faca92 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -46,6 +46,7 @@ import java.util.TimeZone;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
 
@@ -215,6 +216,10 @@ public class SessionManager {
   }
 
   public boolean releaseSessionResource(long sessionId) {
+    return releaseSessionResource(sessionId, this::releaseQueryResourceNoExceptions);
+  }
+
+  public boolean releaseSessionResource(long sessionId, Consumer<Long> releaseQueryResource) {
     sessionIdToZoneId.remove(sessionId);
     sessionIdToClientVersion.remove(sessionId);
 
@@ -224,7 +229,7 @@ public class SessionManager {
         Set<Long> queryIdSet = statementIdToQueryId.remove(statementId);
         if (queryIdSet != null) {
           for (Long queryId : queryIdSet) {
-            releaseQueryResourceNoExceptions(queryId);
+            releaseQueryResource.accept(queryId);
           }
         }
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
index 2c13c68ad7..3de0a28cc2 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -562,7 +562,7 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
       ExecutionResult result =
           COORDINATOR.execute(
               statement,
-              new QueryId(String.valueOf(queryId)),
+              genQueryId(queryId),
               SESSION_MANAGER.getSessionInfo(req.sessionId),
               "",
               PARTITION_FETCHER,
@@ -756,8 +756,17 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
   public void handleClientExit() {
     Long sessionId = SESSION_MANAGER.getCurrSessionId();
     if (sessionId != null) {
+      SESSION_MANAGER.releaseSessionResource(sessionId, this::cleanupQueryExecution);
       TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
       closeSession(req);
     }
   }
+
+  private void cleanupQueryExecution(Long queryId) {
+    COORDINATOR.getQueryExecution(genQueryId(queryId)).stopAndCleanup();
+  }
+
+  private QueryId genQueryId(long id) {
+    return new QueryId(String.valueOf(id));
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 506f180049..eb07ba32cd 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -192,7 +192,7 @@ public class QueryDataSetUtils {
 
     int rowCount = 0;
     int[] valueOccupation = new int[columnNum];
-    while (rowCount < fetchSize && queryExecution.hasNextResult()) {
+    while (rowCount < fetchSize) {
       TsBlock tsBlock = queryExecution.getBatchResult();
       if (tsBlock == null) {
         break;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
index 3b9783023f..461fac1e13 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
@@ -152,7 +152,7 @@ public class SourceHandleTest {
 
     // Receive EndOfDataBlock event from upstream fragment instance.
     sourceHandle.setNoMoreTsBlocks(numOfMockTsBlock - 1);
-    Assert.assertFalse(sourceHandle.isBlocked().isDone());
+    Assert.assertTrue(sourceHandle.isBlocked().isDone());
     Assert.assertFalse(sourceHandle.isClosed());
     Assert.assertTrue(sourceHandle.isFinished());
     Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
@@ -298,7 +298,7 @@ public class SourceHandleTest {
 
     // Receive EndOfDataBlock event from upstream fragment instance.
     sourceHandle.setNoMoreTsBlocks(numOfMockTsBlock - 1);
-    Assert.assertFalse(sourceHandle.isBlocked().isDone());
+    Assert.assertTrue(sourceHandle.isBlocked().isDone());
     Assert.assertFalse(sourceHandle.isClosed());
     Assert.assertTrue(sourceHandle.isFinished());
     Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
@@ -488,7 +488,7 @@ public class SourceHandleTest {
 
     // Receive EndOfDataBlock event from upstream fragment instance.
     sourceHandle.setNoMoreTsBlocks(3 * numOfMockTsBlock - 1);
-    Assert.assertFalse(sourceHandle.isBlocked().isDone());
+    Assert.assertTrue(sourceHandle.isBlocked().isDone());
     Assert.assertFalse(sourceHandle.isClosed());
     Assert.assertTrue(sourceHandle.isFinished());
     Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
new file mode 100644
index 0000000000..dcc1d2dff6
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.config.ConfigExecution;
+import org.apache.iotdb.db.mpp.execution.config.IConfigTask;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutorService;
+
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+
+public class ConfigExecutionTest {
+
+  @Test
+  public void normalConfigTaskTest() {
+    IConfigTask task = () -> immediateFuture(null);
+    ConfigExecution execution =
+        new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+    execution.start();
+    ExecutionResult result = execution.getStatus();
+    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), result.status.code);
+  }
+
+  @Test
+  public void exceptionConfigTaskTest() {
+    IConfigTask task =
+        () -> {
+          throw new RuntimeException("task throw exception when executing");
+        };
+    ConfigExecution execution =
+        new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+    execution.start();
+    ExecutionResult result = execution.getStatus();
+    Assert.assertEquals(TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode(), result.status.code);
+  }
+
+  @Test
+  public void exceptionAfterInvokeGetStatusTest() throws InterruptedException {
+    IConfigTask task =
+        () -> {
+          throw new RuntimeException("task throw exception when executing");
+        };
+    ConfigExecution execution =
+        new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+    Thread resultThread =
+        new Thread(
+            () -> {
+              ExecutionResult result = execution.getStatus();
+              Assert.assertEquals(
+                  TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode(), result.status.code);
+            });
+    resultThread.start();
+    execution.start();
+    resultThread.join();
+  }
+
+  @Test
+  public void configTaskCancelledTest() throws InterruptedException {
+    SettableFuture<Void> taskResult = SettableFuture.create();
+    class SimpleTask implements IConfigTask {
+      private final ListenableFuture<Void> result;
+
+      public SimpleTask(ListenableFuture<Void> future) {
+        this.result = future;
+      }
+
+      @Override
+      public ListenableFuture<Void> execute() throws InterruptedException {
+        return result;
+      }
+    }
+    IConfigTask task = new SimpleTask(taskResult);
+    ConfigExecution execution =
+        new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+    execution.start();
+
+    Thread resultThread =
+        new Thread(
+            () -> {
+              ExecutionResult result = execution.getStatus();
+              Assert.assertEquals(
+                  TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode(), result.status.code);
+            });
+    resultThread.start();
+    taskResult.cancel(true);
+    resultThread.join();
+  }
+
+  private MPPQueryContext genMPPQueryContext() {
+    MPPQueryContext context = new MPPQueryContext(new QueryId("query1"));
+    context.setQueryType(QueryType.WRITE);
+    return context;
+  }
+
+  private ExecutorService getExecutor() {
+    return IoTDBThreadPoolFactory.newSingleThreadExecutor("ConfigExecutionTest");
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index d317b9d5cc..ee4d5dedc9 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -68,6 +68,45 @@ import static org.junit.Assert.assertEquals;
 
 public class DistributionPlannerTest {
 
+  @Test
+  public void TestSingleSeriesScan() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_query");
+    SeriesScanNode root =
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
+            Sets.newHashSet("s1", "s2"),
+            OrderBy.TIMESTAMP_ASC);
+
+    Analysis analysis = constructAnalysis();
+
+    MPPQueryContext context = new MPPQueryContext("", queryId, null, new Endpoint());
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
+    DistributedQueryPlan plan = planner.planFragments();
+    plan.getInstances().forEach(System.out::println);
+    assertEquals(2, plan.getInstances().size());
+  }
+
+  @Test
+  public void TestSingleSeriesScanRewriteSource() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_query");
+    SeriesScanNode root =
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
+            Sets.newHashSet("s1", "s2"),
+            OrderBy.TIMESTAMP_ASC);
+
+    Analysis analysis = constructAnalysis();
+
+    MPPQueryContext context = new MPPQueryContext("", queryId, null, new Endpoint());
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
+    PlanNode rootAfterRewrite = planner.rewriteSource();
+    assertEquals(2, rootAfterRewrite.getChildren().size());
+  }
+
   @Test
   public void TestRewriteSourceNode() throws IllegalPathException {
     QueryId queryId = new QueryId("test_query");