You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/11 07:58:04 UTC

[iotdb] branch master updated: [IOTDB-2687] Base partition policy of data (#5464)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 703a64d2da [IOTDB-2687] Base partition policy of data (#5464)
703a64d2da is described below

commit 703a64d2da7823e755b0b26b2d4cbfb26ad850b6
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Mon Apr 11 15:57:58 2022 +0800

    [IOTDB-2687] Base partition policy of data (#5464)
---
 confignode/src/assembly/confignode.xml             |   9 +-
 .../resources/conf/iotdb-confignode.properties     |  20 +-
 .../iotdb/confignode/conf/ConfigNodeConf.java      |  25 +-
 .../iotdb/confignode/conf/ConfigNodeConfCheck.java |  29 +-
 .../confignode/conf/ConfigNodeDescriptor.java      |   8 +-
 ...aSet.java => DataNodeConfigurationDataSet.java} |  32 +-
 .../consensus/response/DataNodesInfoDataSet.java   |  12 +-
 .../consensus/response/DataPartitionDataSet.java   | 104 +++++-
 .../consensus/response/SchemaPartitionDataSet.java |  92 ++---
 .../response/StorageGroupSchemaDataSet.java        |  11 +-
 .../iotdb/confignode/manager/ConfigManager.java    |  87 +++--
 .../iotdb/confignode/manager/ConsensusManager.java |  33 +-
 .../iotdb/confignode/manager/DataNodeManager.java  |  87 ++---
 .../apache/iotdb/confignode/manager/Manager.java   |  88 ++---
 .../iotdb/confignode/manager/PartitionManager.java | 193 +++++++----
 .../iotdb/confignode/manager/RegionManager.java    | 126 +++----
 .../iotdb/confignode/partition/DataRegionInfo.java |  66 ----
 .../confignode/partition/SchemaRegionInfo.java     |  59 ----
 .../confignode/partition/StorageGroupSchema.java   |  77 +++--
 .../persistence/DataNodeInfoPersistence.java       | 122 ++++---
 .../persistence/PartitionInfoPersistence.java      | 145 +++++---
 .../persistence/RegionInfoPersistence.java         | 210 ++++++------
 .../iotdb/confignode/physical/PhysicalPlan.java    |  32 +-
 .../confignode/physical/PhysicalPlanType.java      |  12 +-
 .../physical/crud/CreateDataPartitionPlan.java     | 128 +++++++
 .../physical/crud/CreateRegionsPlan.java           |  96 ++++++
 .../physical/crud/CreateSchemaPartitionPlan.java   |  27 +-
 .../crud/GetOrCreateDataPartitionPlan.java         | 139 ++++++++
 .../GetOrCreateSchemaPartitionPlan.java}           |  50 +--
 .../confignode/physical/sys/DataPartitionPlan.java |  78 -----
 .../physical/sys/QueryDataNodeInfoPlan.java        |  14 +
 .../physical/sys/RegisterDataNodePlan.java         |  22 +-
 .../physical/sys/SetStorageGroupPlan.java          |  43 +--
 .../confignode/service/executor/PlanExecutor.java  |  27 +-
 .../server/ConfigNodeRPCServerProcessor.java       | 246 +++++++-------
 .../confignode/consensus/RatisConsensusDemo.java   |  27 +-
 .../manager/ConfigManagerManualTest.java           |  29 +-
 .../hash/DeviceGroupHashExecutorManualTest.java    |   2 +-
 .../physical/SerializeDeserializeUT.java           | 168 +++++++++
 .../server/ConfigNodeRPCServerProcessorTest.java   | 377 +++++++++++++++------
 .../{partition => cluster}/DataNodeLocation.java   |  51 +--
 .../iotdb/commons/partition/DataPartition.java     | 128 ++++++-
 .../iotdb/commons/partition/RegionReplicaSet.java  |   1 +
 .../iotdb/commons/partition/SchemaPartition.java   |  49 ++-
 .../commons/partition/SeriesPartitionSlot.java     |  35 +-
 .../iotdb/commons/partition/TimePartitionSlot.java |  33 ++
 .../executor/SeriesPartitionExecutor.java}         |  16 +-
 .../executor}/hash/APHashExecutor.java             |  11 +-
 .../executor}/hash/BKDRHashExecutor.java           |  11 +-
 .../executor}/hash/JSHashExecutor.java             |  11 +-
 .../executor}/hash/SDBMHashExecutor.java           |  11 +-
 .../mpp/sql/analyze/FakePartitionFetcherImpl.java  |   1 +
 .../java/org/apache/iotdb/db/service/DataNode.java |  10 +-
 .../db/mpp/sql/plan/DistributionPlannerTest.java   |   2 +-
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   7 +-
 .../src/main/thrift/confignode.thrift              | 137 ++++----
 thrift/src/main/thrift/common.thrift               |  13 +-
 thrift/src/main/thrift/management.thrift           |   4 +-
 58 files changed, 2348 insertions(+), 1335 deletions(-)

diff --git a/confignode/src/assembly/confignode.xml b/confignode/src/assembly/confignode.xml
index 55707485db..5982b6cdd8 100644
--- a/confignode/src/assembly/confignode.xml
+++ b/confignode/src/assembly/confignode.xml
@@ -33,8 +33,13 @@
     </dependencySets>
     <fileSets>
         <fileSet>
-            <directory>src/assembly/resources</directory>
-            <outputDirectory>${file.separator}</outputDirectory>
+            <directory>src/assembly/resources/conf</directory>
+            <outputDirectory>conf</outputDirectory>
+        </fileSet>
+        <fileSet>
+            <directory>src/assembly/resources/sbin</directory>
+            <outputDirectory>sbin</outputDirectory>
+            <fileMode>0755</fileMode>
         </fileSet>
     </fileSets>
 </assembly>
diff --git a/confignode/src/assembly/resources/conf/iotdb-confignode.properties b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
index 3aaf61eddc..63703ec172 100644
--- a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
+++ b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
@@ -78,23 +78,23 @@ config_node_rpc_port=22277
 # data_node_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
 
 ####################
-### DeviceGroup Configuration
+### SeriesPartitionSlot Configuration
 ####################
 
-# Number of DeviceGroups per StorageGroup
+# Number of SeriesPartitionSlots per StorageGroup
 # Datatype: int
-# device_group_count=10000
+# series_partition_slot_num=10000
 
-# DeviceGroup hash algorithm
+# SeriesPartitionSlot executor class
 # These hashing algorithms are currently supported:
-# 1. org.apache.iotdb.commons.hash.BKDRHashExecutor(Default)
-# 2. org.apache.iotdb.commons.hash.APHashExecutor
-# 3. org.apache.iotdb.commons.hash.JSHashExecutor
-# 4. org.apache.iotdb.commons.hash.SDBMHashExecutor
-# Also, if you want to implement your own hash algorithm, you can inherit the DeviceGroupHashExecutor class and
+# 1. BKDRHashExecutor(Default)
+# 2. APHashExecutor
+# 3. JSHashExecutor
+# 4. SDBMHashExecutor
+# Also, if you want to implement your own SeriesPartition executor, you can inherit the SeriesPartitionExecutor class and
 # modify this parameter to correspond to your Java class
 # Datatype: String
-# device_group_hash_executor_class=org.apache.iotdb.commons.hash.BKDRHashExecutor
+# series_partition_executor_class=org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor
 
 ####################
 ### Directory Configuration
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
index 0f863f8841..09c1a05649 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
@@ -46,11 +46,12 @@ public class ConfigNodeConf {
   private Endpoint[] configNodeGroupAddressList =
       Collections.singletonList(new Endpoint("0.0.0.0", 22278)).toArray(new Endpoint[0]);
 
-  /** Number of DeviceGroups per StorageGroup */
-  private int deviceGroupCount = 10000;
+  /** Number of SeriesPartitionSlots per StorageGroup */
+  private int seriesPartitionSlotNum = 10000;
 
-  /** DeviceGroup hash executor class */
-  private String deviceGroupHashExecutorClass = "org.apache.iotdb.commons.hash.BKDRHashExecutor";
+  /** SeriesPartitionSlot executor class */
+  private String seriesPartitionExecutorClass =
+      "org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor";
 
   /** Max concurrent client number */
   private int rpcMaxConcurrentClientNum = 65535;
@@ -115,20 +116,20 @@ public class ConfigNodeConf {
     return dir;
   }
 
-  public int getDeviceGroupCount() {
-    return deviceGroupCount;
+  public int getSeriesPartitionSlotNum() {
+    return seriesPartitionSlotNum;
   }
 
-  public void setDeviceGroupCount(int deviceGroupCount) {
-    this.deviceGroupCount = deviceGroupCount;
+  public void setSeriesPartitionSlotNum(int seriesPartitionSlotNum) {
+    this.seriesPartitionSlotNum = seriesPartitionSlotNum;
   }
 
-  public String getDeviceGroupHashExecutorClass() {
-    return deviceGroupHashExecutorClass;
+  public String getSeriesPartitionExecutorClass() {
+    return seriesPartitionExecutorClass;
   }
 
-  public void setDeviceGroupHashExecutorClass(String deviceGroupHashExecutorClass) {
-    this.deviceGroupHashExecutorClass = deviceGroupHashExecutorClass;
+  public void setSeriesPartitionExecutorClass(String seriesPartitionExecutorClass) {
+    this.seriesPartitionExecutorClass = seriesPartitionExecutorClass;
   }
 
   public int getRpcMaxConcurrentClientNum() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfCheck.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfCheck.java
index 2a8c4957d6..895850c936 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfCheck.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfCheck.java
@@ -103,9 +103,10 @@ public class ConfigNodeConfCheck {
    * Therefore, store them in iotdb-confignode-special.properties at the first startup
    */
   private void writeSpecialProperties(File specialPropertiesFile) {
-    specialProperties.setProperty("device_group_count", String.valueOf(conf.getDeviceGroupCount()));
     specialProperties.setProperty(
-        "device_group_hash_executor_class", conf.getDeviceGroupHashExecutorClass());
+        "series_partition_slot_num", String.valueOf(conf.getSeriesPartitionSlotNum()));
+    specialProperties.setProperty(
+        "series_partition_executor_class", conf.getSeriesPartitionExecutorClass());
     try {
       specialProperties.store(new FileOutputStream(specialPropertiesFile), "");
     } catch (IOException e) {
@@ -116,26 +117,26 @@ public class ConfigNodeConfCheck {
 
   /** Ensure that special parameters are consistent with each startup except the first one */
   private void checkSpecialProperties() throws ConfigurationException {
-    int specialDeviceGroupCount =
+    int specialSeriesPartitionSlotNum =
         Integer.parseInt(
             specialProperties.getProperty(
-                "device_group_count", String.valueOf(conf.getDeviceGroupCount())));
-    if (specialDeviceGroupCount != conf.getDeviceGroupCount()) {
+                "series_partition_slot_num", String.valueOf(conf.getSeriesPartitionSlotNum())));
+    if (specialSeriesPartitionSlotNum != conf.getSeriesPartitionSlotNum()) {
       throw new ConfigurationException(
-          "device_group_count",
-          String.valueOf(conf.getDeviceGroupCount()),
-          String.valueOf(specialDeviceGroupCount));
+          "series_partition_slot_num",
+          String.valueOf(conf.getSeriesPartitionSlotNum()),
+          String.valueOf(specialSeriesPartitionSlotNum));
     }
 
-    String specialDeviceGroupHashExecutorClass =
+    String specialSeriesPartitionSlotExecutorClass =
         specialProperties.getProperty(
-            "device_group_hash_executor_class", conf.getDeviceGroupHashExecutorClass());
+            "series_partition_executor_class", conf.getSeriesPartitionExecutorClass());
     if (!Objects.equals(
-        specialDeviceGroupHashExecutorClass, conf.getDeviceGroupHashExecutorClass())) {
+        specialSeriesPartitionSlotExecutorClass, conf.getSeriesPartitionExecutorClass())) {
       throw new ConfigurationException(
-          "device_group_hash_executor_class",
-          conf.getDeviceGroupHashExecutorClass(),
-          specialDeviceGroupHashExecutorClass);
+          "series_partition_executor_class",
+          conf.getSeriesPartitionExecutorClass(),
+          specialSeriesPartitionSlotExecutorClass);
     }
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 4f2590192d..29123554d7 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -103,14 +103,14 @@ public class ConfigNodeDescriptor {
       Properties properties = new Properties();
       properties.load(inputStream);
 
-      conf.setDeviceGroupCount(
+      conf.setSeriesPartitionSlotNum(
           Integer.parseInt(
               properties.getProperty(
-                  "device_group_count", String.valueOf(conf.getDeviceGroupCount()))));
+                  "series_partition_slot_num", String.valueOf(conf.getSeriesPartitionSlotNum()))));
 
-      conf.setDeviceGroupHashExecutorClass(
+      conf.setSeriesPartitionExecutorClass(
           properties.getProperty(
-              "device_group_hash_executor_class", conf.getDeviceGroupHashExecutorClass()));
+              "series_partition_executor_class", conf.getSeriesPartitionExecutorClass()));
 
       conf.setRpcAddress(properties.getProperty("config_node_rpc_address", conf.getRpcAddress()));
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeConfigurationDataSet.java
similarity index 52%
copy from confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeConfigurationDataSet.java
index 934f05016d..9e5d67f7ea 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeConfigurationDataSet.java
@@ -18,26 +18,36 @@
  */
 package org.apache.iotdb.confignode.consensus.response;
 
-import org.apache.iotdb.confignode.partition.StorageGroupSchema;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
 import org.apache.iotdb.consensus.common.DataSet;
 
-import java.util.List;
+public class DataNodeConfigurationDataSet implements DataSet {
 
-public class StorageGroupSchemaDataSet implements DataSet {
+  private TSStatus status;
+  private int dataNodeId;
+  private TGlobalConfig globalConfig;
 
-  private List<StorageGroupSchema> schemaList;
+  public DataNodeConfigurationDataSet() {
+    // Empty constructor
+  }
 
-  public StorageGroupSchemaDataSet() {}
+  public void setStatus(TSStatus status) {
+    this.status = status;
+  }
 
-  public StorageGroupSchemaDataSet(List<StorageGroupSchema> schemaList) {
-    this.schemaList = schemaList;
+  public void setDataNodeId(int dataNodeId) {
+    this.dataNodeId = dataNodeId;
   }
 
-  public List<StorageGroupSchema> getSchemaList() {
-    return schemaList;
+  public void setGlobalConfig(TGlobalConfig globalConfig) {
+    this.globalConfig = globalConfig;
   }
 
-  public void setSchemaList(List<StorageGroupSchema> schemaList) {
-    this.schemaList = schemaList;
+  public void convertToRpcDataNodeRegisterResp(TDataNodeRegisterResp resp) {
+    resp.setStatus(status);
+    resp.setDataNodeID(dataNodeId);
+    resp.setGlobalConfig(globalConfig);
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodesInfoDataSet.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodesInfoDataSet.java
index 62d498f09e..11e3d2ce71 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodesInfoDataSet.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodesInfoDataSet.java
@@ -18,19 +18,29 @@
  */
 package org.apache.iotdb.confignode.consensus.response;
 
-import org.apache.iotdb.commons.partition.DataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.DataNodeLocation;
 import org.apache.iotdb.consensus.common.DataSet;
 
 import java.util.List;
 
 public class DataNodesInfoDataSet implements DataSet {
 
+  private TSStatus status;
   private List<DataNodeLocation> dataNodeList;
 
   public DataNodesInfoDataSet() {
     // empty constructor
   }
 
+  public void setStatus(TSStatus status) {
+    this.status = status;
+  }
+
+  public TSStatus getStatus() {
+    return status;
+  }
+
   public void setDataNodeList(List<DataNodeLocation> dataNodeList) {
     this.dataNodeList = dataNodeList;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java
index a02f58a53f..222995501c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java
@@ -19,17 +19,111 @@
 
 package org.apache.iotdb.confignode.consensus.response;
 
+import org.apache.iotdb.common.rpc.thrift.EndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.partition.DataPartition;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
 import org.apache.iotdb.consensus.common.DataSet;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 public class DataPartitionDataSet implements DataSet {
-  private DataPartition dataPartitionInfo;
 
-  public DataPartition getDataPartitionInfo() {
-    return dataPartitionInfo;
+  private TSStatus status;
+
+  private DataPartition dataPartition;
+
+  public TSStatus getStatus() {
+    return status;
+  }
+
+  public void setStatus(TSStatus status) {
+    this.status = status;
+  }
+
+  public DataPartition getDataPartition() {
+    return dataPartition;
+  }
+
+  public void setDataPartition(DataPartition dataPartition) {
+    this.dataPartition = dataPartition;
   }
 
-  public void setDataPartitionInfos(DataPartition dataPartitionInfo) {
-    this.dataPartitionInfo = dataPartitionInfo;
+  /**
+   * Convert DataPartitionDataSet to TDataPartitionResp
+   *
+   * @param resp TDataPartitionResp
+   */
+  public void convertToRpcDataPartitionResp(TDataPartitionResp resp) {
+    Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
+        dataPartitionMap = new HashMap<>();
+
+    dataPartition
+        .getDataPartitionMap()
+        .forEach(
+            ((storageGroup, seriesPartitionSlotTimePartitionSlotRegionReplicaSetListMap) -> {
+              // Extract StorageGroupName
+              dataPartitionMap.putIfAbsent(storageGroup, new HashMap<>());
+
+              seriesPartitionSlotTimePartitionSlotRegionReplicaSetListMap.forEach(
+                  ((seriesPartitionSlot, timePartitionSlotReplicaSetListMap) -> {
+                    // Extract TSeriesPartitionSlot
+                    TSeriesPartitionSlot tSeriesPartitionSlot =
+                        new TSeriesPartitionSlot(seriesPartitionSlot.getSlotId());
+                    dataPartitionMap
+                        .get(storageGroup)
+                        .putIfAbsent(tSeriesPartitionSlot, new HashMap<>());
+
+                    // Extract Map<TimePartitionSlot, List<RegionReplicaSet>>
+                    timePartitionSlotReplicaSetListMap.forEach(
+                        ((timePartitionSlot, regionReplicaSets) -> {
+                          // Extract TTimePartitionSlot
+                          TTimePartitionSlot tTimePartitionSlot =
+                              new TTimePartitionSlot(timePartitionSlot.getStartTime());
+                          dataPartitionMap
+                              .get(storageGroup)
+                              .get(tSeriesPartitionSlot)
+                              .putIfAbsent(tTimePartitionSlot, new ArrayList<>());
+
+                          // Extract TRegionReplicaSets
+                          regionReplicaSets.forEach(
+                              regionReplicaSet -> {
+                                TRegionReplicaSet tRegionReplicaSet = new TRegionReplicaSet();
+
+                                // Set TRegionReplicaSet's RegionId
+                                tRegionReplicaSet.setRegionId(regionReplicaSet.getId().getId());
+
+                                // Set TRegionReplicaSet's GroupType
+                                tRegionReplicaSet.setGroupType("DataRegion");
+
+                                // Set TRegionReplicaSet's EndPoints
+                                List<EndPoint> endPointList = new ArrayList<>();
+                                regionReplicaSet
+                                    .getDataNodeList()
+                                    .forEach(
+                                        dataNodeLocation ->
+                                            endPointList.add(
+                                                new EndPoint(
+                                                    dataNodeLocation.getEndPoint().getIp(),
+                                                    dataNodeLocation.getEndPoint().getPort())));
+                                tRegionReplicaSet.setEndpoint(endPointList);
+
+                                dataPartitionMap
+                                    .get(storageGroup)
+                                    .get(tSeriesPartitionSlot)
+                                    .get(tTimePartitionSlot)
+                                    .add(tRegionReplicaSet);
+                              });
+                        }));
+                  }));
+            }));
+
+    resp.setDataPartitionMap(dataPartitionMap);
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionDataSet.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionDataSet.java
index 407e61a5cf..5c3e95ea3a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionDataSet.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionDataSet.java
@@ -20,8 +20,10 @@
 package org.apache.iotdb.confignode.consensus.response;
 
 import org.apache.iotdb.common.rpc.thrift.EndPoint;
-import org.apache.iotdb.common.rpc.thrift.RegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.partition.SchemaPartition;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionResp;
 import org.apache.iotdb.consensus.common.DataSet;
 
 import java.util.ArrayList;
@@ -29,54 +31,64 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+/** TODO: Reconstruct this class after PatterTree is moved to node-commons */
 public class SchemaPartitionDataSet implements DataSet {
-  private SchemaPartition schemaPartitionInfo;
 
-  public SchemaPartition getSchemaPartitionInfo() {
-    return schemaPartitionInfo;
+  private TSStatus status;
+
+  private SchemaPartition schemaPartition;
+
+  public SchemaPartitionDataSet() {
+    // empty constructor
   }
 
-  public void setSchemaPartitionInfo(SchemaPartition schemaPartitionInfos) {
-    this.schemaPartitionInfo = schemaPartitionInfos;
+  public TSStatus getStatus() {
+    return status;
   }
 
-  public static org.apache.iotdb.confignode.rpc.thrift.SchemaPartitionInfo
-      convertRpcSchemaPartition(SchemaPartition schemaPartitionInfo) {
-    org.apache.iotdb.confignode.rpc.thrift.SchemaPartitionInfo rpcSchemaPartitionInfo =
-        new org.apache.iotdb.confignode.rpc.thrift.SchemaPartitionInfo();
+  public void setStatus(TSStatus status) {
+    this.status = status;
+  }
 
-    Map<String, Map<Integer, RegionReplicaSet>> schemaRegionReplicaSets = new HashMap<>();
+  public SchemaPartition getSchemaPartition() {
+    return schemaPartition;
+  }
 
-    schemaPartitionInfo.getSchemaPartition().entrySet().stream()
+  public void setSchemaPartition(SchemaPartition schemaPartition) {
+    this.schemaPartition = schemaPartition;
+  }
+
+  public void convertToRpcSchemaPartitionResp(TSchemaPartitionResp resp) {
+    Map<String, Map<Integer, TRegionReplicaSet>> schemaRegionMap = new HashMap<>();
+
+    schemaPartition
+        .getSchemaPartitionMap()
         .forEach(
-            entity -> {
-              schemaRegionReplicaSets.putIfAbsent(entity.getKey(), new HashMap<>());
-              entity
-                  .getValue()
-                  .entrySet()
-                  .forEach(
-                      replica -> {
-                        RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
-                        regionReplicaSet.setRegionId(replica.getValue().getId().getId());
-                        List<EndPoint> endPoints = new ArrayList<>();
-                        replica
-                            .getValue()
-                            .getDataNodeList()
-                            .forEach(
-                                dataNode -> {
-                                  EndPoint endPoint =
-                                      new EndPoint(
-                                          dataNode.getEndPoint().getIp(),
-                                          dataNode.getEndPoint().getPort());
-                                  endPoints.add(endPoint);
-                                });
-                        regionReplicaSet.setEndpoint(endPoints);
-                        schemaRegionReplicaSets
-                            .get(entity.getKey())
-                            .put(replica.getKey().getDeviceGroupId(), regionReplicaSet);
-                      });
+            (storageGroup, seriesPartitionSlotRegionReplicaSetMap) -> {
+              // Extract StorageGroupName
+              schemaRegionMap.putIfAbsent(storageGroup, new HashMap<>());
+
+              // Extract Map<SeriesPartitionSlot, RegionReplicaSet>
+              seriesPartitionSlotRegionReplicaSetMap.forEach(
+                  ((seriesPartitionSlot, regionReplicaSet) -> {
+                    TRegionReplicaSet regionMessage = new TRegionReplicaSet();
+                    regionMessage.setRegionId(regionReplicaSet.getId().getId());
+                    List<EndPoint> endPointList = new ArrayList<>();
+                    regionReplicaSet
+                        .getDataNodeList()
+                        .forEach(
+                            dataNodeLocation ->
+                                endPointList.add(
+                                    new EndPoint(
+                                        dataNodeLocation.getEndPoint().getIp(),
+                                        dataNodeLocation.getEndPoint().getPort())));
+                    regionMessage.setEndpoint(endPointList);
+                    schemaRegionMap
+                        .get(storageGroup)
+                        .put(seriesPartitionSlot.getSlotId(), regionMessage);
+                  }));
             });
-    rpcSchemaPartitionInfo.setSchemaRegionDataNodesMap(schemaRegionReplicaSets);
-    return rpcSchemaPartitionInfo;
+
+    // resp.setSchemaRegionMap(schemaRegionMap);
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
index 934f05016d..700d4acfb5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.confignode.consensus.response;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.confignode.partition.StorageGroupSchema;
 import org.apache.iotdb.consensus.common.DataSet;
 
@@ -25,12 +26,18 @@ import java.util.List;
 
 public class StorageGroupSchemaDataSet implements DataSet {
 
+  private TSStatus status;
+
   private List<StorageGroupSchema> schemaList;
 
   public StorageGroupSchemaDataSet() {}
 
-  public StorageGroupSchemaDataSet(List<StorageGroupSchema> schemaList) {
-    this.schemaList = schemaList;
+  public TSStatus getStatus() {
+    return status;
+  }
+
+  public void setStatus(TSStatus status) {
+    this.status = status;
   }
 
   public List<StorageGroupSchema> getSchemaList() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index a4fc5cac15..8457275a88 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -20,17 +20,17 @@
 package org.apache.iotdb.confignode.manager;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.conf.ConfigNodeConf;
-import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationDataSet;
 import org.apache.iotdb.confignode.consensus.response.DataNodesInfoDataSet;
+import org.apache.iotdb.confignode.consensus.response.DataPartitionDataSet;
+import org.apache.iotdb.confignode.consensus.response.SchemaPartitionDataSet;
 import org.apache.iotdb.confignode.physical.PhysicalPlan;
+import org.apache.iotdb.confignode.physical.crud.GetOrCreateDataPartitionPlan;
+import org.apache.iotdb.confignode.physical.crud.GetOrCreateSchemaPartitionPlan;
 import org.apache.iotdb.confignode.physical.sys.AuthorPlan;
-import org.apache.iotdb.confignode.physical.sys.DataPartitionPlan;
 import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
 import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
-import org.apache.iotdb.confignode.physical.sys.SchemaPartitionPlan;
 import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
-import org.apache.iotdb.confignode.rpc.thrift.DeviceGroupHashInfo;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -38,7 +38,7 @@ import java.io.IOException;
 
 /** Entry of all management, AssignPartitionManager,AssignRegionManager. */
 public class ConfigManager implements Manager {
-  private static final ConfigNodeConf conf = ConfigNodeDescriptor.getInstance().getConf();
+
   private static final TSStatus ERROR_TSSTATUS =
       new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
 
@@ -74,15 +74,21 @@ public class ConfigManager implements Manager {
   }
 
   @Override
-  public TSStatus registerDataNode(PhysicalPlan physicalPlan) {
+  public DataSet registerDataNode(PhysicalPlan physicalPlan) {
+
+    // TODO: Only leader can register DataNode
+
     if (physicalPlan instanceof RegisterDataNodePlan) {
       return dataNodeManager.registerDataNode((RegisterDataNodePlan) physicalPlan);
     }
-    return ERROR_TSSTATUS;
+    return new DataNodeConfigurationDataSet();
   }
 
   @Override
   public DataSet getDataNodeInfo(PhysicalPlan physicalPlan) {
+
+    // TODO: Only leader can get DataNodeInfo
+
     if (physicalPlan instanceof QueryDataNodeInfoPlan) {
       return dataNodeManager.getDataNodeInfo((QueryDataNodeInfoPlan) physicalPlan);
     }
@@ -91,11 +97,17 @@ public class ConfigManager implements Manager {
 
   @Override
   public DataSet getStorageGroupSchema() {
+
+    // TODO: Only leader can get StorageGroupSchema
+
     return regionManager.getStorageGroupSchema();
   }
 
   @Override
   public TSStatus setStorageGroup(PhysicalPlan physicalPlan) {
+
+    // TODO: Only leader can set StorageGroup
+
     if (physicalPlan instanceof SetStorageGroupPlan) {
       return regionManager.setStorageGroup((SetStorageGroupPlan) physicalPlan);
     }
@@ -103,51 +115,58 @@ public class ConfigManager implements Manager {
   }
 
   @Override
-  public DataNodeManager getDataNodeManager() {
-    return dataNodeManager;
-  }
+  public DataSet getSchemaPartition(PhysicalPlan physicalPlan) {
 
-  @Override
-  public DataSet getDataPartition(PhysicalPlan physicalPlan) {
-    if (physicalPlan instanceof DataPartitionPlan) {
-      return partitionManager.getDataPartition((DataPartitionPlan) physicalPlan);
+    // TODO: Only leader can query SchemaPartition
+
+    if (physicalPlan instanceof GetOrCreateSchemaPartitionPlan) {
+      return partitionManager.getSchemaPartition((GetOrCreateSchemaPartitionPlan) physicalPlan);
     }
-    return new DataNodesInfoDataSet();
+    return new SchemaPartitionDataSet();
   }
 
   @Override
-  public DataSet getSchemaPartition(PhysicalPlan physicalPlan) {
-    if (physicalPlan instanceof SchemaPartitionPlan) {
-      return partitionManager.getSchemaPartition((SchemaPartitionPlan) physicalPlan);
+  public DataSet getOrCreateSchemaPartition(PhysicalPlan physicalPlan) {
+
+    // TODO: Only leader can apply SchemaPartition
+
+    if (physicalPlan instanceof GetOrCreateSchemaPartitionPlan) {
+      return partitionManager.getOrCreateSchemaPartition(
+          (GetOrCreateSchemaPartitionPlan) physicalPlan);
     }
-    return new DataNodesInfoDataSet();
+    return new SchemaPartitionDataSet();
   }
 
   @Override
-  public RegionManager getRegionManager() {
-    return regionManager;
+  public DataSet getDataPartition(PhysicalPlan physicalPlan) {
+
+    // TODO: Only leader can query DataPartition
+
+    if (physicalPlan instanceof GetOrCreateDataPartitionPlan) {
+      return partitionManager.getDataPartition((GetOrCreateDataPartitionPlan) physicalPlan);
+    }
+    return new DataPartitionDataSet();
   }
 
   @Override
-  public DataSet applySchemaPartition(PhysicalPlan physicalPlan) {
-    if (physicalPlan instanceof SchemaPartitionPlan) {
-      return partitionManager.applySchemaPartition((SchemaPartitionPlan) physicalPlan);
+  public DataSet getOrCreateDataPartition(PhysicalPlan physicalPlan) {
+
+    // TODO: only leader can apply DataPartition
+
+    if (physicalPlan instanceof GetOrCreateDataPartitionPlan) {
+      return partitionManager.getOrCreateDataPartition((GetOrCreateDataPartitionPlan) physicalPlan);
     }
-    return new DataNodesInfoDataSet();
+    return new DataPartitionDataSet();
   }
 
   @Override
-  public DataSet applyDataPartition(PhysicalPlan physicalPlan) {
-    if (physicalPlan instanceof DataPartitionPlan) {
-      return partitionManager.applyDataPartition((DataPartitionPlan) physicalPlan);
-    }
-    return new DataNodesInfoDataSet();
+  public DataNodeManager getDataNodeManager() {
+    return dataNodeManager;
   }
 
   @Override
-  public DeviceGroupHashInfo getDeviceGroupHashInfo() {
-    return new DeviceGroupHashInfo(
-        conf.getDeviceGroupCount(), conf.getDeviceGroupHashExecutorClass());
+  public RegionManager getRegionManager() {
+    return regionManager;
   }
 
   @Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index 08a5e32f99..51d2bf7898 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -21,7 +21,8 @@ package org.apache.iotdb.confignode.manager;
 import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.PartitionRegionId;
-import org.apache.iotdb.commons.hash.DeviceGroupHashExecutor;
+import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
+import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
 import org.apache.iotdb.confignode.conf.ConfigNodeConf;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.statemachine.PartitionRegionStateMachine;
@@ -48,14 +49,13 @@ public class ConsensusManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(ConsensusManager.class);
   private static final ConfigNodeConf conf = ConfigNodeDescriptor.getInstance().getConf();
 
-  private IConsensus consensusImpl;
-
   private ConsensusGroupId consensusGroupId;
+  private IConsensus consensusImpl;
 
-  private DeviceGroupHashExecutor hashExecutor;
+  private SeriesPartitionExecutor executor;
 
   public ConsensusManager() throws IOException {
-    setHashExecutor();
+    setSeriesPartitionExecutor();
     setConsensusLayer();
   }
 
@@ -64,27 +64,28 @@ public class ConsensusManager {
   }
 
   /** Build DeviceGroupHashExecutor */
-  private void setHashExecutor() {
+  private void setSeriesPartitionExecutor() {
     try {
-      Class<?> executor = Class.forName(conf.getDeviceGroupHashExecutorClass());
+      Class<?> executor = Class.forName(conf.getSeriesPartitionExecutorClass());
       Constructor<?> executorConstructor = executor.getConstructor(int.class);
-      hashExecutor =
-          (DeviceGroupHashExecutor) executorConstructor.newInstance(conf.getDeviceGroupCount());
+      this.executor =
+          (SeriesPartitionExecutor)
+              executorConstructor.newInstance(conf.getSeriesPartitionSlotNum());
     } catch (ClassNotFoundException
         | NoSuchMethodException
         | InstantiationException
         | IllegalAccessException
         | InvocationTargetException e) {
       LOGGER.error(
-          "Couldn't Constructor DeviceGroupHashExecutor class: {}",
-          conf.getDeviceGroupHashExecutorClass(),
+          "Couldn't Constructor SeriesPartitionExecutor class: {}",
+          conf.getSeriesPartitionExecutorClass(),
           e);
-      hashExecutor = null;
+      executor = null;
     }
   }
 
-  public int getDeviceGroupID(String device) {
-    return hashExecutor.getDeviceGroupID(device);
+  public SeriesPartitionSlot getSeriesPartitionSlot(String device) {
+    return executor.getSeriesPartitionSlot(device);
   }
 
   /** Build ConfigNodeGroup ConsensusLayer */
@@ -128,5 +129,9 @@ public class ConsensusManager {
     return consensusImpl.read(consensusGroupId, plan);
   }
 
+  public boolean isLeader() {
+    return consensusImpl.isLeader(consensusGroupId);
+  }
+
   // TODO: Interfaces for LoadBalancer control
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeManager.java
index ab62b694d3..9e0d5a7633 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeManager.java
@@ -19,11 +19,15 @@
 package org.apache.iotdb.confignode.manager;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.partition.DataNodeLocation;
+import org.apache.iotdb.commons.cluster.DataNodeLocation;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationDataSet;
 import org.apache.iotdb.confignode.consensus.response.DataNodesInfoDataSet;
 import org.apache.iotdb.confignode.persistence.DataNodeInfoPersistence;
 import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
 import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
+import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
+import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -31,70 +35,77 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /** Manager server info of data node, add node or remove node */
 public class DataNodeManager {
+
   private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeManager.class);
 
-  private DataNodeInfoPersistence dataNodeInfo = DataNodeInfoPersistence.getInstance();
+  private static final DataNodeInfoPersistence dataNodeInfoPersistence =
+      DataNodeInfoPersistence.getInstance();
 
-  private Manager configManager;
+  private final Manager configManager;
 
   /** TODO:do some operate after add node or remove node */
-  private List<ChangeServerListener> listeners = new CopyOnWriteArrayList<>();
-
-  private final ReentrantReadWriteLock dataNodeInfoReadWriteLock;
-
-  private int nextDataNodeId;
+  private final List<ChangeServerListener> listeners = new CopyOnWriteArrayList<>();
 
   public DataNodeManager(Manager configManager) {
     this.configManager = configManager;
-    this.dataNodeInfoReadWriteLock = new ReentrantReadWriteLock();
+  }
+
+  private void setGlobalConfig(DataNodeConfigurationDataSet dataSet) {
+    // Set TGlobalConfig
+    TGlobalConfig globalConfig = new TGlobalConfig();
+    globalConfig.setDataNodeConsensusProtocolClass(
+        ConfigNodeDescriptor.getInstance().getConf().getDataNodeConsensusProtocolClass());
+    globalConfig.setSeriesPartitionSlotNum(
+        ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionSlotNum());
+    globalConfig.setSeriesPartitionExecutorClass(
+        ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionExecutorClass());
+    dataSet.setGlobalConfig(globalConfig);
   }
 
   /**
-   * register dta node info when data node start
+   * Register DataNode
    *
    * @param plan RegisterDataNodePlan
-   * @return success if data node regist first
+   * @return DataNodeConfigurationDataSet. The TSStatus will be set to SUCCESS_STATUS when register
+   *     success, and DATANODE_ALREADY_REGISTERED when the DataNode is already exist.
    */
-  public TSStatus registerDataNode(RegisterDataNodePlan plan) {
-    TSStatus result;
-    DataNodeLocation info = plan.getInfo();
-    dataNodeInfoReadWriteLock.writeLock().lock();
-    try {
-      if (dataNodeInfo.containsValue(info)) {
-        // TODO: optimize
-        result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-        result.setMessage(String.valueOf(dataNodeInfo.getDataNodeInfo(info)));
-      } else {
-        info.setDataNodeID(nextDataNodeId);
-        ConsensusWriteResponse consensusWriteResponse = getConsensusManager().write(plan);
-        nextDataNodeId += 1;
-        return consensusWriteResponse.getStatus();
-      }
-    } finally {
-      dataNodeInfoReadWriteLock.writeLock().unlock();
+  public DataSet registerDataNode(RegisterDataNodePlan plan) {
+    DataNodeConfigurationDataSet dataSet = new DataNodeConfigurationDataSet();
+
+    if (DataNodeInfoPersistence.getInstance().containsValue(plan.getInfo())) {
+      dataSet.setStatus(new TSStatus(TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode()));
+    } else {
+      plan.getInfo().setDataNodeId(DataNodeInfoPersistence.getInstance().generateNextDataNodeId());
+      ConsensusWriteResponse resp = getConsensusManager().write(plan);
+      dataSet.setStatus(resp.getStatus());
     }
-    return result;
+
+    dataSet.setDataNodeId(plan.getInfo().getDataNodeId());
+    setGlobalConfig(dataSet);
+    return dataSet;
   }
 
   /**
-   * get dta node info
+   * Get DataNode info
    *
    * @param plan QueryDataNodeInfoPlan
-   * @return all data node info if dataNodeId of plan is -1
+   * @return The specific DataNode's info or all DataNode info if dataNodeId in
+   *     QueryDataNodeInfoPlan is -1
    */
   public DataNodesInfoDataSet getDataNodeInfo(QueryDataNodeInfoPlan plan) {
     return (DataNodesInfoDataSet) getConsensusManager().read(plan).getDataset();
   }
 
-  public Set<Integer> getDataNodeId() {
-    return dataNodeInfo.getDataNodeIds();
+  public int getOnlineDataNodeCount() {
+    return dataNodeInfoPersistence.getOnlineDataNodeCount();
+  }
+
+  public List<DataNodeLocation> getOnlineDataNodes() {
+    return dataNodeInfoPersistence.getOnlineDataNodes();
   }
 
   private ConsensusManager getConsensusManager() {
@@ -114,10 +125,6 @@ public class DataNodeManager {
     listeners.stream().forEach(serverListener -> serverListener.waiting());
   }
 
-  public Map<Integer, DataNodeLocation> getOnlineDataNodes() {
-    return dataNodeInfo.getOnlineDataNodes();
-  }
-
   private class ServerStartListenerThread extends Thread implements ChangeServerListener {
     private boolean changed = false;
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
index 7c6dda9532..25f40841e2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
@@ -20,11 +20,10 @@ package org.apache.iotdb.confignode.manager;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.confignode.physical.PhysicalPlan;
-import org.apache.iotdb.confignode.rpc.thrift.DeviceGroupHashInfo;
 import org.apache.iotdb.consensus.common.DataSet;
 
 /**
- * a subset of services provided by {@ConfigManager}. For use internally only, pased to Managers,
+ * a subset of services provided by {@ConfigManager}. For use internally only, passed to Managers,
  * services.
  */
 public interface Manager {
@@ -34,88 +33,91 @@ public interface Manager {
    *
    * @return true if service stopped
    */
-  public boolean isStopped();
+  boolean isStopped();
 
   /**
-   * register data node
+   * get data node info manager
    *
-   * @param physicalPlan physical plan
-   * @return status
+   * @return DataNodeInfoManager instance
    */
-  public TSStatus registerDataNode(PhysicalPlan physicalPlan);
+  DataNodeManager getDataNodeManager();
 
   /**
-   * get data node info
+   * get consensus manager
    *
-   * @param physicalPlan physical plan
-   * @return data set
+   * @return ConsensusManager instance
    */
-  DataSet getDataNodeInfo(PhysicalPlan physicalPlan);
+  ConsensusManager getConsensusManager();
 
   /**
-   * get storage group schema
+   * get assign region manager
    *
-   * @return data set
+   * @return AssignRegionManager instance
    */
-  DataSet getStorageGroupSchema();
+  RegionManager getRegionManager();
 
   /**
-   * set storage group
+   * Register DataNode
    *
-   * @param physicalPlan physical plan
-   * @return status
+   * @param physicalPlan RegisterDataNodePlan
+   * @return DataNodeConfigurationDataSet
    */
-  TSStatus setStorageGroup(PhysicalPlan physicalPlan);
+  DataSet registerDataNode(PhysicalPlan physicalPlan);
 
   /**
-   * get data node info manager
+   * Get DataNode info
    *
-   * @return DataNodeInfoManager instance
+   * @param physicalPlan QueryDataNodeInfoPlan
+   * @return DataNodesInfoDataSet
    */
-  DataNodeManager getDataNodeManager();
+  DataSet getDataNodeInfo(PhysicalPlan physicalPlan);
 
   /**
-   * get data partition
+   * Get StorageGroupSchemas
    *
-   * @param physicalPlan physical plan
-   * @return data set
+   * @return StorageGroupSchemaDataSet
    */
-  DataSet getDataPartition(PhysicalPlan physicalPlan);
+  DataSet getStorageGroupSchema();
 
   /**
-   * get schema partition
+   * Set StorageGroup
    *
-   * @param physicalPlan physical plan
-   * @return data set
+   * @param physicalPlan SetStorageGroupPlan
+   * @return status
    */
-  DataSet getSchemaPartition(PhysicalPlan physicalPlan);
+  TSStatus setStorageGroup(PhysicalPlan physicalPlan);
 
   /**
-   * get assign region manager
+   * Get SchemaPartition
    *
-   * @return AssignRegionManager instance
+   * @param physicalPlan SchemaPartitionPlan
+   * @return SchemaPartitionDataSet
    */
-  RegionManager getRegionManager();
+  DataSet getSchemaPartition(PhysicalPlan physicalPlan);
 
   /**
-   * apply schema partition
+   * Get or create SchemaPartition
    *
-   * @param physicalPlan physical plan
-   * @return data set
+   * @param physicalPlan SchemaPartitionPlan
+   * @return SchemaPartitionDataSet
    */
-  DataSet applySchemaPartition(PhysicalPlan physicalPlan);
+  DataSet getOrCreateSchemaPartition(PhysicalPlan physicalPlan);
 
   /**
-   * apply data partition
+   * Get DataPartition
    *
-   * @param physicalPlan physical plan
-   * @return data set
+   * @param physicalPlan DataPartitionPlan
+   * @return DataPartitionDataSet
    */
-  DataSet applyDataPartition(PhysicalPlan physicalPlan);
-
-  DeviceGroupHashInfo getDeviceGroupHashInfo();
+  DataSet getDataPartition(PhysicalPlan physicalPlan);
 
-  ConsensusManager getConsensusManager();
+  /**
+   * Get or create DataPartition
+   *
+   * @param physicalPlan DataPartitionPlan
+   * @return DataPartitionDataSet
+   */
+  DataSet getOrCreateDataPartition(PhysicalPlan physicalPlan);
 
   /**
    * operate permission
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index 09a82f12ec..c2bde8119e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -18,128 +18,195 @@
  */
 package org.apache.iotdb.confignode.manager;
 
-import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
+import org.apache.iotdb.commons.partition.TimePartitionSlot;
+import org.apache.iotdb.confignode.consensus.response.DataPartitionDataSet;
 import org.apache.iotdb.confignode.consensus.response.SchemaPartitionDataSet;
 import org.apache.iotdb.confignode.persistence.PartitionInfoPersistence;
 import org.apache.iotdb.confignode.persistence.RegionInfoPersistence;
-import org.apache.iotdb.confignode.physical.sys.DataPartitionPlan;
-import org.apache.iotdb.confignode.physical.sys.SchemaPartitionPlan;
+import org.apache.iotdb.confignode.physical.crud.CreateDataPartitionPlan;
+import org.apache.iotdb.confignode.physical.crud.GetOrCreateDataPartitionPlan;
+import org.apache.iotdb.confignode.physical.crud.GetOrCreateSchemaPartitionPlan;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /** manage data partition and schema partition */
 public class PartitionManager {
-  private static final Logger LOGGER = LoggerFactory.getLogger(PartitionManager.class);
-
-  /** schema partition read write lock */
-  private final ReentrantReadWriteLock schemaPartitionReadWriteLock;
 
-  /** data partition read write lock */
-  private final ReentrantReadWriteLock dataPartitionReadWriteLock;
+  private static final Logger LOGGER = LoggerFactory.getLogger(PartitionManager.class);
 
-  // TODO: Serialize and Deserialize
-  private final DataPartition dataPartition;
+  private static final PartitionInfoPersistence partitionInfoPersistence =
+      PartitionInfoPersistence.getInstance();
 
   private final Manager configNodeManager;
 
   public PartitionManager(Manager configNodeManager) {
-    this.schemaPartitionReadWriteLock = new ReentrantReadWriteLock();
-    this.dataPartitionReadWriteLock = new ReentrantReadWriteLock();
     this.configNodeManager = configNodeManager;
-    this.dataPartition = new DataPartition();
+  }
+
+  private ConsensusManager getConsensusManager() {
+    return configNodeManager.getConsensusManager();
   }
 
   /**
-   * Get schema partition
+   * TODO: Reconstruct this interface after PatterTree is moved to node-commons Get SchemaPartition
    *
-   * @param physicalPlan storageGroup and deviceGroupIDs
-   * @return Empty Data Set if does not exist
+   * @param physicalPlan SchemaPartitionPlan with PatternTree
+   * @return SchemaPartitionDataSet that contains only existing SchemaPartition
    */
-  public DataSet getSchemaPartition(SchemaPartitionPlan physicalPlan) {
+  public DataSet getSchemaPartition(GetOrCreateSchemaPartitionPlan physicalPlan) {
     SchemaPartitionDataSet schemaPartitionDataSet;
-    schemaPartitionReadWriteLock.readLock().lock();
-    try {
-      ConsensusReadResponse consensusReadResponse = getConsensusManager().read(physicalPlan);
-      schemaPartitionDataSet = (SchemaPartitionDataSet) consensusReadResponse.getDataset();
-    } finally {
-      schemaPartitionReadWriteLock.readLock().unlock();
-    }
+    ConsensusReadResponse consensusReadResponse = getConsensusManager().read(physicalPlan);
+    schemaPartitionDataSet = (SchemaPartitionDataSet) consensusReadResponse.getDataset();
     return schemaPartitionDataSet;
   }
 
   /**
-   * If does not exist, apply a new schema partition
+   * TODO: Reconstruct this interface after PatterTree is moved to node-commons Get SchemaPartition
+   * and create a new one if it does not exist
    *
-   * @param physicalPlan storage group and device group id
-   * @return Schema Partition data set
+   * @param physicalPlan SchemaPartitionPlan with PatternTree
+   * @return SchemaPartitionDataSet
    */
-  public DataSet applySchemaPartition(SchemaPartitionPlan physicalPlan) {
+  public DataSet getOrCreateSchemaPartition(GetOrCreateSchemaPartitionPlan physicalPlan) {
     String storageGroup = physicalPlan.getStorageGroup();
-    List<Integer> deviceGroupIDs = physicalPlan.getDeviceGroupIDs();
-    List<Integer> noAssignDeviceGroupId =
-        PartitionInfoPersistence.getInstance()
-            .filterSchemaRegionNoAssignDeviceGroupId(storageGroup, deviceGroupIDs);
-
-    // allocate partition by storage group and device group id
-    schemaPartitionReadWriteLock.writeLock().lock();
-    try {
-      Map<Integer, RegionReplicaSet> deviceGroupIdReplicaSets =
-          allocateSchemaPartition(storageGroup, noAssignDeviceGroupId);
-      physicalPlan.setDeviceGroupIdReplicaSet(deviceGroupIdReplicaSets);
-      getConsensusManager().write(physicalPlan);
-      LOGGER.info("Allocate schema partition to {}.", deviceGroupIdReplicaSets);
-    } finally {
-      schemaPartitionReadWriteLock.writeLock().unlock();
+    List<Integer> seriesPartitionSlots = physicalPlan.getSeriesPartitionSlots();
+    List<Integer> noAssignedSeriesPartitionSlots =
+        partitionInfoPersistence.filterSchemaRegionNoAssignedPartitionSlots(
+            storageGroup, seriesPartitionSlots);
+
+    if (noAssignedSeriesPartitionSlots.size() > 0) {
+      // allocate partition by storage group and device group id
+      Map<Integer, RegionReplicaSet> schemaPartitionReplicaSets =
+          allocateSchemaPartition(storageGroup, noAssignedSeriesPartitionSlots);
+      physicalPlan.setSchemaPartitionReplicaSet(schemaPartitionReplicaSets);
+
+      ConsensusWriteResponse consensusWriteResponse = getConsensusManager().write(physicalPlan);
+      if (consensusWriteResponse.getStatus().getCode()
+          == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        LOGGER.info("Allocate schema partition to {}.", schemaPartitionReplicaSets);
+      }
     }
 
+    physicalPlan.setSchemaPartitionReplicaSet(new HashMap<>());
     return getSchemaPartition(physicalPlan);
   }
 
   /**
-   * TODO: allocate schema partition by balancer
+   * TODO: allocate schema partition by LoadManager
    *
-   * @param storageGroup storage group
-   * @param deviceGroupIDs device group id list
+   * @param storageGroup StorageGroupName
+   * @param noAssignedSeriesPartitionSlots not assigned SeriesPartitionSlots
+   * @return assign result
    */
   private Map<Integer, RegionReplicaSet> allocateSchemaPartition(
-      String storageGroup, List<Integer> deviceGroupIDs) {
+      String storageGroup, List<Integer> noAssignedSeriesPartitionSlots) {
     List<RegionReplicaSet> schemaRegionEndPoints =
-        RegionInfoPersistence.getInstance().getSchemaRegionEndPoint();
+        RegionInfoPersistence.getInstance().getSchemaRegionEndPoint(storageGroup);
     Random random = new Random();
-    Map<Integer, RegionReplicaSet> deviceGroupIdReplicaSets = new HashMap<>();
-    for (int i = 0; i < deviceGroupIDs.size(); i++) {
+    Map<Integer, RegionReplicaSet> schemaPartitionReplicaSets = new HashMap<>();
+    for (Integer seriesPartitionSlot : noAssignedSeriesPartitionSlots) {
       RegionReplicaSet schemaRegionReplicaSet =
           schemaRegionEndPoints.get(random.nextInt(schemaRegionEndPoints.size()));
-      deviceGroupIdReplicaSets.put(deviceGroupIDs.get(i), schemaRegionReplicaSet);
+      schemaPartitionReplicaSets.put(seriesPartitionSlot, schemaRegionReplicaSet);
     }
-    return deviceGroupIdReplicaSets;
+    return schemaPartitionReplicaSets;
   }
 
-  private ConsensusManager getConsensusManager() {
-    return configNodeManager.getConsensusManager();
+  /**
+   * Get DataPartition
+   *
+   * @param physicalPlan DataPartitionPlan with Map<StorageGroupName, Map<SeriesPartitionSlot,
+   *     List<TimePartitionSlot>>>
+   * @return DataPartitionDataSet that contains only existing DataPartition
+   */
+  public DataSet getDataPartition(GetOrCreateDataPartitionPlan physicalPlan) {
+    DataPartitionDataSet dataPartitionDataSet;
+    ConsensusReadResponse consensusReadResponse = getConsensusManager().read(physicalPlan);
+    dataPartitionDataSet = (DataPartitionDataSet) consensusReadResponse.getDataset();
+    return dataPartitionDataSet;
   }
 
   /**
-   * TODO:allocate schema partition by balancer
+   * Get DataPartition and create a new one if it does not exist
    *
-   * @param physicalPlan physical plan
-   * @return data set
+   * @param physicalPlan DataPartitionPlan with Map<StorageGroupName, Map<SeriesPartitionSlot,
+   *     List<TimePartitionSlot>>>
+   * @return DataPartitionDataSet
    */
-  public DataSet applyDataPartition(DataPartitionPlan physicalPlan) {
-    return null;
+  public DataSet getOrCreateDataPartition(GetOrCreateDataPartitionPlan physicalPlan) {
+    Map<String, Map<SeriesPartitionSlot, List<TimePartitionSlot>>> noAssignedDataPartitionSlots =
+        partitionInfoPersistence.filterNoAssignedDataPartitionSlots(
+            physicalPlan.getPartitionSlotsMap());
+
+    if (noAssignedDataPartitionSlots.size() > 0) {
+      // Allocate DataPartition
+      Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
+          assignedDataPartition = allocateDataPartition(noAssignedDataPartitionSlots);
+      CreateDataPartitionPlan createPlan = new CreateDataPartitionPlan();
+      createPlan.setAssignedDataPartition(assignedDataPartition);
+
+      // Persistence DataPartition
+      ConsensusWriteResponse consensusWriteResponse = getConsensusManager().write(createPlan);
+      if (consensusWriteResponse.getStatus().getCode()
+          == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        LOGGER.info("Allocate data partition to {}.", assignedDataPartition);
+      }
+    }
+
+    return getDataPartition(physicalPlan);
   }
 
-  public DataSet getDataPartition(DataPartitionPlan physicalPlan) {
-    return null;
+  /**
+   * TODO: allocate by LoadManager
+   *
+   * @param noAssignedDataPartitionSlotsMap Map<StorageGroupName, Map<SeriesPartitionSlot,
+   *     List<TimePartitionSlot>>>
+   * @return assign result, Map<StorageGroupName, Map<SeriesPartitionSlot, Map<TimePartitionSlot,
+   *     List<RegionReplicaSet>>>>
+   */
+  private Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
+      allocateDataPartition(
+          Map<String, Map<SeriesPartitionSlot, List<TimePartitionSlot>>>
+              noAssignedDataPartitionSlotsMap) {
+
+    Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>> result =
+        new HashMap<>();
+
+    for (String storageGroup : noAssignedDataPartitionSlotsMap.keySet()) {
+      Map<SeriesPartitionSlot, List<TimePartitionSlot>> noAssignedPartitionSlotsMap =
+          noAssignedDataPartitionSlotsMap.get(storageGroup);
+      List<RegionReplicaSet> dataRegionEndPoints =
+          RegionInfoPersistence.getInstance().getDataRegionEndPoint(storageGroup);
+      Random random = new Random();
+
+      Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>> allocateResult =
+          new HashMap<>();
+      for (Map.Entry<SeriesPartitionSlot, List<TimePartitionSlot>> seriesPartitionEntry :
+          noAssignedPartitionSlotsMap.entrySet()) {
+        allocateResult.put(seriesPartitionEntry.getKey(), new HashMap<>());
+        for (TimePartitionSlot timePartitionSlot : seriesPartitionEntry.getValue()) {
+          allocateResult
+              .get(seriesPartitionEntry.getKey())
+              .computeIfAbsent(timePartitionSlot, key -> new ArrayList<>())
+              .add(dataRegionEndPoints.get(random.nextInt(dataRegionEndPoints.size())));
+        }
+      }
+
+      result.put(storageGroup, allocateResult);
+    }
+    return result;
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RegionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RegionManager.java
index 55017784b8..f45591cffb 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RegionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RegionManager.java
@@ -20,44 +20,39 @@
 package org.apache.iotdb.confignode.manager;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.DataNodeLocation;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.consensus.GroupType;
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.confignode.conf.ConfigNodeConf;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaDataSet;
-import org.apache.iotdb.confignode.partition.DataRegionInfo;
-import org.apache.iotdb.confignode.partition.SchemaRegionInfo;
-import org.apache.iotdb.confignode.partition.StorageGroupSchema;
 import org.apache.iotdb.confignode.persistence.RegionInfoPersistence;
+import org.apache.iotdb.confignode.physical.crud.CreateRegionsPlan;
 import org.apache.iotdb.confignode.physical.sys.QueryStorageGroupSchemaPlan;
 import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /** manage data partition and schema partition */
 public class RegionManager {
+
   private static final ConfigNodeConf conf = ConfigNodeDescriptor.getInstance().getConf();
   private static final int regionReplicaCount = conf.getRegionReplicaCount();
   private static final int schemaRegionCount = conf.getSchemaRegionCount();
   private static final int dataRegionCount = conf.getDataRegionCount();
 
-  /** partition read write lock */
-  private final ReentrantReadWriteLock partitionReadWriteLock;
-
-  // TODO: Serialize and Deserialize
-  private int nextSchemaRegionGroup = 0;
-  // TODO: Serialize and Deserialize
-  private int nextDataRegionGroup = 0;
-
-  private RegionInfoPersistence regionInfoPersistence = RegionInfoPersistence.getInstance();
+  private static final RegionInfoPersistence regionInfoPersistence =
+      RegionInfoPersistence.getInstance();
 
   private final Manager configNodeManager;
 
   public RegionManager(Manager configNodeManager) {
-    this.partitionReadWriteLock = new ReentrantReadWriteLock();
     this.configNodeManager = configNodeManager;
   }
 
@@ -66,41 +61,39 @@ public class RegionManager {
   }
 
   /**
-   * 1. region allocation 2. add to storage group map
+   * Set StorageGroup and allocate the default amount Regions
    *
    * @param plan SetStorageGroupPlan
-   * @return TSStatusCode.SUCCESS_STATUS if region allocate
+   * @return SUCCESS_STATUS if the StorageGroup is set and region allocation successful.
+   *     NOT_ENOUGH_DATA_NODE if there are not enough DataNode for Region allocation.
+   *     STORAGE_GROUP_ALREADY_EXISTS if the StorageGroup is already set.
    */
   public TSStatus setStorageGroup(SetStorageGroupPlan plan) {
     TSStatus result;
-    partitionReadWriteLock.writeLock().lock();
-    try {
-      if (configNodeManager.getDataNodeManager().getDataNodeId().size() < regionReplicaCount) {
-        result = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
-        result.setMessage("DataNode is not enough, please register more.");
+    if (configNodeManager.getDataNodeManager().getOnlineDataNodeCount() < regionReplicaCount) {
+      result = new TSStatus(TSStatusCode.NOT_ENOUGH_DATA_NODE.getStatusCode());
+      result.setMessage("DataNode is not enough, please register more.");
+    } else {
+      if (regionInfoPersistence.containsStorageGroup(plan.getSchema().getName())) {
+        result = new TSStatus(TSStatusCode.STORAGE_GROUP_ALREADY_EXISTS.getStatusCode());
+        result.setMessage(
+            String.format("StorageGroup %s is already set.", plan.getSchema().getName()));
       } else {
-        if (regionInfoPersistence.containsStorageGroup(plan.getSchema().getName())) {
-          result = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
-          result.setMessage(
-              String.format("StorageGroup %s is already set.", plan.getSchema().getName()));
-        } else {
-          String storageGroupName = plan.getSchema().getName();
-          StorageGroupSchema storageGroupSchema = new StorageGroupSchema(storageGroupName);
-
-          // allocate schema region
-          SchemaRegionInfo schemaRegionInfo = schemaRegionAllocation(storageGroupSchema);
-          plan.setSchemaRegionInfo(schemaRegionInfo);
-
-          // allocate data region
-          DataRegionInfo dataRegionInfo = dataRegionAllocation(storageGroupSchema);
-          plan.setDataRegionInfo(dataRegionInfo);
-
-          // write consensus
-          result = getConsensusManager().write(plan).getStatus();
-        }
+        CreateRegionsPlan createPlan = new CreateRegionsPlan();
+        createPlan.setStorageGroup(plan.getSchema().getName());
+
+        // allocate schema region
+        allocateRegions(GroupType.SchemaRegion, createPlan);
+        // allocate data region
+        allocateRegions(GroupType.DataRegion, createPlan);
+
+        // set StorageGroup
+        getConsensusManager().write(plan);
+
+        // create Region
+        // TODO: Send create Region to DataNode
+        result = getConsensusManager().write(createPlan).getStatus();
       }
-    } finally {
-      partitionReadWriteLock.writeLock().unlock();
     }
     return result;
   }
@@ -109,42 +102,31 @@ public class RegionManager {
     return configNodeManager.getDataNodeManager();
   }
 
-  private SchemaRegionInfo schemaRegionAllocation(StorageGroupSchema storageGroupSchema) {
+  private void allocateRegions(GroupType type, CreateRegionsPlan plan) {
 
-    SchemaRegionInfo schemaRegionInfo = new SchemaRegionInfo();
     // TODO: Use CopySet algorithm to optimize region allocation policy
-    for (int i = 0; i < schemaRegionCount; i++) {
-      List<Integer> dataNodeList = new ArrayList<>(getDataNodeInfoManager().getDataNodeId());
-      Collections.shuffle(dataNodeList);
-      schemaRegionInfo.addSchemaRegion(
-          nextSchemaRegionGroup, dataNodeList.subList(0, regionReplicaCount));
-      storageGroupSchema.addSchemaRegionGroup(nextSchemaRegionGroup);
-      nextSchemaRegionGroup += 1;
-    }
-    return schemaRegionInfo;
-  }
 
-  /**
-   * TODO: Only perform in leader node, @rongzhao
-   *
-   * @param storageGroupSchema
-   */
-  private DataRegionInfo dataRegionAllocation(StorageGroupSchema storageGroupSchema) {
-    // TODO: Use CopySet algorithm to optimize region allocation policy
-    DataRegionInfo dataRegionInfo = new DataRegionInfo();
-    for (int i = 0; i < dataRegionCount; i++) {
-      List<Integer> dataNodeList = new ArrayList<>(getDataNodeInfoManager().getDataNodeId());
-      Collections.shuffle(dataNodeList);
-      dataRegionInfo.createDataRegion(
-          nextDataRegionGroup, dataNodeList.subList(0, regionReplicaCount));
-      storageGroupSchema.addDataRegionGroup(nextDataRegionGroup);
-      nextDataRegionGroup += 1;
+    int regionCount = type.equals(GroupType.SchemaRegion) ? schemaRegionCount : dataRegionCount;
+    List<DataNodeLocation> onlineDataNodes = getDataNodeInfoManager().getOnlineDataNodes();
+    for (int i = 0; i < regionCount; i++) {
+      Collections.shuffle(onlineDataNodes);
+
+      RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
+      ConsensusGroupId consensusGroupId = null;
+      switch (type) {
+        case SchemaRegion:
+          consensusGroupId = new SchemaRegionId(regionInfoPersistence.generateNextRegionGroupId());
+          break;
+        case DataRegion:
+          consensusGroupId = new DataRegionId(regionInfoPersistence.generateNextRegionGroupId());
+      }
+      regionReplicaSet.setId(consensusGroupId);
+      regionReplicaSet.setDataNodeList(onlineDataNodes.subList(0, regionReplicaCount));
+      plan.addRegion(regionReplicaSet);
     }
-    return dataRegionInfo;
   }
 
   public StorageGroupSchemaDataSet getStorageGroupSchema() {
-
     ConsensusReadResponse readResponse =
         getConsensusManager().read(new QueryStorageGroupSchemaPlan());
     return (StorageGroupSchemaDataSet) readResponse.getDataset();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataRegionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataRegionInfo.java
deleted file mode 100644
index 9441d1452b..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataRegionInfo.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.partition;
-
-import org.apache.iotdb.confignode.util.SerializeDeserializeUtil;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class DataRegionInfo {
-
-  // TODO: Serialize and Deserialize
-  // Map<DataRegionID, List<DataNodeID>>
-  private Map<Integer, List<Integer>> dataRegionDataNodesMap;
-
-  // Map<StorageGroup, Map<DeviceGroupID, DataPartitionRule>>
-  private final Map<String, Map<Integer, DataPartitionRule>> dataPartitionRuleTable;
-
-  public DataRegionInfo() {
-    this.dataRegionDataNodesMap = new HashMap<>();
-    this.dataPartitionRuleTable = new HashMap<>();
-  }
-
-  public Map<Integer, List<Integer>> getDataRegionDataNodesMap() {
-    return dataRegionDataNodesMap;
-  }
-
-  public void createDataRegion(int dataRegionGroup, List<Integer> dataNodeList) {
-    dataRegionDataNodesMap.put(dataRegionGroup, dataNodeList);
-  }
-
-  public List<Integer> getDataRegionLocation(int dataRegionGroup) {
-    return dataRegionDataNodesMap.get(dataRegionGroup);
-  }
-
-  public void updateDataPartitionRule(
-      String StorageGroup, int deviceGroup, DataPartitionRule rule) {
-    // TODO: Data partition policy by @YongzaoDan
-  }
-
-  public void serializeImpl(ByteBuffer buffer) {
-    SerializeDeserializeUtil.writeIntMapLists(dataRegionDataNodesMap, buffer);
-  }
-
-  public void deserializeImpl(ByteBuffer buffer) {
-    dataRegionDataNodesMap = SerializeDeserializeUtil.readIntMapLists(buffer);
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/partition/SchemaRegionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/partition/SchemaRegionInfo.java
deleted file mode 100644
index 9bde9be5e7..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/partition/SchemaRegionInfo.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.partition;
-
-import org.apache.iotdb.confignode.util.SerializeDeserializeUtil;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class SchemaRegionInfo {
-
-  // TODO: Serialize and Deserialize
-  // Map<SchemaRegionID, List<DataNodeID>>
-  private Map<Integer, List<Integer>> schemaRegionDataNodesMap;
-
-  public SchemaRegionInfo() {
-    this.schemaRegionDataNodesMap = new HashMap<>();
-  }
-
-  public void addSchemaRegion(int schemaRegion, List<Integer> dataNode) {
-    if (!schemaRegionDataNodesMap.containsKey(schemaRegion)) {
-      schemaRegionDataNodesMap.put(schemaRegion, dataNode);
-    }
-  }
-
-  public List<Integer> getSchemaRegionLocation(int schemaRegionGroup) {
-    return schemaRegionDataNodesMap.get(schemaRegionGroup);
-  }
-
-  public Map<Integer, List<Integer>> getSchemaRegionDataNodesMap() {
-    return schemaRegionDataNodesMap;
-  }
-
-  public void serializeImpl(ByteBuffer buffer) {
-    SerializeDeserializeUtil.writeIntMapLists(schemaRegionDataNodesMap, buffer);
-  }
-
-  public void deserializeImpl(ByteBuffer buffer) {
-    schemaRegionDataNodesMap = SerializeDeserializeUtil.readIntMapLists(buffer);
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/partition/StorageGroupSchema.java b/confignode/src/main/java/org/apache/iotdb/confignode/partition/StorageGroupSchema.java
index fc4153c054..9bbe657503 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/partition/StorageGroupSchema.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/partition/StorageGroupSchema.java
@@ -18,22 +18,29 @@
  */
 package org.apache.iotdb.confignode.partition;
 
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.confignode.util.SerializeDeserializeUtil;
+
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 
 public class StorageGroupSchema {
 
   private String name;
 
-  private List<Integer> schemaRegionGroupIDs;
-  private List<Integer> dataRegionGroupIDs;
+  private final List<ConsensusGroupId> schemaRegionGroupIds;
+  private final List<ConsensusGroupId> dataRegionGroupIds;
 
   public StorageGroupSchema() {
-    // empty constructor
+    schemaRegionGroupIds = new ArrayList<>();
+    dataRegionGroupIds = new ArrayList<>();
   }
 
   public StorageGroupSchema(String name) {
+    this();
     this.name = name;
   }
 
@@ -41,37 +48,65 @@ public class StorageGroupSchema {
     return name;
   }
 
-  public List<Integer> getSchemaRegionGroupIDs() {
-    return schemaRegionGroupIDs;
+  public List<ConsensusGroupId> getSchemaRegionGroupIds() {
+    return schemaRegionGroupIds;
   }
 
-  public void addSchemaRegionGroup(int id) {
-    if (schemaRegionGroupIDs == null) {
-      schemaRegionGroupIDs = new ArrayList<>();
-    }
-    schemaRegionGroupIDs.add(id);
+  public void addSchemaRegionGroup(ConsensusGroupId id) {
+    schemaRegionGroupIds.add(id);
   }
 
-  public List<Integer> getDataRegionGroupIDs() {
-    return dataRegionGroupIDs;
+  public List<ConsensusGroupId> getDataRegionGroupIds() {
+    return dataRegionGroupIds;
   }
 
-  public void addDataRegionGroup(int id) {
-    if (dataRegionGroupIDs == null) {
-      dataRegionGroupIDs = new ArrayList<>();
-    }
-    dataRegionGroupIDs.add(id);
+  public void addDataRegionGroup(ConsensusGroupId id) {
+    dataRegionGroupIds.add(id);
   }
 
   public void serialize(ByteBuffer buffer) {
     buffer.putInt(name.length());
     buffer.put(name.getBytes());
+
+    buffer.putInt(schemaRegionGroupIds.size());
+    for (ConsensusGroupId schemaRegionGroupId : schemaRegionGroupIds) {
+      schemaRegionGroupId.serializeImpl(buffer);
+    }
+
+    buffer.putInt(dataRegionGroupIds.size());
+    for (ConsensusGroupId dataRegionGroupId : dataRegionGroupIds) {
+      dataRegionGroupId.serializeImpl(buffer);
+    }
   }
 
-  public void deserialize(ByteBuffer buffer) {
+  public void deserialize(ByteBuffer buffer) throws IOException {
+    name = SerializeDeserializeUtil.readString(buffer);
+
     int length = buffer.getInt();
-    byte[] byteName = new byte[length];
-    buffer.get(byteName, 0, length);
-    name = new String(byteName, 0, length);
+    for (int i = 0; i < length; i++) {
+      ConsensusGroupId schemaRegionId = ConsensusGroupId.Factory.create(buffer);
+      schemaRegionGroupIds.add(schemaRegionId);
+    }
+
+    length = buffer.getInt();
+    for (int i = 0; i < length; i++) {
+      ConsensusGroupId dataRegionId = ConsensusGroupId.Factory.create(buffer);
+      dataRegionGroupIds.add(dataRegionId);
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    StorageGroupSchema that = (StorageGroupSchema) o;
+    return name.equals(that.name)
+        && schemaRegionGroupIds.equals(that.schemaRegionGroupIds)
+        && dataRegionGroupIds.equals(that.dataRegionGroupIds);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(name, schemaRegionGroupIds, dataRegionGroupIds);
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/DataNodeInfoPersistence.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/DataNodeInfoPersistence.java
index 0071c94368..2b1ad0f5f6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/DataNodeInfoPersistence.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/DataNodeInfoPersistence.java
@@ -19,19 +19,17 @@
 package org.apache.iotdb.confignode.persistence;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.partition.DataNodeLocation;
+import org.apache.iotdb.commons.cluster.DataNodeLocation;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.consensus.response.DataNodesInfoDataSet;
 import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
 import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentNavigableMap;
@@ -39,65 +37,61 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class DataNodeInfoPersistence {
-  private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeInfoPersistence.class);
+
+  private final ReentrantReadWriteLock dataNodeInfoReadWriteLock;
+
+  // TODO: serialize and deserialize
+  private int nextDataNodeId = 0;
 
   /** online data nodes */
+  // TODO: serialize and deserialize
   private final ConcurrentNavigableMap<Integer, DataNodeLocation> onlineDataNodes =
       new ConcurrentSkipListMap();
 
   /** For remove node or draining node */
-  private Set<DataNodeLocation> drainingDataNodes = new HashSet<>();
-
-  private final ReentrantReadWriteLock dataNodeInfoReadWriteLock;
+  private final Set<DataNodeLocation> drainingDataNodes = new HashSet<>();
 
   private DataNodeInfoPersistence() {
     this.dataNodeInfoReadWriteLock = new ReentrantReadWriteLock();
   }
 
-  public ConcurrentNavigableMap<Integer, DataNodeLocation> getOnlineDataNodes() {
-    return onlineDataNodes;
-  }
-
   public boolean containsValue(DataNodeLocation info) {
-    return onlineDataNodes.containsValue(info);
+    boolean result = false;
+    dataNodeInfoReadWriteLock.readLock().lock();
+
+    try {
+      for (Map.Entry<Integer, DataNodeLocation> entry : onlineDataNodes.entrySet()) {
+        if (entry.getValue().getEndPoint().equals(info.getEndPoint())) {
+          result = true;
+          info.setDataNodeId(entry.getKey());
+          break;
+        }
+      }
+    } finally {
+      dataNodeInfoReadWriteLock.readLock().unlock();
+    }
+
+    return result;
   }
 
   public void put(int dataNodeID, DataNodeLocation info) {
     onlineDataNodes.put(dataNodeID, info);
   }
 
-  public int getDataNodeInfo(DataNodeLocation info) {
-    // TODO: optimize
-    for (Map.Entry<Integer, DataNodeLocation> entry : onlineDataNodes.entrySet()) {
-      if (entry.getValue().equals(info)) {
-        return info.getDataNodeID();
-      }
-    }
-    return -1;
-  }
-
   /**
-   * register dta node info when data node start
+   * Persist DataNode info
    *
    * @param plan RegisterDataNodePlan
-   * @return success if data node regist first
+   * @return SUCCESS_STATUS
    */
   public TSStatus registerDataNode(RegisterDataNodePlan plan) {
     TSStatus result;
     DataNodeLocation info = plan.getInfo();
     dataNodeInfoReadWriteLock.writeLock().lock();
     try {
-      if (onlineDataNodes.containsValue(info)) {
-        result = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
-        result.setMessage(
-            String.format(
-                "DataNode %s is already registered.", plan.getInfo().getEndPoint().toString()));
-      } else {
-        onlineDataNodes.put(info.getDataNodeID(), info);
-        result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-        result.setMessage(String.valueOf(info.getDataNodeID()));
-        LOGGER.info("Register data node success, data node is {}", plan);
-      }
+      nextDataNodeId = Math.max(nextDataNodeId, info.getDataNodeId());
+      onlineDataNodes.put(info.getDataNodeId(), info);
+      result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     } finally {
       dataNodeInfoReadWriteLock.writeLock().unlock();
     }
@@ -105,13 +99,16 @@ public class DataNodeInfoPersistence {
   }
 
   /**
-   * get dta node info
+   * Get DataNode info
    *
    * @param plan QueryDataNodeInfoPlan
-   * @return all data node info if dataNodeId of plan is -1
+   * @return The specific DataNode's info or all DataNode info if dataNodeId in
+   *     QueryDataNodeInfoPlan is -1
    */
   public DataNodesInfoDataSet getDataNodeInfo(QueryDataNodeInfoPlan plan) {
     DataNodesInfoDataSet result = new DataNodesInfoDataSet();
+    result.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+
     int dataNodeId = plan.getDataNodeID();
     dataNodeInfoReadWriteLock.readLock().lock();
     try {
@@ -127,46 +124,45 @@ public class DataNodeInfoPersistence {
     return result;
   }
 
-  public Set<Integer> getDataNodeIds() {
-    return onlineDataNodes.keySet();
+  public int getOnlineDataNodeCount() {
+    int result;
+    dataNodeInfoReadWriteLock.readLock().lock();
+    try {
+      result = onlineDataNodes.size();
+    } finally {
+      dataNodeInfoReadWriteLock.readLock().unlock();
+    }
+    return result;
   }
 
-  /**
-   * Add schema region group
-   *
-   * @param dataNodeId data node id
-   * @param schemaRegionGroup schema region group
-   */
-  public void addSchemaRegionGroup(int dataNodeId, int schemaRegionGroup) {
-    dataNodeInfoReadWriteLock.writeLock().lock();
+  public List<DataNodeLocation> getOnlineDataNodes() {
+    List<DataNodeLocation> result;
+    dataNodeInfoReadWriteLock.readLock().lock();
     try {
-      if (onlineDataNodes.containsKey(dataNodeId)) {
-        onlineDataNodes.get(dataNodeId).addSchemaRegionGroup(schemaRegionGroup);
-      }
+      result = new ArrayList<>(onlineDataNodes.values());
     } finally {
-      dataNodeInfoReadWriteLock.writeLock().unlock();
+      dataNodeInfoReadWriteLock.readLock().unlock();
     }
+    return result;
   }
 
-  /**
-   * Add data region group
-   *
-   * @param dataNodeId data node id
-   * @param dataRegionGroup data region group
-   */
-  public void addDataRegionGroup(int dataNodeId, int dataRegionGroup) {
-    dataNodeInfoReadWriteLock.writeLock().lock();
+  public int generateNextDataNodeId() {
+    int result;
+
     try {
-      if (onlineDataNodes.containsKey(dataNodeId)) {
-        onlineDataNodes.get(dataNodeId).addSchemaRegionGroup(dataRegionGroup);
-      }
+      dataNodeInfoReadWriteLock.writeLock().lock();
+      result = nextDataNodeId;
+      nextDataNodeId += 1;
     } finally {
       dataNodeInfoReadWriteLock.writeLock().unlock();
     }
+
+    return result;
   }
 
   @TestOnly
   public void clear() {
+    nextDataNodeId = 0;
     onlineDataNodes.clear();
     drainingDataNodes.clear();
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java
index 221ceb2993..2b8d8a5395 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java
@@ -19,25 +19,28 @@
 
 package org.apache.iotdb.confignode.persistence;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.commons.partition.SchemaPartition;
+import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
+import org.apache.iotdb.commons.partition.TimePartitionSlot;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.confignode.consensus.response.DataPartitionDataSet;
 import org.apache.iotdb.confignode.consensus.response.SchemaPartitionDataSet;
-import org.apache.iotdb.confignode.physical.sys.DataPartitionPlan;
-import org.apache.iotdb.confignode.physical.sys.SchemaPartitionPlan;
+import org.apache.iotdb.confignode.physical.crud.CreateDataPartitionPlan;
+import org.apache.iotdb.confignode.physical.crud.GetOrCreateDataPartitionPlan;
+import org.apache.iotdb.confignode.physical.crud.GetOrCreateSchemaPartitionPlan;
 import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /** manage data partition and schema partition */
 public class PartitionInfoPersistence {
-  private static final Logger LOGGER = LoggerFactory.getLogger(PartitionInfoPersistence.class);
 
   /** schema partition read write lock */
   private final ReentrantReadWriteLock schemaPartitionReadWriteLock;
@@ -55,86 +58,136 @@ public class PartitionInfoPersistence {
     this.schemaPartitionReadWriteLock = new ReentrantReadWriteLock();
     this.dataPartitionReadWriteLock = new ReentrantReadWriteLock();
     this.schemaPartition = new SchemaPartition();
+    this.schemaPartition.setSchemaPartitionMap(new HashMap<>());
     this.dataPartition = new DataPartition();
+    this.dataPartition.setDataPartitionMap(new HashMap<>());
   }
 
   /**
-   * Get schema partition
+   * TODO: Reconstruct this interface after PatterTree is moved to node-commons Get SchemaPartition
    *
-   * @param physicalPlan storageGroup and deviceGroupIDs
-   * @return Empty Data Set if does not exist
+   * @param physicalPlan SchemaPartitionPlan with PatternTree
+   * @return SchemaPartitionDataSet that contains only existing SchemaPartition
    */
-  public DataSet getSchemaPartition(SchemaPartitionPlan physicalPlan) {
+  public DataSet getSchemaPartition(GetOrCreateSchemaPartitionPlan physicalPlan) {
     SchemaPartitionDataSet schemaPartitionDataSet = new SchemaPartitionDataSet();
     schemaPartitionReadWriteLock.readLock().lock();
     try {
       String storageGroup = physicalPlan.getStorageGroup();
-      List<Integer> deviceGroupIDs = physicalPlan.getDeviceGroupIDs();
+      List<Integer> deviceGroupIDs = physicalPlan.getSeriesPartitionSlots();
       SchemaPartition schemaPartitionInfo = new SchemaPartition();
-      schemaPartitionInfo.setSchemaPartition(
+      schemaPartitionInfo.setSchemaPartitionMap(
           schemaPartition.getSchemaPartition(storageGroup, deviceGroupIDs));
-      schemaPartitionDataSet.setSchemaPartitionInfo(schemaPartitionInfo);
+      schemaPartitionDataSet.setSchemaPartition(schemaPartitionInfo);
     } finally {
       schemaPartitionReadWriteLock.readLock().unlock();
+      schemaPartitionDataSet.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
     }
     return schemaPartitionDataSet;
   }
 
   /**
-   * If does not exist, apply a new schema partition
+   * TODO: Reconstruct this interface after PatterTree is moved to node-commons Get SchemaPartition
+   * and create a new one if it does not exist
    *
-   * @param physicalPlan storage group and device group id
-   * @return Schema Partition data set
+   * @param physicalPlan SchemaPartitionPlan with PatternTree
+   * @return SchemaPartitionDataSet
    */
-  public DataSet applySchemaPartition(SchemaPartitionPlan physicalPlan) {
-    String storageGroup = physicalPlan.getStorageGroup();
-    List<Integer> deviceGroupIDs = physicalPlan.getDeviceGroupIDs();
-    List<Integer> noAssignDeviceGroupId =
-        schemaPartition.filterNoAssignDeviceGroupId(storageGroup, deviceGroupIDs);
-
-    // allocate partition by storage group and device group id
-    Map<Integer, RegionReplicaSet> deviceGroupIdReplicaSets =
-        physicalPlan.getDeviceGroupIdReplicaSets();
+  public TSStatus createSchemaPartition(GetOrCreateSchemaPartitionPlan physicalPlan) {
     schemaPartitionReadWriteLock.writeLock().lock();
-    try {
 
-      deviceGroupIdReplicaSets
-          .entrySet()
-          .forEach(
-              entity -> {
-                schemaPartition.setSchemaRegionReplicaSet(
-                    storageGroup, entity.getKey(), entity.getValue());
-              });
+    try {
+      // Allocate SchemaPartition by SchemaPartitionPlan
+      String storageGroup = physicalPlan.getStorageGroup();
+      Map<Integer, RegionReplicaSet> schemaPartitionReplicaSets =
+          physicalPlan.getSchemaPartitionReplicaSets();
+      schemaPartitionReplicaSets.forEach(
+          (key, value) -> schemaPartition.setSchemaRegionReplicaSet(storageGroup, key, value));
     } finally {
       schemaPartitionReadWriteLock.writeLock().unlock();
     }
 
-    return getSchemaPartition(physicalPlan);
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+  }
+
+  /** TODO: Reconstruct this interface after PatterTree is moved to node-commons */
+  public List<Integer> filterSchemaRegionNoAssignedPartitionSlots(
+      String storageGroup, List<Integer> seriesPartitionSlots) {
+    List<Integer> result;
+    schemaPartitionReadWriteLock.readLock().lock();
+    try {
+      result =
+          schemaPartition.filterNoAssignedSeriesPartitionSlot(storageGroup, seriesPartitionSlots);
+    } finally {
+      schemaPartitionReadWriteLock.readLock().unlock();
+    }
+    return result;
   }
 
   /**
-   * TODO:allocate schema partition by balancer
+   * Get DataPartition
    *
-   * @param physicalPlan physical plan
-   * @return data set
+   * @param physicalPlan DataPartitionPlan with Map<StorageGroupName, Map<SeriesPartitionSlot,
+   *     List<TimePartitionSlot>>>
+   * @return DataPartitionDataSet that contains only existing DataPartition
    */
-  public DataSet applyDataPartition(DataPartitionPlan physicalPlan) {
-    return null;
+  public DataSet getDataPartition(GetOrCreateDataPartitionPlan physicalPlan) {
+    DataPartitionDataSet dataPartitionDataSet = new DataPartitionDataSet();
+    dataPartitionReadWriteLock.readLock().lock();
+    try {
+      dataPartitionDataSet.setDataPartition(
+          dataPartition.getDataPartition(physicalPlan.getPartitionSlotsMap()));
+    } finally {
+      dataPartitionReadWriteLock.readLock().unlock();
+      dataPartitionDataSet.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+    }
+    return dataPartitionDataSet;
   }
 
-  public DataSet getDataPartition(DataPartitionPlan physicalPlan) {
-    return null;
+  public TSStatus createDataPartition(CreateDataPartitionPlan physicalPlan) {
+    dataPartitionReadWriteLock.writeLock().lock();
+
+    try {
+      // Allocate DataPartition by CreateDataPartitionPlan
+      Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
+          assignedResult = physicalPlan.getAssignedDataPartition();
+      assignedResult.forEach(
+          (storageGroup, seriesPartitionTimePartitionSlots) ->
+              seriesPartitionTimePartitionSlots.forEach(
+                  ((seriesPartitionSlot, timePartitionSlotRegionReplicaSets) ->
+                      timePartitionSlotRegionReplicaSets.forEach(
+                          ((timePartitionSlot, regionReplicaSets) ->
+                              regionReplicaSets.forEach(
+                                  regionReplicaSet ->
+                                      dataPartition.createDataPartition(
+                                          storageGroup,
+                                          seriesPartitionSlot,
+                                          timePartitionSlot,
+                                          regionReplicaSet)))))));
+    } finally {
+      dataPartitionReadWriteLock.writeLock().unlock();
+    }
+
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
-  public List<Integer> filterSchemaRegionNoAssignDeviceGroupId(
-      String storageGroup, List<Integer> deviceGroupIDs) {
-    return schemaPartition.filterNoAssignDeviceGroupId(storageGroup, deviceGroupIDs);
+  public Map<String, Map<SeriesPartitionSlot, List<TimePartitionSlot>>>
+      filterNoAssignedDataPartitionSlots(
+          Map<String, Map<SeriesPartitionSlot, List<TimePartitionSlot>>> partitionSlotsMap) {
+    Map<String, Map<SeriesPartitionSlot, List<TimePartitionSlot>>> result;
+    dataPartitionReadWriteLock.readLock().lock();
+    try {
+      result = dataPartition.filterNoAssignedDataPartitionSlots(partitionSlotsMap);
+    } finally {
+      dataPartitionReadWriteLock.readLock().unlock();
+    }
+    return result;
   }
 
   @TestOnly
   public void clear() {
-    if (schemaPartition.getSchemaPartition() != null) {
-      schemaPartition.getSchemaPartition().clear();
+    if (schemaPartition.getSchemaPartitionMap() != null) {
+      schemaPartition.getSchemaPartitionMap().clear();
     }
 
     if (dataPartition.getDataPartitionMap() != null) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/RegionInfoPersistence.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/RegionInfoPersistence.java
index 712c64e496..e7f429b566 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/RegionInfoPersistence.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/RegionInfoPersistence.java
@@ -20,15 +20,14 @@
 package org.apache.iotdb.confignode.persistence;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
-import org.apache.iotdb.commons.partition.DataNodeLocation;
 import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaDataSet;
-import org.apache.iotdb.confignode.partition.DataRegionInfo;
-import org.apache.iotdb.confignode.partition.SchemaRegionInfo;
 import org.apache.iotdb.confignode.partition.StorageGroupSchema;
+import org.apache.iotdb.confignode.physical.crud.CreateRegionsPlan;
 import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -37,148 +36,159 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
 
 /** manage data partition and schema partition */
 public class RegionInfoPersistence {
 
-  /** partition read write lock */
-  private final ReentrantReadWriteLock partitionReadWriteLock;
-
   // TODO: Serialize and Deserialize
-  // storageGroupName -> StorageGroupSchema
+  // Map<StorageGroupName, StorageGroupSchema>
   private final Map<String, StorageGroupSchema> storageGroupsMap;
 
+  // Region allocate lock
+  private final ReentrantReadWriteLock regionAllocateLock;
   // TODO: Serialize and Deserialize
-  private int nextSchemaRegionGroup = 0;
-  // TODO: Serialize and Deserialize
-  private int nextDataRegionGroup = 0;
+  private int nextRegionGroupId = 0;
 
-  // TODO: Serialize and Deserialize
-  private final SchemaRegionInfo schemaRegion;
-
-  // TODO: Serialize and Deserialize
-  private final DataRegionInfo dataRegion;
+  // Region read write lock
+  private final ReentrantReadWriteLock regionReadWriteLock;
+  // Map<ConsensusGroupId, RegionReplicaSet>
+  private final Map<ConsensusGroupId, RegionReplicaSet> regionMap;
 
   public RegionInfoPersistence() {
-    this.partitionReadWriteLock = new ReentrantReadWriteLock();
+    this.regionAllocateLock = new ReentrantReadWriteLock();
+    this.regionReadWriteLock = new ReentrantReadWriteLock();
     this.storageGroupsMap = new HashMap<>();
-    this.schemaRegion = new SchemaRegionInfo();
-    this.dataRegion = new DataRegionInfo();
+    this.regionMap = new HashMap<>();
   }
 
   /**
-   * 1. region allocation 2. add to storage group map
+   * Persistence new StorageGroupSchema
    *
    * @param plan SetStorageGroupPlan
-   * @return TSStatusCode.SUCCESS_STATUS if region allocate
+   * @return SUCCESS_STATUS
    */
   public TSStatus setStorageGroup(SetStorageGroupPlan plan) {
     TSStatus result;
-    partitionReadWriteLock.writeLock().lock();
+    regionReadWriteLock.writeLock().lock();
     try {
-      if (storageGroupsMap.containsKey(plan.getSchema().getName())) {
-        result = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
-        result.setMessage(
-            String.format("StorageGroup %s is already set.", plan.getSchema().getName()));
-      } else {
-        StorageGroupSchema schema = new StorageGroupSchema(plan.getSchema().getName());
-        storageGroupsMap.put(schema.getName(), schema);
-
-        plan.getSchemaRegionInfo()
-            .getSchemaRegionDataNodesMap()
-            .entrySet()
-            .forEach(
-                entity -> {
-                  schemaRegion.addSchemaRegion(entity.getKey(), entity.getValue());
-                  entity
-                      .getValue()
-                      .forEach(
-                          dataNodeId -> {
-                            DataNodeInfoPersistence.getInstance()
-                                .addSchemaRegionGroup(dataNodeId, entity.getKey());
-                          });
-                });
-
-        plan.getDataRegionInfo()
-            .getDataRegionDataNodesMap()
-            .entrySet()
-            .forEach(
-                entity -> {
-                  dataRegion.createDataRegion(entity.getKey(), entity.getValue());
-                  entity
-                      .getValue()
-                      .forEach(
-                          dataNodeId -> {
-                            DataNodeInfoPersistence.getInstance()
-                                .addDataRegionGroup(dataNodeId, entity.getKey());
-                          });
-                });
-
-        result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-      }
+      StorageGroupSchema schema = plan.getSchema();
+      storageGroupsMap.put(schema.getName(), schema);
+      result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     } finally {
-      partitionReadWriteLock.writeLock().unlock();
+      regionReadWriteLock.writeLock().unlock();
     }
     return result;
   }
 
   public StorageGroupSchemaDataSet getStorageGroupSchema() {
     StorageGroupSchemaDataSet result = new StorageGroupSchemaDataSet();
-    partitionReadWriteLock.readLock().lock();
+    regionReadWriteLock.readLock().lock();
     try {
       result.setSchemaList(new ArrayList<>(storageGroupsMap.values()));
     } finally {
-      partitionReadWriteLock.readLock().unlock();
+      regionReadWriteLock.readLock().unlock();
+      result.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+    }
+    return result;
+  }
+
+  /**
+   * Persistence allocation result of new Regions
+   *
+   * @param plan CreateRegionsPlan
+   * @return SUCCESS_STATUS
+   */
+  public TSStatus createRegions(CreateRegionsPlan plan) {
+    TSStatus result;
+    regionReadWriteLock.writeLock().lock();
+    regionAllocateLock.writeLock().lock();
+    try {
+      StorageGroupSchema schema = storageGroupsMap.get(plan.getStorageGroup());
+
+      for (RegionReplicaSet regionReplicaSet : plan.getRegionReplicaSets()) {
+        nextRegionGroupId = Math.max(nextRegionGroupId, regionReplicaSet.getId().getId());
+        regionMap.put(regionReplicaSet.getId(), regionReplicaSet);
+        if (regionReplicaSet.getId() instanceof DataRegionId) {
+          schema.addDataRegionGroup(regionReplicaSet.getId());
+        } else if (regionReplicaSet.getId() instanceof SchemaRegionId) {
+          schema.addSchemaRegionGroup(regionReplicaSet.getId());
+        }
+      }
+
+      result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    } finally {
+      regionAllocateLock.writeLock().unlock();
+      regionReadWriteLock.writeLock().unlock();
     }
     return result;
   }
 
-  /** @return key is schema region id, value is endpoint list */
-  public List<RegionReplicaSet> getSchemaRegionEndPoint() {
+  /** @return The SchemaRegion ReplicaSets in the specific StorageGroup */
+  public List<RegionReplicaSet> getSchemaRegionEndPoint(String storageGroup) {
     List<RegionReplicaSet> schemaRegionEndPoints = new ArrayList<>();
+    regionReadWriteLock.readLock().lock();
+    try {
+      if (storageGroupsMap.containsKey(storageGroup)) {
+        List<ConsensusGroupId> schemaRegionIds =
+            storageGroupsMap.get(storageGroup).getSchemaRegionGroupIds();
+        for (ConsensusGroupId consensusGroupId : schemaRegionIds) {
+          schemaRegionEndPoints.add(regionMap.get(consensusGroupId));
+        }
+      }
+    } finally {
+      regionReadWriteLock.readLock().unlock();
+    }
 
-    schemaRegion
-        .getSchemaRegionDataNodesMap()
-        .entrySet()
-        .forEach(
-            entity -> {
-              RegionReplicaSet schemaRegionReplicaSet = new RegionReplicaSet();
-              List<Endpoint> endPoints = new ArrayList<>();
-              entity
-                  .getValue()
-                  .forEach(
-                      dataNodeId -> {
-                        if (DataNodeInfoPersistence.getInstance()
-                            .getOnlineDataNodes()
-                            .containsKey(dataNodeId)) {
-                          endPoints.add(
-                              DataNodeInfoPersistence.getInstance()
-                                  .getOnlineDataNodes()
-                                  .get(dataNodeId)
-                                  .getEndPoint());
-                        }
-                      });
-              schemaRegionReplicaSet.setId(new SchemaRegionId(entity.getKey()));
-              // TODO: (xingtanzjr) We cannot get the dataNodeId here, use 0 as the placeholder
-              schemaRegionReplicaSet.setDataNodeList(
-                  endPoints.stream()
-                      .map(endpoint -> new DataNodeLocation(0, endpoint))
-                      .collect(Collectors.toList()));
-              schemaRegionEndPoints.add(schemaRegionReplicaSet);
-            });
     return schemaRegionEndPoints;
   }
 
+  /** @return The DataRegion ReplicaSets in the specific StorageGroup */
+  public List<RegionReplicaSet> getDataRegionEndPoint(String storageGroup) {
+    List<RegionReplicaSet> dataRegionEndPoints = new ArrayList<>();
+    regionReadWriteLock.readLock().lock();
+    try {
+      if (storageGroupsMap.containsKey(storageGroup)) {
+        List<ConsensusGroupId> dataRegionIds =
+            storageGroupsMap.get(storageGroup).getDataRegionGroupIds();
+        for (ConsensusGroupId consensusGroupId : dataRegionIds) {
+          dataRegionEndPoints.add(regionMap.get(consensusGroupId));
+        }
+      }
+    } finally {
+      regionReadWriteLock.readLock().unlock();
+    }
+
+    return dataRegionEndPoints;
+  }
+
+  public int generateNextRegionGroupId() {
+    int result;
+    regionAllocateLock.writeLock().lock();
+    try {
+      result = nextRegionGroupId;
+      nextRegionGroupId += 1;
+    } finally {
+      regionAllocateLock.writeLock().unlock();
+    }
+    return result;
+  }
+
   public boolean containsStorageGroup(String storageName) {
-    return storageGroupsMap.containsKey(storageName);
+    boolean result;
+    regionReadWriteLock.readLock().lock();
+    try {
+      result = storageGroupsMap.containsKey(storageName);
+    } finally {
+      regionReadWriteLock.readLock().unlock();
+    }
+    return result;
   }
 
   @TestOnly
   public void clear() {
+    nextRegionGroupId = 0;
     storageGroupsMap.clear();
-    schemaRegion.getSchemaRegionDataNodesMap().clear();
-    dataRegion.getDataRegionDataNodesMap().clear();
+    regionMap.clear();
   }
 
   private static class RegionInfoPersistenceHolder {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlan.java
index 1c71cf70ff..957e26d52e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlan.java
@@ -18,12 +18,15 @@
  */
 package org.apache.iotdb.confignode.physical;
 
+import org.apache.iotdb.confignode.physical.crud.CreateDataPartitionPlan;
+import org.apache.iotdb.confignode.physical.crud.CreateRegionsPlan;
+import org.apache.iotdb.confignode.physical.crud.CreateSchemaPartitionPlan;
+import org.apache.iotdb.confignode.physical.crud.GetOrCreateDataPartitionPlan;
+import org.apache.iotdb.confignode.physical.crud.GetOrCreateSchemaPartitionPlan;
 import org.apache.iotdb.confignode.physical.sys.AuthorPlan;
-import org.apache.iotdb.confignode.physical.sys.DataPartitionPlan;
 import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
 import org.apache.iotdb.confignode.physical.sys.QueryStorageGroupSchemaPlan;
 import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
-import org.apache.iotdb.confignode.physical.sys.SchemaPartitionPlan;
 import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 
@@ -98,17 +101,26 @@ public abstract class PhysicalPlan implements IConsensusRequest {
         case QueryStorageGroupSchema:
           plan = new QueryStorageGroupSchemaPlan();
           break;
-        case QueryDataPartition:
-          plan = new DataPartitionPlan(PhysicalPlanType.QueryDataPartition);
+        case CreateRegions:
+          plan = new CreateRegionsPlan();
           break;
-        case ApplyDataPartition:
-          plan = new DataPartitionPlan(PhysicalPlanType.ApplyDataPartition);
+        case GetSchemaPartition:
+          plan = new GetOrCreateSchemaPartitionPlan(PhysicalPlanType.GetSchemaPartition);
           break;
-        case QuerySchemaPartition:
-          plan = new SchemaPartitionPlan(PhysicalPlanType.QuerySchemaPartition);
+        case CreateSchemaPartition:
+          plan = new CreateSchemaPartitionPlan(PhysicalPlanType.CreateSchemaPartition);
           break;
-        case ApplySchemaPartition:
-          plan = new SchemaPartitionPlan(PhysicalPlanType.ApplySchemaPartition);
+        case GetOrCreateSchemaPartition:
+          plan = new GetOrCreateSchemaPartitionPlan(PhysicalPlanType.GetOrCreateSchemaPartition);
+          break;
+        case GetDataPartition:
+          plan = new GetOrCreateDataPartitionPlan(PhysicalPlanType.GetDataPartition);
+          break;
+        case CreateDataPartition:
+          plan = new CreateDataPartitionPlan();
+          break;
+        case GetOrCreateDataPartition:
+          plan = new GetOrCreateDataPartitionPlan(PhysicalPlanType.GetOrCreateDataPartition);
           break;
         case LIST_USER:
         case LIST_ROLE:
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlanType.java b/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlanType.java
index 540bf0bdd6..f047dd9723 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlanType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlanType.java
@@ -24,11 +24,13 @@ public enum PhysicalPlanType {
   SetStorageGroup,
   DeleteStorageGroup,
   QueryStorageGroupSchema,
-  CreateRegion,
-  QueryDataPartition,
-  ApplyDataPartition,
-  QuerySchemaPartition,
-  ApplySchemaPartition,
+  CreateRegions,
+  GetSchemaPartition,
+  CreateSchemaPartition,
+  GetOrCreateSchemaPartition,
+  GetDataPartition,
+  CreateDataPartition,
+  GetOrCreateDataPartition,
   AUTHOR,
   CREATE_USER,
   CREATE_ROLE,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateDataPartitionPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateDataPartitionPlan.java
new file mode 100644
index 0000000000..d2bb602e09
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateDataPartitionPlan.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.physical.crud;
+
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
+import org.apache.iotdb.commons.partition.TimePartitionSlot;
+import org.apache.iotdb.confignode.physical.PhysicalPlan;
+import org.apache.iotdb.confignode.physical.PhysicalPlanType;
+import org.apache.iotdb.confignode.util.SerializeDeserializeUtil;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class CreateDataPartitionPlan extends PhysicalPlan {
+
+  private Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
+      assignedDataPartition;
+
+  public CreateDataPartitionPlan() {
+    super(PhysicalPlanType.CreateDataPartition);
+  }
+
+  public Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
+      getAssignedDataPartition() {
+    return assignedDataPartition;
+  }
+
+  public void setAssignedDataPartition(
+      Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
+          assignedDataPartition) {
+    this.assignedDataPartition = assignedDataPartition;
+  }
+
+  @Override
+  protected void serializeImpl(ByteBuffer buffer) {
+    buffer.putInt(PhysicalPlanType.CreateDataPartition.ordinal());
+
+    buffer.putInt(assignedDataPartition.size());
+    for (Map.Entry<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
+        seriesPartitionTimePartitionEntry : assignedDataPartition.entrySet()) {
+      SerializeDeserializeUtil.write(seriesPartitionTimePartitionEntry.getKey(), buffer);
+      buffer.putInt(seriesPartitionTimePartitionEntry.getValue().size());
+      for (Map.Entry<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>
+          timePartitionEntry : seriesPartitionTimePartitionEntry.getValue().entrySet()) {
+        timePartitionEntry.getKey().serializeImpl(buffer);
+        buffer.putInt(timePartitionEntry.getValue().size());
+        for (Map.Entry<TimePartitionSlot, List<RegionReplicaSet>> regionReplicaSetEntry :
+            timePartitionEntry.getValue().entrySet()) {
+          regionReplicaSetEntry.getKey().serializeImpl(buffer);
+          buffer.putInt(regionReplicaSetEntry.getValue().size());
+          for (RegionReplicaSet regionReplicaSet : regionReplicaSetEntry.getValue()) {
+            regionReplicaSet.serializeImpl(buffer);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+    assignedDataPartition = new HashMap<>();
+    int storageGroupNum = buffer.getInt();
+    for (int i = 0; i < storageGroupNum; i++) {
+      String storageGroupName = SerializeDeserializeUtil.readString(buffer);
+      assignedDataPartition.put(storageGroupName, new HashMap<>());
+      int seriesPartitionSlotNum = buffer.getInt();
+      for (int j = 0; j < seriesPartitionSlotNum; j++) {
+        SeriesPartitionSlot seriesPartitionSlot = new SeriesPartitionSlot();
+        seriesPartitionSlot.deserializeImpl(buffer);
+        assignedDataPartition.get(storageGroupName).put(seriesPartitionSlot, new HashMap<>());
+        int timePartitionSlotNum = buffer.getInt();
+        for (int k = 0; k < timePartitionSlotNum; k++) {
+          TimePartitionSlot timePartitionSlot = new TimePartitionSlot();
+          timePartitionSlot.deserializeImpl(buffer);
+          assignedDataPartition
+              .get(storageGroupName)
+              .get(seriesPartitionSlot)
+              .put(timePartitionSlot, new ArrayList<>());
+          int regionReplicaSetNum = buffer.getInt();
+          for (int l = 0; l < regionReplicaSetNum; l++) {
+            RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
+            regionReplicaSet.deserializeImpl(buffer);
+            assignedDataPartition
+                .get(storageGroupName)
+                .get(seriesPartitionSlot)
+                .get(timePartitionSlot)
+                .add(regionReplicaSet);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    CreateDataPartitionPlan that = (CreateDataPartitionPlan) o;
+    return assignedDataPartition.equals(that.assignedDataPartition);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(assignedDataPartition);
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateRegionsPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateRegionsPlan.java
new file mode 100644
index 0000000000..d61a8bb702
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateRegionsPlan.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.physical.crud;
+
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.confignode.physical.PhysicalPlan;
+import org.apache.iotdb.confignode.physical.PhysicalPlanType;
+import org.apache.iotdb.confignode.util.SerializeDeserializeUtil;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class CreateRegionsPlan extends PhysicalPlan {
+
+  private String storageGroup;
+
+  private final List<RegionReplicaSet> regionReplicaSets;
+
+  public CreateRegionsPlan() {
+    super(PhysicalPlanType.CreateRegions);
+    this.regionReplicaSets = new ArrayList<>();
+  }
+
+  public String getStorageGroup() {
+    return storageGroup;
+  }
+
+  public void setStorageGroup(String storageGroup) {
+    this.storageGroup = storageGroup;
+  }
+
+  public void addRegion(RegionReplicaSet regionReplicaSet) {
+    this.regionReplicaSets.add(regionReplicaSet);
+  }
+
+  public List<RegionReplicaSet> getRegionReplicaSets() {
+    return regionReplicaSets;
+  }
+
+  @Override
+  protected void serializeImpl(ByteBuffer buffer) {
+    buffer.putInt(PhysicalPlanType.CreateRegions.ordinal());
+
+    SerializeDeserializeUtil.write(storageGroup, buffer);
+
+    buffer.putInt(regionReplicaSets.size());
+    for (RegionReplicaSet regionReplicaSet : regionReplicaSets) {
+      regionReplicaSet.serializeImpl(buffer);
+    }
+  }
+
+  @Override
+  protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+    storageGroup = SerializeDeserializeUtil.readString(buffer);
+
+    int length = buffer.getInt();
+    for (int i = 0; i < length; i++) {
+      RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
+      regionReplicaSet.deserializeImpl(buffer);
+      regionReplicaSets.add(regionReplicaSet);
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    CreateRegionsPlan that = (CreateRegionsPlan) o;
+    return storageGroup.equals(that.storageGroup)
+        && regionReplicaSets.equals(that.regionReplicaSets);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(storageGroup, regionReplicaSets);
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/hash/JSHashExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateSchemaPartitionPlan.java
similarity index 58%
copy from node-commons/src/main/java/org/apache/iotdb/commons/hash/JSHashExecutor.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateSchemaPartitionPlan.java
index 6521d2890b..15b5b2ec54 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/hash/JSHashExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateSchemaPartitionPlan.java
@@ -16,25 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.commons.hash;
+package org.apache.iotdb.confignode.physical.crud;
 
-public class JSHashExecutor extends DeviceGroupHashExecutor {
+import org.apache.iotdb.confignode.physical.PhysicalPlan;
+import org.apache.iotdb.confignode.physical.PhysicalPlanType;
 
-  private static final int base = 1315423911;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 
-  public JSHashExecutor(int deviceGroupCount) {
-    super(deviceGroupCount);
+/** TODO: Reconstruct this interface after PatterTree is moved to node-commons */
+public class CreateSchemaPartitionPlan extends PhysicalPlan {
+
+  public CreateSchemaPartitionPlan(PhysicalPlanType type) {
+    super(type);
   }
 
   @Override
-  public int getDeviceGroupID(String device) {
-    int hash = base;
-
-    for (int i = 0; i < device.length(); i++) {
-      hash ^= ((hash << 5) + (int) device.charAt(i) + (hash >> 2));
-    }
-    hash &= Integer.MAX_VALUE;
+  protected void serializeImpl(ByteBuffer buffer) {}
 
-    return hash % deviceGroupCount;
-  }
+  @Override
+  protected void deserializeImpl(ByteBuffer buffer) throws IOException {}
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/GetOrCreateDataPartitionPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/GetOrCreateDataPartitionPlan.java
new file mode 100644
index 0000000000..5cd6aec141
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/GetOrCreateDataPartitionPlan.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.physical.crud;
+
+import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
+import org.apache.iotdb.commons.partition.TimePartitionSlot;
+import org.apache.iotdb.confignode.physical.PhysicalPlan;
+import org.apache.iotdb.confignode.physical.PhysicalPlanType;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
+import org.apache.iotdb.confignode.util.SerializeDeserializeUtil;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/** Query or apply DataPartition by the specific storageGroup and the deviceGroupStartTimeMap. */
+public class GetOrCreateDataPartitionPlan extends PhysicalPlan {
+
+  private Map<String, Map<SeriesPartitionSlot, List<TimePartitionSlot>>> partitionSlotsMap;
+
+  public GetOrCreateDataPartitionPlan(PhysicalPlanType physicalPlanType) {
+    super(physicalPlanType);
+  }
+
+  public Map<String, Map<SeriesPartitionSlot, List<TimePartitionSlot>>> getPartitionSlotsMap() {
+    return partitionSlotsMap;
+  }
+
+  public void setPartitionSlotsMap(
+      Map<String, Map<SeriesPartitionSlot, List<TimePartitionSlot>>> partitionSlotsMap) {
+    this.partitionSlotsMap = partitionSlotsMap;
+  }
+
+  /**
+   * Convert TDataPartitionReq to DataPartitionPlan
+   *
+   * @param req TDataPartitionReq
+   */
+  public void convertFromRpcTDataPartitionReq(TDataPartitionReq req) {
+    partitionSlotsMap = new HashMap<>();
+    req.getPartitionSlotsMap()
+        .forEach(
+            ((storageGroup, tSeriesPartitionTimePartitionSlots) -> {
+              // Extract StorageGroupName
+              partitionSlotsMap.putIfAbsent(storageGroup, new HashMap<>());
+
+              tSeriesPartitionTimePartitionSlots.forEach(
+                  ((tSeriesPartitionSlot, tTimePartitionSlots) -> {
+                    // Extract SeriesPartitionSlot
+                    SeriesPartitionSlot seriesPartitionSlot =
+                        new SeriesPartitionSlot(tSeriesPartitionSlot.getSlotId());
+                    partitionSlotsMap
+                        .get(storageGroup)
+                        .putIfAbsent(seriesPartitionSlot, new ArrayList<>());
+
+                    // Extract TimePartitionSlots
+                    tTimePartitionSlots.forEach(
+                        tTimePartitionSlot ->
+                            partitionSlotsMap
+                                .get(storageGroup)
+                                .get(seriesPartitionSlot)
+                                .add(new TimePartitionSlot(tTimePartitionSlot.getStartTime())));
+                  }));
+            }));
+  }
+
+  @Override
+  protected void serializeImpl(ByteBuffer buffer) {
+    buffer.putInt(PhysicalPlanType.GetDataPartition.ordinal());
+
+    buffer.putInt(partitionSlotsMap.size());
+    partitionSlotsMap.forEach(
+        ((storageGroup, seriesPartitionTimePartitionSlots) -> {
+          SerializeDeserializeUtil.write(storageGroup, buffer);
+          buffer.putInt(seriesPartitionTimePartitionSlots.size());
+          seriesPartitionTimePartitionSlots.forEach(
+              ((seriesPartitionSlot, timePartitionSlots) -> {
+                seriesPartitionSlot.serializeImpl(buffer);
+                buffer.putInt(timePartitionSlots.size());
+                timePartitionSlots.forEach(
+                    timePartitionSlot -> timePartitionSlot.serializeImpl(buffer));
+              }));
+        }));
+  }
+
+  @Override
+  protected void deserializeImpl(ByteBuffer buffer) {
+    partitionSlotsMap = new HashMap<>();
+    int storageGroupNum = buffer.getInt();
+    for (int i = 0; i < storageGroupNum; i++) {
+      String storageGroup = SerializeDeserializeUtil.readString(buffer);
+      partitionSlotsMap.put(storageGroup, new HashMap<>());
+      int seriesPartitionSlotNum = buffer.getInt();
+      for (int j = 0; j < seriesPartitionSlotNum; j++) {
+        SeriesPartitionSlot seriesPartitionSlot = new SeriesPartitionSlot();
+        seriesPartitionSlot.deserializeImpl(buffer);
+        partitionSlotsMap.get(storageGroup).put(seriesPartitionSlot, new ArrayList<>());
+        int timePartitionSlotNum = buffer.getInt();
+        for (int k = 0; k < timePartitionSlotNum; k++) {
+          TimePartitionSlot timePartitionSlot = new TimePartitionSlot();
+          timePartitionSlot.deserializeImpl(buffer);
+          partitionSlotsMap.get(storageGroup).get(seriesPartitionSlot).add(timePartitionSlot);
+        }
+      }
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    GetOrCreateDataPartitionPlan that = (GetOrCreateDataPartitionPlan) o;
+    return partitionSlotsMap.equals(that.partitionSlotsMap);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(partitionSlotsMap);
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/SchemaPartitionPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/GetOrCreateSchemaPartitionPlan.java
similarity index 58%
rename from confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/SchemaPartitionPlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/GetOrCreateSchemaPartitionPlan.java
index 67fbc2353a..642aceed5c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/SchemaPartitionPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/GetOrCreateSchemaPartitionPlan.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.physical.sys;
+package org.apache.iotdb.confignode.physical.crud;
 
 import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.confignode.physical.PhysicalPlan;
@@ -29,40 +29,41 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-/** Get DataNodeInfo by the specific DataNode's id. And return all when dataNodeID is set to -1. */
-public class SchemaPartitionPlan extends PhysicalPlan {
+/** Query or apply SchemaPartition by the specific storageGroup and the deviceGroupStartTimeMap. */
+public class GetOrCreateSchemaPartitionPlan extends PhysicalPlan {
   private String storageGroup;
-  private List<Integer> deviceGroupIDs;
-  private Map<Integer, RegionReplicaSet> deviceGroupIdReplicaSets;
+  private List<Integer> seriesPartitionSlots;
+  private Map<Integer, RegionReplicaSet> schemaPartitionReplicaSets;
 
-  public SchemaPartitionPlan(PhysicalPlanType physicalPlanType) {
+  public GetOrCreateSchemaPartitionPlan(PhysicalPlanType physicalPlanType) {
     super(physicalPlanType);
   }
 
-  public SchemaPartitionPlan(
-      PhysicalPlanType physicalPlanType, String storageGroup, List<Integer> deviceGroupIDs) {
+  public GetOrCreateSchemaPartitionPlan(
+      PhysicalPlanType physicalPlanType, String storageGroup, List<Integer> seriesPartitionSlots) {
     this(physicalPlanType);
     this.storageGroup = storageGroup;
-    this.deviceGroupIDs = deviceGroupIDs;
+    this.seriesPartitionSlots = seriesPartitionSlots;
   }
 
-  public void setDeviceGroupIdReplicaSet(Map<Integer, RegionReplicaSet> deviceGroupIdReplicaSets) {
-    this.deviceGroupIdReplicaSets = deviceGroupIdReplicaSets;
+  public void setSchemaPartitionReplicaSet(
+      Map<Integer, RegionReplicaSet> deviceGroupIdReplicaSets) {
+    this.schemaPartitionReplicaSets = deviceGroupIdReplicaSets;
   }
 
-  public Map<Integer, RegionReplicaSet> getDeviceGroupIdReplicaSets() {
-    return deviceGroupIdReplicaSets;
+  public Map<Integer, RegionReplicaSet> getSchemaPartitionReplicaSets() {
+    return schemaPartitionReplicaSets;
   }
 
   @Override
   protected void serializeImpl(ByteBuffer buffer) {
-    buffer.putInt(PhysicalPlanType.QueryDataPartition.ordinal());
+    buffer.putInt(PhysicalPlanType.GetDataPartition.ordinal());
     SerializeDeserializeUtil.write(storageGroup, buffer);
-    buffer.putInt(deviceGroupIDs.size());
-    deviceGroupIDs.forEach(id -> SerializeDeserializeUtil.write(id, buffer));
+    buffer.putInt(seriesPartitionSlots.size());
+    seriesPartitionSlots.forEach(id -> SerializeDeserializeUtil.write(id, buffer));
 
-    buffer.putInt(deviceGroupIdReplicaSets.size());
-    for (Map.Entry<Integer, RegionReplicaSet> entry : deviceGroupIdReplicaSets.entrySet()) {
+    buffer.putInt(schemaPartitionReplicaSets.size());
+    for (Map.Entry<Integer, RegionReplicaSet> entry : schemaPartitionReplicaSets.entrySet()) {
       buffer.putInt(entry.getKey());
       entry.getValue().serializeImpl(buffer);
     }
@@ -73,18 +74,17 @@ public class SchemaPartitionPlan extends PhysicalPlan {
     storageGroup = SerializeDeserializeUtil.readString(buffer);
     int idSize = SerializeDeserializeUtil.readInt(buffer);
     for (int i = 0; i < idSize; i++) {
-      deviceGroupIDs.add(SerializeDeserializeUtil.readInt(buffer));
+      seriesPartitionSlots.add(SerializeDeserializeUtil.readInt(buffer));
     }
 
-    if (deviceGroupIdReplicaSets == null) {
-      deviceGroupIdReplicaSets = new HashMap<>();
+    if (schemaPartitionReplicaSets == null) {
+      schemaPartitionReplicaSets = new HashMap<>();
     }
     int size = buffer.getInt();
-
     for (int i = 0; i < size; i++) {
       RegionReplicaSet schemaRegionReplicaSet = new RegionReplicaSet();
       schemaRegionReplicaSet.deserializeImpl(buffer);
-      deviceGroupIdReplicaSets.put(buffer.getInt(), schemaRegionReplicaSet);
+      schemaPartitionReplicaSets.put(buffer.getInt(), schemaRegionReplicaSet);
     }
   }
 
@@ -92,7 +92,7 @@ public class SchemaPartitionPlan extends PhysicalPlan {
     return storageGroup;
   }
 
-  public List<Integer> getDeviceGroupIDs() {
-    return deviceGroupIDs;
+  public List<Integer> getSeriesPartitionSlots() {
+    return seriesPartitionSlots;
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/DataPartitionPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/DataPartitionPlan.java
deleted file mode 100644
index 556944ecea..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/DataPartitionPlan.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.physical.sys;
-
-import org.apache.iotdb.confignode.physical.PhysicalPlan;
-import org.apache.iotdb.confignode.physical.PhysicalPlanType;
-import org.apache.iotdb.confignode.util.SerializeDeserializeUtil;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-
-/**
- * query DataPartition or apply DataPartition by the specific storageGroup and
- * deviceGroupStartTimeMap.
- */
-public class DataPartitionPlan extends PhysicalPlan {
-  private String storageGroup;
-  private Map<Integer, List<Integer>> deviceGroupStartTimeMap;
-
-  public DataPartitionPlan(PhysicalPlanType physicalPlanType) {
-    super(physicalPlanType);
-  }
-
-  public DataPartitionPlan(
-      PhysicalPlanType physicalPlanType,
-      String storageGroup,
-      Map<Integer, List<Integer>> deviceGroupStartTimeMap) {
-    this(physicalPlanType);
-    this.storageGroup = storageGroup;
-    this.deviceGroupStartTimeMap = deviceGroupStartTimeMap;
-  }
-
-  public String getStorageGroup() {
-    return storageGroup;
-  }
-
-  public void setStorageGroup(String storageGroup) {
-    this.storageGroup = storageGroup;
-  }
-
-  public Map<Integer, List<Integer>> getDeviceGroupIDs() {
-    return deviceGroupStartTimeMap;
-  }
-
-  public void setDeviceGroupIDs(Map<Integer, List<Integer>> deviceGroupIDs) {
-    this.deviceGroupStartTimeMap = deviceGroupIDs;
-  }
-
-  @Override
-  protected void serializeImpl(ByteBuffer buffer) {
-    buffer.putInt(PhysicalPlanType.QueryDataPartition.ordinal());
-    SerializeDeserializeUtil.write(storageGroup, buffer);
-    SerializeDeserializeUtil.writeIntMapLists(deviceGroupStartTimeMap, buffer);
-  }
-
-  @Override
-  protected void deserializeImpl(ByteBuffer buffer) {
-    storageGroup = SerializeDeserializeUtil.readString(buffer);
-    deviceGroupStartTimeMap = SerializeDeserializeUtil.readIntMapLists(buffer);
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/QueryDataNodeInfoPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/QueryDataNodeInfoPlan.java
index 95a2c8df8a..503fae05ca 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/QueryDataNodeInfoPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/QueryDataNodeInfoPlan.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.confignode.physical.PhysicalPlan;
 import org.apache.iotdb.confignode.physical.PhysicalPlanType;
 
 import java.nio.ByteBuffer;
+import java.util.Objects;
 
 /** Get DataNodeInfo by the specific DataNode's id. And return all when dataNodeID is set to -1. */
 public class QueryDataNodeInfoPlan extends PhysicalPlan {
@@ -51,4 +52,17 @@ public class QueryDataNodeInfoPlan extends PhysicalPlan {
   protected void deserializeImpl(ByteBuffer buffer) {
     this.dataNodeID = buffer.getInt();
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    QueryDataNodeInfoPlan that = (QueryDataNodeInfoPlan) o;
+    return dataNodeID == that.dataNodeID;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(dataNodeID);
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/RegisterDataNodePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/RegisterDataNodePlan.java
index eb5cc83a74..c4c3fd5d52 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/RegisterDataNodePlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/RegisterDataNodePlan.java
@@ -18,12 +18,13 @@
  */
 package org.apache.iotdb.confignode.physical.sys;
 
+import org.apache.iotdb.commons.cluster.DataNodeLocation;
 import org.apache.iotdb.commons.cluster.Endpoint;
-import org.apache.iotdb.commons.partition.DataNodeLocation;
 import org.apache.iotdb.confignode.physical.PhysicalPlan;
 import org.apache.iotdb.confignode.physical.PhysicalPlanType;
 
 import java.nio.ByteBuffer;
+import java.util.Objects;
 
 public class RegisterDataNodePlan extends PhysicalPlan {
 
@@ -33,9 +34,9 @@ public class RegisterDataNodePlan extends PhysicalPlan {
     super(PhysicalPlanType.RegisterDataNode);
   }
 
-  public RegisterDataNodePlan(int dataNodeID, Endpoint endpoint) {
+  public RegisterDataNodePlan(DataNodeLocation info) {
     this();
-    this.info = new DataNodeLocation(dataNodeID, endpoint);
+    this.info = info;
   }
 
   public DataNodeLocation getInfo() {
@@ -45,7 +46,7 @@ public class RegisterDataNodePlan extends PhysicalPlan {
   @Override
   protected void serializeImpl(ByteBuffer buffer) {
     buffer.putInt(PhysicalPlanType.RegisterDataNode.ordinal());
-    buffer.putInt(info.getDataNodeID());
+    buffer.putInt(info.getDataNodeId());
     buffer.putInt(info.getEndPoint().getIp().length());
     buffer.put(info.getEndPoint().getIp().getBytes());
     buffer.putInt(info.getEndPoint().getPort());
@@ -62,4 +63,17 @@ public class RegisterDataNodePlan extends PhysicalPlan {
 
     this.info = new DataNodeLocation(dataNodeID, new Endpoint(ip, port));
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    RegisterDataNodePlan plan = (RegisterDataNodePlan) o;
+    return info.equals(plan.info);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(info);
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/SetStorageGroupPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/SetStorageGroupPlan.java
index bf7a7e9ac0..865829592d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/SetStorageGroupPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/SetStorageGroupPlan.java
@@ -18,22 +18,18 @@
  */
 package org.apache.iotdb.confignode.physical.sys;
 
-import org.apache.iotdb.confignode.partition.DataRegionInfo;
-import org.apache.iotdb.confignode.partition.SchemaRegionInfo;
 import org.apache.iotdb.confignode.partition.StorageGroupSchema;
 import org.apache.iotdb.confignode.physical.PhysicalPlan;
 import org.apache.iotdb.confignode.physical.PhysicalPlanType;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Objects;
 
 public class SetStorageGroupPlan extends PhysicalPlan {
 
   private StorageGroupSchema schema;
 
-  private SchemaRegionInfo schemaRegionInfo;
-
-  private DataRegionInfo dataRegionInfo;
-
   public SetStorageGroupPlan() {
     super(PhysicalPlanType.SetStorageGroup);
     this.schema = new StorageGroupSchema();
@@ -52,34 +48,27 @@ public class SetStorageGroupPlan extends PhysicalPlan {
     this.schema = schema;
   }
 
-  public SchemaRegionInfo getSchemaRegionInfo() {
-    return schemaRegionInfo;
-  }
-
-  public void setSchemaRegionInfo(SchemaRegionInfo schemaRegionInfo) {
-    this.schemaRegionInfo = schemaRegionInfo;
-  }
-
-  public DataRegionInfo getDataRegionInfo() {
-    return dataRegionInfo;
-  }
-
-  public void setDataRegionInfo(DataRegionInfo dataRegionInfo) {
-    this.dataRegionInfo = dataRegionInfo;
-  }
-
   @Override
   protected void serializeImpl(ByteBuffer buffer) {
     buffer.putInt(PhysicalPlanType.SetStorageGroup.ordinal());
     schema.serialize(buffer);
-    schemaRegionInfo.serializeImpl(buffer);
-    dataRegionInfo.serializeImpl(buffer);
   }
 
   @Override
-  protected void deserializeImpl(ByteBuffer buffer) {
+  protected void deserializeImpl(ByteBuffer buffer) throws IOException {
     schema.deserialize(buffer);
-    schemaRegionInfo.deserializeImpl(buffer);
-    dataRegionInfo.deserializeImpl(buffer);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    SetStorageGroupPlan that = (SetStorageGroupPlan) o;
+    return schema.equals(that.schema);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(schema);
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/executor/PlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/executor/PlanExecutor.java
index b32f40edf0..53c95585e0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/executor/PlanExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/executor/PlanExecutor.java
@@ -25,11 +25,13 @@ import org.apache.iotdb.confignode.persistence.DataNodeInfoPersistence;
 import org.apache.iotdb.confignode.persistence.PartitionInfoPersistence;
 import org.apache.iotdb.confignode.persistence.RegionInfoPersistence;
 import org.apache.iotdb.confignode.physical.PhysicalPlan;
+import org.apache.iotdb.confignode.physical.crud.CreateDataPartitionPlan;
+import org.apache.iotdb.confignode.physical.crud.CreateRegionsPlan;
+import org.apache.iotdb.confignode.physical.crud.GetOrCreateDataPartitionPlan;
+import org.apache.iotdb.confignode.physical.crud.GetOrCreateSchemaPartitionPlan;
 import org.apache.iotdb.confignode.physical.sys.AuthorPlan;
-import org.apache.iotdb.confignode.physical.sys.DataPartitionPlan;
 import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
 import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
-import org.apache.iotdb.confignode.physical.sys.SchemaPartitionPlan;
 import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.db.auth.AuthException;
@@ -58,14 +60,12 @@ public class PlanExecutor {
         return dataNodeInfoPersistence.getDataNodeInfo((QueryDataNodeInfoPlan) plan);
       case QueryStorageGroupSchema:
         return regionInfoPersistence.getStorageGroupSchema();
-      case QueryDataPartition:
-        return partitionInfoPersistence.getDataPartition((DataPartitionPlan) plan);
-      case QuerySchemaPartition:
-        return partitionInfoPersistence.getSchemaPartition((SchemaPartitionPlan) plan);
-      case ApplySchemaPartition:
-        return partitionInfoPersistence.applySchemaPartition((SchemaPartitionPlan) plan);
-      case ApplyDataPartition:
-        return partitionInfoPersistence.applyDataPartition((DataPartitionPlan) plan);
+      case GetDataPartition:
+      case GetOrCreateDataPartition:
+        return partitionInfoPersistence.getDataPartition((GetOrCreateDataPartitionPlan) plan);
+      case GetSchemaPartition:
+      case GetOrCreateSchemaPartition:
+        return partitionInfoPersistence.getSchemaPartition((GetOrCreateSchemaPartitionPlan) plan);
       case LIST_USER:
         return authorInfoPersistence.executeListUser((AuthorPlan) plan);
       case LIST_ROLE:
@@ -90,6 +90,13 @@ public class PlanExecutor {
         return dataNodeInfoPersistence.registerDataNode((RegisterDataNodePlan) plan);
       case SetStorageGroup:
         return regionInfoPersistence.setStorageGroup((SetStorageGroupPlan) plan);
+      case CreateRegions:
+        return regionInfoPersistence.createRegions((CreateRegionsPlan) plan);
+      case CreateSchemaPartition:
+        return partitionInfoPersistence.createSchemaPartition(
+            (GetOrCreateSchemaPartitionPlan) plan);
+      case CreateDataPartition:
+        return partitionInfoPersistence.createDataPartition((CreateDataPartitionPlan) plan);
       case CREATE_USER:
       case CREATE_ROLE:
       case DROP_USER:
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
index 6a7f829627..1e17b4d76e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
@@ -20,39 +20,34 @@ package org.apache.iotdb.confignode.service.thrift.server;
 
 import org.apache.iotdb.common.rpc.thrift.EndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.DataNodeLocation;
 import org.apache.iotdb.commons.cluster.Endpoint;
-import org.apache.iotdb.commons.partition.DataNodeLocation;
+import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationDataSet;
 import org.apache.iotdb.confignode.consensus.response.DataNodesInfoDataSet;
-import org.apache.iotdb.confignode.consensus.response.SchemaPartitionDataSet;
+import org.apache.iotdb.confignode.consensus.response.DataPartitionDataSet;
 import org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaDataSet;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.partition.StorageGroupSchema;
 import org.apache.iotdb.confignode.physical.PhysicalPlanType;
+import org.apache.iotdb.confignode.physical.crud.GetOrCreateDataPartitionPlan;
 import org.apache.iotdb.confignode.physical.sys.AuthorPlan;
 import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
 import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
-import org.apache.iotdb.confignode.physical.sys.SchemaPartitionPlan;
 import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
-import org.apache.iotdb.confignode.rpc.thrift.AuthorizerReq;
 import org.apache.iotdb.confignode.rpc.thrift.ConfigIService;
-import org.apache.iotdb.confignode.rpc.thrift.DataNodeMessage;
-import org.apache.iotdb.confignode.rpc.thrift.DataNodeRegisterReq;
-import org.apache.iotdb.confignode.rpc.thrift.DataNodeRegisterResp;
-import org.apache.iotdb.confignode.rpc.thrift.DataPartitionInfo;
-import org.apache.iotdb.confignode.rpc.thrift.DataPartitionInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.DeleteStorageGroupReq;
-import org.apache.iotdb.confignode.rpc.thrift.DeviceGroupHashInfo;
-import org.apache.iotdb.confignode.rpc.thrift.FetchDataPartitionReq;
-import org.apache.iotdb.confignode.rpc.thrift.FetchPartitionReq;
-import org.apache.iotdb.confignode.rpc.thrift.FetchSchemaPartitionReq;
-import org.apache.iotdb.confignode.rpc.thrift.GetDataPartitionReq;
-import org.apache.iotdb.confignode.rpc.thrift.GetSchemaPartitionReq;
-import org.apache.iotdb.confignode.rpc.thrift.PartitionInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.SchemaPartitionInfo;
-import org.apache.iotdb.confignode.rpc.thrift.SchemaPartitionInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.SetStorageGroupReq;
-import org.apache.iotdb.confignode.rpc.thrift.StorageGroupMessage;
-import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeMessage;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeMessageResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupMessage;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupMessageResp;
 import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -66,6 +61,7 @@ import java.util.Map;
 
 /** ConfigNodeRPCServer exposes the interface that interacts with the DataNode */
 public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
+
   private static final Logger LOGGER = LoggerFactory.getLogger(ConfigNodeRPCServerProcessor.class);
 
   private final ConfigManager configManager;
@@ -79,51 +75,48 @@ public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
   }
 
   @Override
-  public DataNodeRegisterResp registerDataNode(DataNodeRegisterReq req) throws TException {
-    // TODO: handle exception in consensusLayer
+  public TDataNodeRegisterResp registerDataNode(TDataNodeRegisterReq req) throws TException {
     RegisterDataNodePlan plan =
         new RegisterDataNodePlan(
-            -1, new Endpoint(req.getEndPoint().getIp(), req.getEndPoint().getPort()));
-    TSStatus status = configManager.registerDataNode(plan);
-    DataNodeRegisterResp result = new DataNodeRegisterResp();
-    result.setRegisterResult(status);
-    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      result.setDataNodeID(Integer.parseInt(status.getMessage()));
-      LOGGER.info(
-          "Register DataNode successful. DataNodeID: {}, {}",
-          status.getMessage(),
-          req.getEndPoint().toString());
-    } else {
-      LOGGER.error("Register DataNode failed. {}", status.getMessage());
-    }
-    return result;
+            new DataNodeLocation(
+                -1, new Endpoint(req.getEndPoint().getIp(), req.getEndPoint().getPort())));
+    DataNodeConfigurationDataSet dataSet =
+        (DataNodeConfigurationDataSet) configManager.registerDataNode(plan);
+    TDataNodeRegisterResp resp = new TDataNodeRegisterResp();
+    dataSet.convertToRpcDataNodeRegisterResp(resp);
+    LOGGER.info(
+        "Register DataNode successful. DataNodeID: {}, {}",
+        resp.getDataNodeID(),
+        req.getEndPoint().toString());
+    return resp;
   }
 
   @Override
-  public Map<Integer, DataNodeMessage> getDataNodesMessage(int dataNodeID) throws TException {
+  public TDataNodeMessageResp getDataNodesMessage(int dataNodeID) throws TException {
     QueryDataNodeInfoPlan plan = new QueryDataNodeInfoPlan(dataNodeID);
-    DataSet dataSet = configManager.getDataNodeInfo(plan);
-
-    if (dataSet == null) {
-      return new HashMap<>();
-    } else {
-      Map<Integer, DataNodeMessage> result = new HashMap<>();
-      for (DataNodeLocation info : ((DataNodesInfoDataSet) dataSet).getDataNodeList()) {
-        result.put(
-            info.getDataNodeID(),
-            new DataNodeMessage(
-                info.getDataNodeID(),
+    DataNodesInfoDataSet dataSet = (DataNodesInfoDataSet) configManager.getDataNodeInfo(plan);
+
+    TDataNodeMessageResp resp = new TDataNodeMessageResp();
+    resp.setStatus(dataSet.getStatus());
+    if (dataSet.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      Map<Integer, TDataNodeMessage> msgMap = new HashMap<>();
+      for (DataNodeLocation info : dataSet.getDataNodeList()) {
+        msgMap.put(
+            info.getDataNodeId(),
+            new TDataNodeMessage(
+                info.getDataNodeId(),
                 new EndPoint(info.getEndPoint().getIp(), info.getEndPoint().getPort())));
+        resp.setDataNodeMessageMap(msgMap);
       }
-      return result;
     }
+
+    return resp;
   }
 
   @Override
-  public TSStatus setStorageGroup(SetStorageGroupReq req) throws TException {
+  public TSStatus setStorageGroup(TSetStorageGroupReq req) throws TException {
     SetStorageGroupPlan plan =
-        new SetStorageGroupPlan(
-            new org.apache.iotdb.confignode.partition.StorageGroupSchema(req.getStorageGroup()));
+        new SetStorageGroupPlan(new StorageGroupSchema(req.getStorageGroup()));
 
     TSStatus resp = configManager.setStorageGroup(plan);
     if (resp.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -135,32 +128,100 @@ public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
   }
 
   @Override
-  public TSStatus deleteStorageGroup(DeleteStorageGroupReq req) throws TException {
+  public TSStatus deleteStorageGroup(TDeleteStorageGroupReq req) throws TException {
+    // TODO: delete StorageGroup
     return null;
   }
 
   @Override
-  public Map<String, StorageGroupMessage> getStorageGroupsMessage() throws TException {
-    DataSet dataSet = configManager.getStorageGroupSchema();
-
-    if (dataSet == null) {
-      return new HashMap<>();
-    } else {
-      Map<String, StorageGroupMessage> result = new HashMap<>();
-      for (StorageGroupSchema schema : ((StorageGroupSchemaDataSet) dataSet).getSchemaList()) {
-        result.put(schema.getName(), new StorageGroupMessage(schema.getName()));
+  public TStorageGroupMessageResp getStorageGroupsMessage() throws TException {
+    StorageGroupSchemaDataSet dataSet =
+        (StorageGroupSchemaDataSet) configManager.getStorageGroupSchema();
+
+    TStorageGroupMessageResp resp = new TStorageGroupMessageResp();
+    resp.setStatus(dataSet.getStatus());
+    if (dataSet.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      Map<String, TStorageGroupMessage> storageGroupMessageMap = new HashMap<>();
+      for (StorageGroupSchema schema : dataSet.getSchemaList()) {
+        storageGroupMessageMap.put(schema.getName(), new TStorageGroupMessage(schema.getName()));
       }
-      return result;
+      resp.setStorageGroupMessageMap(storageGroupMessageMap);
     }
+
+    return resp;
   }
 
   @Override
-  public DeviceGroupHashInfo getDeviceGroupHashInfo() throws TException {
-    return configManager.getDeviceGroupHashInfo();
+  public TSchemaPartitionResp getSchemaPartition(TSchemaPartitionReq req) throws TException {
+    // TODO: Get SchemaPartition by specific PatternTree
+
+    //    SchemaPartitionPlan querySchemaPartitionPlan =
+    //        new SchemaPartitionPlan(
+    //            PhysicalPlanType.QuerySchemaPartition, req.getStorageGroup(),
+    // req.getDeviceGroupIDs());
+    //    DataSet dataSet = configManager.getSchemaPartition(querySchemaPartitionPlan);
+    //    return ((SchemaPartitionDataSet) dataSet).convertRpcSchemaPartitionInfo();
+    return null;
+  }
+
+  @Override
+  public TSchemaPartitionResp getOrCreateSchemaPartition(TSchemaPartitionReq req)
+      throws TException {
+    // TODO: Get or create SchemaPartition by specific PatternTree
+
+    //    SchemaPartitionPlan applySchemaPartitionPlan =
+    //        new SchemaPartitionPlan(
+    //            PhysicalPlanType.ApplySchemaPartition,
+    //            req.getStorageGroup(),
+    //            req.getSeriesPartitionSlots());
+    //    SchemaPartitionDataSet dataSet =
+    //        (SchemaPartitionDataSet) configManager.applySchemaPartition(applySchemaPartitionPlan);
+    //
+    //    TSchemaPartitionResp resp = new TSchemaPartitionResp();
+    //    resp.setStatus(dataSet.getStatus());
+    //    if (dataSet.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+    //      dataSet.convertToRpcSchemaPartitionResp(resp);
+    //    }
+    //    return resp;
+    return null;
+  }
+
+  @Override
+  public TDataPartitionResp getDataPartition(TDataPartitionReq req) throws TException {
+    GetOrCreateDataPartitionPlan getDataPartitionPlan =
+        new GetOrCreateDataPartitionPlan(PhysicalPlanType.GetDataPartition);
+    getDataPartitionPlan.convertFromRpcTDataPartitionReq(req);
+    DataPartitionDataSet dataset =
+        (DataPartitionDataSet) configManager.getDataPartition(getDataPartitionPlan);
+
+    TDataPartitionResp resp = new TDataPartitionResp();
+    resp.setStatus(dataset.getStatus());
+    if (dataset.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      dataset.convertToRpcDataPartitionResp(resp);
+    }
+
+    return resp;
+  }
+
+  @Override
+  public TDataPartitionResp getOrCreateDataPartition(TDataPartitionReq req) throws TException {
+    GetOrCreateDataPartitionPlan getOrCreateDataPartitionPlan =
+        new GetOrCreateDataPartitionPlan(PhysicalPlanType.GetOrCreateDataPartition);
+    getOrCreateDataPartitionPlan.convertFromRpcTDataPartitionReq(req);
+    DataPartitionDataSet dataset =
+        (DataPartitionDataSet) configManager.getOrCreateDataPartition(getOrCreateDataPartitionPlan);
+
+    TDataPartitionResp resp = new TDataPartitionResp();
+    resp.setStatus(dataset.getStatus());
+    if (dataset.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      dataset.convertToRpcDataPartitionResp(resp);
+    }
+
+    return resp;
   }
 
   @Override
-  public TSStatus operatePermission(AuthorizerReq req) throws TException {
+  public TSStatus operatePermission(TAuthorizerReq req) throws TException {
     if (req.getAuthorType() < 0 || req.getAuthorType() >= PhysicalPlanType.values().length) {
       throw new IndexOutOfBoundsException("Invalid ordinal");
     }
@@ -181,55 +242,6 @@ public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
     return configManager.operatePermission(plan);
   }
 
-  @Override
-  public DataPartitionInfo applyDataPartition(GetDataPartitionReq req) throws TException {
-    return null;
-  }
-
-  @Override
-  public SchemaPartitionInfo applySchemaPartition(GetSchemaPartitionReq req) throws TException {
-    SchemaPartitionPlan applySchemaPartitionPlan =
-        new SchemaPartitionPlan(
-            PhysicalPlanType.ApplySchemaPartition, req.getStorageGroup(), req.getDeviceGroupIDs());
-    DataSet dataSet = configManager.applySchemaPartition(applySchemaPartitionPlan);
-    ((SchemaPartitionDataSet) dataSet).getSchemaPartitionInfo();
-
-    return SchemaPartitionDataSet.convertRpcSchemaPartition(
-        ((SchemaPartitionDataSet) dataSet).getSchemaPartitionInfo());
-  }
-
-  @Override
-  public SchemaPartitionInfo getSchemaPartition(GetSchemaPartitionReq req) throws TException {
-    SchemaPartitionPlan querySchemaPartitionPlan =
-        new SchemaPartitionPlan(
-            PhysicalPlanType.QuerySchemaPartition, req.getStorageGroup(), req.getDeviceGroupIDs());
-    DataSet dataSet = configManager.getSchemaPartition(querySchemaPartitionPlan);
-    return SchemaPartitionDataSet.convertRpcSchemaPartition(
-        ((SchemaPartitionDataSet) dataSet).getSchemaPartitionInfo());
-  }
-
-  @Override
-  public DataPartitionInfo getDataPartition(GetDataPartitionReq req) throws TException {
-
-    return null;
-  }
-
-  @Override
-  public DataPartitionInfoResp fetchDataPartitionInfo(FetchDataPartitionReq req) throws TException {
-    return null;
-  }
-
-  @Override
-  public SchemaPartitionInfoResp fetchSchemaPartitionInfo(FetchSchemaPartitionReq req)
-      throws TException {
-    return null;
-  }
-
-  @Override
-  public PartitionInfoResp fetchPartitionInfo(FetchPartitionReq req) throws TException {
-    return null;
-  }
-
   public void handleClientExit() {}
 
   // TODO: Interfaces for data operations
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/RatisConsensusDemo.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/RatisConsensusDemo.java
index 09c955246d..90efb510ee 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/RatisConsensusDemo.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/RatisConsensusDemo.java
@@ -20,11 +20,12 @@ package org.apache.iotdb.confignode.consensus;
 
 import org.apache.iotdb.common.rpc.thrift.EndPoint;
 import org.apache.iotdb.confignode.rpc.thrift.ConfigIService;
-import org.apache.iotdb.confignode.rpc.thrift.DataNodeMessage;
-import org.apache.iotdb.confignode.rpc.thrift.DataNodeRegisterReq;
-import org.apache.iotdb.confignode.rpc.thrift.DataNodeRegisterResp;
-import org.apache.iotdb.confignode.rpc.thrift.SetStorageGroupReq;
-import org.apache.iotdb.confignode.rpc.thrift.StorageGroupMessage;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeMessageResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupMessage;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupMessageResp;
 import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -85,10 +86,9 @@ public class RatisConsensusDemo {
     // DataNodes can connect to any ConfigNode and send write requests
     for (int i = 0; i < 10; i++) {
       EndPoint endPoint = new EndPoint("0.0.0.0", 6667 + i);
-      DataNodeRegisterReq req = new DataNodeRegisterReq(endPoint);
-      DataNodeRegisterResp resp = clients[0].registerDataNode(req);
-      Assert.assertEquals(
-          TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.registerResult.getCode());
+      TDataNodeRegisterReq req = new TDataNodeRegisterReq(endPoint);
+      TDataNodeRegisterResp resp = clients[0].registerDataNode(req);
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getStatus().getCode());
       Assert.assertEquals(i, resp.getDataNodeID());
       System.out.printf(
           "\nRegister DataNode successful. DataNodeID: %d, %s\n", resp.getDataNodeID(), endPoint);
@@ -103,7 +103,7 @@ public class RatisConsensusDemo {
 
     // DataNodes can connect to any ConfigNode and send read requests
     for (int i = 0; i < 3; i++) {
-      Map<Integer, DataNodeMessage> msgMap = clients[i].getDataNodesMessage(-1);
+      TDataNodeMessageResp msgMap = clients[i].getDataNodesMessage(-1);
       System.out.printf(
           "\nQuery DataNode message from ConfigNode 0.0.0.0:%d. Result: %s\n",
           22277 + i * 2, msgMap);
@@ -112,7 +112,7 @@ public class RatisConsensusDemo {
 
   private void setStorageGroups() throws TException, InterruptedException {
     for (int i = 0; i < 10; i++) {
-      SetStorageGroupReq req = new SetStorageGroupReq("root.sg" + i);
+      TSetStorageGroupReq req = new TSetStorageGroupReq("root.sg" + i);
       clients[0].setStorageGroup(req);
       System.out.printf("\nSet StorageGroup successful. StorageGroup: %s\n", "root.sg" + i);
       TimeUnit.SECONDS.sleep(1);
@@ -124,10 +124,11 @@ public class RatisConsensusDemo {
     TimeUnit.SECONDS.sleep(1);
 
     for (int i = 0; i < 3; i++) {
-      Map<String, StorageGroupMessage> msgMap = clients[i].getStorageGroupsMessage();
+      TStorageGroupMessageResp msgMap = clients[i].getStorageGroupsMessage();
       System.out.printf(
           "\nQuery StorageGroup message from ConfigNode 0.0.0.0:%d. Result: {\n", 22277 + i * 2);
-      for (Map.Entry<String, StorageGroupMessage> entry : msgMap.entrySet()) {
+      for (Map.Entry<String, TStorageGroupMessage> entry :
+          msgMap.getStorageGroupMessageMap().entrySet()) {
         System.out.printf("  Key(%s)=%s\n", entry.getKey(), entry.getValue().toString());
       }
       System.out.println("}");
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/ConfigManagerManualTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/ConfigManagerManualTest.java
index cacc2fdd27..b5b5fa2e45 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/ConfigManagerManualTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/ConfigManagerManualTest.java
@@ -20,9 +20,9 @@ package org.apache.iotdb.confignode.manager;
 
 import org.apache.iotdb.common.rpc.thrift.EndPoint;
 import org.apache.iotdb.confignode.rpc.thrift.ConfigIService;
-import org.apache.iotdb.confignode.rpc.thrift.DataNodeMessage;
-import org.apache.iotdb.confignode.rpc.thrift.DataNodeRegisterReq;
-import org.apache.iotdb.confignode.rpc.thrift.DataNodeRegisterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeMessage;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
 import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -74,10 +74,9 @@ public class ConfigManagerManualTest {
 
   private void registerDataNodes() throws TException {
     for (int i = 0; i < 3; i++) {
-      DataNodeRegisterReq req = new DataNodeRegisterReq(new EndPoint("0.0.0.0", 6667 + i));
-      DataNodeRegisterResp resp = clients[0].registerDataNode(req);
-      Assert.assertEquals(
-          TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.registerResult.getCode());
+      TDataNodeRegisterReq req = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6667 + i));
+      TDataNodeRegisterResp resp = clients[0].registerDataNode(req);
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getStatus().getCode());
       Assert.assertEquals(i, resp.getDataNodeID());
     }
   }
@@ -87,11 +86,12 @@ public class ConfigManagerManualTest {
     TimeUnit.SECONDS.sleep(1);
 
     for (int i = 0; i < 3; i++) {
-      Map<Integer, DataNodeMessage> msgMap = clients[i].getDataNodesMessage(-1);
+      Map<Integer, TDataNodeMessage> msgMap =
+          clients[i].getDataNodesMessage(-1).getDataNodeMessageMap();
       Assert.assertEquals(3, msgMap.size());
       for (int j = 0; j < 3; j++) {
         Assert.assertNotNull(msgMap.get(j));
-        Assert.assertEquals(j, msgMap.get(j).getDataNodeID());
+        Assert.assertEquals(j, msgMap.get(j).getDataNodeId());
         Assert.assertEquals(localhost, msgMap.get(j).getEndPoint().getIp());
         Assert.assertEquals(6667 + j, msgMap.get(j).getEndPoint().getPort());
       }
@@ -113,17 +113,18 @@ public class ConfigManagerManualTest {
       clients[i] = new ConfigIService.Client(new TBinaryProtocol(transport));
     }
 
-    DataNodeRegisterResp resp =
-        clients[1].registerDataNode(new DataNodeRegisterReq(new EndPoint("0.0.0.0", 6670)));
-    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.registerResult.getCode());
+    TDataNodeRegisterResp resp =
+        clients[1].registerDataNode(new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6670)));
+    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getStatus().getCode());
     Assert.assertEquals(3, resp.getDataNodeID());
 
     for (int i = 0; i < 2; i++) {
-      Map<Integer, DataNodeMessage> msgMap = clients[i].getDataNodesMessage(-1);
+      Map<Integer, TDataNodeMessage> msgMap =
+          clients[i].getDataNodesMessage(-1).getDataNodeMessageMap();
       Assert.assertEquals(4, msgMap.size());
       for (int j = 0; j < 4; j++) {
         Assert.assertNotNull(msgMap.get(j));
-        Assert.assertEquals(j, msgMap.get(j).getDataNodeID());
+        Assert.assertEquals(j, msgMap.get(j).getDataNodeId());
         Assert.assertEquals(localhost, msgMap.get(j).getEndPoint().getIp());
         Assert.assertEquals(6667 + j, msgMap.get(j).getEndPoint().getPort());
       }
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java
index d9f9cbbe2f..1b935f79fb 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java
@@ -69,7 +69,7 @@ public class DeviceGroupHashExecutorManualTest {
       List<String> devices = genBatchDevices();
       totalTime -= System.currentTimeMillis();
       for (String device : devices) {
-        bucket[manager.getDeviceGroupID(device)] += 1;
+        bucket[manager.getSeriesPartitionSlot(device).getSlotId()] += 1;
       }
       totalTime += System.currentTimeMillis();
     }
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/physical/SerializeDeserializeUT.java b/confignode/src/test/java/org/apache/iotdb/confignode/physical/SerializeDeserializeUT.java
new file mode 100644
index 0000000000..0f3c55750d
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/physical/SerializeDeserializeUT.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.physical;
+
+import org.apache.iotdb.commons.cluster.DataNodeLocation;
+import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
+import org.apache.iotdb.commons.partition.TimePartitionSlot;
+import org.apache.iotdb.confignode.partition.StorageGroupSchema;
+import org.apache.iotdb.confignode.physical.crud.CreateDataPartitionPlan;
+import org.apache.iotdb.confignode.physical.crud.CreateRegionsPlan;
+import org.apache.iotdb.confignode.physical.crud.GetOrCreateDataPartitionPlan;
+import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
+import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
+import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SerializeDeserializeUT {
+
+  private final ByteBuffer buffer = ByteBuffer.allocate(10240);
+
+  @After
+  public void cleanBuffer() {
+    buffer.clear();
+  }
+
+  @Test
+  public void RegisterDataNodePlanTest() throws IOException {
+    RegisterDataNodePlan plan0 =
+        new RegisterDataNodePlan(new DataNodeLocation(1, new Endpoint("0.0.0.0", 6667)));
+    plan0.serialize(buffer);
+    RegisterDataNodePlan plan1 = (RegisterDataNodePlan) PhysicalPlan.Factory.create(buffer);
+    Assert.assertEquals(plan0, plan1);
+  }
+
+  @Test
+  public void QueryDataNodeInfoPlanTest() throws IOException {
+    QueryDataNodeInfoPlan plan0 = new QueryDataNodeInfoPlan(-1);
+    plan0.serialize(buffer);
+    QueryDataNodeInfoPlan plan1 = (QueryDataNodeInfoPlan) PhysicalPlan.Factory.create(buffer);
+    Assert.assertEquals(plan0, plan1);
+  }
+
+  @Test
+  public void SetStorageGroupPlanTest() throws IOException {
+    SetStorageGroupPlan plan0 = new SetStorageGroupPlan(new StorageGroupSchema("sg"));
+    plan0.serialize(buffer);
+    SetStorageGroupPlan plan1 = (SetStorageGroupPlan) PhysicalPlan.Factory.create(buffer);
+    Assert.assertEquals(plan0, plan1);
+  }
+
+  @Test
+  public void DeleteStorageGroupPlanTest() {
+    // TODO: Add serialize and deserialize test
+  }
+
+  @Test
+  public void CreateRegionsPlanTest() throws IOException {
+    CreateRegionsPlan plan0 = new CreateRegionsPlan();
+    plan0.setStorageGroup("sg");
+    RegionReplicaSet dataRegionSet = new RegionReplicaSet();
+    dataRegionSet.setId(new DataRegionId(0));
+    dataRegionSet.setDataNodeList(
+        Collections.singletonList(new DataNodeLocation(0, new Endpoint("0.0.0.0", 6667))));
+    plan0.addRegion(dataRegionSet);
+    RegionReplicaSet schemaRegionSet = new RegionReplicaSet();
+    schemaRegionSet.setId(new SchemaRegionId(1));
+    schemaRegionSet.setDataNodeList(
+        Collections.singletonList(new DataNodeLocation(0, new Endpoint("0.0.0.0", 6667))));
+    plan0.addRegion(schemaRegionSet);
+
+    plan0.serialize(buffer);
+    CreateRegionsPlan plan1 = (CreateRegionsPlan) PhysicalPlan.Factory.create(buffer);
+    Assert.assertEquals(plan0, plan1);
+  }
+
+  @Test
+  public void CreateSchemaPartitionPlanTest() {
+    // TODO: Add serialize and deserialize test
+  }
+
+  @Test
+  public void GetOrCreateSchemaPartitionPlanTest() {
+    // TODO: Add serialize and deserialize test
+  }
+
+  @Test
+  public void CreateDataPartitionPlanTest() throws IOException {
+    String storageGroup = "root.sg0";
+    SeriesPartitionSlot seriesPartitionSlot = new SeriesPartitionSlot(10);
+    TimePartitionSlot timePartitionSlot = new TimePartitionSlot(100);
+    RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
+    regionReplicaSet.setId(new DataRegionId(0));
+    regionReplicaSet.setDataNodeList(
+        Collections.singletonList(new DataNodeLocation(0, new Endpoint("0.0.0.0", 6667))));
+
+    Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
+        assignedDataPartition = new HashMap<>();
+    assignedDataPartition.put(storageGroup, new HashMap<>());
+    assignedDataPartition.get(storageGroup).put(seriesPartitionSlot, new HashMap<>());
+    assignedDataPartition
+        .get(storageGroup)
+        .get(seriesPartitionSlot)
+        .put(timePartitionSlot, new ArrayList<>());
+    assignedDataPartition
+        .get(storageGroup)
+        .get(seriesPartitionSlot)
+        .get(timePartitionSlot)
+        .add(regionReplicaSet);
+
+    CreateDataPartitionPlan plan0 = new CreateDataPartitionPlan();
+    plan0.setAssignedDataPartition(assignedDataPartition);
+    plan0.serialize(buffer);
+    CreateDataPartitionPlan plan1 = (CreateDataPartitionPlan) PhysicalPlan.Factory.create(buffer);
+    Assert.assertEquals(plan0, plan1);
+  }
+
+  @Test
+  public void GetOrCreateDataPartitionPlanTest() throws IOException {
+    String storageGroup = "root.sg0";
+    SeriesPartitionSlot seriesPartitionSlot = new SeriesPartitionSlot(10);
+    TimePartitionSlot timePartitionSlot = new TimePartitionSlot(100);
+
+    Map<String, Map<SeriesPartitionSlot, List<TimePartitionSlot>>> partitionSlotsMap =
+        new HashMap<>();
+    partitionSlotsMap.put(storageGroup, new HashMap<>());
+    partitionSlotsMap.get(storageGroup).put(seriesPartitionSlot, new ArrayList<>());
+    partitionSlotsMap.get(storageGroup).get(seriesPartitionSlot).add(timePartitionSlot);
+
+    GetOrCreateDataPartitionPlan plan0 =
+        new GetOrCreateDataPartitionPlan(PhysicalPlanType.GetDataPartition);
+    plan0.setPartitionSlotsMap(partitionSlotsMap);
+    plan0.serialize(buffer);
+    GetOrCreateDataPartitionPlan plan1 =
+        (GetOrCreateDataPartitionPlan) PhysicalPlan.Factory.create(buffer);
+    Assert.assertEquals(plan0, plan1);
+  }
+}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
index ba01aee175..d4f3608920 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
@@ -19,19 +19,25 @@
 package org.apache.iotdb.confignode.service.thrift.server;
 
 import org.apache.iotdb.common.rpc.thrift.EndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.persistence.DataNodeInfoPersistence;
 import org.apache.iotdb.confignode.persistence.PartitionInfoPersistence;
 import org.apache.iotdb.confignode.persistence.RegionInfoPersistence;
-import org.apache.iotdb.confignode.rpc.thrift.DataNodeMessage;
-import org.apache.iotdb.confignode.rpc.thrift.DataNodeRegisterReq;
-import org.apache.iotdb.confignode.rpc.thrift.DataNodeRegisterResp;
-import org.apache.iotdb.confignode.rpc.thrift.DeviceGroupHashInfo;
-import org.apache.iotdb.confignode.rpc.thrift.GetSchemaPartitionReq;
-import org.apache.iotdb.confignode.rpc.thrift.SchemaPartitionInfo;
-import org.apache.iotdb.confignode.rpc.thrift.SetStorageGroupReq;
-import org.apache.iotdb.confignode.rpc.thrift.StorageGroupMessage;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeMessage;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeMessageResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupMessage;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupMessageResp;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.ratis.util.FileUtils;
@@ -45,6 +51,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -66,65 +73,90 @@ public class ConfigNodeRPCServerProcessorTest {
     FileUtils.deleteFully(new File(ConfigNodeDescriptor.getInstance().getConf().getConsensusDir()));
   }
 
+  private void checkGlobalConfig(TGlobalConfig globalConfig) {
+    Assert.assertEquals(
+        ConfigNodeDescriptor.getInstance().getConf().getDataNodeConsensusProtocolClass(),
+        globalConfig.getDataNodeConsensusProtocolClass());
+    Assert.assertEquals(
+        ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionSlotNum(),
+        globalConfig.getSeriesPartitionSlotNum());
+    Assert.assertEquals(
+        ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionExecutorClass(),
+        globalConfig.getSeriesPartitionExecutorClass());
+  }
+
   @Test
-  public void registerDataNodeTest() throws TException, IOException {
-    DataNodeRegisterResp resp;
-    DataNodeRegisterReq registerReq0 = new DataNodeRegisterReq(new EndPoint("0.0.0.0", 6667));
-    DataNodeRegisterReq registerReq1 = new DataNodeRegisterReq(new EndPoint("0.0.0.0", 6668));
-    DataNodeRegisterReq registerReq2 = new DataNodeRegisterReq(new EndPoint("0.0.0.0", 6669));
+  public void registerAndQueryDataNodeTest() throws TException {
+    TDataNodeRegisterResp resp;
+    TDataNodeRegisterReq registerReq0 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6667));
+    TDataNodeRegisterReq registerReq1 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6668));
+    TDataNodeRegisterReq registerReq2 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6669));
 
     // test success register
     resp = processor.registerDataNode(registerReq0);
-    Assert.assertEquals(
-        TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getRegisterResult().getCode());
+    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getStatus().getCode());
     Assert.assertEquals(0, resp.getDataNodeID());
+    checkGlobalConfig(resp.getGlobalConfig());
+
     resp = processor.registerDataNode(registerReq1);
-    Assert.assertEquals(
-        TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getRegisterResult().getCode());
+    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getStatus().getCode());
     Assert.assertEquals(1, resp.getDataNodeID());
+    checkGlobalConfig(resp.getGlobalConfig());
+
     resp = processor.registerDataNode(registerReq2);
-    Assert.assertEquals(
-        TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getRegisterResult().getCode());
+    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getStatus().getCode());
     Assert.assertEquals(2, resp.getDataNodeID());
+    checkGlobalConfig(resp.getGlobalConfig());
+
+    // test success re-register
+    resp = processor.registerDataNode(registerReq1);
+    Assert.assertEquals(
+        TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode(), resp.getStatus().getCode());
+    Assert.assertEquals(1, resp.getDataNodeID());
+    checkGlobalConfig(resp.getGlobalConfig());
 
     // test query DataNodeInfo
-    Map<Integer, DataNodeMessage> messageMap = processor.getDataNodesMessage(-1);
-    Assert.assertEquals(3, messageMap.size());
-    List<Map.Entry<Integer, DataNodeMessage>> messageList = new ArrayList<>(messageMap.entrySet());
+    TDataNodeMessageResp msgResp = processor.getDataNodesMessage(-1);
+    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), msgResp.getStatus().getCode());
+    Map<Integer, TDataNodeMessage> msgMap = msgResp.getDataNodeMessageMap();
+    Assert.assertEquals(3, msgMap.size());
+    List<Map.Entry<Integer, TDataNodeMessage>> messageList = new ArrayList<>(msgMap.entrySet());
     messageList.sort(Comparator.comparingInt(Map.Entry::getKey));
     for (int i = 0; i < 3; i++) {
-      Assert.assertEquals(i, messageList.get(i).getValue().getDataNodeID());
+      Assert.assertEquals(i, messageList.get(i).getValue().getDataNodeId());
       Assert.assertEquals("0.0.0.0", messageList.get(i).getValue().getEndPoint().getIp());
       Assert.assertEquals(6667 + i, messageList.get(i).getValue().getEndPoint().getPort());
     }
 
-    messageMap = processor.getDataNodesMessage(1);
-    Assert.assertEquals(1, messageMap.size());
-    Assert.assertNotNull(messageMap.get(1));
-    Assert.assertEquals("0.0.0.0", messageMap.get(1).getEndPoint().getIp());
-    Assert.assertEquals(6668, messageMap.get(1).getEndPoint().getPort());
+    msgResp = processor.getDataNodesMessage(1);
+    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), msgResp.getStatus().getCode());
+    msgMap = msgResp.getDataNodeMessageMap();
+    Assert.assertEquals(1, msgMap.size());
+    Assert.assertNotNull(msgMap.get(1));
+    Assert.assertEquals("0.0.0.0", msgMap.get(1).getEndPoint().getIp());
+    Assert.assertEquals(6668, msgMap.get(1).getEndPoint().getPort());
   }
 
   @Test
-  public void setStorageGroupTest() throws TException, IOException {
+  public void setAndQueryStorageGroupTest() throws TException {
     TSStatus status;
     final String sg = "root.sg0";
 
     // failed because there are not enough DataNodes
-    SetStorageGroupReq setReq = new SetStorageGroupReq(sg);
+    TSetStorageGroupReq setReq = new TSetStorageGroupReq(sg);
     status = processor.setStorageGroup(setReq);
-    Assert.assertEquals(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), status.getCode());
+    Assert.assertEquals(TSStatusCode.NOT_ENOUGH_DATA_NODE.getStatusCode(), status.getCode());
     Assert.assertEquals("DataNode is not enough, please register more.", status.getMessage());
 
     // register DataNodes
-    DataNodeRegisterReq registerReq0 = new DataNodeRegisterReq(new EndPoint("0.0.0.0", 6667));
-    DataNodeRegisterReq registerReq1 = new DataNodeRegisterReq(new EndPoint("0.0.0.0", 6668));
-    DataNodeRegisterReq registerReq2 = new DataNodeRegisterReq(new EndPoint("0.0.0.0", 6669));
-    status = processor.registerDataNode(registerReq0).getRegisterResult();
+    TDataNodeRegisterReq registerReq0 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6667));
+    TDataNodeRegisterReq registerReq1 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6668));
+    TDataNodeRegisterReq registerReq2 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6669));
+    status = processor.registerDataNode(registerReq0).getStatus();
     Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
-    status = processor.registerDataNode(registerReq1).getRegisterResult();
+    status = processor.registerDataNode(registerReq1).getStatus();
     Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
-    status = processor.registerDataNode(registerReq2).getRegisterResult();
+    status = processor.registerDataNode(registerReq2).getStatus();
     Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
 
     // set StorageGroup
@@ -132,45 +164,39 @@ public class ConfigNodeRPCServerProcessorTest {
     Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
 
     // query StorageGroupSchema
-    Map<String, StorageGroupMessage> messageMap = processor.getStorageGroupsMessage();
-    Assert.assertEquals(1, messageMap.size());
-    Assert.assertNotNull(messageMap.get(sg));
-    Assert.assertEquals(sg, messageMap.get(sg).getStorageGroup());
-  }
+    TStorageGroupMessageResp resp = processor.getStorageGroupsMessage();
+    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getStatus().getCode());
+    Map<String, TStorageGroupMessage> msgMap = resp.getStorageGroupMessageMap();
+    Assert.assertEquals(1, msgMap.size());
+    Assert.assertNotNull(msgMap.get(sg));
+    Assert.assertEquals(sg, msgMap.get(sg).getStorageGroup());
 
-  @Test
-  public void getDeviceGroupHashInfoTest() throws TException, IOException {
-    // get Device Group hash
-    DeviceGroupHashInfo deviceGroupHashInfo = new DeviceGroupHashInfo();
-    deviceGroupHashInfo = processor.getDeviceGroupHashInfo();
-    Assert.assertEquals(
-        deviceGroupHashInfo.getDeviceGroupCount(),
-        ConfigNodeDescriptor.getInstance().getConf().getDeviceGroupCount());
+    // test fail by re-register
+    status = processor.setStorageGroup(setReq);
     Assert.assertEquals(
-        deviceGroupHashInfo.getHashClass(),
-        ConfigNodeDescriptor.getInstance().getConf().getDeviceGroupHashExecutorClass());
+        TSStatusCode.STORAGE_GROUP_ALREADY_EXISTS.getStatusCode(), status.getCode());
   }
 
-  @Test
+  // TODO: Reuse this test after PatterTree is moved to node-commons
   public void applySchemaPartitionTest() throws TException, IOException {
     TSStatus status;
     final String sg = "root.sg0";
 
     // failed because there are not enough DataNodes
-    SetStorageGroupReq setReq = new SetStorageGroupReq(sg);
+    TSetStorageGroupReq setReq = new TSetStorageGroupReq(sg);
     status = processor.setStorageGroup(setReq);
     Assert.assertEquals(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), status.getCode());
     Assert.assertEquals("DataNode is not enough, please register more.", status.getMessage());
 
     // register DataNodes
-    DataNodeRegisterReq registerReq0 = new DataNodeRegisterReq(new EndPoint("0.0.0.0", 6667));
-    DataNodeRegisterReq registerReq1 = new DataNodeRegisterReq(new EndPoint("0.0.0.0", 6668));
-    DataNodeRegisterReq registerReq2 = new DataNodeRegisterReq(new EndPoint("0.0.0.0", 6669));
-    status = processor.registerDataNode(registerReq0).getRegisterResult();
+    TDataNodeRegisterReq registerReq0 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6667));
+    TDataNodeRegisterReq registerReq1 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6668));
+    TDataNodeRegisterReq registerReq2 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6669));
+    status = processor.registerDataNode(registerReq0).getStatus();
     Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
-    status = processor.registerDataNode(registerReq1).getRegisterResult();
+    status = processor.registerDataNode(registerReq1).getStatus();
     Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
-    status = processor.registerDataNode(registerReq2).getRegisterResult();
+    status = processor.registerDataNode(registerReq2).getStatus();
     Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
 
     // set StorageGroup
@@ -178,40 +204,41 @@ public class ConfigNodeRPCServerProcessorTest {
     Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
 
     // applySchemaPartition
-    GetSchemaPartitionReq getSchemaPartitionReq = new GetSchemaPartitionReq();
+    TSchemaPartitionReq getSchemaPartitionReq = new TSchemaPartitionReq();
     List<Integer> deviceGroupIds = new ArrayList<>();
     Integer deviceGroupId = 1000;
     deviceGroupIds.add(deviceGroupId);
-    getSchemaPartitionReq.setStorageGroup(sg).setDeviceGroupIDs(deviceGroupIds);
-    SchemaPartitionInfo schemaPartitionInfo = processor.applySchemaPartition(getSchemaPartitionReq);
-    Assert.assertNotNull(schemaPartitionInfo);
-    Assert.assertNotNull(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg));
-    schemaPartitionInfo
-        .getSchemaRegionDataNodesMap()
-        .get(sg)
-        .forEach((key, value) -> Assert.assertEquals(deviceGroupId, key));
+    //    getSchemaPartitionReq.setStorageGroup(sg).setDeviceGroupIDs(deviceGroupIds);
+    //    SchemaPartitionInfo schemaPartitionInfo =
+    // processor.applySchemaPartition(getSchemaPartitionReq);
+    //    Assert.assertNotNull(schemaPartitionInfo);
+    //    Assert.assertNotNull(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg));
+    //    schemaPartitionInfo
+    //        .getSchemaRegionDataNodesMap()
+    //        .get(sg)
+    //        .forEach((key, value) -> Assert.assertEquals(deviceGroupId, key));
   }
 
-  @Test
+  // TODO: Reuse this test after PatterTree is moved to node-commons
   public void getSchemaPartitionTest() throws TException, IOException {
     TSStatus status;
     final String sg = "root.sg0";
 
     // failed because there are not enough DataNodes
-    SetStorageGroupReq setReq = new SetStorageGroupReq(sg);
+    TSetStorageGroupReq setReq = new TSetStorageGroupReq(sg);
     status = processor.setStorageGroup(setReq);
     Assert.assertEquals(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), status.getCode());
     Assert.assertEquals("DataNode is not enough, please register more.", status.getMessage());
 
     // register DataNodes
-    DataNodeRegisterReq registerReq0 = new DataNodeRegisterReq(new EndPoint("0.0.0.0", 6667));
-    DataNodeRegisterReq registerReq1 = new DataNodeRegisterReq(new EndPoint("0.0.0.0", 6668));
-    DataNodeRegisterReq registerReq2 = new DataNodeRegisterReq(new EndPoint("0.0.0.0", 6669));
-    status = processor.registerDataNode(registerReq0).getRegisterResult();
+    TDataNodeRegisterReq registerReq0 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6667));
+    TDataNodeRegisterReq registerReq1 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6668));
+    TDataNodeRegisterReq registerReq2 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6669));
+    status = processor.registerDataNode(registerReq0).getStatus();
     Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
-    status = processor.registerDataNode(registerReq1).getRegisterResult();
+    status = processor.registerDataNode(registerReq1).getStatus();
     Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
-    status = processor.registerDataNode(registerReq2).getRegisterResult();
+    status = processor.registerDataNode(registerReq2).getStatus();
     Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
 
     // set StorageGroup
@@ -219,40 +246,176 @@ public class ConfigNodeRPCServerProcessorTest {
     Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
 
     // getSchemaPartition
-    GetSchemaPartitionReq getSchemaPartitionReq = new GetSchemaPartitionReq();
+    TSchemaPartitionReq getSchemaPartitionReq = new TSchemaPartitionReq();
     List<Integer> deviceGroupIds = new ArrayList<>();
     Integer deviceGroupId = 1000;
     deviceGroupIds.add(deviceGroupId);
-    getSchemaPartitionReq.setStorageGroup(sg).setDeviceGroupIDs(deviceGroupIds);
-    SchemaPartitionInfo schemaPartitionInfo = processor.getSchemaPartition(getSchemaPartitionReq);
-    Assert.assertNotNull(schemaPartitionInfo);
-    Assert.assertNotNull(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg));
+    //    getSchemaPartitionReq.setStorageGroup(sg).setDeviceGroupIDs(deviceGroupIds);
+    //    SchemaPartitionInfo schemaPartitionInfo =
+    // processor.getSchemaPartition(getSchemaPartitionReq);
+    //    Assert.assertNotNull(schemaPartitionInfo);
+    //    Assert.assertNotNull(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg));
+    //
+    //    // because does not apply schema partition, so schema partition is null
+    //
+    // Assert.assertNull(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg).get(deviceGroupId));
+    //
+    //    // applySchemaPartition
+    //    deviceGroupIds.add(deviceGroupId);
+    //    getSchemaPartitionReq.setStorageGroup(sg).setDeviceGroupIDs(deviceGroupIds);
+    //    schemaPartitionInfo = processor.applySchemaPartition(getSchemaPartitionReq);
+    //    Assert.assertNotNull(schemaPartitionInfo);
+    //    Assert.assertNotNull(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg));
+    //    schemaPartitionInfo
+    //        .getSchemaRegionDataNodesMap()
+    //        .get(sg)
+    //        .forEach((key, value) -> Assert.assertEquals(deviceGroupId, key));
+    //
+    //    // getSchemaPartition twice
+    //    getSchemaPartitionReq = new GetSchemaPartitionReq();
+    //    deviceGroupIds = new ArrayList<>();
+    //    deviceGroupIds.add(deviceGroupId);
+    //    getSchemaPartitionReq.setStorageGroup(sg).setDeviceGroupIDs(deviceGroupIds);
+    //    schemaPartitionInfo = processor.getSchemaPartition(getSchemaPartitionReq);
+    //    Assert.assertNotNull(schemaPartitionInfo);
+    //    Assert.assertNotNull(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg));
+    //
+    //    // because apply schema partition, so schema partition is not null
+    //    Assert.assertNotNull(
+    //        schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg).get(deviceGroupId));
+  }
 
-    // because does not apply schema partition, so schema partition is null
-    Assert.assertNull(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg).get(deviceGroupId));
+  private Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
+      constructPartitionSlotsMap(int sgNum, int seriesPartitionSlotNum, long timePartitionSlotNum) {
+    final String sg = "root.sg";
+    Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> result = new HashMap<>();
 
-    // applySchemaPartition
-    deviceGroupIds.add(deviceGroupId);
-    getSchemaPartitionReq.setStorageGroup(sg).setDeviceGroupIDs(deviceGroupIds);
-    schemaPartitionInfo = processor.applySchemaPartition(getSchemaPartitionReq);
-    Assert.assertNotNull(schemaPartitionInfo);
-    Assert.assertNotNull(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg));
-    schemaPartitionInfo
-        .getSchemaRegionDataNodesMap()
-        .get(sg)
-        .forEach((key, value) -> Assert.assertEquals(deviceGroupId, key));
-
-    // getSchemaPartition twice
-    getSchemaPartitionReq = new GetSchemaPartitionReq();
-    deviceGroupIds = new ArrayList<>();
-    deviceGroupIds.add(deviceGroupId);
-    getSchemaPartitionReq.setStorageGroup(sg).setDeviceGroupIDs(deviceGroupIds);
-    schemaPartitionInfo = processor.getSchemaPartition(getSchemaPartitionReq);
-    Assert.assertNotNull(schemaPartitionInfo);
-    Assert.assertNotNull(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg));
-
-    // because apply schema partition, so schema partition is not null
-    Assert.assertNotNull(
-        schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg).get(deviceGroupId));
+    for (int i = 0; i < sgNum; i++) {
+      String storageGroup = sg + i;
+      result.put(storageGroup, new HashMap<>());
+      for (int j = 0; j < seriesPartitionSlotNum; j++) {
+        TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(j);
+        result.get(storageGroup).put(seriesPartitionSlot, new ArrayList<>());
+        for (long k = 0; k < timePartitionSlotNum; k++) {
+          TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot(k);
+          result.get(storageGroup).get(seriesPartitionSlot).add(timePartitionSlot);
+        }
+      }
+    }
+
+    return result;
+  }
+
+  private void checkDataPartitionMap(
+      int sgNum,
+      int seriesPartitionSlotNum,
+      long timePartitionSlotNum,
+      Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
+          dataPartitionMap) {
+    final String sg = "root.sg";
+    Assert.assertEquals(sgNum, dataPartitionMap.size());
+    for (int i = 0; i < sgNum; i++) {
+      String storageGroup = sg + i;
+      Assert.assertTrue(dataPartitionMap.containsKey(storageGroup));
+      Assert.assertEquals(seriesPartitionSlotNum, dataPartitionMap.get(storageGroup).size());
+      for (int j = 0; j < seriesPartitionSlotNum; j++) {
+        TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(j);
+        Assert.assertTrue(dataPartitionMap.get(storageGroup).containsKey(seriesPartitionSlot));
+        Assert.assertEquals(
+            timePartitionSlotNum,
+            dataPartitionMap.get(storageGroup).get(seriesPartitionSlot).size());
+        for (long k = 0; k < timePartitionSlotNum; k++) {
+          TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot(k);
+          Assert.assertTrue(
+              dataPartitionMap
+                  .get(storageGroup)
+                  .get(seriesPartitionSlot)
+                  .containsKey(timePartitionSlot));
+          // One RegionReplicaSet
+          Assert.assertEquals(
+              1,
+              dataPartitionMap
+                  .get(storageGroup)
+                  .get(seriesPartitionSlot)
+                  .get(timePartitionSlot)
+                  .size());
+          // Including three RegionReplica
+          Assert.assertEquals(
+              3,
+              dataPartitionMap
+                  .get(storageGroup)
+                  .get(seriesPartitionSlot)
+                  .get(timePartitionSlot)
+                  .get(0)
+                  .getEndpointSize());
+        }
+      }
+    }
+  }
+
+  @Test
+  public void getAndCreateDataPartitionTest() throws TException {
+    TSStatus status;
+
+    // register DataNodes
+    TDataNodeRegisterReq registerReq0 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6667));
+    TDataNodeRegisterReq registerReq1 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6668));
+    TDataNodeRegisterReq registerReq2 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6669));
+    status = processor.registerDataNode(registerReq0).getStatus();
+    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+    status = processor.registerDataNode(registerReq1).getStatus();
+    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+    status = processor.registerDataNode(registerReq2).getStatus();
+    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+
+    final String sg = "root.sg";
+    final int sgNum = 2;
+    final int seriesPartitionSlotNum = 4;
+    final long timePartitionSlotNum = 6;
+
+    // Prepare partitionSlotsMap
+    Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap0 =
+        constructPartitionSlotsMap(sgNum, seriesPartitionSlotNum, timePartitionSlotNum);
+    Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap1 =
+        constructPartitionSlotsMap(sgNum * 2, seriesPartitionSlotNum * 2, timePartitionSlotNum * 2);
+
+    // set StorageGroups
+    for (int i = 0; i < sgNum; i++) {
+      TSetStorageGroupReq setReq = new TSetStorageGroupReq(sg + i);
+      status = processor.setStorageGroup(setReq);
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+    }
+
+    // Test getDataPartition, the result should be empty
+    TDataPartitionReq dataPartitionReq = new TDataPartitionReq();
+    dataPartitionReq.setPartitionSlotsMap(partitionSlotsMap0);
+    TDataPartitionResp dataPartitionResp = processor.getDataPartition(dataPartitionReq);
+    Assert.assertEquals(
+        TSStatusCode.SUCCESS_STATUS.getStatusCode(), dataPartitionResp.getStatus().getCode());
+    Assert.assertNotNull(dataPartitionResp.getDataPartitionMap());
+    Assert.assertEquals(0, dataPartitionResp.getDataPartitionMapSize());
+
+    // Test getOrCreateDataPartition, ConfigNode should create DataPartition for PartitionSlots
+    dataPartitionResp = processor.getOrCreateDataPartition(dataPartitionReq);
+    Assert.assertEquals(
+        TSStatusCode.SUCCESS_STATUS.getStatusCode(), dataPartitionResp.getStatus().getCode());
+    Assert.assertNotNull(dataPartitionResp.getDataPartitionMap());
+    checkDataPartitionMap(
+        sgNum,
+        seriesPartitionSlotNum,
+        timePartitionSlotNum,
+        dataPartitionResp.getDataPartitionMap());
+
+    // Test getDataPartition, the result should only contain DataPartition created before
+    dataPartitionReq.setPartitionSlotsMap(partitionSlotsMap1);
+    dataPartitionResp = processor.getDataPartition(dataPartitionReq);
+    Assert.assertEquals(
+        TSStatusCode.SUCCESS_STATUS.getStatusCode(), dataPartitionResp.getStatus().getCode());
+    Assert.assertNotNull(dataPartitionResp.getDataPartitionMap());
+    checkDataPartitionMap(
+        sgNum,
+        seriesPartitionSlotNum,
+        timePartitionSlotNum,
+        dataPartitionResp.getDataPartitionMap());
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataNodeLocation.java b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/DataNodeLocation.java
similarity index 58%
rename from node-commons/src/main/java/org/apache/iotdb/commons/partition/DataNodeLocation.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/cluster/DataNodeLocation.java
index 14e638d2a6..e81af759b3 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataNodeLocation.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/DataNodeLocation.java
@@ -16,67 +16,42 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.commons.partition;
-
-import org.apache.iotdb.commons.cluster.Endpoint;
+package org.apache.iotdb.commons.cluster;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Objects;
 
 public class DataNodeLocation {
 
-  private int dataNodeID;
+  private int dataNodeId;
   private Endpoint endPoint;
 
-  private List<Integer> schemaRegionGroupIDs;
-  private List<Integer> dataRegionGroupIDs;
-
   public DataNodeLocation() {}
 
-  public DataNodeLocation(int dataNodeID, Endpoint endPoint) {
-    this.dataNodeID = dataNodeID;
+  public DataNodeLocation(int dataNodeId, Endpoint endPoint) {
+    this.dataNodeId = dataNodeId;
     this.endPoint = endPoint;
-    dataRegionGroupIDs = new ArrayList<>();
-    schemaRegionGroupIDs = new ArrayList<>();
   }
 
-  public int getDataNodeID() {
-    return dataNodeID;
+  public int getDataNodeId() {
+    return dataNodeId;
   }
 
-  public void setDataNodeID(int dataNodeID) {
-    this.dataNodeID = dataNodeID;
+  public void setDataNodeId(int dataNodeId) {
+    this.dataNodeId = dataNodeId;
   }
 
   public Endpoint getEndPoint() {
     return endPoint;
   }
 
-  public void addSchemaRegionGroup(int id) {
-    schemaRegionGroupIDs.add(id);
-  }
-
-  public List<Integer> getSchemaRegionGroupIDs() {
-    return schemaRegionGroupIDs;
-  }
-
-  public void addDataRegionGroup(int id) {
-    dataRegionGroupIDs.add(id);
-  }
-
-  public List<Integer> getDataRegionGroupIDs() {
-    return dataRegionGroupIDs;
-  }
-
   public void serializeImpl(ByteBuffer buffer) {
-    buffer.putInt(dataNodeID);
+    buffer.putInt(dataNodeId);
     endPoint.serializeImpl(buffer);
   }
 
   public void deserializeImpl(ByteBuffer buffer) {
-    dataNodeID = buffer.getInt();
+    dataNodeId = buffer.getInt();
     endPoint = new Endpoint();
     endPoint.deserializeImpl(buffer);
   }
@@ -90,15 +65,15 @@ public class DataNodeLocation {
       return false;
     }
     DataNodeLocation that = (DataNodeLocation) o;
-    return dataNodeID == that.dataNodeID && Objects.equals(endPoint, that.endPoint);
+    return dataNodeId == that.dataNodeId && Objects.equals(endPoint, that.endPoint);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(dataNodeID, endPoint);
+    return Objects.hash(dataNodeId, endPoint);
   }
 
   public String toString() {
-    return String.format("DataNode[%d, %s]", dataNodeID, endPoint);
+    return String.format("DataNode[%d, %s]", dataNodeId, endPoint);
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index acd8a5e3c8..e16f878c59 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -18,7 +18,12 @@
  */
 package org.apache.iotdb.commons.partition;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 public class DataPartition {
@@ -27,6 +32,16 @@ public class DataPartition {
   private Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
       dataPartitionMap;
 
+  public DataPartition() {
+    // Empty constructor
+  }
+
+  public DataPartition(
+      Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
+          dataPartitionMap) {
+    this.dataPartitionMap = dataPartitionMap;
+  }
+
   public Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
       getDataPartitionMap() {
     return dataPartitionMap;
@@ -78,4 +93,115 @@ public class DataPartition {
     // TODO: (xingtanzjr) how to handle this exception in IoTDB
     return null;
   }
+
+  /* Interfaces for ConfigNode */
+
+  /**
+   * Get DataPartition by partitionSlotsMap
+   *
+   * @param partitionSlotsMap Map<StorageGroupName, Map<SeriesPartitionSlot,
+   *     List<TimePartitionSlot>>>
+   * @return Map<StorageGroupName, Map<SeriesPartitionSlot, Map<TimePartitionSlot,
+   *     List<RegionReplicaSet>>>>
+   */
+  public DataPartition getDataPartition(
+      Map<String, Map<SeriesPartitionSlot, List<TimePartitionSlot>>> partitionSlotsMap) {
+    Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>> result =
+        new HashMap<>();
+
+    for (String storageGroupName : partitionSlotsMap.keySet()) {
+      // Compare StorageGroupName
+      if (dataPartitionMap.containsKey(storageGroupName)) {
+        Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>
+            seriesTimePartitionSlotMap = dataPartitionMap.get(storageGroupName);
+        for (SeriesPartitionSlot seriesPartitionSlot :
+            partitionSlotsMap.get(storageGroupName).keySet()) {
+          // Compare SeriesPartitionSlot
+          if (seriesTimePartitionSlotMap.containsKey(seriesPartitionSlot)) {
+            Map<TimePartitionSlot, List<RegionReplicaSet>> timePartitionSlotMap =
+                seriesTimePartitionSlotMap.get(seriesPartitionSlot);
+            for (TimePartitionSlot timePartitionSlot :
+                partitionSlotsMap.get(storageGroupName).get(seriesPartitionSlot)) {
+              // Compare TimePartitionSlot
+              if (timePartitionSlotMap.containsKey(timePartitionSlot)) {
+                result
+                    .computeIfAbsent(storageGroupName, key -> new HashMap<>())
+                    .computeIfAbsent(seriesPartitionSlot, key -> new HashMap<>())
+                    .put(
+                        timePartitionSlot,
+                        new ArrayList<>(timePartitionSlotMap.get(timePartitionSlot)));
+              }
+            }
+          }
+        }
+      }
+    }
+
+    return new DataPartition(result);
+  }
+
+  /**
+   * Filter out unassigned PartitionSlots
+   *
+   * @param partitionSlotsMap Map<StorageGroupName, Map<SeriesPartitionSlot,
+   *     List<TimePartitionSlot>>>
+   * @return Map<StorageGroupName, Map<SeriesPartitionSlot, List<TimePartitionSlot>>>, unassigned
+   *     PartitionSlots
+   */
+  public Map<String, Map<SeriesPartitionSlot, List<TimePartitionSlot>>>
+      filterNoAssignedDataPartitionSlots(
+          Map<String, Map<SeriesPartitionSlot, List<TimePartitionSlot>>> partitionSlotsMap) {
+    Map<String, Map<SeriesPartitionSlot, List<TimePartitionSlot>>> result = new HashMap<>();
+
+    for (String storageGroupName : partitionSlotsMap.keySet()) {
+      // Compare StorageGroupName
+      if (dataPartitionMap.containsKey(storageGroupName)) {
+        Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>
+            seriesTimePartitionSlotMap = dataPartitionMap.get(storageGroupName);
+        for (SeriesPartitionSlot seriesPartitionSlot :
+            partitionSlotsMap.get(storageGroupName).keySet()) {
+          // Compare SeriesPartitionSlot
+          if (seriesTimePartitionSlotMap.containsKey(seriesPartitionSlot)) {
+            Map<TimePartitionSlot, List<RegionReplicaSet>> timePartitionSlotMap =
+                seriesTimePartitionSlotMap.get(seriesPartitionSlot);
+            for (TimePartitionSlot timePartitionSlot :
+                partitionSlotsMap.get(storageGroupName).get(seriesPartitionSlot)) {
+              // Compare TimePartitionSlot
+              if (!timePartitionSlotMap.containsKey(timePartitionSlot)) {
+                result
+                    .computeIfAbsent(storageGroupName, key -> new HashMap<>())
+                    .computeIfAbsent(seriesPartitionSlot, key -> new ArrayList<>())
+                    .add(timePartitionSlot);
+              }
+            }
+          } else {
+            // Clone all if SeriesPartitionSlot not assigned
+            result
+                .computeIfAbsent(storageGroupName, key -> new HashMap<>())
+                .put(
+                    seriesPartitionSlot,
+                    new ArrayList<>(
+                        partitionSlotsMap.get(storageGroupName).get(seriesPartitionSlot)));
+          }
+        }
+      } else {
+        // Clone all if StorageGroupName not assigned
+        result.put(storageGroupName, new HashMap<>(partitionSlotsMap.get(storageGroupName)));
+      }
+    }
+
+    return result;
+  }
+
+  /** Create a DataPartition by ConfigNode */
+  public void createDataPartition(
+      String storageGroup,
+      SeriesPartitionSlot seriesPartitionSlot,
+      TimePartitionSlot timePartitionSlot,
+      RegionReplicaSet regionReplicaSet) {
+    dataPartitionMap
+        .computeIfAbsent(storageGroup, key -> new HashMap<>())
+        .computeIfAbsent(seriesPartitionSlot, key -> new HashMap<>())
+        .put(timePartitionSlot, Collections.singletonList(regionReplicaSet));
+  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
index fec0cab0c7..115c4406c4 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.commons.partition;
 
+import org.apache.iotdb.commons.cluster.DataNodeLocation;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 
 import java.io.IOException;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
index b81d569a1a..9d428ccedf 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
@@ -26,34 +26,34 @@ import java.util.stream.Collectors;
 public class SchemaPartition {
 
   // Map<StorageGroup, Map<DeviceGroupID, SchemaRegionPlaceInfo>>
-  private Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> schemaPartition;
+  private Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> schemaPartitionMap;
 
   public SchemaPartition() {
-    schemaPartition = new HashMap<>();
+    schemaPartitionMap = new HashMap<>();
   }
 
-  public Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> getSchemaPartition() {
-    return schemaPartition;
+  public Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> getSchemaPartitionMap() {
+    return schemaPartitionMap;
   }
 
-  public void setSchemaPartition(
-      Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> schemaPartition) {
-    this.schemaPartition = schemaPartition;
+  public void setSchemaPartitionMap(
+      Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> schemaPartitionMap) {
+    this.schemaPartitionMap = schemaPartitionMap;
   }
 
   public Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> getSchemaPartition(
-      String storageGroup, List<Integer> deviceGroupIDs) {
+      String storageGroup, List<Integer> seriesPartitionSlots) {
     Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> storageGroupMap = new HashMap<>();
     Map<SeriesPartitionSlot, RegionReplicaSet> deviceGroupMap = new HashMap<>();
-    deviceGroupIDs.forEach(
+    seriesPartitionSlots.forEach(
         deviceGroupID -> {
-          if (schemaPartition.get(storageGroup) != null
-              && schemaPartition
+          if (schemaPartitionMap.get(storageGroup) != null
+              && schemaPartitionMap
                   .get(storageGroup)
                   .containsKey(new SeriesPartitionSlot(deviceGroupID))) {
             deviceGroupMap.put(
                 new SeriesPartitionSlot(deviceGroupID),
-                schemaPartition.get(storageGroup).get(new SeriesPartitionSlot(deviceGroupID)));
+                schemaPartitionMap.get(storageGroup).get(new SeriesPartitionSlot(deviceGroupID)));
           }
         });
     storageGroupMap.put(storageGroup, deviceGroupMap);
@@ -61,31 +61,26 @@ public class SchemaPartition {
   }
 
   /**
-   * Filter out unassigned device groups
+   * Filter out unassigned SeriesPartitionSlots
    *
    * @param storageGroup storage group name
-   * @param deviceGroupIDs device group id list
-   * @return deviceGroupIDs does not assigned
+   * @param seriesPartitionSlots SeriesPartitionSlotIds
+   * @return not assigned seriesPartitionSlots
    */
-  public List<Integer> filterNoAssignDeviceGroupId(
-      String storageGroup, List<Integer> deviceGroupIDs) {
-    if (!schemaPartition.containsKey(storageGroup)) {
-      return deviceGroupIDs;
+  public List<Integer> filterNoAssignedSeriesPartitionSlot(
+      String storageGroup, List<Integer> seriesPartitionSlots) {
+    if (!schemaPartitionMap.containsKey(storageGroup)) {
+      return seriesPartitionSlots;
     }
-    return deviceGroupIDs.stream()
+    return seriesPartitionSlots.stream()
         .filter(
-            id -> {
-              if (schemaPartition.get(storageGroup).containsKey(deviceGroupIDs)) {
-                return false;
-              }
-              return true;
-            })
+            id -> !schemaPartitionMap.get(storageGroup).containsKey(new SeriesPartitionSlot(id)))
         .collect(Collectors.toList());
   }
 
   public void setSchemaRegionReplicaSet(
       String storageGroup, int deviceGroupId, RegionReplicaSet regionReplicaSet) {
-    schemaPartition
+    schemaPartitionMap
         .computeIfAbsent(storageGroup, value -> new HashMap<>())
         .put(new SeriesPartitionSlot(deviceGroupId), regionReplicaSet);
   }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionSlot.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionSlot.java
index 52ef9b5fe6..553aeff163 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionSlot.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionSlot.java
@@ -18,27 +18,42 @@
  */
 package org.apache.iotdb.commons.partition;
 
+import java.nio.ByteBuffer;
+
 public class SeriesPartitionSlot {
-  private int deviceGroupId;
+  private int slotId;
+
+  public SeriesPartitionSlot() {
+    // Empty constructor
+  }
 
-  public SeriesPartitionSlot(int deviceGroupId) {
-    this.deviceGroupId = deviceGroupId;
+  public SeriesPartitionSlot(int slotId) {
+    this.slotId = slotId;
   }
 
-  public int getDeviceGroupId() {
-    return deviceGroupId;
+  public int getSlotId() {
+    return slotId;
   }
 
-  public void setDeviceGroupId(int deviceGroupId) {
-    this.deviceGroupId = deviceGroupId;
+  public void setSlotId(int slotId) {
+    this.slotId = slotId;
   }
 
+  @Override
   public int hashCode() {
-    return new Integer(deviceGroupId).hashCode();
+    return new Integer(slotId).hashCode();
   }
 
+  @Override
   public boolean equals(Object obj) {
-    return obj instanceof SeriesPartitionSlot
-        && this.deviceGroupId == ((SeriesPartitionSlot) obj).deviceGroupId;
+    return obj instanceof SeriesPartitionSlot && this.slotId == ((SeriesPartitionSlot) obj).slotId;
+  }
+
+  public void serializeImpl(ByteBuffer buffer) {
+    buffer.putInt(slotId);
+  }
+
+  public void deserializeImpl(ByteBuffer buffer) {
+    slotId = buffer.getInt();
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/TimePartitionSlot.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/TimePartitionSlot.java
index 048406045b..69a3416fcf 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/TimePartitionSlot.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/TimePartitionSlot.java
@@ -18,9 +18,21 @@
  */
 package org.apache.iotdb.commons.partition;
 
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
 public class TimePartitionSlot {
+
   private long startTime;
 
+  public TimePartitionSlot() {
+    // empty constructor
+  }
+
+  public TimePartitionSlot(long startTime) {
+    this.startTime = startTime;
+  }
+
   public long getStartTime() {
     return startTime;
   }
@@ -28,4 +40,25 @@ public class TimePartitionSlot {
   public void setStartTime(long startTime) {
     this.startTime = startTime;
   }
+
+  public void serializeImpl(ByteBuffer buffer) {
+    buffer.putLong(startTime);
+  }
+
+  public void deserializeImpl(ByteBuffer buffer) {
+    startTime = buffer.getLong();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    TimePartitionSlot that = (TimePartitionSlot) o;
+    return startTime == that.startTime;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(startTime);
+  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/hash/DeviceGroupHashExecutor.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/SeriesPartitionExecutor.java
similarity index 62%
rename from node-commons/src/main/java/org/apache/iotdb/commons/hash/DeviceGroupHashExecutor.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/SeriesPartitionExecutor.java
index a597c938a8..285f2ff95c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/hash/DeviceGroupHashExecutor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/SeriesPartitionExecutor.java
@@ -16,16 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.commons.hash;
+package org.apache.iotdb.commons.partition.executor;
 
-/** All DeviceGroup hash algorithm executors must be subclasses of DeviceGroupHashExecutor */
-public abstract class DeviceGroupHashExecutor {
+import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
 
-  protected final int deviceGroupCount;
+/** All SeriesPartitionExecutors must be subclasses of SeriesPartitionExecutor */
+public abstract class SeriesPartitionExecutor {
 
-  public DeviceGroupHashExecutor(int deviceGroupCount) {
-    this.deviceGroupCount = deviceGroupCount;
+  protected final int seriesPartitionSlotNum;
+
+  public SeriesPartitionExecutor(int seriesPartitionSlotNum) {
+    this.seriesPartitionSlotNum = seriesPartitionSlotNum;
   }
 
-  public abstract int getDeviceGroupID(String device);
+  public abstract SeriesPartitionSlot getSeriesPartitionSlot(String device);
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/hash/APHashExecutor.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/APHashExecutor.java
similarity index 75%
rename from node-commons/src/main/java/org/apache/iotdb/commons/hash/APHashExecutor.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/APHashExecutor.java
index 0fc18ad662..8da263e4ae 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/hash/APHashExecutor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/APHashExecutor.java
@@ -16,16 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.commons.hash;
+package org.apache.iotdb.commons.partition.executor.hash;
 
-public class APHashExecutor extends DeviceGroupHashExecutor {
+import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
+import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
+
+public class APHashExecutor extends SeriesPartitionExecutor {
 
   public APHashExecutor(int deviceGroupCount) {
     super(deviceGroupCount);
   }
 
   @Override
-  public int getDeviceGroupID(String device) {
+  public SeriesPartitionSlot getSeriesPartitionSlot(String device) {
     int hash = 0;
 
     for (int i = 0; i < device.length(); i++) {
@@ -37,6 +40,6 @@ public class APHashExecutor extends DeviceGroupHashExecutor {
     }
     hash &= Integer.MAX_VALUE;
 
-    return hash % deviceGroupCount;
+    return new SeriesPartitionSlot(hash % seriesPartitionSlotNum);
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/hash/BKDRHashExecutor.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/BKDRHashExecutor.java
similarity index 73%
rename from node-commons/src/main/java/org/apache/iotdb/commons/hash/BKDRHashExecutor.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/BKDRHashExecutor.java
index b5e08a0718..a238afb416 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/hash/BKDRHashExecutor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/BKDRHashExecutor.java
@@ -16,9 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.commons.hash;
+package org.apache.iotdb.commons.partition.executor.hash;
 
-public class BKDRHashExecutor extends DeviceGroupHashExecutor {
+import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
+import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
+
+public class BKDRHashExecutor extends SeriesPartitionExecutor {
 
   private static final int seed = 131;
 
@@ -27,7 +30,7 @@ public class BKDRHashExecutor extends DeviceGroupHashExecutor {
   }
 
   @Override
-  public int getDeviceGroupID(String device) {
+  public SeriesPartitionSlot getSeriesPartitionSlot(String device) {
     int hash = 0;
 
     for (int i = 0; i < device.length(); i++) {
@@ -35,6 +38,6 @@ public class BKDRHashExecutor extends DeviceGroupHashExecutor {
     }
     hash &= Integer.MAX_VALUE;
 
-    return hash % deviceGroupCount;
+    return new SeriesPartitionSlot(hash % seriesPartitionSlotNum);
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/hash/JSHashExecutor.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/JSHashExecutor.java
similarity index 74%
rename from node-commons/src/main/java/org/apache/iotdb/commons/hash/JSHashExecutor.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/JSHashExecutor.java
index 6521d2890b..13f9468154 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/hash/JSHashExecutor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/JSHashExecutor.java
@@ -16,9 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.commons.hash;
+package org.apache.iotdb.commons.partition.executor.hash;
 
-public class JSHashExecutor extends DeviceGroupHashExecutor {
+import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
+import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
+
+public class JSHashExecutor extends SeriesPartitionExecutor {
 
   private static final int base = 1315423911;
 
@@ -27,7 +30,7 @@ public class JSHashExecutor extends DeviceGroupHashExecutor {
   }
 
   @Override
-  public int getDeviceGroupID(String device) {
+  public SeriesPartitionSlot getSeriesPartitionSlot(String device) {
     int hash = base;
 
     for (int i = 0; i < device.length(); i++) {
@@ -35,6 +38,6 @@ public class JSHashExecutor extends DeviceGroupHashExecutor {
     }
     hash &= Integer.MAX_VALUE;
 
-    return hash % deviceGroupCount;
+    return new SeriesPartitionSlot(hash % seriesPartitionSlotNum);
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/hash/SDBMHashExecutor.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/SDBMHashExecutor.java
similarity index 73%
rename from node-commons/src/main/java/org/apache/iotdb/commons/hash/SDBMHashExecutor.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/SDBMHashExecutor.java
index 8780ac4838..0246d4f8de 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/hash/SDBMHashExecutor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/SDBMHashExecutor.java
@@ -16,16 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.commons.hash;
+package org.apache.iotdb.commons.partition.executor.hash;
 
-public class SDBMHashExecutor extends DeviceGroupHashExecutor {
+import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
+import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
+
+public class SDBMHashExecutor extends SeriesPartitionExecutor {
 
   public SDBMHashExecutor(int deviceGroupCount) {
     super(deviceGroupCount);
   }
 
   @Override
-  public int getDeviceGroupID(String device) {
+  public SeriesPartitionSlot getSeriesPartitionSlot(String device) {
     int hash = 0;
 
     for (int i = 0; i < device.length(); i++) {
@@ -33,6 +36,6 @@ public class SDBMHashExecutor extends DeviceGroupHashExecutor {
     }
     hash &= Integer.MAX_VALUE;
 
-    return hash % deviceGroupCount;
+    return new SeriesPartitionSlot(hash % seriesPartitionSlotNum);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java
index 5ba8090451..94167a23ae 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.mpp.sql.analyze;
 
+import org.apache.iotdb.commons.cluster.DataNodeLocation;
 import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.partition.*;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 30e88e484b..57cfdd1354 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -28,8 +28,8 @@ import org.apache.iotdb.commons.service.JMXService;
 import org.apache.iotdb.commons.service.RegisterManager;
 import org.apache.iotdb.commons.utils.CommonUtils;
 import org.apache.iotdb.confignode.rpc.thrift.ConfigIService;
-import org.apache.iotdb.confignode.rpc.thrift.DataNodeRegisterReq;
-import org.apache.iotdb.confignode.rpc.thrift.DataNodeRegisterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConfigCheck;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -139,10 +139,10 @@ public class DataNode implements DataNodeMBean {
       logger.info("start joining the cluster with the help of {}", configNode);
       try {
         ConfigIService.Client client = createClient(configNode);
-        DataNodeRegisterResp dataNodeRegisterResp =
+        TDataNodeRegisterResp dataNodeRegisterResp =
             client.registerDataNode(
-                new DataNodeRegisterReq(new EndPoint(thisNode.getIp(), thisNode.getPort())));
-        if (dataNodeRegisterResp.getRegisterResult().getCode()
+                new TDataNodeRegisterReq(new EndPoint(thisNode.getIp(), thisNode.getPort())));
+        if (dataNodeRegisterResp.getStatus().getCode()
             == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
           dataNodeID = dataNodeRegisterResp.getDataNodeID();
           logger.info("Joined a cluster successfully");
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index 7ad38eb34f..03db5bb4ff 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -19,9 +19,9 @@
 
 package org.apache.iotdb.db.mpp.sql.plan;
 
+import org.apache.iotdb.commons.cluster.DataNodeLocation;
 import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.consensus.DataRegionId;
-import org.apache.iotdb.commons.partition.DataNodeLocation;
 import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index c0f5fa1635..d3948a7d06 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -109,7 +109,12 @@ public enum TSStatusCode {
   PARSE_LOG_ERROR(708),
 
   // configuration
-  CONFIG_ERROR(800);
+  CONFIG_ERROR(800),
+
+  // ConfigNode response
+  DATANODE_ALREADY_REGISTERED(901),
+  STORAGE_GROUP_ALREADY_EXISTS(902),
+  NOT_ENOUGH_DATA_NODE(903);
 
   private int statusCode;
 
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index c41ffc1c22..7d3c3b7038 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -21,82 +21,79 @@ include "common.thrift"
 namespace java org.apache.iotdb.confignode.rpc.thrift
 namespace py iotdb.thrift.confignode
 
-struct DataNodeRegisterReq {
+// DataNode
+struct TDataNodeRegisterReq {
     1: required common.EndPoint endPoint
 }
 
-struct DataNodeRegisterResp {
-    1: required common.TSStatus registerResult
-    2: optional i32 dataNodeID
+struct TGlobalConfig {
+    1: optional string dataNodeConsensusProtocolClass
+    2: optional i32 seriesPartitionSlotNum
+    3: optional string seriesPartitionExecutorClass
 }
 
-struct DataNodeMessage {
-  1: required i32 dataNodeID
-  2: required common.EndPoint endPoint
+struct TDataNodeRegisterResp {
+    1: required common.TSStatus status
+    2: optional i32 dataNodeID
+    3: optional TGlobalConfig globalConfig
 }
 
-struct SetStorageGroupReq {
-    1: required string storageGroup
+struct TDataNodeMessageResp {
+  1: required common.TSStatus status
+  // map<DataNodeId, DataNodeMessage>
+  2: optional map<i32, TDataNodeMessage> dataNodeMessageMap
 }
 
-struct DeleteStorageGroupReq {
-    1: required string storageGroup
+struct TDataNodeMessage {
+  1: required i32 dataNodeId
+  2: required common.EndPoint endPoint
 }
 
-struct StorageGroupMessage {
+// StorageGroup
+struct TSetStorageGroupReq {
     1: required string storageGroup
+    2: optional i64 ttl
 }
 
-struct GetDeviceGroupIDReq {
-    1: required string device
-}
-
-struct GetSchemaPartitionReq {
+struct TDeleteStorageGroupReq {
     1: required string storageGroup
-    2: required list<i32> deviceGroupIDs
-}
-
-struct SchemaPartitionInfo {
-    1: required map<string, map<i32, common.RegionReplicaSet>> schemaRegionDataNodesMap
 }
 
-struct DataPartitionInfo {
-    1: required map<string, map<i64, map<i32, list<common.RegionReplicaSet>>>> deviceGroupStartTimeDataRegionGroupMap
+struct TStorageGroupMessageResp {
+  1: required common.TSStatus status
+  // map<string, StorageGroupMessage>
+  2: optional map<string, TStorageGroupMessage> storageGroupMessageMap
 }
 
-struct GetDataPartitionReq {
+struct TStorageGroupMessage {
     1: required string storageGroup
-    2: required map<i32, list<i64>> deviceGroupStartTimeMap
 }
 
-struct DeviceGroupHashInfo {
-    1: required i32 deviceGroupCount
-    2: required string hashClass
+// Schema
+struct TSchemaPartitionReq {
+    1: required binary pathPatternTree
 }
 
-struct FetchDataPartitionReq {
-    1: required map<i32, list<i64>> deviceGroupIDToStartTimeMap
+struct TSchemaPartitionResp {
+  1: required common.TSStatus status
+    // map<StorageGroupName, map<TSeriesPartitionSlot, TRegionReplicaSet>>
+  2: optional map<string, map<common.TSeriesPartitionSlot, common.TRegionReplicaSet>> schemaRegionMap
 }
 
-struct FetchSchemaPartitionReq {
-    1: required list<string> devicePaths
+// Data
+struct TDataPartitionReq {
+    // map<StorageGroupName, map<TSeriesPartitionSlot, list<TTimePartitionSlot>>>
+    1: required map<string, map<common.TSeriesPartitionSlot, list<common.TTimePartitionSlot>>> partitionSlotsMap
 }
 
-struct FetchPartitionReq {
-    1: required map<i32, list<i64>> deviceGroupIDToStartTimeMap
+struct TDataPartitionResp {
+  1: required common.TSStatus status
+  // map<StorageGroupName, map<TSeriesPartitionSlot, map<TTimePartitionSlot, list<TRegionReplicaSet>>>>
+  2: optional map<string, map<common.TSeriesPartitionSlot, map<common.TTimePartitionSlot, list<common.TRegionReplicaSet>>>> dataPartitionMap
 }
 
-struct RegionInfo {
-    1: required i32 regionId
-    2: required list<common.EndPoint> endPointList
-}
-
-struct DataPartitionInfoResp {
-    // Map<StorageGroup, Map<DeviceGroupID, Map<TimePartitionId, List<DataRegionReplicaInfo>>>>
-    1: required map<string, map<i32, map<i64, list<RegionInfo>>>> dataPartitionMap
-}
-
-struct AuthorizerReq{
+// Authorize
+struct TAuthorizerReq {
     1: required i32 authorType
     2: required string userName
     3: required string roleName
@@ -106,51 +103,35 @@ struct AuthorizerReq{
     7: required string nodeName
 }
 
-struct SchemaPartitionInfoResp {
-    // Map<StorageGroup, Map<DeviceGroupID, SchemaRegionPlaceInfo>>
-    1: required map<string, map<i32, RegionInfo>> schemaPartitionInfo
-}
-
-struct PartitionInfoResp {
-    // Map<StorageGroup, Map<DeviceGroupID, Map<TimePartitionId, List<DataRegionReplicaInfo>>>>
-    1: required map<string, map<i32, map<i64, list<RegionInfo>>>> dataPartitionMap
-    // Map<StorageGroup, Map<DeviceGroupID, SchemaRegionPlaceInfo>>
-    2: required map<string, map<i32, RegionInfo>> schemaPartitionInfo
-}
-
 service ConfigIService {
-  // Return TSStatusCode.SUCCESS_STATUS and the register DataNode id when successful registered.
-  // Otherwise, return TSStatusCode.INTERNAL_SERVER_ERROR
-  DataNodeRegisterResp registerDataNode(DataNodeRegisterReq req)
 
-  map<i32, DataNodeMessage> getDataNodesMessage(i32 dataNodeID)
+  /* DataNode */
+
+  TDataNodeRegisterResp registerDataNode(TDataNodeRegisterReq req)
 
-  common.TSStatus setStorageGroup(SetStorageGroupReq req)
+  TDataNodeMessageResp getDataNodesMessage(i32 dataNodeID)
 
-  common.TSStatus deleteStorageGroup(DeleteStorageGroupReq req)
+  /* StorageGroup */
 
-  map<string, StorageGroupMessage> getStorageGroupsMessage()
+  common.TSStatus setStorageGroup(TSetStorageGroupReq req)
 
-  // Gets SchemaRegions for DeviceGroups in a StorageGroup
-  SchemaPartitionInfo getSchemaPartition(GetSchemaPartitionReq req)
+  common.TSStatus deleteStorageGroup(TDeleteStorageGroupReq req)
 
-  // Gets DataRegions for DeviceGroups in a StorageGroup at different starttime
-  DataPartitionInfo getDataPartition(GetDataPartitionReq req)
+  TStorageGroupMessageResp getStorageGroupsMessage()
 
-  DeviceGroupHashInfo getDeviceGroupHashInfo()
+  /* Schema */
 
-  // apply data partition when write data
-  DataPartitionInfo applyDataPartition(GetDataPartitionReq req)
+  TSchemaPartitionResp getSchemaPartition(TSchemaPartitionReq req)
 
-  // apply schema partition when create schema
-  SchemaPartitionInfo applySchemaPartition(GetSchemaPartitionReq req)
+  TSchemaPartitionResp getOrCreateSchemaPartition(TSchemaPartitionReq req)
 
-  DataPartitionInfoResp fetchDataPartitionInfo(FetchDataPartitionReq req)
+  /* Data */
 
-  SchemaPartitionInfoResp fetchSchemaPartitionInfo(FetchSchemaPartitionReq req)
+  TDataPartitionResp getDataPartition(TDataPartitionReq req)
 
-  PartitionInfoResp fetchPartitionInfo(FetchPartitionReq req)
+  TDataPartitionResp getOrCreateDataPartition(TDataPartitionReq req)
 
-   common.TSStatus operatePermission(AuthorizerReq req)
+  /* Authorize */
+  common.TSStatus operatePermission(TAuthorizerReq req)
 
 }
\ No newline at end of file
diff --git a/thrift/src/main/thrift/common.thrift b/thrift/src/main/thrift/common.thrift
index 724822e93b..8a8e61999f 100644
--- a/thrift/src/main/thrift/common.thrift
+++ b/thrift/src/main/thrift/common.thrift
@@ -33,7 +33,16 @@ namespace py iotdb.thrift.common
    4: optional EndPoint redirectNode
  }
 
- struct RegionReplicaSet {
+ struct TRegionReplicaSet {
      1: required i32 regionId
-     2: required list<EndPoint> endpoint
+     2: required string groupType
+     3: required list<EndPoint> endpoint
+ }
+
+ struct TSeriesPartitionSlot {
+     1: required i32 slotId
+ }
+
+ struct TTimePartitionSlot {
+     1: required i64 startTime
  }
\ No newline at end of file
diff --git a/thrift/src/main/thrift/management.thrift b/thrift/src/main/thrift/management.thrift
index 31832a168a..30aa28a072 100644
--- a/thrift/src/main/thrift/management.thrift
+++ b/thrift/src/main/thrift/management.thrift
@@ -24,12 +24,12 @@ typedef i32 int
 typedef i64 long
 
 struct CreateSchemaRegionReq {
-    1: required common.RegionReplicaSet regionReplicaSet
+    1: required common.TRegionReplicaSet regionReplicaSet
     2: required string storageGroup
 }
 
 struct CreateDataRegionReq {
-    1: required common.RegionReplicaSet regionReplicaSet
+    1: required common.TRegionReplicaSet regionReplicaSet
     2: required string storageGroup
     3: optional long ttl
 }