You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/12 11:18:54 UTC

[iotdb] branch master updated: [IOTDB-2803] Implement create timeseries metadata operation through consensus layer in MPP framework (#5403)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 53089cee1f [IOTDB-2803] Implement create timeseries metadata operation through consensus layer in MPP framework (#5403)
53089cee1f is described below

commit 53089cee1fcd98558c3709d39a68ca87ca96248e
Author: Yifu Zhou <ef...@outlook.com>
AuthorDate: Tue Apr 12 19:18:48 2022 +0800

    [IOTDB-2803] Implement create timeseries metadata operation through consensus layer in MPP framework (#5403)
    
    As our talk before, the PlanNode which is used to execute WRITE operation need to implement a method splitByPartition(). We can merge this PR first and do the work in next PR
---
 confignode/pom.xml                                 |   5 +
 .../consensus/response/DataPartitionDataSet.java   |   3 +-
 .../consensus/response/SchemaPartitionDataSet.java |   2 +-
 .../iotdb/confignode/manager/RegionManager.java    |   2 +-
 .../persistence/RegionInfoPersistence.java         |  13 +-
 .../physical/SerializeDeserializeUT.java           |   6 +-
 .../iotdb/consensus/ratis/RequestMessage.java      |   1 +
 .../iotdb/commons/cluster/DataNodeLocation.java    |   6 +-
 .../org/apache/iotdb/commons/cluster/Endpoint.java |   6 +-
 .../apache/iotdb/commons/conf/IoTDBConstant.java   |   1 +
 .../iotdb/commons/partition/RegionReplicaSet.java  |  41 +++---
 .../apache/iotdb/commons/utils/CommonUtils.java    |   4 +-
 server/pom.xml                                     |   5 +
 .../iotdb/db/consensus/ConsensusManager.java       |  72 +++++++++++
 .../consensus/statemachine/BaseStateMachine.java   |  12 +-
 .../statemachine/SchemaRegionStateMachine.java     |  13 +-
 .../iotdb/db/metadata/Executor/SchemaVisitor.java  |  55 ++++++++
 .../apache/iotdb/db/mpp/common/PlanFragmentId.java |  10 +-
 .../org/apache/iotdb/db/mpp/common/QueryId.java    |   8 +-
 .../scheduler/SimpleFragInstanceDispatcher.java    |   4 +-
 .../db/mpp/sql/planner/plan/FragmentInstance.java  |  53 ++++----
 .../db/mpp/sql/planner/plan/PlanFragment.java      |   5 +-
 .../plan/SimpleFragmentParallelPlanner.java        |   2 +-
 .../db/mpp/sql/planner/plan/node/PlanNode.java     |   6 +
 .../db/mpp/sql/planner/plan/node/PlanNodeType.java |   3 +-
 .../db/mpp/sql/planner/plan/node/PlanNodeUtil.java |   3 +
 .../db/mpp/sql/planner/plan/node/PlanVisitor.java  |   5 +
 .../node/metedata/write/AlterTimeSeriesNode.java   |  37 ++++++
 .../write/CreateAlignedTimeSeriesNode.java         |  22 ++++
 .../node/metedata/write/CreateTimeSeriesNode.java  | 137 +++++++++++++++++++-
 .../planner/plan/node/process/DeviceMergeNode.java |   3 +-
 .../planner/plan/node/process/ExchangeNode.java    |   3 +-
 .../iotdb/db/service/InternalServiceImpl.java      |  72 ++++++++---
 .../db/mpp/sql/plan/FragmentInstanceSerdeTest.java |   9 +-
 .../sql/plan/node/PlanNodeDeserializeHelper.java   |   3 +-
 .../metadata/read/ShowDevicesNodeSerdeTest.java    |   3 +-
 .../plan/node/sink/FragmentSinkNodeSerdeTest.java  |   3 +-
 .../iotdb/db/service/InternalServiceImplTest.java  | 138 +++++++++++++++++++++
 38 files changed, 659 insertions(+), 117 deletions(-)

diff --git a/confignode/pom.xml b/confignode/pom.xml
index 24fecb0a8e..ff00b99f39 100644
--- a/confignode/pom.xml
+++ b/confignode/pom.xml
@@ -75,6 +75,11 @@
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.codahale.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+            <version>3.0.2</version>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java
index 4de9443c5e..91397f7ade 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java
@@ -99,7 +99,8 @@ public class DataPartitionDataSet implements DataSet {
                                   TRegionReplicaSet tRegionReplicaSet = new TRegionReplicaSet();
 
                                   // Set TRegionReplicaSet's RegionId
-                                  tRegionReplicaSet.setRegionId(regionReplicaSet.getId().getId());
+                                  tRegionReplicaSet.setRegionId(
+                                      regionReplicaSet.getConsensusGroupId().getId());
 
                                   // Set TRegionReplicaSet's GroupType
                                   tRegionReplicaSet.setGroupType("DataRegion");
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionDataSet.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionDataSet.java
index 5c3e95ea3a..bef6f975b1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionDataSet.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionDataSet.java
@@ -72,7 +72,7 @@ public class SchemaPartitionDataSet implements DataSet {
               seriesPartitionSlotRegionReplicaSetMap.forEach(
                   ((seriesPartitionSlot, regionReplicaSet) -> {
                     TRegionReplicaSet regionMessage = new TRegionReplicaSet();
-                    regionMessage.setRegionId(regionReplicaSet.getId().getId());
+                    regionMessage.setRegionId(regionReplicaSet.getConsensusGroupId().getId());
                     List<EndPoint> endPointList = new ArrayList<>();
                     regionReplicaSet
                         .getDataNodeList()
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RegionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RegionManager.java
index f45591cffb..86e6f31774 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RegionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RegionManager.java
@@ -120,7 +120,7 @@ public class RegionManager {
         case DataRegion:
           consensusGroupId = new DataRegionId(regionInfoPersistence.generateNextRegionGroupId());
       }
-      regionReplicaSet.setId(consensusGroupId);
+      regionReplicaSet.setConsensusGroupId(consensusGroupId);
       regionReplicaSet.setDataNodeList(onlineDataNodes.subList(0, regionReplicaCount));
       plan.addRegion(regionReplicaSet);
     }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/RegionInfoPersistence.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/RegionInfoPersistence.java
index e7f429b566..620814a7e3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/RegionInfoPersistence.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/RegionInfoPersistence.java
@@ -106,12 +106,13 @@ public class RegionInfoPersistence {
       StorageGroupSchema schema = storageGroupsMap.get(plan.getStorageGroup());
 
       for (RegionReplicaSet regionReplicaSet : plan.getRegionReplicaSets()) {
-        nextRegionGroupId = Math.max(nextRegionGroupId, regionReplicaSet.getId().getId());
-        regionMap.put(regionReplicaSet.getId(), regionReplicaSet);
-        if (regionReplicaSet.getId() instanceof DataRegionId) {
-          schema.addDataRegionGroup(regionReplicaSet.getId());
-        } else if (regionReplicaSet.getId() instanceof SchemaRegionId) {
-          schema.addSchemaRegionGroup(regionReplicaSet.getId());
+        nextRegionGroupId =
+            Math.max(nextRegionGroupId, regionReplicaSet.getConsensusGroupId().getId());
+        regionMap.put(regionReplicaSet.getConsensusGroupId(), regionReplicaSet);
+        if (regionReplicaSet.getConsensusGroupId() instanceof DataRegionId) {
+          schema.addDataRegionGroup(regionReplicaSet.getConsensusGroupId());
+        } else if (regionReplicaSet.getConsensusGroupId() instanceof SchemaRegionId) {
+          schema.addSchemaRegionGroup(regionReplicaSet.getConsensusGroupId());
         }
       }
 
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/physical/SerializeDeserializeUT.java b/confignode/src/test/java/org/apache/iotdb/confignode/physical/SerializeDeserializeUT.java
index 37262c7f14..0e33e3eaf0 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/physical/SerializeDeserializeUT.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/physical/SerializeDeserializeUT.java
@@ -94,12 +94,12 @@ public class SerializeDeserializeUT {
     CreateRegionsPlan plan0 = new CreateRegionsPlan();
     plan0.setStorageGroup("sg");
     RegionReplicaSet dataRegionSet = new RegionReplicaSet();
-    dataRegionSet.setId(new DataRegionId(0));
+    dataRegionSet.setConsensusGroupId(new DataRegionId(0));
     dataRegionSet.setDataNodeList(
         Collections.singletonList(new DataNodeLocation(0, new Endpoint("0.0.0.0", 6667))));
     plan0.addRegion(dataRegionSet);
     RegionReplicaSet schemaRegionSet = new RegionReplicaSet();
-    schemaRegionSet.setId(new SchemaRegionId(1));
+    schemaRegionSet.setConsensusGroupId(new SchemaRegionId(1));
     schemaRegionSet.setDataNodeList(
         Collections.singletonList(new DataNodeLocation(0, new Endpoint("0.0.0.0", 6667))));
     plan0.addRegion(schemaRegionSet);
@@ -125,7 +125,7 @@ public class SerializeDeserializeUT {
     SeriesPartitionSlot seriesPartitionSlot = new SeriesPartitionSlot(10);
     TimePartitionSlot timePartitionSlot = new TimePartitionSlot(100);
     RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
-    regionReplicaSet.setId(new DataRegionId(0));
+    regionReplicaSet.setConsensusGroupId(new DataRegionId(0));
     regionReplicaSet.setDataNodeList(
         Collections.singletonList(new DataNodeLocation(0, new Endpoint("0.0.0.0", 6667))));
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
index feb42b5f25..050358fdb2 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
@@ -59,6 +59,7 @@ public class RequestMessage implements Message {
             // TODO Pooling
             ByteBuffer byteBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
             actualRequest.serializeRequest(byteBuffer);
+            byteBuffer.flip();
             serializedContent = ByteString.copyFrom(byteBuffer);
             byteBuffer.flip();
           }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/cluster/DataNodeLocation.java b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/DataNodeLocation.java
index e81af759b3..6c3d762a2c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/cluster/DataNodeLocation.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/DataNodeLocation.java
@@ -50,10 +50,8 @@ public class DataNodeLocation {
     endPoint.serializeImpl(buffer);
   }
 
-  public void deserializeImpl(ByteBuffer buffer) {
-    dataNodeId = buffer.getInt();
-    endPoint = new Endpoint();
-    endPoint.deserializeImpl(buffer);
+  public static DataNodeLocation deserializeImpl(ByteBuffer buffer) {
+    return new DataNodeLocation(buffer.getInt(), Endpoint.deserializeImpl(buffer));
   }
 
   @Override
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/cluster/Endpoint.java b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/Endpoint.java
index 25d7b84d0f..b5c15cc85f 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/cluster/Endpoint.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/Endpoint.java
@@ -61,13 +61,11 @@ public class Endpoint {
     buffer.putInt(port);
   }
 
-  public void deserializeImpl(ByteBuffer buffer) {
+  public static Endpoint deserializeImpl(ByteBuffer buffer) {
     int length = buffer.getInt();
     byte[] bytes = new byte[length];
     buffer.get(bytes, 0, length);
-    ip = new String(bytes, 0, length);
-
-    port = buffer.getInt();
+    return new Endpoint(new String(bytes, 0, length), buffer.getInt());
   }
 
   @Override
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index de04e6e0de..b7f1fe4e58 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -143,6 +143,7 @@ public class IoTDBConstant {
   public static final String UNSEQUENCE_FLODER_NAME = "unsequence";
   public static final String FILE_NAME_SEPARATOR = "-";
   public static final String UPGRADE_FOLDER_NAME = "upgrade";
+  public static final String CONSENSUS_FOLDER_NAME = "consensus";
 
   // system folder name
   public static final String SYSTEM_FOLDER_NAME = "system";
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
index 115c4406c4..dac39d2d7c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
@@ -28,13 +28,13 @@ import java.util.List;
 import java.util.Objects;
 
 public class RegionReplicaSet {
-  private ConsensusGroupId id;
+  private ConsensusGroupId consensusGroupId;
   private List<DataNodeLocation> dataNodeList;
 
   public RegionReplicaSet() {}
 
-  public RegionReplicaSet(ConsensusGroupId id, List<DataNodeLocation> dataNodeList) {
-    this.id = id;
+  public RegionReplicaSet(ConsensusGroupId consensusGroupId, List<DataNodeLocation> dataNodeList) {
+    this.consensusGroupId = consensusGroupId;
     this.dataNodeList = dataNodeList;
   }
 
@@ -46,20 +46,22 @@ public class RegionReplicaSet {
     this.dataNodeList = dataNodeList;
   }
 
-  public ConsensusGroupId getId() {
-    return id;
+  public ConsensusGroupId getConsensusGroupId() {
+    return consensusGroupId;
   }
 
-  public void setId(ConsensusGroupId id) {
-    this.id = id;
+  public void setConsensusGroupId(ConsensusGroupId consensusGroupId) {
+    this.consensusGroupId = consensusGroupId;
   }
 
   public String toString() {
-    return String.format("RegionReplicaSet[%s-%s]: %s", id.getType(), id, dataNodeList);
+    return String.format(
+        "RegionReplicaSet[%s-%d]: %s",
+        consensusGroupId.getType(), consensusGroupId.getId(), dataNodeList);
   }
 
   public void serializeImpl(ByteBuffer buffer) {
-    id.serializeImpl(buffer);
+    consensusGroupId.serializeImpl(buffer);
     buffer.putInt(dataNodeList.size());
     dataNodeList.forEach(
         dataNode -> {
@@ -67,18 +69,16 @@ public class RegionReplicaSet {
         });
   }
 
-  public void deserializeImpl(ByteBuffer buffer) throws IOException {
-    id = ConsensusGroupId.Factory.create(buffer);
+  public static RegionReplicaSet deserializeImpl(ByteBuffer buffer) throws IOException {
+    ConsensusGroupId consensusGroupId = ConsensusGroupId.Factory.create(buffer);
 
     int size = buffer.getInt();
     // We should always make dataNodeList as a new Object when deserialization
-    dataNodeList = new ArrayList<>();
-
+    List<DataNodeLocation> dataNodeList = new ArrayList<>();
     for (int i = 0; i < size; i++) {
-      DataNodeLocation dataNode = new DataNodeLocation();
-      dataNode.deserializeImpl(buffer);
-      dataNodeList.add(dataNode);
+      dataNodeList.add(DataNodeLocation.deserializeImpl(buffer));
     }
+    return new RegionReplicaSet(consensusGroupId, dataNodeList);
   }
 
   @Override
@@ -90,11 +90,16 @@ public class RegionReplicaSet {
       return false;
     }
     RegionReplicaSet that = (RegionReplicaSet) o;
-    return Objects.equals(id, that.id) && Objects.equals(dataNodeList, that.dataNodeList);
+    return Objects.equals(consensusGroupId, that.consensusGroupId)
+        && Objects.equals(dataNodeList, that.dataNodeList);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(id, dataNodeList);
+    return Objects.hash(consensusGroupId, dataNodeList);
+  }
+
+  public boolean isEmpty() {
+    return dataNodeList == null || dataNodeList.isEmpty();
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/utils/CommonUtils.java b/node-commons/src/main/java/org/apache/iotdb/commons/utils/CommonUtils.java
index e9e170232c..b16d7692c8 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/utils/CommonUtils.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/utils/CommonUtils.java
@@ -32,16 +32,16 @@ public class CommonUtils {
   private static final Logger logger = LoggerFactory.getLogger(CommonUtils.class);
 
   public static Endpoint parseNodeUrl(String nodeUrl) throws BadNodeUrlException {
-    Endpoint result = new Endpoint();
     String[] split = nodeUrl.split(":");
     if (split.length != 2) {
       logger.warn("Bad node url: {}", nodeUrl);
       throw new BadNodeUrlException(String.format("Bad node url: %s", nodeUrl));
     }
     String ip = split[0];
+    Endpoint result;
     try {
       int port = Integer.parseInt(split[1]);
-      result.setIp(ip).setPort(port);
+      result = new Endpoint(ip, port);
     } catch (NumberFormatException e) {
       logger.warn("Bad node url: {}", nodeUrl);
       throw new BadNodeUrlException(String.format("Bad node url: %s", nodeUrl));
diff --git a/server/pom.xml b/server/pom.xml
index 55ffa494c9..6d3dfa22d3 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -262,6 +262,11 @@
             <version>2.6</version>
             <scope>compile</scope>
         </dependency>
+        <dependency>
+            <groupId>com.codahale.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+            <version>3.0.2</version>
+        </dependency>
         <dependency>
             <groupId>org.rocksdb</groupId>
             <artifactId>rocksdbjni</artifactId>
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusManager.java b/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusManager.java
new file mode 100644
index 0000000000..1a036908ff
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusManager.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.consensus;
+
+import org.apache.iotdb.commons.cluster.DataNodeLocation;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** DataNode Consensus layer manager */
+public class ConsensusManager {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ConsensusManager.class);
+
+  private IConsensus consensusImpl;
+
+  public ConsensusManager() throws IOException {
+    consensusImpl = ConsensusImpl.getInstance();
+    consensusImpl.start();
+  }
+
+  public void addConsensusGroup(RegionReplicaSet regionReplicaSet) {
+    ConsensusGroupId consensusGroupId = regionReplicaSet.getConsensusGroupId();
+    List<Peer> peerList = new ArrayList<>();
+    for (DataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeList()) {
+      peerList.add(new Peer(consensusGroupId, dataNodeLocation.getEndPoint()));
+    }
+    consensusImpl.addConsensusGroup(consensusGroupId, peerList);
+  }
+
+  /** Transmit FragmentInstance to datanode.consensus.statemachine */
+  public ConsensusWriteResponse write(ConsensusGroupId consensusGroupId, IConsensusRequest plan) {
+    return consensusImpl.write(consensusGroupId, plan);
+  }
+
+  /** Transmit FragmentInstance to datanode.consensus.statemachine */
+  public ConsensusReadResponse read(ConsensusGroupId consensusGroupId, FragmentInstance plan) {
+    return consensusImpl.read(consensusGroupId, plan);
+  }
+
+  public void close() throws IOException {
+    consensusImpl.stop();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
index 07f04dc248..a7ef03b32e 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
@@ -24,12 +24,15 @@ import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+
 public abstract class BaseStateMachine implements IStateMachine {
 
   private static final Logger logger = LoggerFactory.getLogger(BaseStateMachine.class);
@@ -38,7 +41,8 @@ public abstract class BaseStateMachine implements IStateMachine {
   public TSStatus write(IConsensusRequest request) {
     try {
       return write(getFragmentInstance(request));
-    } catch (IllegalArgumentException e) {
+    } catch (IllegalArgumentException | IllegalPathException | IOException e) {
+      logger.error(e.getMessage());
       return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
     }
   }
@@ -49,14 +53,16 @@ public abstract class BaseStateMachine implements IStateMachine {
   public DataSet read(IConsensusRequest request) {
     try {
       return read(getFragmentInstance(request));
-    } catch (IllegalArgumentException e) {
+    } catch (IllegalArgumentException | IllegalPathException | IOException e) {
+      logger.error(e.getMessage());
       return null;
     }
   }
 
   protected abstract DataSet read(FragmentInstance fragmentInstance);
 
-  private FragmentInstance getFragmentInstance(IConsensusRequest request) {
+  private FragmentInstance getFragmentInstance(IConsensusRequest request)
+      throws IllegalPathException, IOException {
     FragmentInstance instance;
     if (request instanceof ByteBufferConsensusRequest) {
       instance =
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
index fcb73ee4ea..b67ffe8e92 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
@@ -21,9 +21,10 @@ package org.apache.iotdb.db.consensus.statemachine;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.db.metadata.Executor.SchemaVisitor;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
-import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,10 +33,10 @@ public class SchemaRegionStateMachine extends BaseStateMachine {
 
   private static final Logger logger = LoggerFactory.getLogger(SchemaRegionStateMachine.class);
 
-  private final ISchemaRegion region;
+  private final ISchemaRegion schemaRegion;
 
-  public SchemaRegionStateMachine(ISchemaRegion region) {
-    this.region = region;
+  public SchemaRegionStateMachine(ISchemaRegion schemaRegion) {
+    this.schemaRegion = schemaRegion;
   }
 
   @Override
@@ -47,7 +48,9 @@ public class SchemaRegionStateMachine extends BaseStateMachine {
   @Override
   protected TSStatus write(FragmentInstance fragmentInstance) {
     logger.info("Execute write plan in SchemaRegionStateMachine");
-    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    PlanNode planNode = fragmentInstance.getFragment().getRoot();
+    TSStatus status = planNode.accept(new SchemaVisitor(), schemaRegion);
+    return status;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/Executor/SchemaVisitor.java b/server/src/main/java/org/apache/iotdb/db/metadata/Executor/SchemaVisitor.java
new file mode 100644
index 0000000000..08252364ce
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/Executor/SchemaVisitor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.Executor;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Schema write PlanNode visitor */
+public class SchemaVisitor extends PlanVisitor<TSStatus, ISchemaRegion> {
+  private static final Logger logger = LoggerFactory.getLogger(SchemaVisitor.class);
+
+  @Override
+  public TSStatus visitCreateTimeSeries(CreateTimeSeriesNode node, ISchemaRegion schemaRegion) {
+    try {
+      schemaRegion.createTimeseries((CreateTimeSeriesPlan) node.transferToPhysicalPlan(), -1);
+    } catch (MetadataException e) {
+      logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
+      return RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, e.getMessage());
+    }
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
+  }
+
+  @Override
+  public TSStatus visitPlan(PlanNode node, ISchemaRegion context) {
+    return null;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
index e4bdd7d306..1d3cc2296d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
@@ -70,16 +70,16 @@ public class PlanFragmentId {
     return String.format("%s.%d", queryId, id);
   }
 
+  public void serialize(ByteBuffer byteBuffer) {
+    queryId.serialize(byteBuffer);
+    byteBuffer.putInt(id);
+  }
+
   public static PlanFragmentId deserialize(ByteBuffer byteBuffer) {
     return new PlanFragmentId(
         QueryId.deserialize(byteBuffer), ReadWriteIOUtils.readInt(byteBuffer));
   }
 
-  public void serialize(ByteBuffer byteBuffer) {
-    queryId.serialize(byteBuffer);
-    ReadWriteIOUtils.write(id, byteBuffer);
-  }
-
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
index 5bfeb25e26..4d24e5cfc0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
@@ -128,11 +128,11 @@ public class QueryId {
     return id;
   }
 
-  public static QueryId deserialize(ByteBuffer byteBuffer) {
-    return new QueryId(ReadWriteIOUtils.readString(byteBuffer));
+  public void serialize(ByteBuffer byteBuffer) {
+    ReadWriteIOUtils.write(this.id, byteBuffer);
   }
 
-  public void serialize(ByteBuffer byteBuffer) {
-    ReadWriteIOUtils.write(id, byteBuffer);
+  public static QueryId deserialize(ByteBuffer byteBuffer) {
+    return new QueryId(ReadWriteIOUtils.readString(byteBuffer));
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index 0dde122c74..3602bd43fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -55,8 +55,8 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
               buffer.flip();
               TConsensusGroupId groupId =
                   new TConsensusGroupId(
-                      instance.getDataRegionId().getId().getId(),
-                      instance.getDataRegionId().getId().getType().toString());
+                      instance.getRegionReplicaSet().getConsensusGroupId().getId(),
+                      instance.getRegionReplicaSet().getConsensusGroupId().getType().toString());
               TSendFragmentInstanceReq req =
                   new TSendFragmentInstanceReq(
                       new TFragmentInstance(buffer), groupId, instance.getType().toString());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index 03fd9f3e0e..a9b259c693 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.sql.planner.plan;
 import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
@@ -40,8 +41,10 @@ public class FragmentInstance implements IConsensusRequest {
   private final QueryType type;
   // The reference of PlanFragment which this instance is generated from
   private final PlanFragment fragment;
-  // The DataRegion where the FragmentInstance should run
-  private RegionReplicaSet dataRegion;
+
+  // The Region where the FragmentInstance should run
+  private RegionReplicaSet regionReplicaSet;
+
   private Endpoint hostEndpoint;
 
   private Filter timeFilter;
@@ -60,12 +63,12 @@ public class FragmentInstance implements IConsensusRequest {
     return new FragmentInstanceId(id, String.valueOf(index));
   }
 
-  public RegionReplicaSet getDataRegionId() {
-    return dataRegion;
+  public RegionReplicaSet getRegionReplicaSet() {
+    return regionReplicaSet;
   }
 
-  public void setDataRegionId(RegionReplicaSet dataRegion) {
-    this.dataRegion = dataRegion;
+  public void setRegionReplicaSet(RegionReplicaSet regionReplicaSet) {
+    this.regionReplicaSet = regionReplicaSet;
   }
 
   public Endpoint getHostEndpoint() {
@@ -111,16 +114,24 @@ public class FragmentInstance implements IConsensusRequest {
 
   public String toString() {
     StringBuilder ret = new StringBuilder();
-    ret.append(
-        String.format(
-            "FragmentInstance-%s:[Host: %s/%s]\n",
-            getId(), getHostEndpoint().getIp(), getDataRegionId().getId()));
+    ret.append(String.format("FragmentInstance-%s:", getId()));
+    if (getHostEndpoint() == null) {
+      ret.append(String.format("host endpoint has not set."));
+    } else {
+      ret.append(String.format("host endpoint: %s.", getHostEndpoint().toString()));
+    }
+    if (getRegionReplicaSet() == null) {
+      ret.append(String.format("Region Replica set has not set.\n"));
+    } else {
+      ret.append(String.format("Region Replica set: %s.\n", getRegionReplicaSet().toString()));
+    }
     ret.append("---- Plan Node Tree ----\n");
     ret.append(PlanNodeUtil.nodeToString(getFragment().getRoot()));
     return ret.toString();
   }
 
-  public static FragmentInstance deserializeFrom(ByteBuffer buffer) {
+  public static FragmentInstance deserializeFrom(ByteBuffer buffer)
+      throws IllegalPathException, IOException {
     FragmentInstanceId id = FragmentInstanceId.deserialize(buffer);
     PlanFragment planFragment = PlanFragment.deserialize(buffer);
     boolean hasTimeFilter = ReadWriteIOUtils.readBool(buffer);
@@ -129,22 +140,15 @@ public class FragmentInstance implements IConsensusRequest {
     FragmentInstance fragmentInstance =
         new FragmentInstance(
             planFragment, Integer.parseInt(id.getInstanceId()), timeFilter, queryType);
-    RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
-    try {
-      regionReplicaSet.deserializeImpl(buffer);
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-    Endpoint endpoint = new Endpoint();
-    endpoint.deserializeImpl(buffer);
-    fragmentInstance.dataRegion = regionReplicaSet;
-    fragmentInstance.hostEndpoint = endpoint;
+    fragmentInstance.regionReplicaSet = RegionReplicaSet.deserializeImpl(buffer);
+    fragmentInstance.hostEndpoint = Endpoint.deserializeImpl(buffer);
 
     return fragmentInstance;
   }
 
   @Override
   public void serializeRequest(ByteBuffer buffer) {
+    buffer.mark();
     id.serialize(buffer);
     fragment.serialize(buffer);
     ReadWriteIOUtils.write(timeFilter != null, buffer);
@@ -152,7 +156,8 @@ public class FragmentInstance implements IConsensusRequest {
       timeFilter.serialize(buffer);
     }
     ReadWriteIOUtils.write(type.ordinal(), buffer);
-    dataRegion.serializeImpl(buffer);
+    regionReplicaSet.serializeImpl(buffer);
+
     hostEndpoint.serializeImpl(buffer);
   }
 
@@ -164,13 +169,13 @@ public class FragmentInstance implements IConsensusRequest {
     return Objects.equals(id, instance.id)
         && type == instance.type
         && Objects.equals(fragment, instance.fragment)
-        && Objects.equals(dataRegion, instance.dataRegion)
+        && Objects.equals(regionReplicaSet, instance.regionReplicaSet)
         && Objects.equals(hostEndpoint, instance.hostEndpoint)
         && Objects.equals(timeFilter, instance.timeFilter);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(id, type, fragment, dataRegion, hostEndpoint, timeFilter);
+    return Objects.hash(id, type, fragment, regionReplicaSet, hostEndpoint, timeFilter);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
index 80f16b7465..550ad1e0f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.mpp.sql.planner.plan;
 
 import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -100,12 +101,12 @@ public class PlanFragment {
     root.serialize(byteBuffer);
   }
 
-  public static PlanFragment deserialize(ByteBuffer byteBuffer) {
+  public static PlanFragment deserialize(ByteBuffer byteBuffer) throws IllegalPathException {
     return new PlanFragment(PlanFragmentId.deserialize(byteBuffer), deserializeHelper(byteBuffer));
   }
 
   // deserialize the plan node recursively
-  public static PlanNode deserializeHelper(ByteBuffer byteBuffer) {
+  public static PlanNode deserializeHelper(ByteBuffer byteBuffer) throws IllegalPathException {
     PlanNode root = PlanNodeType.deserialize(byteBuffer);
     int childrenCount = byteBuffer.getInt();
     for (int i = 0; i < childrenCount; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
index c2014c55d2..2818661995 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
@@ -100,7 +100,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
     // We need to store all the replica host in case of the scenario that the instance need to be
     // redirected
     // to another host when scheduling
-    fragmentInstance.setDataRegionId(dataRegion);
+    fragmentInstance.setRegionReplicaSet(dataRegion);
 
     // TODO: (xingtanzjr) We select the first Endpoint as the default target host for current
     // instance
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
index 97075588c9..3a17a08a75 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node;
 
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.apache.commons.lang.Validate;
@@ -113,4 +114,9 @@ public abstract class PlanNode {
   public int hashCode() {
     return Objects.hash(id);
   }
+
+  // TODO (yifuzhou) will remote later
+  public PhysicalPlan transferToPhysicalPlan() {
+    return null;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
index 1d6bed6251..d9de5f02c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node;
 
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.ShowDevicesNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AlterTimeSeriesNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AuthorNode;
@@ -81,7 +82,7 @@ public enum PlanNodeType {
     buffer.putShort(nodeType);
   }
 
-  public static PlanNode deserialize(ByteBuffer buffer) {
+  public static PlanNode deserialize(ByteBuffer buffer) throws IllegalPathException {
     short nodeType = buffer.getShort();
     switch (nodeType) {
       case 0:
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
index 58c11cd563..2ff9c1f1e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
@@ -72,6 +72,9 @@ public class PlanNodeUtil {
     }
     result.append(root.toString());
     result.append(System.lineSeparator());
+    if (root.getChildren() == null) {
+      return;
+    }
     for (int i = 0; i < root.getChildren().size(); i++) {
       PlanNode child = root.getChildren().get(i);
       PrintContext childCtx = ctx.clone();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
index c035a0129e..76eecb8215 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node;
 
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.AggregateNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.DeviceMergeNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
@@ -96,4 +97,8 @@ public abstract class PlanVisitor<R, C> {
   public R visitFragmentSink(FragmentSinkNode node, C context) {
     return visitPlan(node, context);
   }
+
+  public R visitCreateTimeSeries(CreateTimeSeriesNode node, C context) {
+    return null;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
index 935a4bb116..b1fda2fb3b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
@@ -238,6 +238,43 @@ public class AlterTimeSeriesNode extends PlanNode {
         new PlanNodeId(id), path, alterType, alterMap, alias, tagsMap, attributesMap);
   }
 
+  //  @Override
+  //  public void executeOn(SchemaRegion schemaRegion) throws MetadataException,
+  // QueryProcessException {
+  //    try {
+  //      switch (getAlterType()) {
+  //        case RENAME:
+  //          String beforeName = alterMap.keySet().iterator().next();
+  //          String currentName = alterMap.get(beforeName);
+  //          schemaRegion.renameTagOrAttributeKey(beforeName, currentName, path);
+  //          break;
+  //        case SET:
+  //          schemaRegion.setTagsOrAttributesValue(alterMap, path);
+  //          break;
+  //        case DROP:
+  //          schemaRegion.dropTagsOrAttributes(alterMap.keySet(), path);
+  //          break;
+  //        case ADD_TAGS:
+  //          schemaRegion.addTags(alterMap, path);
+  //          break;
+  //        case ADD_ATTRIBUTES:
+  //          schemaRegion.addAttributes(alterMap, path);
+  //          break;
+  //        case UPSERT:
+  //          schemaRegion.upsertTagsAndAttributes(getAlias(), getTagsMap(), getAttributesMap(),
+  // path);
+  //          break;
+  //      }
+  //    } catch (MetadataException e) {
+  //      throw new QueryProcessException(e);
+  //    } catch (IOException e) {
+  //      throw new QueryProcessException(
+  //          String.format(
+  //              "Something went wrong while read/write the [%s]'s tag/attribute info.",
+  //              path.getFullPath()));
+  //    }
+  //  }
+
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     throw new NotImplementedException(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
index f9cd7829e9..89605a8704 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -351,4 +353,24 @@ public class CreateAlignedTimeSeriesNode extends PlanNode {
         tagsList,
         attributesList);
   }
+
+  @Override
+  public PhysicalPlan transferToPhysicalPlan() {
+    return new CreateAlignedTimeSeriesPlan(
+        getDevicePath(),
+        getMeasurements(),
+        getDataTypes(),
+        getEncodings(),
+        getCompressors(),
+        getAliasList(),
+        getTagsList(),
+        getAttributesList());
+  }
+
+  //  @Override
+  //  public void executeOn(SchemaRegion schemaRegion) throws MetadataException {
+  //    schemaRegion.createAlignedTimeSeries((CreateAlignedTimeSeriesPlan)
+  // transferToPhysicalPlan());
+  //  }
+
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
index 6aed73d62d..45b7856878 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
@@ -18,15 +18,22 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write;
 
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.nio.ByteBuffer;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -157,16 +164,136 @@ public class CreateTimeSeriesNode extends PlanNode {
   }
 
   @Override
-  protected void serializeAttributes(ByteBuffer byteBuffer) {
-    throw new NotImplementedException(
-        "serializeAttributes of CreateTimeSeriesNode is not implemented");
+  public PhysicalPlan transferToPhysicalPlan() {
+    return new CreateTimeSeriesPlan(
+        getPath(),
+        getDataType(),
+        getEncoding(),
+        getCompressor(),
+        getProps(),
+        getTags(),
+        getAttributes(),
+        getAlias());
   }
 
-  public static CreateTimeSeriesNode deserialize(ByteBuffer byteBuffer) {
-    return null;
+  public static CreateTimeSeriesNode deserialize(ByteBuffer byteBuffer)
+      throws IllegalPathException {
+    String id;
+    PartialPath path = null;
+    TSDataType dataType;
+    TSEncoding encoding;
+    CompressionType compressor;
+    long tagOffset;
+    String alias = null;
+    Map<String, String> props = null;
+    Map<String, String> tags = null;
+    Map<String, String> attributes = null;
+
+    id = ReadWriteIOUtils.readString(byteBuffer);
+    int length = byteBuffer.getInt();
+    byte[] bytes = new byte[length];
+    byteBuffer.get(bytes);
+    path = new PartialPath(new String(bytes));
+    dataType = TSDataType.values()[byteBuffer.get()];
+    encoding = TSEncoding.values()[byteBuffer.get()];
+    compressor = CompressionType.values()[byteBuffer.get()];
+    tagOffset = byteBuffer.getLong();
+
+    // alias
+    if (byteBuffer.get() == 1) {
+      alias = ReadWriteIOUtils.readString(byteBuffer);
+    }
+
+    byte label = byteBuffer.get();
+    // props
+    if (label == 0) {
+      props = new HashMap<>();
+    } else if (label == 1) {
+      props = ReadWriteIOUtils.readMap(byteBuffer);
+    }
+
+    // tags
+    label = byteBuffer.get();
+    if (label == 0) {
+      tags = new HashMap<>();
+    } else if (label == 1) {
+      tags = ReadWriteIOUtils.readMap(byteBuffer);
+    }
+
+    // attributes
+    label = byteBuffer.get();
+    if (label == 0) {
+      attributes = new HashMap<>();
+    } else if (label == 1) {
+      attributes = ReadWriteIOUtils.readMap(byteBuffer);
+    }
+
+    return new CreateTimeSeriesNode(
+        new PlanNodeId(id), path, dataType, encoding, compressor, props, tags, attributes, alias);
   }
 
   @Override
+  public void serialize(ByteBuffer byteBuffer) {
+    byteBuffer.putShort((short) PlanNodeType.CREATE_TIME_SERIES.ordinal());
+    ReadWriteIOUtils.write(this.getPlanNodeId().getId(), byteBuffer);
+    byte[] bytes = path.getFullPath().getBytes();
+    byteBuffer.putInt(bytes.length);
+    byteBuffer.put(bytes);
+    byteBuffer.put((byte) dataType.ordinal());
+    byteBuffer.put((byte) encoding.ordinal());
+    byteBuffer.put((byte) compressor.ordinal());
+    byteBuffer.putLong(tagOffset);
+
+    // alias
+    if (alias != null) {
+      byteBuffer.put((byte) 1);
+      ReadWriteIOUtils.write(alias, byteBuffer);
+    } else {
+      byteBuffer.put((byte) 0);
+    }
+
+    // props
+    if (props == null) {
+      byteBuffer.put((byte) -1);
+    } else if (props.isEmpty()) {
+      byteBuffer.put((byte) 0);
+    } else {
+      byteBuffer.put((byte) 1);
+      ReadWriteIOUtils.write(props, byteBuffer);
+    }
+
+    // tags
+    if (tags == null) {
+      byteBuffer.put((byte) -1);
+    } else if (tags.isEmpty()) {
+      byteBuffer.put((byte) 0);
+    } else {
+      byteBuffer.put((byte) 1);
+      ReadWriteIOUtils.write(tags, byteBuffer);
+    }
+
+    // attributes
+    if (attributes == null) {
+      byteBuffer.put((byte) -1);
+    } else if (attributes.isEmpty()) {
+      byteBuffer.put((byte) 0);
+    } else {
+      byteBuffer.put((byte) 1);
+      ReadWriteIOUtils.write(attributes, byteBuffer);
+    }
+
+    // no children node, need to set 0
+    byteBuffer.putInt(0);
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {}
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C schemaRegion) {
+    return visitor.visitCreateTimeSeries(this, schemaRegion);
+  }
+
   public boolean equals(Object o) {
     if (this == o) {
       return true;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
index d427b85809..ec40d5df52 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
@@ -155,7 +156,7 @@ public class DeviceMergeNode extends ProcessNode implements IOutputPlanNode {
     }
   }
 
-  public static DeviceMergeNode deserialize(ByteBuffer byteBuffer) {
+  public static DeviceMergeNode deserialize(ByteBuffer byteBuffer) throws IllegalPathException {
     int orderByIndex = ReadWriteIOUtils.readInt(byteBuffer);
     OrderBy orderBy = OrderBy.values()[orderByIndex];
     FilterNullComponent filterNullComponent = FilterNullComponent.deserialize(byteBuffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index fc408b5623..41965c2273 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
 import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
@@ -86,7 +87,7 @@ public class ExchangeNode extends PlanNode {
     this.upstreamPlanNodeId = nodeId;
   }
 
-  public static ExchangeNode deserialize(ByteBuffer byteBuffer) {
+  public static ExchangeNode deserialize(ByteBuffer byteBuffer) throws IllegalPathException {
     FragmentSinkNode fragmentSinkNode =
         (FragmentSinkNode) PlanFragment.deserializeHelper(byteBuffer);
     Endpoint endPoint =
diff --git a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
index a0e1d01aab..959e2879a3 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
@@ -19,16 +19,18 @@
 
 package org.apache.iotdb.db.service;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
-import org.apache.iotdb.commons.consensus.GroupType;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.db.consensus.ConsensusImpl;
+import org.apache.iotdb.db.consensus.ConsensusManager;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceInfo;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.InternalService;
 import org.apache.iotdb.mpp.rpc.thrift.SchemaFetchRequest;
 import org.apache.iotdb.mpp.rpc.thrift.SchemaFetchResponse;
@@ -43,39 +45,73 @@ import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
 
 public class InternalServiceImpl implements InternalService.Iface {
+  private static final Logger LOGGER = LoggerFactory.getLogger(InternalServiceImpl.class);
+
+  private final ConsensusManager consensusManager;
 
-  public InternalServiceImpl() {
+  public InternalServiceImpl() throws IOException {
     super();
+    consensusManager = new ConsensusManager();
   }
 
   @Override
-  public TSendFragmentInstanceResp sendFragmentInstance(TSendFragmentInstanceReq req)
-      throws TException {
+  public TSendFragmentInstanceResp sendFragmentInstance(TSendFragmentInstanceReq req) {
+    TSendFragmentInstanceResp response = new TSendFragmentInstanceResp();
+    FragmentInstance fragmentInstance = null;
+    try {
+      fragmentInstance = FragmentInstance.deserializeFrom(req.fragmentInstance.body);
+    } catch (IOException | IllegalPathException e) {
+      LOGGER.error(e.getMessage());
+      response.setAccepted(false);
+      response.setMessage(e.getMessage());
+      return response;
+    }
+
     ByteBufferConsensusRequest request = new ByteBufferConsensusRequest(req.fragmentInstance.body);
-    QueryType type = QueryType.valueOf(req.queryType);
-    ConsensusGroupId groupId =
-        ConsensusGroupId.Factory.create(
-            req.consensusGroupId.id, GroupType.valueOf(req.consensusGroupId.type));
+    QueryType type = fragmentInstance.getType();
+    ConsensusGroupId groupId = fragmentInstance.getRegionReplicaSet().getConsensusGroupId();
+
+    if (fragmentInstance.getRegionReplicaSet() == null
+        || fragmentInstance.getRegionReplicaSet().isEmpty()) {
+      String msg = "Unknown regions to write, since getRegionReplicaSet is empty.";
+      LOGGER.error(msg);
+      response.setAccepted(false);
+      response.setMessage(msg);
+      return response;
+    }
+    consensusManager.addConsensusGroup(fragmentInstance.getRegionReplicaSet());
+
     switch (type) {
       case READ:
         ConsensusReadResponse readResp = ConsensusImpl.getInstance().read(groupId, request);
         FragmentInstanceInfo info = (FragmentInstanceInfo) readResp.getDataset();
         return new TSendFragmentInstanceResp(info.getState().isFailed());
       case WRITE:
-        ConsensusWriteResponse writeResp = ConsensusImpl.getInstance().write(groupId, request);
-        // TODO: (xingtanzjr) need to distinguish more conditions for response status.
-        boolean accepted =
-            writeResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
-        return new TSendFragmentInstanceResp(accepted);
+        TSStatus status =
+            consensusManager
+                .write(
+                    fragmentInstance.getRegionReplicaSet().getConsensusGroupId(), fragmentInstance)
+                .getStatus();
+        // TODO need consider more status
+        if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == status.getCode()) {
+          response.setAccepted(true);
+        } else {
+          response.setAccepted(false);
+        }
+        response.setMessage(status.message);
+        return response;
     }
     return null;
   }
 
   @Override
-  public TFragmentInstanceStateResp fetchFragmentInstanceState(TFetchFragmentInstanceStateReq req)
-      throws TException {
+  public TFragmentInstanceStateResp fetchFragmentInstanceState(TFetchFragmentInstanceStateReq req) {
     FragmentInstanceInfo info =
         FragmentInstanceManager.getInstance()
             .getInstanceInfo(FragmentInstanceId.fromThrift(req.fragmentInstanceId));
@@ -101,4 +137,8 @@ public class InternalServiceImpl implements InternalService.Iface {
   public SchemaFetchResponse fetchSchema(SchemaFetchRequest req) throws TException {
     throw new UnsupportedOperationException();
   }
+
+  public void close() throws IOException {
+    consensusManager.close();
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
index dea1e9b031..ce29fee307 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.tsfile.read.filter.operator.Gt;
 
 import org.junit.Test;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 
@@ -53,7 +54,7 @@ import static org.junit.Assert.assertEquals;
 public class FragmentInstanceSerdeTest {
 
   @Test
-  public void TestSerializeAndDeserializeForTree1() throws IllegalPathException {
+  public void TestSerializeAndDeserializeForTree1() throws IllegalPathException, IOException {
     FragmentInstance fragmentInstance =
         new FragmentInstance(
             new PlanFragment(new PlanFragmentId("test", -1), constructPlanNodeTree()),
@@ -62,7 +63,7 @@ public class FragmentInstanceSerdeTest {
             QueryType.READ);
     RegionReplicaSet regionReplicaSet =
         new RegionReplicaSet(new DataRegionId(1), new ArrayList<>());
-    fragmentInstance.setDataRegionId(regionReplicaSet);
+    fragmentInstance.setRegionReplicaSet(regionReplicaSet);
     fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.1", 6666));
 
     ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
@@ -73,7 +74,7 @@ public class FragmentInstanceSerdeTest {
   }
 
   @Test
-  public void TestSerializeAndDeserializeWithNullFilter() throws IllegalPathException {
+  public void TestSerializeAndDeserializeWithNullFilter() throws IllegalPathException, IOException {
     FragmentInstance fragmentInstance =
         new FragmentInstance(
             new PlanFragment(new PlanFragmentId("test2", 1), constructPlanNodeTree()),
@@ -82,7 +83,7 @@ public class FragmentInstanceSerdeTest {
             QueryType.READ);
     RegionReplicaSet regionReplicaSet =
         new RegionReplicaSet(new DataRegionId(1), new ArrayList<>());
-    fragmentInstance.setDataRegionId(regionReplicaSet);
+    fragmentInstance.setRegionReplicaSet(regionReplicaSet);
     fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.2", 6667));
 
     ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/PlanNodeDeserializeHelper.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/PlanNodeDeserializeHelper.java
index 55894de485..05b62153fb 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/PlanNodeDeserializeHelper.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/PlanNodeDeserializeHelper.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.sql.plan.node;
 
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
 
@@ -25,7 +26,7 @@ import java.nio.ByteBuffer;
 
 public class PlanNodeDeserializeHelper {
 
-  public static PlanNode deserialize(ByteBuffer byteBuffer) {
+  public static PlanNode deserialize(ByteBuffer byteBuffer) throws IllegalPathException {
     PlanNode root = PlanNodeType.deserialize(byteBuffer);
     int childrenCount = byteBuffer.getInt();
     for (int i = 0; i < childrenCount; i++) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/metadata/read/ShowDevicesNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/metadata/read/ShowDevicesNodeSerdeTest.java
index 6139bb33df..80ca97c47f 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/metadata/read/ShowDevicesNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/metadata/read/ShowDevicesNodeSerdeTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.sql.plan.node.metadata.read;
 
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.mpp.sql.plan.node.PlanNodeDeserializeHelper;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.ShowDevicesNode;
@@ -31,7 +32,7 @@ import static org.junit.Assert.assertEquals;
 public class ShowDevicesNodeSerdeTest {
 
   @Test
-  public void TestSerializeAndDeserialize() {
+  public void TestSerializeAndDeserialize() throws IllegalPathException {
     ShowDevicesNode showDevicesNode = new ShowDevicesNode(new PlanNodeId("TestShowDevicesNode"));
     ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
     showDevicesNode.serialize(byteBuffer);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/sink/FragmentSinkNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/sink/FragmentSinkNodeSerdeTest.java
index 0e5618373b..2aa2610bde 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/sink/FragmentSinkNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/sink/FragmentSinkNodeSerdeTest.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.mpp.sql.plan.node.sink;
 
 import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.sql.plan.node.PlanNodeDeserializeHelper;
@@ -35,7 +36,7 @@ import static org.junit.Assert.assertEquals;
 public class FragmentSinkNodeSerdeTest {
 
   @Test
-  public void TestSerializeAndDeserialize() {
+  public void TestSerializeAndDeserialize() throws IllegalPathException {
     FragmentSinkNode fragmentSinkNode =
         new FragmentSinkNode(new PlanNodeId("TestFragmentSinkNode"));
     fragmentSinkNode.addChild(new ShowDevicesNode(new PlanNodeId("TestShowDevicesNode")));
diff --git a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
new file mode 100644
index 0000000000..c08553c1d9
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service;
+
+import org.apache.iotdb.commons.cluster.DataNodeLocation;
+import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.LocalConfigManager;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
+import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
+
+import org.apache.ratis.util.FileUtils;
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+public class InternalServiceImplTest {
+  private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
+  InternalServiceImpl internalServiceImpl;
+  LocalConfigManager configManager;
+
+  @Before
+  public void setUp() throws Exception {
+    IoTDB.configManager.init();
+    internalServiceImpl = new InternalServiceImpl();
+    configManager = LocalConfigManager.getInstance();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    IoTDB.configManager.clear();
+    EnvironmentUtils.cleanEnv();
+    internalServiceImpl.close();
+    FileUtils.deleteFully(new File("data" + File.separator + "consensus"));
+  }
+
+  @Test
+  public void createTimeseriesTest() throws MetadataException, TException {
+    configManager.getBelongedSchemaRegionWithAutoCreate(new PartialPath("root.ln"));
+    CreateTimeSeriesNode createTimeSeriesNode =
+        new CreateTimeSeriesNode(
+            new PlanNodeId("0"),
+            new PartialPath("root.ln.wf01.wt01.status"),
+            TSDataType.BOOLEAN,
+            TSEncoding.PLAIN,
+            CompressionType.SNAPPY,
+            new HashMap<String, String>() {
+              {
+                put("MAX_POINT_NUMBER", "3");
+              }
+            },
+            new HashMap<String, String>() {
+              {
+                put("tag1", "v1");
+                put("tag2", "v2");
+              }
+            },
+            new HashMap<String, String>() {
+              {
+                put("attr1", "a1");
+                put("attr2", "a2");
+              }
+            },
+            "meter1");
+
+    List<DataNodeLocation> dataNodeList = new ArrayList<>();
+    dataNodeList.add(
+        new DataNodeLocation(
+            conf.getConsensusPort(), new Endpoint(conf.getInternalIp(), conf.getConsensusPort())));
+
+    // construct fragmentInstance
+    SchemaRegionId schemaRegionId = new SchemaRegionId(0);
+    RegionReplicaSet regionReplicaSet = new RegionReplicaSet(schemaRegionId, dataNodeList);
+    PlanFragment planFragment = new PlanFragment(new PlanFragmentId("2", 3), createTimeSeriesNode);
+    FragmentInstance fragmentInstance =
+        new FragmentInstance(planFragment, 4, new GroupByFilter(1, 2, 3, 4), QueryType.WRITE);
+    fragmentInstance.setRegionReplicaSet(regionReplicaSet);
+    fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.1", 6666));
+
+    // serialize fragmentInstance
+    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+    fragmentInstance.serializeRequest(byteBuffer);
+    byteBuffer.flip();
+
+    // put serialized fragmentInstance to TSendFragmentInstanceReq
+    TSendFragmentInstanceReq request = new TSendFragmentInstanceReq();
+    TFragmentInstance tFragmentInstance = new TFragmentInstance();
+    tFragmentInstance.setBody(byteBuffer);
+    request.setFragmentInstance(tFragmentInstance);
+
+    // Use consensus layer to execute request
+    TSendFragmentInstanceResp response = internalServiceImpl.sendFragmentInstance(request);
+
+    Assert.assertTrue(response.accepted);
+  }
+}