You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/15 08:47:14 UTC

[iotdb] branch master updated: Add DataNodeTSIServiceImpl as RPCServiceImpl in mpp mode (#5540)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 93ee11b9f3 Add DataNodeTSIServiceImpl as RPCServiceImpl in mpp mode (#5540)
93ee11b9f3 is described below

commit 93ee11b9f3fa10af4b032d9204b13ee9f6fc8248
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Fri Apr 15 16:47:10 2022 +0800

    Add DataNodeTSIServiceImpl as RPCServiceImpl in mpp mode (#5540)
---
 .../iotdb/confignode/conf/ConfigNodeConf.java      |   2 +-
 .../persistence/PartitionInfoPersistence.java      |   8 +-
 .../iotdb/commons/concurrent/ThreadName.java       |   1 +
 .../iotdb/commons/partition/DataPartition.java     |  28 +-
 .../commons/partition/DataPartitionQueryParam.java |   3 +-
 .../iotdb/commons/partition/SchemaPartition.java   |  46 +-
 .../apache/iotdb/commons/service/ServiceType.java  |   3 +-
 .../iotdb/commons/service/ThriftService.java       |   8 +-
 .../resources/conf/iotdb-engine.properties         |  35 +-
 .../apache/iotdb/db/client/ConfigNodeClient.java   |   2 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  13 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   4 +
 .../iotdb/db/mpp/buffer/DataBlockManager.java      |   2 +-
 .../iotdb/db/mpp/buffer/DataBlockService.java      |  29 +-
 .../DataBlockServiceMBean.java}                    |  15 +-
 .../org/apache/iotdb/db/mpp/buffer/SinkHandle.java |  14 +-
 .../apache/iotdb/db/mpp/buffer/SourceHandle.java   |   2 +
 .../iotdb/db/mpp/common/MPPQueryContext.java       |  10 +-
 .../iotdb/db/mpp/common/header/DatasetHeader.java  |  13 +-
 .../apache/iotdb/db/mpp/execution/Coordinator.java |  27 +-
 .../apache/iotdb/db/mpp/execution/DataDriver.java  |   1 +
 .../iotdb/db/mpp/execution/IQueryExecution.java    |  13 +
 .../iotdb/db/mpp/execution/QueryExecution.java     |  68 +-
 .../iotdb/db/mpp/execution/SchemaDriver.java       |   3 +-
 .../db/mpp/execution/config/ConfigExecution.java   |  48 +-
 .../scheduler/InternalServiceClientFactory.java    |  16 +-
 .../scheduler/SimpleFragInstanceDispatcher.java    |  17 +-
 .../db/mpp/schedule/FragmentInstanceScheduler.java |   2 +
 .../apache/iotdb/db/mpp/sql/analyze/Analyzer.java  |  71 +-
 .../mpp/sql/analyze/ClusterPartitionFetcher.java   |   7 +-
 .../db/mpp/sql/analyze/ClusterSchemaFetcher.java   |  12 +-
 .../db/mpp/sql/planner/DistributionPlanner.java    |  18 +-
 .../db/mpp/sql/planner/LocalExecutionPlanner.java  |   5 +-
 .../db/mpp/sql/planner/plan/FragmentInstance.java  |   1 +
 .../db/mpp/sql/planner/plan/node/PlanNodeType.java |   6 +-
 .../plan/node/metedata/read/SchemaFetchNode.java   |   4 +-
 .../plan/node/metedata/read/SchemaMergeNode.java   |  11 +-
 .../node/metedata/write/CreateTimeSeriesNode.java  |  29 +-
 .../planner/plan/node/process/ExchangeNode.java    |   6 +
 .../planner/plan/node/process/TimeJoinNode.java    |   9 +-
 .../planner/plan/node/sink/FragmentSinkNode.java   |   6 +
 .../sql/planner/plan/node/write/InsertRowNode.java |  32 +-
 .../apache/iotdb/db/service/InternalService.java   |  55 +-
 .../iotdb/db/service/InternalServiceImpl.java      |   2 +-
 .../InternalServiceMBean.java}                     |  11 +-
 ...vice.java => InternalServiceThriftHandler.java} |  34 +-
 .../java/org/apache/iotdb/db/service/IoTDB.java    |  29 +-
 .../org/apache/iotdb/db/service/RPCService.java    |  12 +-
 .../thrift/handler/RPCServiceThriftHandler.java    |  10 +-
 .../thrift/impl/DataNodeTSIServiceImpl.java        | 733 +++++++++++++++++++++
 .../thrift/impl/TSIEventHandler.java}              |  12 +-
 .../db/service/thrift/impl/TSServiceImpl.java      | 367 +----------
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   |  19 +-
 .../db/mpp/sql/plan/DistributionPlannerTest.java   |  19 +-
 .../iotdb/db/mpp/sql/plan/QueryPlannerTest.java    |   4 +-
 55 files changed, 1343 insertions(+), 614 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
index a92faf8485..27a60ea35b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
@@ -88,7 +88,7 @@ public class ConfigNodeConf {
       ConfigNodeConstant.DATA_DIR + File.separator + ConfigNodeConstant.CONSENSUS_FOLDER;
 
   /** Default TTL for storage groups that are not set TTL by statements, in ms. */
-  private long defaultTTL = 36000000;
+  private long defaultTTL = Long.MAX_VALUE;
 
   /** The number of replicas of each region */
   private int regionReplicaCount = 3;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java
index c7154f0e13..888c6ff9c2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java
@@ -59,7 +59,10 @@ public class PartitionInfoPersistence {
   public PartitionInfoPersistence() {
     this.schemaPartitionReadWriteLock = new ReentrantReadWriteLock();
     this.dataPartitionReadWriteLock = new ReentrantReadWriteLock();
-    this.schemaPartition = new SchemaPartition();
+    this.schemaPartition =
+        new SchemaPartition(
+            ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionExecutorClass(),
+            ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionSlotNum());
     this.schemaPartition.setSchemaPartitionMap(new HashMap<>());
     this.dataPartition =
         new DataPartition(
@@ -79,8 +82,7 @@ public class PartitionInfoPersistence {
     schemaPartitionReadWriteLock.readLock().lock();
 
     try {
-      schemaPartitionDataSet.setSchemaPartition(
-          schemaPartition.getSchemaPartition(physicalPlan.getPartitionSlotsMap()));
+      schemaPartitionDataSet.setSchemaPartition(schemaPartition.getSchemaPartition(physicalPlan.getPartitionSlotsMap()));
     } finally {
       schemaPartitionReadWriteLock.readLock().unlock();
       schemaPartitionDataSet.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index a4e0dc35ad..e587d38730 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -79,6 +79,7 @@ public enum ThreadName {
   Cluster_Monitor("ClusterMonitor"),
   DATA_BLOCK_MANAGER_SERVICE("DataBlockManagerService"),
   DATA_BLOCK_MANAGER_CLIENT("DataBlockManagerService-Client"),
+  INTERNAL_SERVICE_CLIENT("InternalService-Client"),
   ;
 
   private final String name;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index dcec685e63..bb62f1cea4 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -149,16 +149,24 @@ public class DataPartition {
           if (seriesTimePartitionSlotMap.containsKey(seriesPartitionSlot)) {
             Map<TimePartitionSlot, List<RegionReplicaSet>> timePartitionSlotMap =
                 seriesTimePartitionSlotMap.get(seriesPartitionSlot);
-            for (TimePartitionSlot timePartitionSlot :
-                partitionSlotsMap.get(storageGroupName).get(seriesPartitionSlot)) {
-              // Compare TimePartitionSlot
-              if (timePartitionSlotMap.containsKey(timePartitionSlot)) {
-                result
-                    .computeIfAbsent(storageGroupName, key -> new HashMap<>())
-                    .computeIfAbsent(seriesPartitionSlot, key -> new HashMap<>())
-                    .put(
-                        timePartitionSlot,
-                        new ArrayList<>(timePartitionSlotMap.get(timePartitionSlot)));
+            // TODO: (xingtanzjr) optimize if timeSlotPartition is empty
+            if (partitionSlotsMap.get(storageGroupName).get(seriesPartitionSlot).size() == 0) {
+              result
+                  .computeIfAbsent(storageGroupName, key -> new HashMap<>())
+                  .computeIfAbsent(seriesPartitionSlot, key -> new HashMap<>())
+                  .putAll(new HashMap<>(timePartitionSlotMap));
+            } else {
+              for (TimePartitionSlot timePartitionSlot :
+                  partitionSlotsMap.get(storageGroupName).get(seriesPartitionSlot)) {
+                // Compare TimePartitionSlot
+                if (timePartitionSlotMap.containsKey(timePartitionSlot)) {
+                  result
+                      .computeIfAbsent(storageGroupName, key -> new HashMap<>())
+                      .computeIfAbsent(seriesPartitionSlot, key -> new HashMap<>())
+                      .put(
+                          timePartitionSlot,
+                          new ArrayList<>(timePartitionSlotMap.get(timePartitionSlot)));
+                }
               }
             }
           }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
index d8bc05e294..23b61902a9 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
@@ -18,12 +18,13 @@
  */
 package org.apache.iotdb.commons.partition;
 
+import java.util.ArrayList;
 import java.util.List;
 
 public class DataPartitionQueryParam {
 
   private String devicePath;
-  private List<TimePartitionSlot> timePartitionSlotList;
+  private List<TimePartitionSlot> timePartitionSlotList = new ArrayList<>();
 
   public String getDevicePath() {
     return devicePath;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
index d4af85106b..f0ca88f774 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.commons.partition;
 
+import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -25,15 +27,22 @@ import java.util.Map;
 
 public class SchemaPartition {
 
+  private String seriesSlotExecutorName;
+  private int seriesPartitionSlotNum;
+
   // Map<StorageGroup, Map<SeriesPartitionSlot, SchemaRegionPlaceInfo>>
   private Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> schemaPartitionMap;
 
-  public SchemaPartition() {
-    // Empty constructor
+  public SchemaPartition(String seriesSlotExecutorName, int seriesPartitionSlotNum) {
+    this.seriesSlotExecutorName = seriesSlotExecutorName;
+    this.seriesPartitionSlotNum = seriesPartitionSlotNum;
   }
 
   public SchemaPartition(
-      Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> schemaPartitionMap) {
+      Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> schemaPartitionMap,
+      String seriesSlotExecutorName,
+      int seriesPartitionSlotNum) {
+    this(seriesSlotExecutorName, seriesPartitionSlotNum);
     this.schemaPartitionMap = schemaPartitionMap;
   }
 
@@ -46,6 +55,32 @@ public class SchemaPartition {
     this.schemaPartitionMap = schemaPartitionMap;
   }
 
+  public RegionReplicaSet getSchemaRegionReplicaSet(String deviceName) {
+    // A list of data region replica sets will store data in a same time partition.
+    // We will insert data to the last set in the list.
+    // TODO return the latest dataRegionReplicaSet for each time partition
+    String storageGroup = getStorageGroupByDevice(deviceName);
+    SeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceName);
+    return schemaPartitionMap.get(storageGroup).get(seriesPartitionSlot);
+  }
+
+  private SeriesPartitionSlot calculateDeviceGroupId(String deviceName) {
+    SeriesPartitionExecutor executor =
+        SeriesPartitionExecutor.getSeriesPartitionExecutor(
+            seriesSlotExecutorName, seriesPartitionSlotNum);
+    return executor.getSeriesPartitionSlot(deviceName);
+  }
+
+  private String getStorageGroupByDevice(String deviceName) {
+    for (String storageGroup : schemaPartitionMap.keySet()) {
+      if (deviceName.startsWith(storageGroup)) {
+        return storageGroup;
+      }
+    }
+    // TODO: (xingtanzjr) how to handle this exception in IoTDB
+    return null;
+  }
+
   /* Interfaces for ConfigNode */
 
   /**
@@ -59,7 +94,8 @@ public class SchemaPartition {
       Map<String, List<SeriesPartitionSlot>> partitionSlotsMap) {
     if (partitionSlotsMap.isEmpty()) {
       // Return all SchemaPartitions when the partitionSlotsMap is empty
-      return new SchemaPartition(new HashMap<>(schemaPartitionMap));
+      return new SchemaPartition(
+          new HashMap<>(schemaPartitionMap), seriesSlotExecutorName, seriesPartitionSlotNum);
     } else {
       Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> result = new HashMap<>();
 
@@ -86,7 +122,7 @@ public class SchemaPartition {
             }
           });
 
-      return new SchemaPartition(result);
+      return new SchemaPartition(result, seriesSlotExecutorName, seriesPartitionSlotNum);
     }
   }
 
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
index 0626a0261a..1e913818cc 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
@@ -70,7 +70,8 @@ public enum ServiceType {
   CONFIG_NODE_SERVICE("Config Node service", "ConfigNodeRPCServer"),
   DATA_NODE_MANAGEMENT_SERVICE("Data Node management service", "DataNodeManagementServer"),
   FRAGMENT_INSTANCE_MANAGER_SERVICE("Fragment instance manager", "FragmentInstanceManager"),
-  DATA_BLOCK_MANAGER_SERVICE("Data block manager", "DataBlockManager");
+  DATA_BLOCK_MANAGER_SERVICE("Data block manager", "DataBlockManager"),
+  INTERNAL_SERVICE("Internal Service", "InternalService");
 
   private final String name;
   private final String jmxName;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java
index e792d88c81..30d75a94eb 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java
@@ -26,6 +26,7 @@ import org.apache.thrift.TProcessor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.InvocationTargetException;
 import java.util.concurrent.CountDownLatch;
 
 public abstract class ThriftService implements IService {
@@ -88,7 +89,8 @@ public abstract class ThriftService implements IService {
   }
 
   public abstract void initTProcessor()
-      throws ClassNotFoundException, IllegalAccessException, InstantiationException;
+      throws ClassNotFoundException, IllegalAccessException, InstantiationException,
+          NoSuchMethodException, InvocationTargetException;
 
   public abstract void initThriftServiceThread()
       throws IllegalAccessException, InstantiationException, ClassNotFoundException;
@@ -125,7 +127,9 @@ public abstract class ThriftService implements IService {
     } catch (InterruptedException
         | ClassNotFoundException
         | IllegalAccessException
-        | InstantiationException e) {
+        | InstantiationException
+        | NoSuchMethodException
+        | InvocationTargetException e) {
       Thread.currentThread().interrupt();
       throw new StartupException(this.getID().getName(), e.getMessage());
     }
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 761ca79c87..f66262d72c 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -27,8 +27,26 @@ rpc_address=0.0.0.0
 # Datatype: int
 rpc_port=6667
 
+# whether start data node in mpp mode
+# mpp_mode=false
+
+####################
+### Shuffle Configuration
+####################
+# Datatype: int
+# data_block_manager_port=8777
+
+# Datatype: int
+# data_block_manager_core_pool_size=1
+
+# Datatype: int
+# data_block_manager_max_pool_size=5
+
+# Datatype: int
+# data_block_manager_keep_alive_time_in_ms=1000
+
 # Datatype: int
-mpp_port=7777
+# mpp_port=7777
 
 # Datatype: String
 # used for communication between cluster nodes.
@@ -963,21 +981,6 @@ timestamp_precision=ms
 # Datatype: float
 # group_by_fill_cache_size_in_mb=1.0
 
-####################
-### Shuffle Configuration
-####################
-# Datatype: int
-# data_block_manager_port=7777
-
-# Datatype: int
-# data_block_manager_core_pool_size=1
-
-# Datatype: int
-# data_block_manager_max_pool_size=5
-
-# Datatype: int
-# data_block_manager_keep_alive_time_in_ms=1000
-
 ####################
 ### Schema Engine Configuration
 ####################
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 0816618fb3..4c2d0d7ed3 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -54,7 +54,7 @@ import java.util.Random;
 public class ConfigNodeClient {
   private static final Logger logger = LoggerFactory.getLogger(ConfigNodeClient.class);
 
-  private static final int TIMEOUT_MS = 2000;
+  private static final int TIMEOUT_MS = 10000;
 
   private static final int RETRY_NUM = 5;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 83e30ba005..856381c0d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -507,6 +507,9 @@ public class IoTDBConfig {
   /** Replace implementation class of JDBC service */
   private String rpcImplClassName = TSServiceImpl.class.getName();
 
+  /** indicate whether current mode is mpp */
+  private boolean mppMode = false;
+
   /** Replace implementation class of influxdb protocol service */
   private String influxdbImplClassName = InfluxDBServiceImpl.class.getName();
 
@@ -875,7 +878,7 @@ public class IoTDBConfig {
   private int seriesPartitionSlotNum = 10000;
 
   /** Port that data block manager thrift service listen to. */
-  private int dataBlockManagerPort = 7777;
+  private int dataBlockManagerPort = 8777;
 
   /** Core pool size of data block manager. */
   private int dataBlockManagerCorePoolSize = 1;
@@ -2795,4 +2798,12 @@ public class IoTDBConfig {
   public void setDataBlockManagerKeepAliveTimeInMs(int dataBlockManagerKeepAliveTimeInMs) {
     this.dataBlockManagerKeepAliveTimeInMs = dataBlockManagerKeepAliveTimeInMs;
   }
+
+  public boolean isMppMode() {
+    return mppMode;
+  }
+
+  public void setMppMode(boolean mppMode) {
+    this.mppMode = mppMode;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 36faf46a38..189d65a85d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -167,6 +167,10 @@ public class IoTDBDescriptor {
           Integer.parseInt(
               properties.getProperty("rpc_port", Integer.toString(conf.getRpcPort()))));
 
+      conf.setMppMode(
+          Boolean.parseBoolean(
+              properties.getProperty("mpp_mode", Boolean.toString(conf.isMppMode()))));
+
       conf.setMppPort(
           Integer.parseInt(
               properties.getProperty("mpp_port", Integer.toString(conf.getRpcPort()))));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
index 5f06947a8f..f90fce1f3e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
@@ -236,7 +236,7 @@ public class DataBlockManager implements IDataBlockManager {
   }
 
   public DataBlockServiceImpl getOrCreateDataBlockServiceImpl() {
-    if (dataBlockService != null) {
+    if (dataBlockService == null) {
       dataBlockService = new DataBlockServiceImpl();
     }
     return dataBlockService;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java
index 1557987d17..fa9b30cd8a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java
@@ -31,19 +31,14 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
 import org.apache.iotdb.mpp.rpc.thrift.DataBlockService.Processor;
 
-import org.apache.commons.lang3.Validate;
-
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-public class DataBlockService extends ThriftService {
+public class DataBlockService extends ThriftService implements DataBlockServiceMBean {
 
-  private LocalMemoryManager localMemoryManager;
-  private TsBlockSerdeFactory tsBlockSerdeFactory;
   private DataBlockManager dataBlockManager;
   private ExecutorService executorService;
-  private DataBlockServiceClientFactory clientFactory;
 
   private DataBlockService() {}
 
@@ -55,6 +50,7 @@ public class DataBlockService extends ThriftService {
   @Override
   public void initTProcessor()
       throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+    initSyncedServiceImpl(null);
     IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
     executorService =
         IoTDBThreadPoolFactory.newThreadPool(
@@ -66,21 +62,15 @@ public class DataBlockService extends ThriftService {
             new LinkedBlockingQueue<>(),
             new IoTThreadFactory("data-block-manager-task-executors"),
             "data-block-manager-task-executors");
-    clientFactory = new DataBlockServiceClientFactory();
     this.dataBlockManager =
         new DataBlockManager(
-            localMemoryManager, tsBlockSerdeFactory, executorService, clientFactory);
+            new LocalMemoryManager(),
+            new TsBlockSerdeFactory(),
+            executorService,
+            new DataBlockServiceClientFactory());
     processor = new Processor<>(dataBlockManager.getOrCreateDataBlockServiceImpl());
   }
 
-  public void setLocalMemoryManager(LocalMemoryManager localMemoryManager) {
-    this.localMemoryManager = Validate.notNull(localMemoryManager);
-  }
-
-  public void setTsBlockSerdeFactory(TsBlockSerdeFactory tsBlockSerdeFactory) {
-    this.tsBlockSerdeFactory = Validate.notNull(tsBlockSerdeFactory);
-  }
-
   public DataBlockManager getDataBlockManager() {
     return dataBlockManager;
   }
@@ -101,7 +91,7 @@ public class DataBlockService extends ThriftService {
               config.getThriftServerAwaitTimeForStopService(),
               new DataBlockServiceThriftHandler(),
               // TODO: hard coded compress strategy
-              true);
+              false);
     } catch (RPCServiceException e) {
       throw new IllegalAccessException(e.getMessage());
     }
@@ -133,6 +123,11 @@ public class DataBlockService extends ThriftService {
     return DataBlockManagerServiceHolder.INSTANCE;
   }
 
+  @Override
+  public int getRPCPort() {
+    return getBindPort();
+  }
+
   private static class DataBlockManagerServiceHolder {
     private static final DataBlockService INSTANCE = new DataBlockService();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockServiceMBean.java
similarity index 72%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockServiceMBean.java
index 3f9fa05ea8..52aae71ebb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockServiceMBean.java
@@ -16,14 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.iotdb.db.mpp.buffer;
 
-package org.apache.iotdb.db.mpp.execution;
+import org.apache.iotdb.commons.exception.StartupException;
 
-public interface IQueryExecution {
+public interface DataBlockServiceMBean {
 
-  void start();
+  String getRPCServiceStatus();
 
-  void stop();
+  int getRPCPort();
 
-  ExecutionResult getStatus();
+  void startService() throws StartupException;
+
+  void restartService() throws StartupException;
+
+  void stopService();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index a5fa545bcc..96802c3bd5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -105,7 +105,11 @@ public class SinkHandle implements ISinkHandle {
   }
 
   private void submitSendNewDataBlockEventTask(int startSequenceId, List<Long> blockSizes) {
-    executorService.submit(new SendNewDataBlockEventTask(startSequenceId, blockSizes));
+    // TODO: (xingtanzjr)
+    // We temporarily make it sync instead of async to avoid EOS Event(SinkHandle close() method is
+    // called) is sent before NewDataBlockEvent arrived
+    new SendNewDataBlockEventTask(startSequenceId, blockSizes).run();
+    //    executorService.submit(new SendNewDataBlockEventTask(startSequenceId, blockSizes));
   }
 
   @Override
@@ -157,9 +161,10 @@ public class SinkHandle implements ISinkHandle {
 
   private void sendEndOfDataBlockEvent() throws TException {
     logger.debug(
-        "Send end of data block event to plan node {} of {}.",
+        "Send end of data block event to plan node {} of {}. {}",
         remotePlanNodeId,
-        remoteFragmentInstanceId);
+        remoteFragmentInstanceId,
+        Thread.currentThread().getName());
     int attempt = 0;
     EndOfDataBlockEvent endOfDataBlockEvent =
         new EndOfDataBlockEvent(
@@ -178,7 +183,8 @@ public class SinkHandle implements ISinkHandle {
             remotePlanNodeId,
             remoteFragmentInstanceId,
             e.getMessage(),
-            attempt);
+            attempt,
+            e);
         if (attempt == MAX_ATTEMPT_TIMES) {
           throw e;
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
index b577174702..af6e715a16 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
@@ -212,6 +212,8 @@ public class SourceHandle implements ISourceHandle {
   synchronized void setNoMoreTsBlocks(int lastSequenceId) {
     this.lastSequenceId = lastSequenceId;
     noMoreTsBlocks = true;
+    // someone may be waiting for this blocked, so here we need to notify it
+    blocked.set(null);
   }
 
   synchronized void updatePendingDataBlockInfo(int startSequenceId, List<Long> dataBlockSizes) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
index def89dba39..b449bdf9e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
@@ -29,7 +29,7 @@ public class MPPQueryContext {
   private String sql;
   private QueryId queryId;
   private SessionInfo session;
-  private QueryType queryType;
+  private QueryType queryType = QueryType.READ;
 
   private Endpoint hostEndpoint;
   private ResultNodeContext resultNodeContext;
@@ -38,12 +38,10 @@ public class MPPQueryContext {
     this.queryId = queryId;
   }
 
-  public MPPQueryContext(
-      String sql, QueryId queryId, SessionInfo session, QueryType type, Endpoint hostEndpoint) {
+  public MPPQueryContext(String sql, QueryId queryId, SessionInfo session, Endpoint hostEndpoint) {
     this.sql = sql;
     this.queryId = queryId;
     this.session = session;
-    this.queryType = type;
     this.hostEndpoint = hostEndpoint;
     this.resultNodeContext = new ResultNodeContext(queryId);
   }
@@ -56,6 +54,10 @@ public class MPPQueryContext {
     return queryType;
   }
 
+  public void setQueryType(QueryType queryType) {
+    this.queryType = queryType;
+  }
+
   public Endpoint getHostEndpoint() {
     return hostEndpoint;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeader.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeader.java
index 32a236feae..ee3e52d218 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeader.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeader.java
@@ -19,8 +19,6 @@
 
 package org.apache.iotdb.db.mpp.common.header;
 
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-
 import com.google.common.primitives.Bytes;
 
 import java.util.*;
@@ -51,10 +49,6 @@ public class DatasetHeader {
     return isIgnoreTimestamp;
   }
 
-  public Map<String, Integer> getColumnToTsBlockIndexMap() {
-    return columnToTsBlockIndexMap;
-  }
-
   public void setColumnToTsBlockIndexMap(List<String> outputColumnNames) {
     this.columnToTsBlockIndexMap = new HashMap<>();
     for (int i = 0; i < outputColumnNames.size(); i++) {
@@ -66,8 +60,11 @@ public class DatasetHeader {
     return columnHeaders.stream().map(ColumnHeader::getColumnName).collect(Collectors.toList());
   }
 
-  public List<TSDataType> getRespDataTypeList() {
-    return columnHeaders.stream().map(ColumnHeader::getColumnType).collect(Collectors.toList());
+  public List<String> getRespDataTypeList() {
+    return columnHeaders.stream()
+        .map(ColumnHeader::getColumnType)
+        .map(Objects::toString)
+        .collect(Collectors.toList());
   }
 
   public List<Byte> getRespAliasColumns() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index 4749394259..4bb6bfbf58 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -24,9 +24,10 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
+import org.apache.iotdb.db.mpp.execution.config.ConfigExecution;
 import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.SetStorageGroupStatement;
 
 import org.apache.commons.lang3.Validate;
 
@@ -50,12 +51,12 @@ public class Coordinator {
           IoTDBDescriptor.getInstance().getConfig().getRpcAddress(),
           IoTDBDescriptor.getInstance().getConfig().getMppPort());
 
-  private ExecutorService executor;
-  private ScheduledExecutorService scheduledExecutor;
+  private final ExecutorService executor;
+  private final ScheduledExecutorService scheduledExecutor;
 
   private static final Coordinator INSTANCE = new Coordinator();
 
-  private ConcurrentHashMap<QueryId, QueryExecution> queryExecutionMap;
+  private final ConcurrentHashMap<QueryId, IQueryExecution> queryExecutionMap;
 
   private Coordinator() {
     this.queryExecutionMap = new ConcurrentHashMap<>();
@@ -63,16 +64,20 @@ public class Coordinator {
     this.scheduledExecutor = getScheduledExecutor();
   }
 
-  private QueryExecution createQueryExecution(Statement statement, MPPQueryContext queryContext) {
+  private IQueryExecution createQueryExecution(Statement statement, MPPQueryContext queryContext) {
+    if (statement instanceof SetStorageGroupStatement) {
+      queryContext.setQueryType(QueryType.WRITE);
+      return new ConfigExecution(queryContext, statement, executor);
+    }
     return new QueryExecution(statement, queryContext, executor, scheduledExecutor);
   }
 
   public ExecutionResult execute(
-      Statement statement, QueryId queryId, QueryType queryType, SessionInfo session, String sql) {
+      Statement statement, QueryId queryId, SessionInfo session, String sql) {
 
-    QueryExecution execution =
+    IQueryExecution execution =
         createQueryExecution(
-            statement, new MPPQueryContext(sql, queryId, session, queryType, getHostEndpoint()));
+            statement, new MPPQueryContext(sql, queryId, session, getHostEndpoint()));
     queryExecutionMap.put(queryId, execution);
 
     execution.start();
@@ -80,10 +85,10 @@ public class Coordinator {
     return execution.getStatus();
   }
 
-  public TsBlock getResultSet(QueryId queryId) {
-    QueryExecution execution = queryExecutionMap.get(queryId);
+  public IQueryExecution getQueryExecution(QueryId queryId) {
+    IQueryExecution execution = queryExecutionMap.get(queryId);
     Validate.notNull(execution, "invalid queryId %s", queryId.getId());
-    return execution.getBatchResult();
+    return execution;
   }
 
   // TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
index 55a6d566f1..ce6a513080 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
@@ -90,6 +90,7 @@ public class DataDriver implements Driver {
       boolean isFinished =
           closed || (driverBlockedFuture.get().isDone() && root != null && root.isFinished());
       if (isFinished) {
+        close();
         driverContext.finish();
       }
       return isFinished;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
index 3f9fa05ea8..2e0bd9c76d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
@@ -19,6 +19,9 @@
 
 package org.apache.iotdb.db.mpp.execution;
 
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
 public interface IQueryExecution {
 
   void start();
@@ -26,4 +29,14 @@ public interface IQueryExecution {
   void stop();
 
   ExecutionResult getStatus();
+
+  TsBlock getBatchResult();
+
+  boolean hasNextResult();
+
+  int getOutputValueColumnCount();
+
+  DatasetHeader getDatasetHeader();
+
+  boolean isQuery();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index a5dbeaf21f..99241305d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -18,13 +18,16 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.buffer.DataBlockService;
 import org.apache.iotdb.db.mpp.buffer.ISourceHandle;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
 import org.apache.iotdb.db.mpp.execution.scheduler.ClusterScheduler;
 import org.apache.iotdb.db.mpp.execution.scheduler.IScheduler;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.analyze.Analyzer;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.optimization.PlanOptimizer;
 import org.apache.iotdb.db.mpp.sql.planner.DistributionPlanner;
 import org.apache.iotdb.db.mpp.sql.planner.LogicalPlanner;
@@ -55,18 +58,18 @@ import static com.google.common.base.Throwables.throwIfUnchecked;
  * corresponding physical nodes. 3. Collect and monitor the progress/states of this query.
  */
 public class QueryExecution implements IQueryExecution {
-  private MPPQueryContext context;
+  private final MPPQueryContext context;
   private IScheduler scheduler;
-  private QueryStateMachine stateMachine;
+  private final QueryStateMachine stateMachine;
 
-  private List<PlanOptimizer> planOptimizers;
+  private final List<PlanOptimizer> planOptimizers;
 
   private final Analysis analysis;
   private LogicalQueryPlan logicalPlan;
   private DistributedQueryPlan distributedPlan;
 
-  private ExecutorService executor;
-  private ScheduledExecutorService scheduledExecutor;
+  private final ExecutorService executor;
+  private final ScheduledExecutorService scheduledExecutor;
 
   // The result of QueryExecution will be written to the DataBlockManager in current Node.
   // We use this SourceHandle to fetch the TsBlock from it.
@@ -100,6 +103,9 @@ public class QueryExecution implements IQueryExecution {
   public void start() {
     doLogicalPlan();
     doDistributedPlan();
+    if (context.getQueryType() == QueryType.READ) {
+      initResultHandle();
+    }
     schedule();
   }
 
@@ -153,11 +159,14 @@ public class QueryExecution implements IQueryExecution {
    * DataStreamManager use the virtual ResultOperator's ID (This part will be designed and
    * implemented with DataStreamManager)
    */
+  @Override
   public TsBlock getBatchResult() {
     try {
-      initResultHandle();
       ListenableFuture<Void> blocked = resultHandle.isBlocked();
       blocked.get();
+      if (resultHandle.isFinished()) {
+        return null;
+      }
       return resultHandle.receive();
 
     } catch (ExecutionException | IOException e) {
@@ -171,20 +180,20 @@ public class QueryExecution implements IQueryExecution {
   }
 
   /** @return true if there is more tsblocks, otherwise false */
+  @Override
   public boolean hasNextResult() {
-    try {
-      initResultHandle();
-      return resultHandle.isFinished();
-    } catch (IOException e) {
-      throwIfUnchecked(e.getCause());
-      throw new RuntimeException(e.getCause());
-    }
+    return !resultHandle.isFinished();
   }
 
   /** return the result column count without the time column */
+  @Override
   public int getOutputValueColumnCount() {
-    // TODO need return the actual size while there exists output columns in Analysis
-    return 1;
+    return analysis.getRespDatasetHeader().getColumnHeaders().size();
+  }
+
+  @Override
+  public DatasetHeader getDatasetHeader() {
+    return analysis.getRespDatasetHeader();
   }
 
   /**
@@ -221,17 +230,21 @@ public class QueryExecution implements IQueryExecution {
     }
   }
 
-  private void initResultHandle() throws IOException {
+  private void initResultHandle() {
     if (this.resultHandle == null) {
-      this.resultHandle =
-          DataBlockService.getInstance()
-              .getDataBlockManager()
-              .createSourceHandle(
-                  context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
-                  context.getResultNodeContext().getVirtualResultNodeId().getId(),
-                  context.getResultNodeContext().getUpStreamEndpoint().getIp(),
-                  context.getResultNodeContext().getUpStreamEndpoint().getPort(),
-                  context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift());
+      try {
+        this.resultHandle =
+            DataBlockService.getInstance()
+                .getDataBlockManager()
+                .createSourceHandle(
+                    context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
+                    context.getResultNodeContext().getVirtualResultNodeId().getId(),
+                    context.getResultNodeContext().getUpStreamEndpoint().getIp(),
+                    IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort(),
+                    context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift());
+      } catch (IOException e) {
+        stateMachine.transitionToFailed();
+      }
     }
   }
 
@@ -242,4 +255,9 @@ public class QueryExecution implements IQueryExecution {
   public LogicalQueryPlan getLogicalPlan() {
     return logicalPlan;
   }
+
+  @Override
+  public boolean isQuery() {
+    return context.getQueryType() == QueryType.READ;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
index 4a1eef21fa..ea49d012e3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
@@ -42,7 +42,7 @@ import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
 @NotThreadSafe
 public class SchemaDriver implements Driver {
 
-  private static final Logger logger = LoggerFactory.getLogger(DataDriver.class);
+  private static final Logger logger = LoggerFactory.getLogger(SchemaDriver.class);
 
   private final Operator root;
   private final ISinkHandle sinkHandle;
@@ -65,6 +65,7 @@ public class SchemaDriver implements Driver {
     try {
       boolean isFinished = driverBlockedFuture.get().isDone() && root != null && root.isFinished();
       if (isFinished) {
+        close();
         driverContext.finish();
       }
       return isFinished;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
index 2d158c4856..21afde5d29 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
@@ -20,19 +20,23 @@
 package org.apache.iotdb.db.mpp.execution.config;
 
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
 import org.apache.iotdb.db.mpp.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.execution.IQueryExecution;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.SetStorageGroupStatement;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import jersey.repackaged.com.google.common.util.concurrent.SettableFuture;
+import org.jetbrains.annotations.NotNull;
 
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -41,12 +45,12 @@ import static com.google.common.base.Throwables.throwIfInstanceOf;
 
 public class ConfigExecution implements IQueryExecution {
 
-  private MPPQueryContext context;
-  private Statement statement;
-  private ExecutorService executor;
+  private final MPPQueryContext context;
+  private final Statement statement;
+  private final ExecutorService executor;
 
-  private QueryStateMachine stateMachine;
-  private SettableFuture<Boolean> result;
+  private final QueryStateMachine stateMachine;
+  private final SettableFuture<Boolean> result;
 
   public ConfigExecution(MPPQueryContext context, Statement statement, ExecutorService executor) {
     this.context = context;
@@ -71,7 +75,7 @@ public class ConfigExecution implements IQueryExecution {
             }
 
             @Override
-            public void onFailure(Throwable throwable) {
+            public void onFailure(@NotNull Throwable throwable) {
               fail(throwable);
             }
           },
@@ -93,6 +97,10 @@ public class ConfigExecution implements IQueryExecution {
   @Override
   public ExecutionResult getStatus() {
     try {
+      if (result.isCancelled()) {
+        return new ExecutionResult(
+            context.getQueryId(), RpcUtils.getStatus(TSStatusCode.QUERY_PROCESS_ERROR));
+      }
       Boolean success = result.get();
       TSStatusCode statusCode =
           success ? TSStatusCode.SUCCESS_STATUS : TSStatusCode.QUERY_PROCESS_ERROR;
@@ -105,6 +113,34 @@ public class ConfigExecution implements IQueryExecution {
     }
   }
 
+  @Override
+  public TsBlock getBatchResult() {
+    // TODO
+    return null;
+  }
+
+  @Override
+  public boolean hasNextResult() {
+    return false;
+  }
+
+  @Override
+  public int getOutputValueColumnCount() {
+    // TODO
+    return 0;
+  }
+
+  @Override
+  public DatasetHeader getDatasetHeader() {
+    // TODO
+    return null;
+  }
+
+  @Override
+  public boolean isQuery() {
+    return context.getQueryType() == QueryType.READ;
+  }
+
   // TODO: consider a more suitable implementation for it
   // Generate the corresponding IConfigTask by statement.
   // Each type of statement will has a ConfigTask
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
index fefd526b54..4a5a94b4bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
@@ -20,21 +20,25 @@
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
 import org.apache.iotdb.mpp.rpc.thrift.InternalService;
+import org.apache.iotdb.rpc.RpcTransportFactory;
 
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 
 public class InternalServiceClientFactory {
+  private static final int TIMEOUT_MS = 10000;
+
   // TODO: (xingtanzjr) consider the best practice to maintain the clients
   public static InternalService.Client getInternalServiceClient(String endpoint, int port)
       throws TTransportException {
-    try (TTransport transport = new TSocket(endpoint, port)) {
-      transport.open();
-      TProtocol protocol = new TBinaryProtocol(transport);
-      return new InternalService.Client(protocol);
-    }
+    TTransport transport =
+        RpcTransportFactory.INSTANCE.getTransport(
+            // as there is a try-catch already, we do not need to use TSocket.wrap
+            endpoint, port, TIMEOUT_MS);
+    transport.open();
+    TProtocol protocol = new TBinaryProtocol(transport);
+    return new InternalService.Client(protocol);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index 3602bd43fa..fa81d65764 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -19,13 +19,13 @@
 
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.InternalService;
 import org.apache.iotdb.mpp.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
-
-import org.apache.thrift.TException;
+import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
 
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -44,11 +44,13 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
   public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> instances) {
     return executor.submit(
         () -> {
+          TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
           try {
             for (FragmentInstance instance : instances) {
               InternalService.Client client =
                   InternalServiceClientFactory.getInternalServiceClient(
-                      instance.getHostEndpoint().getIp(), instance.getHostEndpoint().getPort());
+                      instance.getHostEndpoint().getIp(),
+                      IoTDBDescriptor.getInstance().getConfig().getMppPort());
               // TODO: (xingtanzjr) consider how to handle the buffer here
               ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
               instance.serializeRequest(buffer);
@@ -60,13 +62,16 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
               TSendFragmentInstanceReq req =
                   new TSendFragmentInstanceReq(
                       new TFragmentInstance(buffer), groupId, instance.getType().toString());
-              client.sendFragmentInstance(req);
+              resp = client.sendFragmentInstance(req);
+              if (!resp.accepted) {
+                break;
+              }
             }
-          } catch (TException e) {
+          } catch (Exception e) {
             // TODO: (xingtanzjr) add more details
             return new FragInstanceDispatchResult(false);
           }
-          return new FragInstanceDispatchResult(true);
+          return new FragInstanceDispatchResult(resp.accepted);
         });
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
index efaa56b89e..b2200b7690 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.mpp.buffer.DataBlockService;
 import org.apache.iotdb.db.mpp.buffer.IDataBlockManager;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.QueryId;
@@ -89,6 +90,7 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
     this.scheduler = new Scheduler();
     this.workerGroups = new ThreadGroup("ScheduleThreads");
     this.threads = new ArrayList<>();
+    this.blockManager = DataBlockService.getInstance().getDataBlockManager();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
index bb7114c01f..d8fcd29210 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
@@ -71,9 +71,9 @@ public class Analyzer {
   private final MPPQueryContext context;
 
   // TODO need to use factory to decide standalone or cluster
-  private final IPartitionFetcher partitionFetcher = new FakePartitionFetcherImpl();
+  private final IPartitionFetcher partitionFetcher = ClusterPartitionFetcher.getInstance();
   // TODO need to use factory to decide standalone or cluster
-  private final ISchemaFetcher schemaFetcher = new FakeSchemaFetcherImpl();
+  private final ISchemaFetcher schemaFetcher = ClusterSchemaFetcher.getInstance();
 
   public Analyzer(MPPQueryContext context) {
     this.context = context;
@@ -177,14 +177,52 @@ public class Analyzer {
     @Override
     public Analysis visitInsert(InsertStatement insertStatement, MPPQueryContext context) {
       // TODO: do analyze for insert statement
-      Analysis analysis = new Analysis();
-      analysis.setStatement(insertStatement);
-      return analysis;
+      context.setQueryType(QueryType.WRITE);
+
+      long[] timeArray = insertStatement.getTimes();
+      PartialPath devicePath = insertStatement.getDevice();
+      String[] measurements = insertStatement.getMeasurementList();
+      if (timeArray.length == 1) {
+        // construct insert row statement
+        InsertRowStatement insertRowStatement = new InsertRowStatement();
+        insertRowStatement.setDevicePath(devicePath);
+        insertRowStatement.setTime(timeArray[0]);
+        insertRowStatement.setMeasurements(measurements);
+        insertRowStatement.setDataTypes(
+            new TSDataType[insertStatement.getMeasurementList().length]);
+        Object[] values = new Object[insertStatement.getMeasurementList().length];
+        System.arraycopy(insertStatement.getValuesList().get(0), 0, values, 0, values.length);
+        insertRowStatement.setValues(values);
+        insertRowStatement.setNeedInferType(true);
+        insertRowStatement.setAligned(insertStatement.isAligned());
+        return insertRowStatement.accept(this, context);
+      } else {
+        // construct insert rows statement
+        // construct insert statement
+        InsertRowsStatement insertRowsStatement = new InsertRowsStatement();
+        List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
+        for (int i = 0; i < timeArray.length; i++) {
+          InsertRowStatement statement = new InsertRowStatement();
+          statement.setDevicePath(devicePath);
+          statement.setMeasurements(measurements);
+          statement.setTime(timeArray[i]);
+          statement.setDataTypes(new TSDataType[insertStatement.getMeasurementList().length]);
+          Object[] values = new Object[insertStatement.getMeasurementList().length];
+          System.arraycopy(insertStatement.getValuesList().get(i), 0, values, 0, values.length);
+          statement.setValues(values);
+          statement.setAligned(insertStatement.isAligned());
+          statement.setNeedInferType(true);
+          insertRowStatementList.add(statement);
+        }
+        insertRowsStatement.setInsertRowStatementList(insertRowStatementList);
+        return insertRowsStatement.accept(this, context);
+      }
     }
 
     @Override
     public Analysis visitCreateTimeseries(
         CreateTimeSeriesStatement createTimeSeriesStatement, MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
       if (createTimeSeriesStatement.getTags() != null
           && !createTimeSeriesStatement.getTags().isEmpty()
           && createTimeSeriesStatement.getAttributes() != null
@@ -201,7 +239,7 @@ public class Analyzer {
       analysis.setStatement(createTimeSeriesStatement);
 
       SchemaPartition schemaPartitionInfo =
-          partitionFetcher.getSchemaPartition(
+          partitionFetcher.getOrCreateSchemaPartition(
               new PathPatternTree(createTimeSeriesStatement.getPath()));
       analysis.setSchemaPartitionInfo(schemaPartitionInfo);
       return analysis;
@@ -211,6 +249,7 @@ public class Analyzer {
     public Analysis visitCreateAlignedTimeseries(
         CreateAlignedTimeSeriesStatement createAlignedTimeSeriesStatement,
         MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
       List<String> measurements = createAlignedTimeSeriesStatement.getMeasurements();
       Set<String> measurementsSet = new HashSet<>(measurements);
       if (measurementsSet.size() < measurements.size()) {
@@ -234,6 +273,7 @@ public class Analyzer {
     @Override
     public Analysis visitAlterTimeseries(
         AlterTimeSeriesStatement alterTimeSeriesStatement, MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
       Analysis analysis = new Analysis();
       analysis.setStatement(alterTimeSeriesStatement);
 
@@ -248,6 +288,7 @@ public class Analyzer {
     @Override
     public Analysis visitInsertTablet(
         InsertTabletStatement insertTabletStatement, MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
       SchemaTree schemaTree =
           schemaFetcher.fetchSchemaWithAutoCreate(
               insertTabletStatement.getDevicePath(),
@@ -320,6 +361,7 @@ public class Analyzer {
 
     @Override
     public Analysis visitCreateUser(AuthorStatement authorStatement, MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
       Analysis analysis = new Analysis();
       analysis.setStatement(authorStatement);
       return analysis;
@@ -327,6 +369,7 @@ public class Analyzer {
 
     @Override
     public Analysis visitCreateRole(AuthorStatement authorStatement, MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
       Analysis analysis = new Analysis();
       analysis.setStatement(authorStatement);
       return analysis;
@@ -334,6 +377,7 @@ public class Analyzer {
 
     @Override
     public Analysis visitAlterUser(AuthorStatement authorStatement, MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
       Analysis analysis = new Analysis();
       analysis.setStatement(authorStatement);
       return analysis;
@@ -341,6 +385,7 @@ public class Analyzer {
 
     @Override
     public Analysis visitGrantUser(AuthorStatement authorStatement, MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
       Analysis analysis = new Analysis();
       analysis.setStatement(authorStatement);
       return analysis;
@@ -348,6 +393,7 @@ public class Analyzer {
 
     @Override
     public Analysis visitGrantRole(AuthorStatement authorStatement, MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
       Analysis analysis = new Analysis();
       analysis.setStatement(authorStatement);
       return analysis;
@@ -355,6 +401,7 @@ public class Analyzer {
 
     @Override
     public Analysis visitGrantRoleToUser(AuthorStatement authorStatement, MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
       Analysis analysis = new Analysis();
       analysis.setStatement(authorStatement);
       return analysis;
@@ -362,6 +409,7 @@ public class Analyzer {
 
     @Override
     public Analysis visitRevokeUser(AuthorStatement authorStatement, MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
       Analysis analysis = new Analysis();
       analysis.setStatement(authorStatement);
       return analysis;
@@ -369,6 +417,7 @@ public class Analyzer {
 
     @Override
     public Analysis visitRevokeRole(AuthorStatement authorStatement, MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
       Analysis analysis = new Analysis();
       analysis.setStatement(authorStatement);
       return analysis;
@@ -377,6 +426,7 @@ public class Analyzer {
     @Override
     public Analysis visitRevokeRoleFromUser(
         AuthorStatement authorStatement, MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
       Analysis analysis = new Analysis();
       analysis.setStatement(authorStatement);
       return analysis;
@@ -384,6 +434,7 @@ public class Analyzer {
 
     @Override
     public Analysis visitDropUser(AuthorStatement authorStatement, MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
       Analysis analysis = new Analysis();
       analysis.setStatement(authorStatement);
       return analysis;
@@ -391,6 +442,7 @@ public class Analyzer {
 
     @Override
     public Analysis visitDropRole(AuthorStatement authorStatement, MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
       Analysis analysis = new Analysis();
       analysis.setStatement(authorStatement);
       return analysis;
@@ -460,6 +512,7 @@ public class Analyzer {
 
     @Override
     public Analysis visitInsertRow(InsertRowStatement insertRowStatement, MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
       // TODO remove duplicate
       SchemaTree schemaTree =
           schemaFetcher.fetchSchemaWithAutoCreate(
@@ -485,7 +538,8 @@ public class Analyzer {
       sgNameToQueryParamsMap.put(
           schemaTree.getBelongedStorageGroup(insertRowStatement.getDevicePath()),
           Collections.singletonList(dataPartitionQueryParam));
-      DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+      DataPartition dataPartition =
+          partitionFetcher.getOrCreateDataPartition(sgNameToQueryParamsMap);
 
       Analysis analysis = new Analysis();
       analysis.setSchemaTree(schemaTree);
@@ -498,6 +552,7 @@ public class Analyzer {
     @Override
     public Analysis visitInsertRows(
         InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
       // TODO remove duplicate
       SchemaTree schemaTree =
           schemaFetcher.fetchSchemaListWithAutoCreate(
@@ -542,6 +597,7 @@ public class Analyzer {
     @Override
     public Analysis visitInsertMultiTablets(
         InsertMultiTabletsStatement insertMultiTabletsStatement, MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
       // TODO remove duplicate
       SchemaTree schemaTree =
           schemaFetcher.fetchSchemaListWithAutoCreate(
@@ -580,6 +636,7 @@ public class Analyzer {
     @Override
     public Analysis visitInsertRowsOfOneDevice(
         InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
       // TODO remove duplicate
       SchemaTree schemaTree =
           schemaFetcher.fetchSchemaWithAutoCreate(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
index 7c0f6c9470..f8a9d348be 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
@@ -164,7 +164,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
     try {
       patternTree.serialize(baos);
       ByteBuffer serializedPatternTree = ByteBuffer.allocate(baos.size());
-      serializedPatternTree.put(baos.getBuf());
+      serializedPatternTree.put(baos.getBuf(), 0, baos.size());
       serializedPatternTree.flip();
       return new TSchemaPartitionReq(serializedPatternTree);
     } catch (IOException e) {
@@ -209,7 +209,10 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
       }
       schemaPartitionMap.put(storageGroupName, deviceToSchemaRegionMap);
     }
-    return new SchemaPartition(schemaPartitionMap);
+    return new SchemaPartition(
+        schemaPartitionMap,
+        IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+        IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
   }
 
   private DataPartition parseDataPartitionResp(TDataPartitionResp dataPartitionResp) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
index 9e47c755eb..ca7f1f06cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
@@ -26,8 +26,10 @@ import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
 import org.apache.iotdb.db.mpp.execution.Coordinator;
+import org.apache.iotdb.db.mpp.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.SchemaFetchStatement;
 import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
@@ -69,11 +71,15 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
 
     QueryId queryId =
         new QueryId(String.valueOf(SessionManager.getInstance().requestQueryId(false)));
-    coordinator.execute(schemaFetchStatement, queryId, QueryType.READ, null, "");
-    TsBlock tsBlock = coordinator.getResultSet(queryId);
+    ExecutionResult executionResult = coordinator.execute(schemaFetchStatement, queryId, null, "");
+    // TODO: (xingtanzjr) throw exception
+    if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new RuntimeException("cannot fetch schema");
+    }
+    TsBlock tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
+    // TODO: (xingtanzjr) need to release this query's resource here
     SchemaTree result = new SchemaTree();
     result.setStorageGroups(storageGroups);
-
     Binary binary;
     SchemaTree fetchedSchemaTree;
     Column column = tsBlock.getColumn(0);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index 17961e5420..909ab98080 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -215,17 +215,23 @@ public class DistributionPlanner {
       // and make the
       // new TimeJoinNode as the child of current TimeJoinNode
       // TODO: (xingtanzjr) optimize the procedure here to remove duplicated TimeJoinNode
+      final boolean[] addParent = {false};
       sourceGroup.forEach(
           (dataRegion, seriesScanNodes) -> {
             if (seriesScanNodes.size() == 1) {
               root.addChild(seriesScanNodes.get(0));
             } else {
-              // We clone a TimeJoinNode from root to make the params to be consistent.
-              // But we need to assign a new ID to it
-              TimeJoinNode parentOfGroup = (TimeJoinNode) root.clone();
-              root.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
-              seriesScanNodes.forEach(parentOfGroup::addChild);
-              root.addChild(parentOfGroup);
+              if (!addParent[0]) {
+                seriesScanNodes.forEach(root::addChild);
+                addParent[0] = true;
+              } else {
+                // We clone a TimeJoinNode from root to make the params to be consistent.
+                // But we need to assign a new ID to it
+                TimeJoinNode parentOfGroup = (TimeJoinNode) root.clone();
+                root.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+                seriesScanNodes.forEach(parentOfGroup::addChild);
+                root.addChild(parentOfGroup);
+              }
             }
           });
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index 77508888a2..166a3077a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.mpp.sql.planner;
 
 import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
@@ -306,7 +307,7 @@ public class LocalExecutionPlanner {
                 localInstanceId.toThrift(),
                 node.getPlanNodeId().getId(),
                 source.getIp(),
-                source.getPort(),
+                IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort(),
                 remoteInstanceId.toThrift());
         return new ExchangeOperator(operatorContext, sourceHandle, node.getUpstreamPlanNodeId());
       } catch (IOException e) {
@@ -325,7 +326,7 @@ public class LocalExecutionPlanner {
             DATA_BLOCK_MANAGER.createSinkHandle(
                 localInstanceId.toThrift(),
                 target.getIp(),
-                target.getPort(),
+                IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort(),
                 targetInstanceId.toThrift(),
                 node.getDownStreamPlanNodeId().getId());
         context.setSinkHandle(sinkHandle);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index a3065c8c03..2063206c45 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.sql.planner.plan;
 import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
index 99b5d4fe35..a97ccddc16 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.DevicesSchemaScanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaFetchNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaMergeNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.ShowDevicesNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AlterTimeSeriesNode;
@@ -79,7 +80,8 @@ public enum PlanNodeType {
   TIME_SERIES_SCHEMA_SCAN((short) 24),
   // TODO @xinzhongtianxia remove this
   SHOW_DEVICES((short) 25),
-  SCHEMA_FETCH((short) 26);
+  SCHEMA_FETCH((short) 26),
+  SCHEMA_MERGE((short) 27);
 
   private final short nodeType;
 
@@ -161,6 +163,8 @@ public enum PlanNodeType {
         return ShowDevicesNode.deserialize(buffer);
       case 26:
         return SchemaFetchNode.deserialize(buffer);
+      case 27:
+        return SchemaMergeNode.deserialize(buffer);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaFetchNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaFetchNode.java
index 87888c9e1f..5100ba3f21 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaFetchNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaFetchNode.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 
+import com.google.common.collect.ImmutableList;
+
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
@@ -44,7 +46,7 @@ public class SchemaFetchNode extends SchemaScanNode {
 
   @Override
   public List<PlanNode> getChildren() {
-    return null;
+    return ImmutableList.of();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaMergeNode.java
index 7ae31a523e..7e9357cb68 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaMergeNode.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read;
 
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ProcessNode;
 
@@ -64,10 +65,14 @@ public class SchemaMergeNode extends ProcessNode {
   }
 
   @Override
-  public void serialize(ByteBuffer byteBuffer) {}
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.SCHEMA_MERGE.serialize(byteBuffer);
+  }
 
-  @Override
-  protected void serializeAttributes(ByteBuffer byteBuffer) {}
+  public static SchemaMergeNode deserialize(ByteBuffer byteBuffer) {
+    PlanNodeId id = PlanNodeId.deserialize(byteBuffer);
+    return new SchemaMergeNode(id);
+  }
 
   @Override
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
index 36d9f35db6..c22136ad04 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
@@ -18,25 +18,31 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write;
 
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
+import com.google.common.collect.ImmutableList;
+
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
-public class CreateTimeSeriesNode extends PlanNode {
+public class CreateTimeSeriesNode extends WritePlanNode {
   private PartialPath path;
   private TSDataType dataType;
   private TSEncoding encoding;
@@ -47,6 +53,8 @@ public class CreateTimeSeriesNode extends PlanNode {
   private Map<String, String> attributes = null;
   private long tagOffset = -1;
 
+  private RegionReplicaSet regionReplicaSet;
+
   public CreateTimeSeriesNode(
       PlanNodeId id,
       PartialPath path,
@@ -145,7 +153,7 @@ public class CreateTimeSeriesNode extends PlanNode {
 
   @Override
   public List<PlanNode> getChildren() {
-    return null;
+    return new ArrayList<>();
   }
 
   @Override
@@ -303,4 +311,21 @@ public class CreateTimeSeriesNode extends PlanNode {
         && ((tags == null && that.tags == null) || tags.equals(that.tags))
         && ((attributes == null && that.attributes == null) || attributes.equals(that.attributes));
   }
+
+  @Override
+  public RegionReplicaSet getRegionReplicaSet() {
+    return regionReplicaSet;
+  }
+
+  @Override
+  public List<WritePlanNode> splitByPartition(Analysis analysis) {
+    RegionReplicaSet regionReplicaSet =
+        analysis.getSchemaPartitionInfo().getSchemaRegionReplicaSet(path.getDevice());
+    setRegionReplicaSet(regionReplicaSet);
+    return ImmutableList.of(this);
+  }
+
+  public void setRegionReplicaSet(RegionReplicaSet regionReplicaSet) {
+    this.regionReplicaSet = regionReplicaSet;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index 5947fc3d21..d3f82fd328 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
@@ -59,6 +60,11 @@ public class ExchangeNode extends PlanNode {
     return ImmutableList.of(child);
   }
 
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitExchange(this, context);
+  }
+
   @Override
   public void addChild(PlanNode child) {
     this.child = child;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
index 90ebc81ee3..fdda60a311 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
@@ -52,11 +52,11 @@ public class TimeJoinNode extends ProcessNode implements IOutputPlanNode {
   // The without policy is able to be push down to the TimeJoinOperator because we can know whether
   // a row contains
   // null or not.
-  private FilterNullPolicy filterNullPolicy;
+  private FilterNullPolicy filterNullPolicy = FilterNullPolicy.NO_FILTER;
 
   private List<PlanNode> children;
 
-  private final List<ColumnHeader> columnHeaders = new ArrayList<>();
+  private List<ColumnHeader> columnHeaders = new ArrayList<>();
 
   public TimeJoinNode(PlanNodeId id, OrderBy mergeOrder) {
     super(id);
@@ -77,7 +77,10 @@ public class TimeJoinNode extends ProcessNode implements IOutputPlanNode {
 
   @Override
   public PlanNode clone() {
-    return new TimeJoinNode(getPlanNodeId(), this.mergeOrder);
+    // TODO: (xingtanzjr)
+    TimeJoinNode cloneNode = new TimeJoinNode(getPlanNodeId(), this.mergeOrder);
+    cloneNode.columnHeaders = this.columnHeaders;
+    return cloneNode;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
index 0fe26258e5..a738c8a502 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import com.google.common.collect.ImmutableList;
@@ -67,6 +68,11 @@ public class FragmentSinkNode extends SinkNode {
     return sinkNode;
   }
 
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitFragmentSink(this, context);
+  }
+
   @Override
   public void addChild(PlanNode child) {
     this.child = child;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
index 2b3481fd74..e999046ff8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
@@ -169,12 +169,25 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
   }
 
   @Override
-  public void serialize(ByteBuffer byteBuffer) {
-    byteBuffer.putShort((short) PlanNodeType.INSERT_ROW.ordinal());
-    getPlanNodeId().serialize(byteBuffer);
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.INSERT_ROW.serialize(byteBuffer);
     subSerialize(byteBuffer);
   }
 
+  public static InsertRowNode deserialize(ByteBuffer byteBuffer) {
+    // TODO: (xingtanzjr) remove placeholder
+    InsertRowNode insertNode = new InsertRowNode(new PlanNodeId("1"));
+    insertNode.setTime(byteBuffer.getLong());
+    try {
+      insertNode.setDevicePath(new PartialPath(ReadWriteIOUtils.readString(byteBuffer)));
+    } catch (IllegalPathException e) {
+      throw new IllegalArgumentException("Cannot deserialize InsertRowNode", e);
+    }
+    insertNode.deserializeMeasurementsAndValues(byteBuffer);
+    insertNode.setPlanNodeId(PlanNodeId.deserialize(byteBuffer));
+    return insertNode;
+  }
+
   void subSerialize(ByteBuffer buffer) {
     buffer.putLong(time);
     ReadWriteIOUtils.write(devicePath.getFullPath(), buffer);
@@ -314,19 +327,6 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
     this.time = time;
   }
 
-  public static InsertRowNode deserialize(ByteBuffer byteBuffer) {
-    InsertRowNode insertNode = new InsertRowNode(PlanNodeId.deserialize(byteBuffer));
-    insertNode.setTime(byteBuffer.getLong());
-    try {
-      insertNode.setDevicePath(new PartialPath(ReadWriteIOUtils.readString(byteBuffer)));
-    } catch (IllegalPathException e) {
-      throw new IllegalArgumentException("Cannot deserialize InsertRowNode", e);
-    }
-    insertNode.deserializeMeasurementsAndValues(byteBuffer);
-
-    return insertNode;
-  }
-
   void deserializeMeasurementsAndValues(ByteBuffer buffer) {
     int measurementSize = buffer.getInt();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/InternalService.java b/server/src/main/java/org/apache/iotdb/db/service/InternalService.java
index 9c751e7766..4414fb9110 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/InternalService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/InternalService.java
@@ -19,38 +19,79 @@
 
 package org.apache.iotdb.db.service;
 
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.exception.runtime.RPCServiceException;
 import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.commons.service.ThriftService;
+import org.apache.iotdb.commons.service.ThriftServiceThread;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.mpp.rpc.thrift.InternalService.Processor;
 
-public class InternalService extends ThriftService {
+public class InternalService extends ThriftService implements InternalServiceMBean {
 
   private InternalServiceImpl impl;
 
+  private InternalService() {}
+
   @Override
   public ServiceType getID() {
-    return null;
+    return ServiceType.INTERNAL_SERVICE;
   }
 
   @Override
   public ThriftService getImplementation() {
-    return null;
+    return InternalServiceHolder.INSTANCE;
   }
 
   @Override
   public void initTProcessor()
-      throws ClassNotFoundException, IllegalAccessException, InstantiationException {}
+      throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+    impl = new InternalServiceImpl();
+    initSyncedServiceImpl(null);
+    processor = new Processor<>(impl);
+  }
 
   @Override
   public void initThriftServiceThread()
-      throws IllegalAccessException, InstantiationException, ClassNotFoundException {}
+      throws IllegalAccessException, InstantiationException, ClassNotFoundException {
+    try {
+      IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+      thriftServiceThread =
+          new ThriftServiceThread(
+              processor,
+              getID().getName(),
+              ThreadName.INTERNAL_SERVICE_CLIENT.getName(),
+              getBindIP(),
+              getBindPort(),
+              config.getRpcMaxConcurrentClientNum(),
+              config.getThriftServerAwaitTimeForStopService(),
+              new InternalServiceThriftHandler(),
+              // TODO: hard coded compress strategy
+              false);
+    } catch (RPCServiceException e) {
+      throw new IllegalAccessException(e.getMessage());
+    }
+    thriftServiceThread.setName(ThreadName.INTERNAL_SERVICE_CLIENT.getName());
+  }
 
   @Override
   public String getBindIP() {
-    return null;
+    return IoTDBDescriptor.getInstance().getConfig().getRpcAddress();
   }
 
   @Override
   public int getBindPort() {
-    return 0;
+    return IoTDBDescriptor.getInstance().getConfig().getMppPort();
+  }
+
+  private static class InternalServiceHolder {
+    private static final InternalService INSTANCE = new InternalService();
+
+    private InternalServiceHolder() {}
+  }
+
+  public static InternalService getInstance() {
+    return InternalService.InternalServiceHolder.INSTANCE;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
index 190655719f..34634d74cc 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
@@ -63,7 +63,7 @@ public class InternalServiceImpl implements InternalService.Iface {
             ConsensusImpl.getInstance()
                 .read(groupId, new ByteBufferConsensusRequest(req.fragmentInstance.body));
         FragmentInstanceInfo info = (FragmentInstanceInfo) readResp.getDataset();
-        return new TSendFragmentInstanceResp(info.getState().isFailed());
+        return new TSendFragmentInstanceResp(!info.getState().isFailed());
       case WRITE:
         TSendFragmentInstanceResp response = new TSendFragmentInstanceResp();
         ConsensusWriteResponse resp =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceMBean.java
similarity index 84%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
copy to server/src/main/java/org/apache/iotdb/db/service/InternalServiceMBean.java
index 3f9fa05ea8..6abb847e81 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceMBean.java
@@ -17,13 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.execution;
+package org.apache.iotdb.db.service;
 
-public interface IQueryExecution {
-
-  void start();
-
-  void stop();
-
-  ExecutionResult getStatus();
-}
+public interface InternalServiceMBean {}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/InternalService.java b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceThriftHandler.java
similarity index 58%
copy from server/src/main/java/org/apache/iotdb/db/service/InternalService.java
copy to server/src/main/java/org/apache/iotdb/db/service/InternalServiceThriftHandler.java
index 9c751e7766..bfaf11c583 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/InternalService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceThriftHandler.java
@@ -19,38 +19,26 @@
 
 package org.apache.iotdb.db.service;
 
-import org.apache.iotdb.commons.service.ServiceType;
-import org.apache.iotdb.commons.service.ThriftService;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.transport.TTransport;
 
-public class InternalService extends ThriftService {
-
-  private InternalServiceImpl impl;
+public class InternalServiceThriftHandler implements TServerEventHandler {
 
   @Override
-  public ServiceType getID() {
-    return null;
-  }
+  public void preServe() {}
 
   @Override
-  public ThriftService getImplementation() {
+  public ServerContext createContext(TProtocol tProtocol, TProtocol tProtocol1) {
     return null;
   }
 
   @Override
-  public void initTProcessor()
-      throws ClassNotFoundException, IllegalAccessException, InstantiationException {}
-
-  @Override
-  public void initThriftServiceThread()
-      throws IllegalAccessException, InstantiationException, ClassNotFoundException {}
+  public void deleteContext(
+      ServerContext serverContext, TProtocol tProtocol, TProtocol tProtocol1) {}
 
   @Override
-  public String getBindIP() {
-    return null;
-  }
-
-  @Override
-  public int getBindPort() {
-    return 0;
-  }
+  public void processContext(
+      ServerContext serverContext, TTransport tTransport, TTransport tTransport1) {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index f8c8839444..d67b6048ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.rest.IoTDBRestServiceCheck;
 import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.StorageEngineV2;
 import org.apache.iotdb.db.engine.cache.CacheHitRatioMonitor;
 import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
 import org.apache.iotdb.db.engine.cq.ContinuousQueryService;
@@ -38,6 +39,8 @@ import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.LocalConfigNode;
 import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
+import org.apache.iotdb.db.mpp.buffer.DataBlockService;
+import org.apache.iotdb.db.mpp.schedule.FragmentInstanceScheduler;
 import org.apache.iotdb.db.protocol.influxdb.meta.InfluxDBMetaManager;
 import org.apache.iotdb.db.protocol.rest.RestService;
 import org.apache.iotdb.db.query.udf.service.TemporaryQueryDataFileService;
@@ -48,6 +51,7 @@ import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.db.service.basic.ServiceProvider;
 import org.apache.iotdb.db.service.basic.StandaloneServiceProvider;
 import org.apache.iotdb.db.service.metrics.MetricsService;
+import org.apache.iotdb.db.service.thrift.impl.DataNodeTSIServiceImpl;
 import org.apache.iotdb.db.sync.receiver.ReceiverService;
 import org.apache.iotdb.db.sync.sender.service.SenderService;
 import org.apache.iotdb.db.wal.WALManager;
@@ -135,8 +139,21 @@ public class IoTDB implements IoTDBMBean {
     registerManager.register(CacheHitRatioMonitor.getInstance());
     registerManager.register(CompactionTaskManager.getInstance());
     JMXService.registerMBean(getInstance(), mbeanName);
+
+    // in mpp mode we need to start some other services
+    if (IoTDBDescriptor.getInstance().getConfig().isMppMode()) {
+      registerManager.register(StorageEngineV2.getInstance());
+      registerManager.register(DataBlockService.getInstance());
+      registerManager.register(InternalService.getInstance());
+      registerManager.register(FragmentInstanceScheduler.getInstance());
+      IoTDBDescriptor.getInstance()
+          .getConfig()
+          .setRpcImplClassName(DataNodeTSIServiceImpl.class.getName());
+    } else {
+      registerManager.register(StorageEngine.getInstance());
+    }
+
     registerManager.register(WALManager.getInstance());
-    registerManager.register(StorageEngine.getInstance());
     registerManager.register(TemporaryQueryDataFileService.getInstance());
     registerManager.register(UDFClassLoaderManager.getInstance());
     registerManager.register(UDFRegistrationService.getInstance());
@@ -158,7 +175,9 @@ public class IoTDB implements IoTDBMBean {
 
     logger.info("IoTDB is set up, now may some sgs are not ready, please wait several seconds...");
 
-    while (!StorageEngine.getInstance().isAllSgReady()) {
+    while (IoTDBDescriptor.getInstance().getConfig().isMppMode()
+        ? !StorageEngineV2.getInstance().isAllSgReady()
+        : !StorageEngine.getInstance().isAllSgReady()) {
       try {
         Thread.sleep(1000);
       } catch (InterruptedException e) {
@@ -170,7 +189,11 @@ public class IoTDB implements IoTDBMBean {
 
     registerManager.register(SenderService.getInstance());
     registerManager.register(UpgradeSevice.getINSTANCE());
-    registerManager.register(SettleService.getINSTANCE());
+    // in mpp mode we temporarily don't start settle service because it uses StorageEngine directly
+    // in itself, but currently we need to use StorageEngineV2 instead of StorageEngine in mpp mode.
+    if (!IoTDBDescriptor.getInstance().getConfig().isMppMode()) {
+      registerManager.register(SettleService.getINSTANCE());
+    }
     registerManager.register(TriggerRegistrationService.getInstance());
     registerManager.register(ContinuousQueryService.getInstance());
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
index dc0ee3987c..0080857171 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
@@ -27,14 +27,16 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.service.thrift.ProcessorWithMetrics;
 import org.apache.iotdb.db.service.thrift.handler.RPCServiceThriftHandler;
-import org.apache.iotdb.db.service.thrift.impl.TSServiceImpl;
+import org.apache.iotdb.db.service.thrift.impl.TSIEventHandler;
 import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
 import org.apache.iotdb.service.rpc.thrift.TSIService.Processor;
 
+import java.lang.reflect.InvocationTargetException;
+
 /** A service to handle jdbc request from client. */
 public class RPCService extends ThriftService implements RPCServiceMBean {
 
-  private TSServiceImpl impl;
+  private TSIEventHandler impl;
 
   public static RPCService getInstance() {
     return RPCServiceHolder.INSTANCE;
@@ -47,10 +49,12 @@ public class RPCService extends ThriftService implements RPCServiceMBean {
 
   @Override
   public void initTProcessor()
-      throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+      throws ClassNotFoundException, IllegalAccessException, InstantiationException,
+          NoSuchMethodException, InvocationTargetException {
     impl =
-        (TSServiceImpl)
+        (TSIEventHandler)
             Class.forName(IoTDBDescriptor.getInstance().getConfig().getRpcImplClassName())
+                .getDeclaredConstructor()
                 .newInstance();
     initSyncedServiceImpl(null);
     if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java
index 7bdf65fd7b..1501402b08 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java
@@ -16,7 +16,7 @@
  */
 package org.apache.iotdb.db.service.thrift.handler;
 
-import org.apache.iotdb.db.service.thrift.impl.TSServiceImpl;
+import org.apache.iotdb.db.service.thrift.impl.TSIEventHandler;
 
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.server.ServerContext;
@@ -24,10 +24,10 @@ import org.apache.thrift.server.TServerEventHandler;
 import org.apache.thrift.transport.TTransport;
 
 public class RPCServiceThriftHandler implements TServerEventHandler {
-  private TSServiceImpl serviceImpl;
+  private TSIEventHandler eventHandler;
 
-  public RPCServiceThriftHandler(TSServiceImpl serviceImpl) {
-    this.serviceImpl = serviceImpl;
+  public RPCServiceThriftHandler(TSIEventHandler eventHandler) {
+    this.eventHandler = eventHandler;
   }
 
   @Override
@@ -39,7 +39,7 @@ public class RPCServiceThriftHandler implements TServerEventHandler {
   @Override
   public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol arg2) {
     // release query resources.
-    serviceImpl.handleClientExit();
+    eventHandler.handleClientExit();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
new file mode 100644
index 0000000000..f3cfdf5bf2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -0,0 +1,733 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.service.thrift.impl;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.OperationType;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.db.mpp.execution.Coordinator;
+import org.apache.iotdb.db.mpp.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.execution.IQueryExecution;
+import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
+import org.apache.iotdb.db.mpp.sql.statement.Statement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowsOfOneDeviceStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowsStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
+import org.apache.iotdb.db.service.metrics.MetricsService;
+import org.apache.iotdb.db.service.metrics.Operation;
+import org.apache.iotdb.db.utils.QueryDataSetUtils;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.*;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.iotdb.db.service.basic.ServiceProvider.AUDIT_LOGGER;
+import static org.apache.iotdb.db.service.basic.ServiceProvider.CONFIG;
+import static org.apache.iotdb.db.service.basic.ServiceProvider.CURRENT_RPC_VERSION;
+import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_FREQUENCY_RECORDER;
+import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_TIME_MANAGER;
+import static org.apache.iotdb.db.service.basic.ServiceProvider.SLOW_SQL_LOGGER;
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
+
+public class DataNodeTSIServiceImpl implements TSIEventHandler {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeTSIServiceImpl.class);
+
+  private static final Coordinator COORDINATOR = Coordinator.getInstance();
+
+  private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
+
+  @Override
+  public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
+    IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req);
+    BasicOpenSessionResp openSessionResp =
+        SESSION_MANAGER.openSession(
+            req.username, req.password, req.zoneId, req.client_protocol, clientVersion);
+    TSStatus tsStatus = RpcUtils.getStatus(openSessionResp.getCode(), openSessionResp.getMessage());
+    TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION);
+    return resp.setSessionId(openSessionResp.getSessionId());
+  }
+
+  private IoTDBConstant.ClientVersion parseClientVersion(TSOpenSessionReq req) {
+    Map<String, String> configuration = req.configuration;
+    if (configuration != null && configuration.containsKey("version")) {
+      return IoTDBConstant.ClientVersion.valueOf(configuration.get("version"));
+    }
+    return IoTDBConstant.ClientVersion.V_0_12;
+  }
+
+  @Override
+  public TSStatus closeSession(TSCloseSessionReq req) {
+    return new TSStatus(
+        !SESSION_MANAGER.closeSession(req.sessionId)
+            ? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR)
+            : RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+  }
+
+  @Override
+  public TSStatus cancelOperation(TSCancelOperationReq req) {
+    // TODO implement
+    return RpcUtils.getStatus(TSStatusCode.QUERY_NOT_ALLOWED, "Cancellation is not implemented");
+  }
+
+  @Override
+  public TSStatus closeOperation(TSCloseOperationReq req) {
+    return SESSION_MANAGER.closeOperation(
+        req.sessionId, req.queryId, req.statementId, req.isSetStatementId(), req.isSetQueryId());
+  }
+
+  @Override
+  public TSGetTimeZoneResp getTimeZone(long sessionId) {
+    try {
+      ZoneId zoneId = SESSION_MANAGER.getZoneId(sessionId);
+      return new TSGetTimeZoneResp(
+          RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS),
+          zoneId != null ? zoneId.toString() : "Unknown time zone");
+    } catch (Exception e) {
+      return new TSGetTimeZoneResp(
+          onNPEOrUnexpectedException(
+              e, OperationType.GET_TIME_ZONE, TSStatusCode.GENERATE_TIME_ZONE_ERROR),
+          "Unknown time zone");
+    }
+  }
+
+  @Override
+  public TSStatus setTimeZone(TSSetTimeZoneReq req) {
+    try {
+      SESSION_MANAGER.setTimezone(req.sessionId, req.timeZone);
+      return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.SET_TIME_ZONE, TSStatusCode.SET_TIME_ZONE_ERROR);
+    }
+  }
+
+  @Override
+  public ServerProperties getProperties() {
+    ServerProperties properties = new ServerProperties();
+    properties.setVersion(IoTDBConstant.VERSION);
+    LOGGER.info("IoTDB server version: {}", IoTDBConstant.VERSION);
+    properties.setSupportedTimeAggregationOperations(new ArrayList<>());
+    properties.getSupportedTimeAggregationOperations().add(IoTDBConstant.MAX_TIME);
+    properties.getSupportedTimeAggregationOperations().add(IoTDBConstant.MIN_TIME);
+    properties.setTimestampPrecision(
+        IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision());
+    properties.setMaxConcurrentClientNum(
+        IoTDBDescriptor.getInstance().getConfig().getRpcMaxConcurrentClientNum());
+    properties.setWatermarkSecretKey(
+        IoTDBDescriptor.getInstance().getConfig().getWatermarkSecretKey());
+    properties.setWatermarkBitString(
+        IoTDBDescriptor.getInstance().getConfig().getWatermarkBitString());
+    properties.setWatermarkParamMarkRate(
+        IoTDBDescriptor.getInstance().getConfig().getWatermarkParamMarkRate());
+    properties.setWatermarkParamMaxRightBit(
+        IoTDBDescriptor.getInstance().getConfig().getWatermarkParamMaxRightBit());
+    properties.setIsReadOnly(IoTDBDescriptor.getInstance().getConfig().isReadOnly());
+    properties.setThriftMaxFrameSize(
+        IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize());
+    return properties;
+  }
+
+  @Override
+  public TSStatus setStorageGroup(long sessionId, String storageGroup) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus createTimeseries(TSCreateTimeseriesReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus createAlignedTimeseries(TSCreateAlignedTimeseriesReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus createMultiTimeseries(TSCreateMultiTimeseriesReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus deleteTimeseries(long sessionId, List<String> path) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus deleteStorageGroups(long sessionId, List<String> storageGroup) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSFetchMetadataResp fetchMetadata(TSFetchMetadataReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSExecuteStatementResp executeStatement(TSExecuteStatementReq req) {
+    String statement = req.getStatement();
+    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+    }
+
+    long startTime = System.currentTimeMillis();
+    Statement s =
+        StatementGenerator.createStatement(
+            statement, SESSION_MANAGER.getZoneId(req.getSessionId()));
+
+    QUERY_FREQUENCY_RECORDER.incrementAndGet();
+    AUDIT_LOGGER.debug("Session {} execute Query: {}", req.sessionId, statement);
+
+    try {
+      long queryId = SESSION_MANAGER.requestQueryId(req.statementId, true);
+      QueryId id = new QueryId(String.valueOf(queryId));
+      // create and cache dataset
+      ExecutionResult result =
+          COORDINATOR.execute(s, id, SESSION_MANAGER.getSessionInfo(req.sessionId), statement);
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        throw new RuntimeException("");
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(id);
+
+      TSExecuteStatementResp resp;
+      if (queryExecution.isQuery()) {
+        resp = createResponse(queryExecution.getDatasetHeader(), queryId);
+        resp.setStatus(result.status);
+        resp.setQueryDataSet(
+            QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, req.fetchSize));
+      } else {
+        resp = RpcUtils.getTSExecuteStatementResp(result.status);
+      }
+
+      return resp;
+    } catch (Exception e) {
+      // TODO call the coordinator to release query resource
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(e, "\"" + statement + "\". " + OperationType.EXECUTE_STATEMENT));
+    } finally {
+      addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+      long costTime = System.currentTimeMillis() - startTime;
+      if (costTime >= CONFIG.getSlowQueryThreshold()) {
+        SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
+      }
+    }
+  }
+
+  @Override
+  public TSStatus executeBatchStatement(TSExecuteBatchStatementReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq req) throws TException {
+    return executeStatement(req);
+  }
+
+  @Override
+  public TSExecuteStatementResp executeUpdateStatement(TSExecuteStatementReq req)
+      throws TException {
+    return executeStatement(req);
+  }
+
+  @Override
+  public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
+    try {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+        return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
+      }
+
+      TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
+      TSQueryDataSet result =
+          QueryDataSetUtils.convertTsBlockByFetchSize(
+              COORDINATOR.getQueryExecution(new QueryId(String.valueOf(req.queryId))),
+              req.fetchSize);
+      boolean hasResultSet = result.bufferForTime().limit() != 0;
+
+      resp.setHasResultSet(hasResultSet);
+      resp.setQueryDataSet(result);
+      resp.setIsAlign(true);
+
+      QUERY_TIME_MANAGER.unRegisterQuery(req.queryId, false);
+      return resp;
+
+    } catch (Exception e) {
+      return RpcUtils.getTSFetchResultsResp(onQueryException(e, OperationType.FETCH_RESULTS));
+    }
+  }
+
+  @Override
+  public TSStatus insertRecords(TSInsertRecordsReq req) {
+    long t1 = System.currentTimeMillis();
+    try {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+        return getNotLoggedInStatus();
+      }
+
+      if (AUDIT_LOGGER.isDebugEnabled()) {
+        AUDIT_LOGGER.debug(
+            "Session {} insertRecords, first device {}, first time {}",
+            SESSION_MANAGER.getCurrSessionId(),
+            req.prefixPaths.get(0),
+            req.getTimestamps().get(0));
+      }
+
+      // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
+      InsertRowsStatement statement = (InsertRowsStatement) StatementGenerator.createStatement(req);
+
+      // Step 2: call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              new QueryId(String.valueOf(queryId)),
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "");
+
+      // TODO(INSERT) do this check in analyze
+      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+      // req.getSessionId());
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    } finally {
+      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+    }
+  }
+
+  @Override
+  public TSStatus insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) {
+    long t1 = System.currentTimeMillis();
+    try {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+        return getNotLoggedInStatus();
+      }
+
+      if (AUDIT_LOGGER.isDebugEnabled()) {
+        AUDIT_LOGGER.debug(
+            "Session {} insertRecords, device {}, first time {}",
+            SESSION_MANAGER.getCurrSessionId(),
+            req.prefixPath,
+            req.getTimestamps().get(0));
+      }
+
+      // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
+      InsertRowsOfOneDeviceStatement statement =
+          (InsertRowsOfOneDeviceStatement) StatementGenerator.createStatement(req);
+
+      // Step 2: call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              new QueryId(String.valueOf(queryId)),
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "");
+
+      // TODO(INSERT) do this check in analyze
+      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+      // req.getSessionId());
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    } finally {
+      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+    }
+  }
+
+  @Override
+  public TSStatus insertStringRecordsOfOneDevice(TSInsertStringRecordsOfOneDeviceReq req) {
+    long t1 = System.currentTimeMillis();
+    try {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+        return getNotLoggedInStatus();
+      }
+
+      if (AUDIT_LOGGER.isDebugEnabled()) {
+        AUDIT_LOGGER.debug(
+            "Session {} insertRecords, device {}, first time {}",
+            SESSION_MANAGER.getCurrSessionId(),
+            req.prefixPath,
+            req.getTimestamps().get(0));
+      }
+
+      // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
+      InsertRowsOfOneDeviceStatement statement =
+          (InsertRowsOfOneDeviceStatement) StatementGenerator.createStatement(req);
+
+      // Step 2: call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              new QueryId(String.valueOf(queryId)),
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "");
+
+      // TODO(INSERT) do this check in analyze
+      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+      // req.getSessionId());
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    } finally {
+      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+    }
+  }
+
+  @Override
+  public TSStatus insertRecord(TSInsertRecordReq req) {
+    long t1 = System.currentTimeMillis();
+    try {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+        return getNotLoggedInStatus();
+      }
+
+      AUDIT_LOGGER.debug(
+          "Session {} insertRecord, device {}, time {}",
+          SESSION_MANAGER.getCurrSessionId(),
+          req.getPrefixPath(),
+          req.getTimestamp());
+
+      InsertRowStatement statement = (InsertRowStatement) StatementGenerator.createStatement(req);
+
+      // Step 2: call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              new QueryId(String.valueOf(queryId)),
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "");
+
+      // TODO(INSERT) do this check in analyze
+      //      TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan,
+      // req.getSessionId());
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    } finally {
+      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+    }
+  }
+
+  @Override
+  public TSStatus insertTablets(TSInsertTabletsReq req) {
+    long t1 = System.currentTimeMillis();
+    try {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+        return getNotLoggedInStatus();
+      }
+
+      // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
+      InsertMultiTabletsStatement statement =
+          (InsertMultiTabletsStatement) StatementGenerator.createStatement(req);
+      // Step 2: call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              new QueryId(String.valueOf(queryId)),
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "");
+
+      // TODO(INSERT) do this check in analyze
+      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+      // req.getSessionId());
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    } finally {
+      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+    }
+  }
+
+  @Override
+  public TSStatus insertTablet(TSInsertTabletReq req) {
+    long t1 = System.currentTimeMillis();
+    try {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+        return getNotLoggedInStatus();
+      }
+
+      // Step 1: TODO(INSERT) transfer from TSInsertTabletReq to Statement
+      InsertTabletStatement statement =
+          (InsertTabletStatement) StatementGenerator.createStatement(req);
+
+      // Step 2: call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              new QueryId(String.valueOf(queryId)),
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "");
+
+      // TODO(INSERT) do this check in analyze
+      //      TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan,
+      // req.getSessionId());
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    } finally {
+      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+    }
+  }
+
+  @Override
+  public TSStatus insertStringRecords(TSInsertStringRecordsReq req) {
+    long t1 = System.currentTimeMillis();
+    try {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+        return getNotLoggedInStatus();
+      }
+
+      if (AUDIT_LOGGER.isDebugEnabled()) {
+        AUDIT_LOGGER.debug(
+            "Session {} insertRecords, first device {}, first time {}",
+            SESSION_MANAGER.getCurrSessionId(),
+            req.prefixPaths.get(0),
+            req.getTimestamps().get(0));
+      }
+
+      InsertRowsStatement statement = (InsertRowsStatement) StatementGenerator.createStatement(req);
+
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              new QueryId(String.valueOf(queryId)),
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "");
+
+      // TODO(INSERT) do this check in analyze
+      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+      // req.getSessionId());
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    } finally {
+      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+    }
+  }
+
+  @Override
+  public TSStatus testInsertTablet(TSInsertTabletReq req) {
+    LOGGER.debug("Test insert batch request receive.");
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  @Override
+  public TSStatus testInsertTablets(TSInsertTabletsReq req) {
+    LOGGER.debug("Test insert batch request receive.");
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  @Override
+  public TSStatus testInsertRecord(TSInsertRecordReq req) {
+    LOGGER.debug("Test insert row request receive.");
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  @Override
+  public TSStatus testInsertStringRecord(TSInsertStringRecordReq req) {
+    LOGGER.debug("Test insert string record request receive.");
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  @Override
+  public TSStatus testInsertRecords(TSInsertRecordsReq req) {
+    LOGGER.debug("Test insert row in batch request receive.");
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  @Override
+  public TSStatus testInsertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) {
+    LOGGER.debug("Test insert rows in batch request receive.");
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  @Override
+  public TSStatus testInsertStringRecords(TSInsertStringRecordsReq req) {
+    LOGGER.debug("Test insert string records request receive.");
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  @Override
+  public TSStatus deleteData(TSDeleteDataReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSExecuteStatementResp executeRawDataQuery(TSRawDataQueryReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSExecuteStatementResp executeLastDataQuery(TSLastDataQueryReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long requestStatementId(long sessionId) {
+    return SESSION_MANAGER.requestStatementId(sessionId);
+  }
+
+  @Override
+  public TSStatus createSchemaTemplate(TSCreateSchemaTemplateReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus appendSchemaTemplate(TSAppendSchemaTemplateReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus pruneSchemaTemplate(TSPruneSchemaTemplateReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSQueryTemplateResp querySchemaTemplate(TSQueryTemplateReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus setSchemaTemplate(TSSetSchemaTemplateReq req) throws TException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus unsetSchemaTemplate(TSUnsetSchemaTemplateReq req) throws TException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus dropSchemaTemplate(TSDropSchemaTemplateReq req) throws TException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus insertStringRecord(TSInsertStringRecordReq req) {
+    long t1 = System.currentTimeMillis();
+    try {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+        return getNotLoggedInStatus();
+      }
+
+      AUDIT_LOGGER.debug(
+          "Session {} insertRecord, device {}, time {}",
+          SESSION_MANAGER.getCurrSessionId(),
+          req.getPrefixPath(),
+          req.getTimestamp());
+
+      InsertRowStatement statement = (InsertRowStatement) StatementGenerator.createStatement(req);
+
+      // Step 2: call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              new QueryId(String.valueOf(queryId)),
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "");
+
+      // TODO(INSERT) do this check in analyze
+      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+      // req.getSessionId());
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    } finally {
+      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+    }
+  }
+
+  private TSExecuteStatementResp createResponse(DatasetHeader header, long queryId) {
+    TSExecuteStatementResp resp = RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+    resp.setColumnNameIndexMap(header.getColumnNameIndexMap());
+    // TODO deal with the sg name here
+    resp.setSgColumns(new ArrayList<>());
+    resp.setColumns(header.getRespColumns());
+    resp.setDataTypeList(header.getRespDataTypeList());
+    resp.setAliasColumns(header.getRespAliasColumns());
+    resp.setIgnoreTimeStamp(header.isIgnoreTimestamp());
+    resp.setQueryId(queryId);
+    return resp;
+  }
+
+  private TSStatus getNotLoggedInStatus() {
+    return RpcUtils.getStatus(
+        TSStatusCode.NOT_LOGIN_ERROR,
+        "Log in failed. Either you are not authorized or the session has timed out.");
+  }
+
+  /** Add stat of operation into metrics */
+  private void addOperationLatency(Operation operation, long startTime) {
+    if (CONFIG.isEnablePerformanceStat()) {
+      MetricsService.getInstance()
+          .getMetricManager()
+          .histogram(
+              System.currentTimeMillis() - startTime,
+              "operation_histogram",
+              MetricLevel.IMPORTANT,
+              "name",
+              operation.getName());
+      MetricsService.getInstance()
+          .getMetricManager()
+          .count(1, "operation_count", MetricLevel.IMPORTANT, "name", operation.getName());
+    }
+  }
+
+  @Override
+  public void handleClientExit() {
+    Long sessionId = SESSION_MANAGER.getCurrSessionId();
+    if (sessionId != null) {
+      TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
+      closeSession(req);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSIEventHandler.java
similarity index 80%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
copy to server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSIEventHandler.java
index 3f9fa05ea8..0a4887ffac 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSIEventHandler.java
@@ -16,14 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.iotdb.db.service.thrift.impl;
 
-package org.apache.iotdb.db.mpp.execution;
+import org.apache.iotdb.service.rpc.thrift.TSIService;
 
-public interface IQueryExecution {
-
-  void start();
-
-  void stop();
-
-  ExecutionResult getStatus();
+public interface TSIEventHandler extends TSIService.Iface {
+  void handleClientExit();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 6291b14875..5fab670693 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -37,16 +37,17 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.metadata.template.TemplateQueryType;
-import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.execution.Coordinator;
-import org.apache.iotdb.db.mpp.execution.ExecutionResult;
-import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
-import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
-import org.apache.iotdb.db.mpp.sql.statement.crud.*;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.*;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletsPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.SelectIntoPlan;
+import org.apache.iotdb.db.qp.physical.crud.UDFPlan;
 import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
@@ -79,47 +80,7 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.rpc.RedirectException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.ServerProperties;
-import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
-import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
-import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
-import org.apache.iotdb.service.rpc.thrift.TSIService;
-import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
-import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
-import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
-import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
-import org.apache.iotdb.service.rpc.thrift.TSPruneSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
-import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
-import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateResp;
-import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
-import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
-import org.apache.iotdb.service.rpc.thrift.TSTracingInfo;
-import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
+import org.apache.iotdb.service.rpc.thrift.*;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -159,13 +120,11 @@ import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNonQueryException;
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
 
 /** Thrift RPC implementation at server side. */
-public class TSServiceImpl implements TSIService.Iface {
+public class TSServiceImpl implements TSIEventHandler {
 
-  private static final Coordinator coordinator = Coordinator.getInstance();
+  private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
 
-  public static SessionManager SESSION_MANAGER = SessionManager.getInstance();
-
-  protected class QueryTask implements Callable<TSExecuteStatementResp> {
+  private class QueryTask implements Callable<TSExecuteStatementResp> {
 
     private PhysicalPlan plan;
     private final long queryStartTime;
@@ -241,7 +200,7 @@ public class TSServiceImpl implements TSIService.Iface {
     }
   }
 
-  protected class FetchResultsTask implements Callable<TSFetchResultsResp> {
+  private class FetchResultsTask implements Callable<TSFetchResultsResp> {
 
     private final long sessionId;
     private final long queryId;
@@ -1117,6 +1076,7 @@ public class TSServiceImpl implements TSIService.Iface {
             .setQueryId(SESSION_MANAGER.requestQueryId(false));
   }
 
+  @Override
   public void handleClientExit() {
     Long sessionId = SESSION_MANAGER.getCurrSessionId();
     if (sessionId != null) {
@@ -1228,46 +1188,6 @@ public class TSServiceImpl implements TSIService.Iface {
         allCheckSuccess, tsStatus, insertRowsPlan.getResults(), req.prefixPaths.size());
   }
 
-  public TSStatus insertRecordsV2(TSInsertRecordsReq req) {
-    long t1 = System.currentTimeMillis();
-    try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
-        return getNotLoggedInStatus();
-      }
-
-      if (AUDIT_LOGGER.isDebugEnabled()) {
-        AUDIT_LOGGER.debug(
-            "Session {} insertRecords, first device {}, first time {}",
-            SESSION_MANAGER.getCurrSessionId(),
-            req.prefixPaths.get(0),
-            req.getTimestamps().get(0));
-      }
-
-      // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
-      InsertRowsStatement statement = (InsertRowsStatement) StatementGenerator.createStatement(req);
-
-      // Step 2: call the coordinator
-      long queryId = SESSION_MANAGER.requestQueryId(false);
-      ExecutionResult result =
-          coordinator.execute(
-              statement,
-              new QueryId(String.valueOf(queryId)),
-              QueryType.WRITE,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
-
-      // TODO(INSERT) do this check in analyze
-      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
-      // req.getSessionId());
-      return result.status;
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    } finally {
-      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
-    }
-  }
-
   private TSStatus judgeFinalTsStatus(
       boolean allCheckSuccess,
       TSStatus executeTsStatus,
@@ -1335,47 +1255,6 @@ public class TSServiceImpl implements TSIService.Iface {
     return resp;
   }
 
-  public TSStatus insertRecordsOfOneDeviceV2(TSInsertRecordsOfOneDeviceReq req) {
-    long t1 = System.currentTimeMillis();
-    try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
-        return getNotLoggedInStatus();
-      }
-
-      if (AUDIT_LOGGER.isDebugEnabled()) {
-        AUDIT_LOGGER.debug(
-            "Session {} insertRecords, device {}, first time {}",
-            SESSION_MANAGER.getCurrSessionId(),
-            req.prefixPath,
-            req.getTimestamps().get(0));
-      }
-
-      // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
-      InsertRowsOfOneDeviceStatement statement =
-          (InsertRowsOfOneDeviceStatement) StatementGenerator.createStatement(req);
-
-      // Step 2: call the coordinator
-      long queryId = SESSION_MANAGER.requestQueryId(false);
-      ExecutionResult result =
-          coordinator.execute(
-              statement,
-              new QueryId(String.valueOf(queryId)),
-              QueryType.WRITE,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
-
-      // TODO(INSERT) do this check in analyze
-      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
-      // req.getSessionId());
-      return result.status;
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    } finally {
-      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
-    }
-  }
-
   @Override
   public TSStatus insertStringRecordsOfOneDevice(TSInsertStringRecordsOfOneDeviceReq req) {
     if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
@@ -1434,47 +1313,6 @@ public class TSServiceImpl implements TSIService.Iface {
         allCheckSuccess, tsStatus, insertRowsPlan.getResults(), req.timestamps.size());
   }
 
-  public TSStatus insertStringRecordsOfOneDeviceV2(TSInsertStringRecordsOfOneDeviceReq req) {
-    long t1 = System.currentTimeMillis();
-    try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
-        return getNotLoggedInStatus();
-      }
-
-      if (AUDIT_LOGGER.isDebugEnabled()) {
-        AUDIT_LOGGER.debug(
-            "Session {} insertRecords, device {}, first time {}",
-            SESSION_MANAGER.getCurrSessionId(),
-            req.prefixPath,
-            req.getTimestamps().get(0));
-      }
-
-      // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
-      InsertRowsOfOneDeviceStatement statement =
-          (InsertRowsOfOneDeviceStatement) StatementGenerator.createStatement(req);
-
-      // Step 2: call the coordinator
-      long queryId = SESSION_MANAGER.requestQueryId(false);
-      ExecutionResult result =
-          coordinator.execute(
-              statement,
-              new QueryId(String.valueOf(queryId)),
-              QueryType.WRITE,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
-
-      // TODO(INSERT) do this check in analyze
-      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
-      // req.getSessionId());
-      return result.status;
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    } finally {
-      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
-    }
-  }
-
   @Override
   public TSStatus insertStringRecords(TSInsertStringRecordsReq req) {
     if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
@@ -1528,44 +1366,6 @@ public class TSServiceImpl implements TSIService.Iface {
         allCheckSuccess, tsStatus, insertRowsPlan.getResults(), req.prefixPaths.size());
   }
 
-  public TSStatus insertStringRecordsV2(TSInsertStringRecordsReq req) {
-    long t1 = System.currentTimeMillis();
-    try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
-        return getNotLoggedInStatus();
-      }
-
-      if (AUDIT_LOGGER.isDebugEnabled()) {
-        AUDIT_LOGGER.debug(
-            "Session {} insertRecords, first device {}, first time {}",
-            SESSION_MANAGER.getCurrSessionId(),
-            req.prefixPaths.get(0),
-            req.getTimestamps().get(0));
-      }
-
-      InsertRowsStatement statement = (InsertRowsStatement) StatementGenerator.createStatement(req);
-
-      long queryId = SESSION_MANAGER.requestQueryId(false);
-      ExecutionResult result =
-          coordinator.execute(
-              statement,
-              new QueryId(String.valueOf(queryId)),
-              QueryType.WRITE,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
-
-      // TODO(INSERT) do this check in analyze
-      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
-      // req.getSessionId());
-      return result.status;
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    } finally {
-      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
-    }
-  }
-
   private void addMeasurementAndValue(
       InsertRowPlan insertRowPlan, List<String> measurements, List<String> values) {
     List<String> newMeasurements = new ArrayList<>(measurements.size());
@@ -1656,43 +1456,6 @@ public class TSServiceImpl implements TSIService.Iface {
     }
   }
 
-  public TSStatus insertRecordV2(TSInsertRecordReq req) {
-    long t1 = System.currentTimeMillis();
-    try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
-        return getNotLoggedInStatus();
-      }
-
-      AUDIT_LOGGER.debug(
-          "Session {} insertRecord, device {}, time {}",
-          SESSION_MANAGER.getCurrSessionId(),
-          req.getPrefixPath(),
-          req.getTimestamp());
-
-      InsertRowStatement statement = (InsertRowStatement) StatementGenerator.createStatement(req);
-
-      // Step 2: call the coordinator
-      long queryId = SESSION_MANAGER.requestQueryId(false);
-      ExecutionResult result =
-          coordinator.execute(
-              statement,
-              new QueryId(String.valueOf(queryId)),
-              QueryType.WRITE,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
-
-      // TODO(INSERT) do this check in analyze
-      //      TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan,
-      // req.getSessionId());
-      return result.status;
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    } finally {
-      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
-    }
-  }
-
   @Override
   public TSStatus insertStringRecord(TSInsertStringRecordReq req) {
     try {
@@ -1724,43 +1487,6 @@ public class TSServiceImpl implements TSIService.Iface {
     }
   }
 
-  public TSStatus insertStringRecordV2(TSInsertStringRecordReq req) {
-    long t1 = System.currentTimeMillis();
-    try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
-        return getNotLoggedInStatus();
-      }
-
-      AUDIT_LOGGER.debug(
-          "Session {} insertRecord, device {}, time {}",
-          SESSION_MANAGER.getCurrSessionId(),
-          req.getPrefixPath(),
-          req.getTimestamp());
-
-      InsertRowStatement statement = (InsertRowStatement) StatementGenerator.createStatement(req);
-
-      // Step 2: call the coordinator
-      long queryId = SESSION_MANAGER.requestQueryId(false);
-      ExecutionResult result =
-          coordinator.execute(
-              statement,
-              new QueryId(String.valueOf(queryId)),
-              QueryType.WRITE,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
-
-      // TODO(INSERT) do this check in analyze
-      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
-      // req.getSessionId());
-      return result.status;
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    } finally {
-      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
-    }
-  }
-
   @Override
   public TSStatus deleteData(TSDeleteDataReq req) {
     try {
@@ -1819,39 +1545,6 @@ public class TSServiceImpl implements TSIService.Iface {
     }
   }
 
-  public TSStatus insertTabletV2(TSInsertTabletReq req) {
-    long t1 = System.currentTimeMillis();
-    try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
-        return getNotLoggedInStatus();
-      }
-
-      // Step 1: TODO(INSERT) transfer from TSInsertTabletReq to Statement
-      InsertTabletStatement statement =
-          (InsertTabletStatement) StatementGenerator.createStatement(req);
-
-      // Step 2: call the coordinator
-      long queryId = SESSION_MANAGER.requestQueryId(false);
-      ExecutionResult result =
-          coordinator.execute(
-              statement,
-              new QueryId(String.valueOf(queryId)),
-              QueryType.WRITE,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
-
-      // TODO(INSERT) do this check in analyze
-      //      TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan,
-      // req.getSessionId());
-      return result.status;
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    } finally {
-      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
-    }
-  }
-
   @Override
   public TSStatus insertTablets(TSInsertTabletsReq req) {
     long t1 = System.currentTimeMillis();
@@ -1874,38 +1567,6 @@ public class TSServiceImpl implements TSIService.Iface {
     }
   }
 
-  public TSStatus insertTabletsV2(TSInsertTabletsReq req) {
-    long t1 = System.currentTimeMillis();
-    try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
-        return getNotLoggedInStatus();
-      }
-
-      // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
-      InsertMultiTabletsStatement statement =
-          (InsertMultiTabletsStatement) StatementGenerator.createStatement(req);
-      // Step 2: call the coordinator
-      long queryId = SESSION_MANAGER.requestQueryId(false);
-      ExecutionResult result =
-          coordinator.execute(
-              statement,
-              new QueryId(String.valueOf(queryId)),
-              QueryType.WRITE,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
-
-      // TODO(INSERT) do this check in analyze
-      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
-      // req.getSessionId());
-      return result.status;
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    } finally {
-      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
-    }
-  }
-
   private InsertTabletPlan constructInsertTabletPlan(TSInsertTabletsReq req, int i)
       throws IllegalPathException {
     InsertTabletPlan insertTabletPlan =
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 29efa1a4c7..506f180049 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.utils;
 
-import org.apache.iotdb.db.mpp.execution.QueryExecution;
+import org.apache.iotdb.db.mpp.execution.IQueryExecution;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
@@ -177,7 +177,7 @@ public class QueryDataSetUtils {
   }
 
   public static TSQueryDataSet convertTsBlockByFetchSize(
-      QueryExecution queryExecution, int fetchSize) throws IOException {
+      IQueryExecution queryExecution, int fetchSize) throws IOException {
     int columnNum = queryExecution.getOutputValueColumnCount();
     TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
     // one time column and each value column has an actual value buffer and a bitmap value to
@@ -194,6 +194,9 @@ public class QueryDataSetUtils {
     int[] valueOccupation = new int[columnNum];
     while (rowCount < fetchSize && queryExecution.hasNextResult()) {
       TsBlock tsBlock = queryExecution.getBatchResult();
+      if (tsBlock == null) {
+        break;
+      }
       int currentCount = tsBlock.getPositionCount();
       // serialize time column
       for (int i = 0; i < currentCount; i++) {
@@ -220,7 +223,7 @@ public class QueryDataSetUtils {
                 dataOutputStream.writeInt(column.getInt(i));
                 valueOccupation[k] += 4;
               }
-              if (i % 8 == 0) {
+              if (i != 0 && i % 8 == 0) {
                 dataBitmapOutputStream.writeByte(bitmap);
                 // we should clear the bitmap every 8 points
                 bitmap = 0;
@@ -236,7 +239,7 @@ public class QueryDataSetUtils {
                 dataOutputStream.writeLong(column.getLong(i));
                 valueOccupation[k] += 8;
               }
-              if (i % 8 == 0) {
+              if (i != 0 && i % 8 == 0) {
                 dataBitmapOutputStream.writeByte(bitmap);
                 // we should clear the bitmap every 8 points
                 bitmap = 0;
@@ -252,7 +255,7 @@ public class QueryDataSetUtils {
                 dataOutputStream.writeFloat(column.getFloat(i));
                 valueOccupation[k] += 4;
               }
-              if (i % 8 == 0) {
+              if (i != 0 && i % 8 == 0) {
                 dataBitmapOutputStream.writeByte(bitmap);
                 // we should clear the bitmap every 8 points
                 bitmap = 0;
@@ -268,7 +271,7 @@ public class QueryDataSetUtils {
                 dataOutputStream.writeDouble(column.getDouble(i));
                 valueOccupation[k] += 8;
               }
-              if (i % 8 == 0) {
+              if (i != 0 && i % 8 == 0) {
                 dataBitmapOutputStream.writeByte(bitmap);
                 // we should clear the bitmap every 8 points
                 bitmap = 0;
@@ -284,7 +287,7 @@ public class QueryDataSetUtils {
                 dataOutputStream.writeBoolean(column.getBoolean(i));
                 valueOccupation[k] += 1;
               }
-              if (i % 8 == 0) {
+              if (i != 0 && i % 8 == 0) {
                 dataBitmapOutputStream.writeByte(bitmap);
                 // we should clear the bitmap every 8 points
                 bitmap = 0;
@@ -302,7 +305,7 @@ public class QueryDataSetUtils {
                 dataOutputStream.write(binary.getValues());
                 valueOccupation[k] = valueOccupation[k] + 4 + binary.getLength();
               }
-              if (i % 8 == 0) {
+              if (i != 0 && i % 8 == 0) {
                 dataBitmapOutputStream.writeByte(bitmap);
                 // we should clear the bitmap every 8 points
                 bitmap = 0;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index e428970fb6..cbd56cc790 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -212,8 +212,7 @@ public class DistributionPlannerTest {
 
     Analysis analysis = constructAnalysis();
 
-    MPPQueryContext context =
-        new MPPQueryContext("", queryId, null, QueryType.READ, new Endpoint());
+    MPPQueryContext context = new MPPQueryContext("", queryId, null, new Endpoint());
     DistributionPlanner planner =
         new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
     PlanNode rootAfterRewrite = planner.rewriteSource();
@@ -250,8 +249,7 @@ public class DistributionPlannerTest {
 
     Analysis analysis = constructAnalysis();
 
-    MPPQueryContext context =
-        new MPPQueryContext("", queryId, null, QueryType.READ, new Endpoint());
+    MPPQueryContext context = new MPPQueryContext("", queryId, null, new Endpoint());
     DistributionPlanner planner =
         new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
     DistributedQueryPlan plan = planner.planFragments();
@@ -276,8 +274,8 @@ public class DistributionPlannerTest {
 
     Analysis analysis = constructAnalysis();
 
-    MPPQueryContext context =
-        new MPPQueryContext("", queryId, null, QueryType.WRITE, new Endpoint());
+    MPPQueryContext context = new MPPQueryContext("", queryId, null, new Endpoint());
+    context.setQueryType(QueryType.WRITE);
     DistributionPlanner planner =
         new DistributionPlanner(analysis, new LogicalQueryPlan(context, insertRowNode));
     DistributedQueryPlan plan = planner.planFragments();
@@ -318,8 +316,8 @@ public class DistributionPlannerTest {
 
     Analysis analysis = constructAnalysis();
 
-    MPPQueryContext context =
-        new MPPQueryContext("", queryId, null, QueryType.WRITE, new Endpoint());
+    MPPQueryContext context = new MPPQueryContext("", queryId, null, new Endpoint());
+    context.setQueryType(QueryType.WRITE);
     DistributionPlanner planner =
         new DistributionPlanner(analysis, new LogicalQueryPlan(context, node));
     DistributedQueryPlan plan = planner.planFragments();
@@ -405,7 +403,10 @@ public class DistributionPlannerTest {
     analysis.setDataPartitionInfo(dataPartition);
 
     // construct schema partition
-    SchemaPartition schemaPartition = new SchemaPartition();
+    SchemaPartition schemaPartition =
+        new SchemaPartition(
+            IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+            IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
     Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> schemaPartitionMap = new HashMap<>();
 
     RegionReplicaSet schemaRegion1 =
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
index 0931e9faf4..ca4d50f542 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
 import org.apache.iotdb.db.mpp.execution.QueryExecution;
-import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
@@ -49,8 +48,7 @@ public class QueryPlannerTest {
     QueryExecution queryExecution =
         new QueryExecution(
             stmt,
-            new MPPQueryContext(
-                querySql, new QueryId("query1"), new SessionInfo(), QueryType.READ, new Endpoint()),
+            new MPPQueryContext(querySql, new QueryId("query1"), new SessionInfo(), new Endpoint()),
             IoTDBThreadPoolFactory.newSingleThreadExecutor("test_query"),
             IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("test_query_scheduled"));
     queryExecution.doLogicalPlan();