You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/13 14:45:35 UTC

[iotdb] branch master updated: Make some modifications according to talking for recent PRs of SchemaRegion (#5498)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ade542133 Make some modifications according to talking for recent PRs of SchemaRegion (#5498)
8ade542133 is described below

commit 8ade54213392786ce8032bb731efd709daeb4fcf
Author: Zhang.Jinrui <xi...@gmail.com>
AuthorDate: Wed Apr 13 22:45:30 2022 +0800

    Make some modifications according to talking for recent PRs of SchemaRegion (#5498)
---
 .../server/ConfigNodeRPCServerProcessorTest.java   | 63 ++++++-----------
 .../iotdb/commons/consensus/ConsensusGroupId.java  |  5 +-
 .../iotdb/commons/partition/RegionReplicaSet.java  |  3 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 16 +----
 .../iotdb/db/consensus/ConsensusManager.java       | 72 --------------------
 .../consensus/statemachine/BaseStateMachine.java   | 10 +--
 .../iotdb/db/metadata/Executor/SchemaVisitor.java  | 43 +++++++++++-
 .../db/mpp/sql/planner/plan/FragmentInstance.java  | 25 +++----
 .../db/mpp/sql/planner/plan/PlanFragment.java      |  5 +-
 .../db/mpp/sql/planner/plan/node/PlanNode.java     |  6 --
 .../db/mpp/sql/planner/plan/node/PlanNodeType.java |  3 +-
 .../db/mpp/sql/planner/plan/node/PlanVisitor.java  | 11 ++-
 .../node/metedata/read/DevicesSchemaScanNode.java  | 10 ++-
 .../metedata/read/TimeSeriesSchemaScanNode.java    | 10 ++-
 .../write/CreateAlignedTimeSeriesNode.java         | 21 ++----
 .../node/metedata/write/CreateTimeSeriesNode.java  | 24 ++-----
 .../planner/plan/node/process/DeviceMergeNode.java |  3 +-
 .../planner/plan/node/process/ExchangeNode.java    |  3 +-
 .../plan/node/source/SeriesAggregateScanNode.java  |  6 +-
 .../planner/plan/node/source/SeriesScanNode.java   |  7 +-
 .../iotdb/db/service/InternalServiceImpl.java      | 78 ++++++----------------
 .../thrift/impl/DataNodeManagementServiceImpl.java |  5 --
 .../db/mpp/sql/plan/FragmentInstanceSerdeTest.java |  5 +-
 .../iotdb/db/service/InternalServiceImplTest.java  | 50 ++++++++++----
 server/src/test/resources/iotdb-engine.properties  |  3 +-
 25 files changed, 185 insertions(+), 302 deletions(-)

diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
index 7ed1fe2a2c..0aa164ccd0 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
@@ -268,13 +268,10 @@ public class ConfigNodeRPCServerProcessorTest {
               (tSeriesPartitionSlot, tRegionReplicaSet) -> {
                 Assert.assertEquals(3, tRegionReplicaSet.getEndpointSize());
                 ConsensusGroupId regionId = null;
-                try {
-                  regionId =
-                      ConsensusGroupId.Factory.create(
-                          ByteBuffer.wrap(tRegionReplicaSet.getRegionId()));
-                } catch (IOException ignore) {
-                  // Ignore
-                }
+                regionId =
+                    ConsensusGroupId.Factory.create(
+                        ByteBuffer.wrap(tRegionReplicaSet.getRegionId()));
+
                 Assert.assertTrue(regionId instanceof SchemaRegionId);
               });
     }
@@ -298,13 +295,11 @@ public class ConfigNodeRPCServerProcessorTest {
               (tSeriesPartitionSlot, tRegionReplicaSet) -> {
                 Assert.assertEquals(3, tRegionReplicaSet.getEndpointSize());
                 ConsensusGroupId regionId = null;
-                try {
-                  regionId =
-                      ConsensusGroupId.Factory.create(
-                          ByteBuffer.wrap(tRegionReplicaSet.getRegionId()));
-                } catch (IOException ignore) {
-                  // Ignore
-                }
+
+                regionId =
+                    ConsensusGroupId.Factory.create(
+                        ByteBuffer.wrap(tRegionReplicaSet.getRegionId()));
+
                 Assert.assertTrue(regionId instanceof SchemaRegionId);
               });
     }
@@ -327,13 +322,8 @@ public class ConfigNodeRPCServerProcessorTest {
             (tSeriesPartitionSlot, tRegionReplicaSet) -> {
               Assert.assertEquals(3, tRegionReplicaSet.getEndpointSize());
               ConsensusGroupId regionId = null;
-              try {
-                regionId =
-                    ConsensusGroupId.Factory.create(
-                        ByteBuffer.wrap(tRegionReplicaSet.getRegionId()));
-              } catch (IOException ignore) {
-                // Ignore
-              }
+              regionId =
+                  ConsensusGroupId.Factory.create(ByteBuffer.wrap(tRegionReplicaSet.getRegionId()));
               Assert.assertTrue(regionId instanceof SchemaRegionId);
             });
     // Check "root.sg1"
@@ -345,13 +335,8 @@ public class ConfigNodeRPCServerProcessorTest {
             (tSeriesPartitionSlot, tRegionReplicaSet) -> {
               Assert.assertEquals(3, tRegionReplicaSet.getEndpointSize());
               ConsensusGroupId regionId = null;
-              try {
-                regionId =
-                    ConsensusGroupId.Factory.create(
-                        ByteBuffer.wrap(tRegionReplicaSet.getRegionId()));
-              } catch (IOException ignore) {
-                // Ignore
-              }
+              regionId =
+                  ConsensusGroupId.Factory.create(ByteBuffer.wrap(tRegionReplicaSet.getRegionId()));
               Assert.assertTrue(regionId instanceof SchemaRegionId);
             });
   }
@@ -413,19 +398,15 @@ public class ConfigNodeRPCServerProcessorTest {
                   .size());
           // Is DataRegion
           ConsensusGroupId regionId = null;
-          try {
-            regionId =
-                ConsensusGroupId.Factory.create(
-                    ByteBuffer.wrap(
-                        dataPartitionMap
-                            .get(storageGroup)
-                            .get(seriesPartitionSlot)
-                            .get(timePartitionSlot)
-                            .get(0)
-                            .getRegionId()));
-          } catch (IOException ignore) {
-            // Ignore
-          }
+          regionId =
+              ConsensusGroupId.Factory.create(
+                  ByteBuffer.wrap(
+                      dataPartitionMap
+                          .get(storageGroup)
+                          .get(seriesPartitionSlot)
+                          .get(timePartitionSlot)
+                          .get(0)
+                          .getRegionId()));
           Assert.assertTrue(regionId instanceof DataRegionId);
           // Including three RegionReplica
           Assert.assertEquals(
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
index 13e38df6c0..ba38960173 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.commons.consensus;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 
 public interface ConsensusGroupId {
@@ -39,10 +38,10 @@ public interface ConsensusGroupId {
   GroupType getType();
 
   class Factory {
-    public static ConsensusGroupId create(ByteBuffer buffer) throws IOException {
+    public static ConsensusGroupId create(ByteBuffer buffer) {
       int index = buffer.get();
       if (index >= GroupType.values().length) {
-        throw new IOException("unrecognized id type " + index);
+        throw new IllegalArgumentException("invalid ConsensusGroup type. Ordinal is: " + index);
       }
       GroupType type = GroupType.values()[index];
       ConsensusGroupId groupId = createEmpty(type);
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
index 2217f8832c..d98695824d 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.commons.partition;
 import org.apache.iotdb.commons.cluster.DataNodeLocation;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -70,7 +69,7 @@ public class RegionReplicaSet {
         });
   }
 
-  public static RegionReplicaSet deserializeImpl(ByteBuffer buffer) throws IOException {
+  public static RegionReplicaSet deserializeImpl(ByteBuffer buffer) {
     ConsensusGroupId consensusGroupId = ConsensusGroupId.Factory.create(buffer);
 
     int size = buffer.getInt();
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index a7603576a1..5a87a42625 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -45,8 +45,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -844,10 +842,10 @@ public class IoTDBConfig {
   /**
    * Ip and port of config nodes. each one is a {internalIp | domain name}:{meta port} string tuple.
    */
-  private List<String> configNodeUrls;
+  private List<String> configNodeUrls = new ArrayList<>();
 
   /** Internal ip for data node */
-  private String internalIp;
+  private String internalIp = "127.0.0.1";
 
   /** Internal port for coordinator */
   private int internalPort = 9003;
@@ -888,16 +886,6 @@ public class IoTDBConfig {
   /** Thread keep alive time in ms of data block manager. */
   private int dataBlockManagerKeepAliveTimeInMs = 1000;
 
-  public IoTDBConfig() {
-    try {
-      internalIp = InetAddress.getLocalHost().getHostAddress();
-    } catch (UnknownHostException e) {
-      logger.error(e.getMessage());
-      internalIp = "127.0.0.1";
-    }
-    configNodeUrls = new ArrayList<>();
-  }
-
   public float getUdfMemoryBudgetInMB() {
     return udfMemoryBudgetInMB;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusManager.java b/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusManager.java
deleted file mode 100644
index 1a036908ff..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusManager.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.consensus;
-
-import org.apache.iotdb.commons.cluster.DataNodeLocation;
-import org.apache.iotdb.commons.consensus.ConsensusGroupId;
-import org.apache.iotdb.commons.partition.RegionReplicaSet;
-import org.apache.iotdb.consensus.IConsensus;
-import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
-import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/** DataNode Consensus layer manager */
-public class ConsensusManager {
-  private static final Logger LOGGER = LoggerFactory.getLogger(ConsensusManager.class);
-
-  private IConsensus consensusImpl;
-
-  public ConsensusManager() throws IOException {
-    consensusImpl = ConsensusImpl.getInstance();
-    consensusImpl.start();
-  }
-
-  public void addConsensusGroup(RegionReplicaSet regionReplicaSet) {
-    ConsensusGroupId consensusGroupId = regionReplicaSet.getConsensusGroupId();
-    List<Peer> peerList = new ArrayList<>();
-    for (DataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeList()) {
-      peerList.add(new Peer(consensusGroupId, dataNodeLocation.getEndPoint()));
-    }
-    consensusImpl.addConsensusGroup(consensusGroupId, peerList);
-  }
-
-  /** Transmit FragmentInstance to datanode.consensus.statemachine */
-  public ConsensusWriteResponse write(ConsensusGroupId consensusGroupId, IConsensusRequest plan) {
-    return consensusImpl.write(consensusGroupId, plan);
-  }
-
-  /** Transmit FragmentInstance to datanode.consensus.statemachine */
-  public ConsensusReadResponse read(ConsensusGroupId consensusGroupId, FragmentInstance plan) {
-    return consensusImpl.read(consensusGroupId, plan);
-  }
-
-  public void close() throws IOException {
-    consensusImpl.stop();
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
index a7ef03b32e..873d652013 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
@@ -24,15 +24,12 @@ import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.statemachine.IStateMachine;
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-
 public abstract class BaseStateMachine implements IStateMachine {
 
   private static final Logger logger = LoggerFactory.getLogger(BaseStateMachine.class);
@@ -41,7 +38,7 @@ public abstract class BaseStateMachine implements IStateMachine {
   public TSStatus write(IConsensusRequest request) {
     try {
       return write(getFragmentInstance(request));
-    } catch (IllegalArgumentException | IllegalPathException | IOException e) {
+    } catch (IllegalArgumentException e) {
       logger.error(e.getMessage());
       return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
     }
@@ -53,7 +50,7 @@ public abstract class BaseStateMachine implements IStateMachine {
   public DataSet read(IConsensusRequest request) {
     try {
       return read(getFragmentInstance(request));
-    } catch (IllegalArgumentException | IllegalPathException | IOException e) {
+    } catch (IllegalArgumentException e) {
       logger.error(e.getMessage());
       return null;
     }
@@ -61,8 +58,7 @@ public abstract class BaseStateMachine implements IStateMachine {
 
   protected abstract DataSet read(FragmentInstance fragmentInstance);
 
-  private FragmentInstance getFragmentInstance(IConsensusRequest request)
-      throws IllegalPathException, IOException {
+  private FragmentInstance getFragmentInstance(IConsensusRequest request) {
     FragmentInstance instance;
     if (request instanceof ByteBufferConsensusRequest) {
       instance =
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/Executor/SchemaVisitor.java b/server/src/main/java/org/apache/iotdb/db/metadata/Executor/SchemaVisitor.java
index 08252364ce..5ed7cbf395 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/Executor/SchemaVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/Executor/SchemaVisitor.java
@@ -25,10 +25,14 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,7 +44,8 @@ public class SchemaVisitor extends PlanVisitor<TSStatus, ISchemaRegion> {
   @Override
   public TSStatus visitCreateTimeSeries(CreateTimeSeriesNode node, ISchemaRegion schemaRegion) {
     try {
-      schemaRegion.createTimeseries((CreateTimeSeriesPlan) node.transferToPhysicalPlan(), -1);
+      PhysicalPlan plan = node.accept(new PhysicalPlanTransformer(), new TransformerContext());
+      schemaRegion.createTimeseries((CreateTimeSeriesPlan) plan, -1);
     } catch (MetadataException e) {
       logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
       return RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, e.getMessage());
@@ -52,4 +57,40 @@ public class SchemaVisitor extends PlanVisitor<TSStatus, ISchemaRegion> {
   public TSStatus visitPlan(PlanNode node, ISchemaRegion context) {
     return null;
   }
+
+  private static class PhysicalPlanTransformer
+      extends PlanVisitor<PhysicalPlan, TransformerContext> {
+    @Override
+    public PhysicalPlan visitPlan(PlanNode node, TransformerContext context) {
+      throw new NotImplementedException();
+    }
+
+    public PhysicalPlan visitCreateTimeSeries(
+        CreateTimeSeriesNode node, TransformerContext context) {
+      return new CreateTimeSeriesPlan(
+          node.getPath(),
+          node.getDataType(),
+          node.getEncoding(),
+          node.getCompressor(),
+          node.getProps(),
+          node.getTags(),
+          node.getAttributes(),
+          node.getAlias());
+    }
+
+    public PhysicalPlan visitCreateAlignedTimeSeries(
+        CreateAlignedTimeSeriesNode node, TransformerContext context) {
+      return new CreateAlignedTimeSeriesPlan(
+          node.getDevicePath(),
+          node.getMeasurements(),
+          node.getDataTypes(),
+          node.getEncodings(),
+          node.getCompressors(),
+          node.getAliasList(),
+          node.getTagsList(),
+          node.getAttributesList());
+    }
+  }
+
+  private static class TransformerContext {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index a9b259c693..3e55a5d32c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.mpp.sql.planner.plan;
 import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
@@ -32,7 +31,6 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 
@@ -115,23 +113,20 @@ public class FragmentInstance implements IConsensusRequest {
   public String toString() {
     StringBuilder ret = new StringBuilder();
     ret.append(String.format("FragmentInstance-%s:", getId()));
-    if (getHostEndpoint() == null) {
-      ret.append(String.format("host endpoint has not set."));
-    } else {
-      ret.append(String.format("host endpoint: %s.", getHostEndpoint().toString()));
-    }
-    if (getRegionReplicaSet() == null) {
-      ret.append(String.format("Region Replica set has not set.\n"));
-    } else {
-      ret.append(String.format("Region Replica set: %s.\n", getRegionReplicaSet().toString()));
-    }
+    ret.append(
+        String.format("Host: %s", getHostEndpoint() == null ? "Not set" : getHostEndpoint()));
+    ret.append(
+        String.format(
+            "Region: %s",
+            getRegionReplicaSet() == null
+                ? "Not set"
+                : getRegionReplicaSet().getConsensusGroupId()));
     ret.append("---- Plan Node Tree ----\n");
     ret.append(PlanNodeUtil.nodeToString(getFragment().getRoot()));
     return ret.toString();
   }
 
-  public static FragmentInstance deserializeFrom(ByteBuffer buffer)
-      throws IllegalPathException, IOException {
+  public static FragmentInstance deserializeFrom(ByteBuffer buffer) {
     FragmentInstanceId id = FragmentInstanceId.deserialize(buffer);
     PlanFragment planFragment = PlanFragment.deserialize(buffer);
     boolean hasTimeFilter = ReadWriteIOUtils.readBool(buffer);
@@ -148,7 +143,6 @@ public class FragmentInstance implements IConsensusRequest {
 
   @Override
   public void serializeRequest(ByteBuffer buffer) {
-    buffer.mark();
     id.serialize(buffer);
     fragment.serialize(buffer);
     ReadWriteIOUtils.write(timeFilter != null, buffer);
@@ -157,7 +151,6 @@ public class FragmentInstance implements IConsensusRequest {
     }
     ReadWriteIOUtils.write(type.ordinal(), buffer);
     regionReplicaSet.serializeImpl(buffer);
-
     hostEndpoint.serializeImpl(buffer);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
index 0f00703c86..38365ec4af 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.sql.planner.plan;
 
 import org.apache.iotdb.commons.partition.RegionReplicaSet;
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -101,12 +100,12 @@ public class PlanFragment {
     root.serialize(byteBuffer);
   }
 
-  public static PlanFragment deserialize(ByteBuffer byteBuffer) throws IllegalPathException {
+  public static PlanFragment deserialize(ByteBuffer byteBuffer) {
     return new PlanFragment(PlanFragmentId.deserialize(byteBuffer), deserializeHelper(byteBuffer));
   }
 
   // deserialize the plan node recursively
-  public static PlanNode deserializeHelper(ByteBuffer byteBuffer) throws IllegalPathException {
+  public static PlanNode deserializeHelper(ByteBuffer byteBuffer) {
     PlanNode root = PlanNodeType.deserialize(byteBuffer);
     int childrenCount = byteBuffer.getInt();
     for (int i = 0; i < childrenCount; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
index 5df9022b84..d99c3361bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node;
 
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.apache.commons.lang.Validate;
@@ -115,9 +114,4 @@ public abstract class PlanNode {
   public int hashCode() {
     return Objects.hash(id);
   }
-
-  // TODO (yifuzhou) will remote later
-  public PhysicalPlan transferToPhysicalPlan() {
-    return null;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
index 85d9f78d2f..e5cd7ff8be 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node;
 
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.DevicesSchemaScanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.ShowDevicesNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
@@ -87,7 +86,7 @@ public enum PlanNodeType {
     buffer.putShort(nodeType);
   }
 
-  public static PlanNode deserialize(ByteBuffer buffer) throws IllegalPathException {
+  public static PlanNode deserialize(ByteBuffer buffer) {
     short nodeType = buffer.getShort();
     switch (nodeType) {
       case 0:
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
index 953db9f514..e5a5dfe8c3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.DevicesSchema
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaMergeNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaScanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.AggregateNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.DeviceMergeNode;
@@ -106,12 +107,16 @@ public abstract class PlanVisitor<R, C> {
     return visitPlan(node, context);
   }
 
+  public R visitCreateAlignedTimeSeries(CreateAlignedTimeSeriesNode node, C context) {
+    return visitPlan(node, context);
+  }
+
   public R visitTimeSeriesMetaScan(TimeSeriesSchemaScanNode node, C context) {
-    return visitMetaScan(node, context);
+    return visitPlan(node, context);
   }
 
   public R visitDevicesMetaScan(DevicesSchemaScanNode node, C context) {
-    return visitMetaScan(node, context);
+    return visitPlan(node, context);
   }
 
   public R visitFragmentSink(FragmentSinkNode node, C context) {
@@ -119,6 +124,6 @@ public abstract class PlanVisitor<R, C> {
   }
 
   public R visitCreateTimeSeries(CreateTimeSeriesNode node, C context) {
-    return null;
+    return visitPlan(node, context);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/DevicesSchemaScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/DevicesSchemaScanNode.java
index da381349ff..43be8ea836 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/DevicesSchemaScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/DevicesSchemaScanNode.java
@@ -89,12 +89,16 @@ public class DevicesSchemaScanNode extends SchemaScanNode {
     ReadWriteIOUtils.write(hasSgCol, byteBuffer);
   }
 
-  public static DevicesSchemaScanNode deserialize(ByteBuffer byteBuffer)
-      throws IllegalPathException {
+  public static DevicesSchemaScanNode deserialize(ByteBuffer byteBuffer) {
     String id = ReadWriteIOUtils.readString(byteBuffer);
     PlanNodeId planNodeId = new PlanNodeId(id);
     String fullPath = ReadWriteIOUtils.readString(byteBuffer);
-    PartialPath path = new PartialPath(fullPath);
+    PartialPath path = null;
+    try {
+      path = new PartialPath(fullPath);
+    } catch (IllegalPathException e) {
+      throw new IllegalArgumentException("Cannot deserialize DevicesSchemaScanNode", e);
+    }
     int limit = ReadWriteIOUtils.readInt(byteBuffer);
     int offset = ReadWriteIOUtils.readInt(byteBuffer);
     boolean isPrefixPath = ReadWriteIOUtils.readBool(byteBuffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/TimeSeriesSchemaScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/TimeSeriesSchemaScanNode.java
index c7feee1a82..1e2cc49c25 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/TimeSeriesSchemaScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/TimeSeriesSchemaScanNode.java
@@ -79,12 +79,16 @@ public class TimeSeriesSchemaScanNode extends SchemaScanNode {
     ReadWriteIOUtils.write(isPrefixPath, byteBuffer);
   }
 
-  public static TimeSeriesSchemaScanNode deserialize(ByteBuffer byteBuffer)
-      throws IllegalPathException {
+  public static TimeSeriesSchemaScanNode deserialize(ByteBuffer byteBuffer) {
     String id = ReadWriteIOUtils.readString(byteBuffer);
     PlanNodeId planNodeId = new PlanNodeId(id);
     String fullPath = ReadWriteIOUtils.readString(byteBuffer);
-    PartialPath path = new PartialPath(fullPath);
+    PartialPath path = null;
+    try {
+      path = new PartialPath(fullPath);
+    } catch (IllegalPathException e) {
+      throw new IllegalArgumentException("Cannot deserialize TimeSeriesSchemaScanNode", e);
+    }
     String key = ReadWriteIOUtils.readString(byteBuffer);
     String value = ReadWriteIOUtils.readString(byteBuffer);
     int limit = ReadWriteIOUtils.readInt(byteBuffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
index 89605a8704..b6d0a56143 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
@@ -24,8 +24,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -160,6 +159,11 @@ public class CreateAlignedTimeSeriesNode extends PlanNode {
     return NO_CHILD_ALLOWED;
   }
 
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C schemaRegion) {
+    return visitor.visitCreateAlignedTimeSeries(this, schemaRegion);
+  }
+
   @Override
   public void serialize(ByteBuffer byteBuffer) {
     byteBuffer.putShort((short) PlanNodeType.CREATE_ALIGNED_TIME_SERIES.ordinal());
@@ -354,19 +358,6 @@ public class CreateAlignedTimeSeriesNode extends PlanNode {
         attributesList);
   }
 
-  @Override
-  public PhysicalPlan transferToPhysicalPlan() {
-    return new CreateAlignedTimeSeriesPlan(
-        getDevicePath(),
-        getMeasurements(),
-        getDataTypes(),
-        getEncodings(),
-        getCompressors(),
-        getAliasList(),
-        getTagsList(),
-        getAttributesList());
-  }
-
   //  @Override
   //  public void executeOn(SchemaRegion schemaRegion) throws MetadataException {
   //    schemaRegion.createAlignedTimeSeries((CreateAlignedTimeSeriesPlan)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
index 45b7856878..36d9f35db6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
@@ -24,8 +24,6 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -163,21 +161,7 @@ public class CreateTimeSeriesNode extends PlanNode {
     return NO_CHILD_ALLOWED;
   }
 
-  @Override
-  public PhysicalPlan transferToPhysicalPlan() {
-    return new CreateTimeSeriesPlan(
-        getPath(),
-        getDataType(),
-        getEncoding(),
-        getCompressor(),
-        getProps(),
-        getTags(),
-        getAttributes(),
-        getAlias());
-  }
-
-  public static CreateTimeSeriesNode deserialize(ByteBuffer byteBuffer)
-      throws IllegalPathException {
+  public static CreateTimeSeriesNode deserialize(ByteBuffer byteBuffer) {
     String id;
     PartialPath path = null;
     TSDataType dataType;
@@ -193,7 +177,11 @@ public class CreateTimeSeriesNode extends PlanNode {
     int length = byteBuffer.getInt();
     byte[] bytes = new byte[length];
     byteBuffer.get(bytes);
-    path = new PartialPath(new String(bytes));
+    try {
+      path = new PartialPath(new String(bytes));
+    } catch (IllegalPathException e) {
+      throw new IllegalArgumentException("Cannot deserialize CreateTimeSeriesNode", e);
+    }
     dataType = TSDataType.values()[byteBuffer.get()];
     encoding = TSEncoding.values()[byteBuffer.get()];
     compressor = CompressionType.values()[byteBuffer.get()];
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
index ec40d5df52..d427b85809 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
 import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
@@ -156,7 +155,7 @@ public class DeviceMergeNode extends ProcessNode implements IOutputPlanNode {
     }
   }
 
-  public static DeviceMergeNode deserialize(ByteBuffer byteBuffer) throws IllegalPathException {
+  public static DeviceMergeNode deserialize(ByteBuffer byteBuffer) {
     int orderByIndex = ReadWriteIOUtils.readInt(byteBuffer);
     OrderBy orderBy = OrderBy.values()[orderByIndex];
     FilterNullComponent filterNullComponent = FilterNullComponent.deserialize(byteBuffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index ec0cb655d6..5947fc3d21 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
 import org.apache.iotdb.commons.cluster.Endpoint;
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
@@ -87,7 +86,7 @@ public class ExchangeNode extends PlanNode {
     this.upstreamPlanNodeId = nodeId;
   }
 
-  public static ExchangeNode deserialize(ByteBuffer byteBuffer) throws IllegalPathException {
+  public static ExchangeNode deserialize(ByteBuffer byteBuffer) {
     FragmentSinkNode fragmentSinkNode =
         (FragmentSinkNode) PlanFragment.deserializeHelper(byteBuffer);
     Endpoint endPoint =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
index a3002ebeb9..87c2ff1d05 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
@@ -196,11 +196,7 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
 
     // TODO serialize groupByTimeParameter
     RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
-    try {
-      regionReplicaSet.deserializeImpl(byteBuffer);
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
+    RegionReplicaSet.deserializeImpl(byteBuffer);
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
     SeriesAggregateScanNode seriesAggregateScanNode =
         new SeriesAggregateScanNode(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
index 3511a4c9c3..9700720466 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import com.google.common.collect.ImmutableList;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -211,11 +210,7 @@ public class SeriesScanNode extends SourceNode implements IOutputPlanNode {
     int limit = ReadWriteIOUtils.readInt(byteBuffer);
     int offset = ReadWriteIOUtils.readInt(byteBuffer);
     RegionReplicaSet dataRegionReplicaSet = new RegionReplicaSet();
-    try {
-      dataRegionReplicaSet.deserializeImpl(byteBuffer);
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
+    RegionReplicaSet.deserializeImpl(byteBuffer);
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
     SeriesScanNode seriesScanNode = new SeriesScanNode(planNodeId, partialPath);
     seriesScanNode.allSensors = allSensors;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
index 959e2879a3..190655719f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
@@ -19,18 +19,16 @@
 
 package org.apache.iotdb.db.service;
 
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.GroupType;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.db.consensus.ConsensusImpl;
-import org.apache.iotdb.db.consensus.ConsensusManager;
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceInfo;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
-import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.InternalService;
 import org.apache.iotdb.mpp.rpc.thrift.SchemaFetchRequest;
 import org.apache.iotdb.mpp.rpc.thrift.SchemaFetchResponse;
@@ -43,68 +41,38 @@ import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceStateResp;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
 
 import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
 
 public class InternalServiceImpl implements InternalService.Iface {
-  private static final Logger LOGGER = LoggerFactory.getLogger(InternalServiceImpl.class);
-
-  private final ConsensusManager consensusManager;
 
-  public InternalServiceImpl() throws IOException {
+  public InternalServiceImpl() {
     super();
-    consensusManager = new ConsensusManager();
   }
 
   @Override
   public TSendFragmentInstanceResp sendFragmentInstance(TSendFragmentInstanceReq req) {
-    TSendFragmentInstanceResp response = new TSendFragmentInstanceResp();
-    FragmentInstance fragmentInstance = null;
-    try {
-      fragmentInstance = FragmentInstance.deserializeFrom(req.fragmentInstance.body);
-    } catch (IOException | IllegalPathException e) {
-      LOGGER.error(e.getMessage());
-      response.setAccepted(false);
-      response.setMessage(e.getMessage());
-      return response;
-    }
-
-    ByteBufferConsensusRequest request = new ByteBufferConsensusRequest(req.fragmentInstance.body);
-    QueryType type = fragmentInstance.getType();
-    ConsensusGroupId groupId = fragmentInstance.getRegionReplicaSet().getConsensusGroupId();
-
-    if (fragmentInstance.getRegionReplicaSet() == null
-        || fragmentInstance.getRegionReplicaSet().isEmpty()) {
-      String msg = "Unknown regions to write, since getRegionReplicaSet is empty.";
-      LOGGER.error(msg);
-      response.setAccepted(false);
-      response.setMessage(msg);
-      return response;
-    }
-    consensusManager.addConsensusGroup(fragmentInstance.getRegionReplicaSet());
-
+    QueryType type = QueryType.valueOf(req.queryType);
+    ConsensusGroupId groupId =
+        ConsensusGroupId.Factory.create(
+            req.consensusGroupId.id, GroupType.valueOf(req.consensusGroupId.type));
     switch (type) {
       case READ:
-        ConsensusReadResponse readResp = ConsensusImpl.getInstance().read(groupId, request);
+        ConsensusReadResponse readResp =
+            ConsensusImpl.getInstance()
+                .read(groupId, new ByteBufferConsensusRequest(req.fragmentInstance.body));
         FragmentInstanceInfo info = (FragmentInstanceInfo) readResp.getDataset();
         return new TSendFragmentInstanceResp(info.getState().isFailed());
       case WRITE:
-        TSStatus status =
-            consensusManager
-                .write(
-                    fragmentInstance.getRegionReplicaSet().getConsensusGroupId(), fragmentInstance)
-                .getStatus();
+        TSendFragmentInstanceResp response = new TSendFragmentInstanceResp();
+        ConsensusWriteResponse resp =
+            ConsensusImpl.getInstance()
+                .write(groupId, new ByteBufferConsensusRequest(req.fragmentInstance.body));
         // TODO need consider more status
-        if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == status.getCode()) {
-          response.setAccepted(true);
-        } else {
-          response.setAccepted(false);
-        }
-        response.setMessage(status.message);
+        response.setAccepted(
+            TSStatusCode.SUCCESS_STATUS.getStatusCode() == resp.getStatus().getCode());
+        response.setMessage(resp.getStatus().message);
         return response;
     }
     return null;
@@ -120,25 +88,21 @@ public class InternalServiceImpl implements InternalService.Iface {
 
   @Override
   public TCancelResp cancelQuery(TCancelQueryReq req) throws TException {
-    return null;
+    throw new NotImplementedException();
   }
 
   @Override
   public TCancelResp cancelPlanFragment(TCancelPlanFragmentReq req) throws TException {
-    return null;
+    throw new NotImplementedException();
   }
 
   @Override
   public TCancelResp cancelFragmentInstance(TCancelFragmentInstanceReq req) throws TException {
-    return null;
+    throw new NotImplementedException();
   }
 
   @Override
   public SchemaFetchResponse fetchSchema(SchemaFetchRequest req) throws TException {
     throw new UnsupportedOperationException();
   }
-
-  public void close() throws IOException {
-    consensusManager.close();
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeManagementServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeManagementServiceImpl.java
index fdd0a863ad..4976369443 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeManagementServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeManagementServiceImpl.java
@@ -47,7 +47,6 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -94,10 +93,6 @@ public class DataNodeManagementServiceImpl implements ManagementIService.Iface {
       tsStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       tsStatus.setMessage(
           String.format("Create Schema Region failed because of %s", e2.getMessage()));
-    } catch (IOException e3) {
-      LOGGER.error("Can't deserialize regionId", e3);
-      tsStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
-      tsStatus.setMessage(String.format("Can't deserialize regionId %s", e3));
     }
     return tsStatus;
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
index ad1f71af7f..6871ff6560 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
@@ -45,7 +45,6 @@ import org.apache.iotdb.tsfile.read.filter.operator.Gt;
 
 import org.junit.Test;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 
@@ -54,7 +53,7 @@ import static org.junit.Assert.assertEquals;
 public class FragmentInstanceSerdeTest {
 
   @Test
-  public void TestSerializeAndDeserializeForTree1() throws IllegalPathException, IOException {
+  public void TestSerializeAndDeserializeForTree1() throws IllegalPathException {
     FragmentInstance fragmentInstance =
         new FragmentInstance(
             new PlanFragment(new PlanFragmentId("test", -1), constructPlanNodeTree()),
@@ -74,7 +73,7 @@ public class FragmentInstanceSerdeTest {
   }
 
   @Test
-  public void TestSerializeAndDeserializeWithNullFilter() throws IllegalPathException, IOException {
+  public void TestSerializeAndDeserializeWithNullFilter() throws IllegalPathException {
     FragmentInstance fragmentInstance =
         new FragmentInstance(
             new PlanFragment(new PlanFragmentId("test2", 1), constructPlanNodeTree()),
diff --git a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
index 4475eea8fa..7ed08d65b6 100644
--- a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
@@ -23,8 +23,10 @@ import org.apache.iotdb.commons.cluster.DataNodeLocation;
 import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.consensus.ConsensusImpl;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.LocalConfigNode;
 import org.apache.iotdb.db.metadata.path.PartialPath;
@@ -35,6 +37,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.mpp.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
@@ -44,7 +47,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
 
 import org.apache.ratis.util.FileUtils;
-import org.apache.thrift.TException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -64,20 +66,27 @@ public class InternalServiceImplTest {
   @Before
   public void setUp() throws Exception {
     IoTDB.configManager.init();
-    internalServiceImpl = new InternalServiceImpl();
     configNode = LocalConfigNode.getInstance();
+    configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln"));
+    ConsensusImpl.getInstance().start();
+    RegionReplicaSet regionReplicaSet = genRegionReplicaSet();
+    ConsensusImpl.getInstance()
+        .addConsensusGroup(regionReplicaSet.getConsensusGroupId(), genPeerList(regionReplicaSet));
+    internalServiceImpl = new InternalServiceImpl();
   }
 
   @After
   public void tearDown() throws Exception {
     IoTDB.configManager.clear();
+    RegionReplicaSet regionReplicaSet = genRegionReplicaSet();
+    ConsensusImpl.getInstance().removeConsensusGroup(regionReplicaSet.getConsensusGroupId());
+    ConsensusImpl.getInstance().stop();
     EnvironmentUtils.cleanEnv();
-    internalServiceImpl.close();
     FileUtils.deleteFully(new File("data" + File.separator + "consensus"));
   }
 
   @Test
-  public void createTimeseriesTest() throws MetadataException, TException {
+  public void createTimeseriesTest() throws MetadataException {
     configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln"));
     CreateTimeSeriesNode createTimeSeriesNode =
         new CreateTimeSeriesNode(
@@ -105,14 +114,7 @@ public class InternalServiceImplTest {
             },
             "meter1");
 
-    List<DataNodeLocation> dataNodeList = new ArrayList<>();
-    dataNodeList.add(
-        new DataNodeLocation(
-            conf.getConsensusPort(), new Endpoint(conf.getInternalIp(), conf.getConsensusPort())));
-
-    // construct fragmentInstance
-    SchemaRegionId schemaRegionId = new SchemaRegionId(0);
-    RegionReplicaSet regionReplicaSet = new RegionReplicaSet(schemaRegionId, dataNodeList);
+    RegionReplicaSet regionReplicaSet = genRegionReplicaSet();
     PlanFragment planFragment = new PlanFragment(new PlanFragmentId("2", 3), createTimeSeriesNode);
     FragmentInstance fragmentInstance =
         new FragmentInstance(planFragment, 4, new GroupByFilter(1, 2, 3, 4), QueryType.WRITE);
@@ -129,10 +131,34 @@ public class InternalServiceImplTest {
     TFragmentInstance tFragmentInstance = new TFragmentInstance();
     tFragmentInstance.setBody(byteBuffer);
     request.setFragmentInstance(tFragmentInstance);
+    request.setConsensusGroupId(
+        new TConsensusGroupId(
+            regionReplicaSet.getConsensusGroupId().getId(),
+            regionReplicaSet.getConsensusGroupId().getType().toString()));
+    request.setQueryType(QueryType.WRITE.toString());
 
     // Use consensus layer to execute request
     TSendFragmentInstanceResp response = internalServiceImpl.sendFragmentInstance(request);
 
     Assert.assertTrue(response.accepted);
   }
+
+  private RegionReplicaSet genRegionReplicaSet() {
+    List<DataNodeLocation> dataNodeList = new ArrayList<>();
+    dataNodeList.add(
+        new DataNodeLocation(
+            conf.getConsensusPort(), new Endpoint(conf.getInternalIp(), conf.getConsensusPort())));
+
+    // construct fragmentInstance
+    SchemaRegionId schemaRegionId = new SchemaRegionId(0);
+    return new RegionReplicaSet(schemaRegionId, dataNodeList);
+  }
+
+  private List<Peer> genPeerList(RegionReplicaSet regionReplicaSet) {
+    List<Peer> peerList = new ArrayList<>();
+    for (DataNodeLocation node : regionReplicaSet.getDataNodeList()) {
+      peerList.add(new Peer(regionReplicaSet.getConsensusGroupId(), node.getEndPoint()));
+    }
+    return peerList;
+  }
 }
diff --git a/server/src/test/resources/iotdb-engine.properties b/server/src/test/resources/iotdb-engine.properties
index 559698f908..4074a55472 100644
--- a/server/src/test/resources/iotdb-engine.properties
+++ b/server/src/test/resources/iotdb-engine.properties
@@ -26,4 +26,5 @@ trigger_root_dir=target/ext/trigger
 tracing_dir=target/data/tracing
 minimum_schema_file_segment_in_bytes=0
 page_cache_in_schema_file=10
-sync_dir=target/sync
\ No newline at end of file
+internal_ip=0.0.0.0
+sync_dir=target/sync