You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/05/17 14:34:25 UTC

[iotdb] 01/01: split dataRegion/schemaRegion consensus protocol && multiLeaderConsensus init

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch jira3188
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3b5fb851641f8c90a9d0711b1c9669a4c5237b23
Author: LebronAl <TX...@gmail.com>
AuthorDate: Tue May 17 22:34:03 2022 +0800

    split dataRegion/schemaRegion consensus protocol && multiLeaderConsensus init
---
 .../resources/conf/iotdb-confignode.properties     |  12 +-
 .../iotdb/confignode/conf/ConfigNodeConf.java      |  25 ++-
 .../confignode/conf/ConfigNodeDescriptor.java      |   9 +-
 .../confignode/conf/ConfigNodeStartupCheck.java    |  36 +++-
 .../iotdb/confignode/manager/ConfigManager.java    |  13 +-
 .../iotdb/confignode/manager/NodeManager.java      |   9 +-
 .../consensus/request/ConfigRequestSerDeTest.java  |  12 +-
 .../iotdb/confignode/persistence/NodeInfoTest.java |   3 +-
 .../confignode/persistence/PartitionInfoTest.java  |   3 +-
 .../thrift/ConfigNodeRPCServiceProcessorTest.java  |  19 +-
 consensus/pom.xml                                  |   5 +
 .../org/apache/iotdb/consensus/common/Peer.java    |  15 ++
 .../common/request/IndexedConsensusRequest.java    |  55 +++++
 .../consensus/multileader/IndexController.java     | 130 ++++++++++++
 .../multileader/MultiLeaderConsensus.java          | 227 +++++++++++++++++++++
 .../multileader/MultiLeaderServerImpl.java         | 139 +++++++++++++
 .../apache/iotdb/consensus/multileader/Utils.java  |  29 +++
 .../asyncLogAppender/AsyncLogAppender.java         |  35 ++++
 .../multileader/service/MultiLeaderRPCService.java |  99 +++++++++
 .../service/MultiLeaderRPCServiceHandler.java      |  51 +++++
 .../service/MultiLeaderRPCServiceMBean.java        |  22 ++
 .../service/MultiLeaderRPCServiceProcessor.java    |  74 +++++++
 .../iotdb/consensus/ratis/RequestMessage.java      |   3 +-
 .../iotdb/commons/concurrent/ThreadName.java       |   2 +
 .../iotdb/commons/consensus/ConsensusGroupId.java  |   8 +-
 .../apache/iotdb/commons/service/ServiceType.java  |   3 +-
 .../iotdb/commons/partition/SerializeTest.java     |   3 +-
 .../commons/utils/ThriftCommonsSerDeUtilsTest.java |   6 +-
 pom.xml                                            |   2 +
 .../resources/conf/iotdb-engine.properties         |   7 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  80 ++++++--
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  15 +-
 ...ensusImpl.java => DataRegionConsensusImpl.java} |  36 ++--
 ...susImpl.java => SchemaRegionConsensusImpl.java} |  40 ++--
 .../consensus/statemachine/BaseStateMachine.java   |  16 +-
 .../statemachine/DataRegionStateMachine.java       |  33 ++-
 .../statemachine/SchemaRegionStateMachine.java     |  14 +-
 .../scheduler/FragmentInstanceDispatcherImpl.java  |  23 ++-
 .../java/org/apache/iotdb/db/service/DataNode.java |  12 +-
 .../service/thrift/impl/InternalServiceImpl.java   |  46 +++--
 .../mpp/plan/plan/FragmentInstanceSerdeTest.java   |   6 +-
 .../iotdb/db/service/InternalServiceImplTest.java  |  24 ++-
 .../datanode1conf/iotdb-engine.properties          |   3 +-
 .../datanode2conf/iotdb-engine.properties          |   3 +-
 .../datanode3conf/iotdb-engine.properties          |   3 +-
 thrift-commons/src/main/thrift/common.thrift       |   6 +-
 .../src/main/thrift/confignode.thrift              |  24 ++-
 thrift-multi-leader-consensus/pom.xml              |  47 +++++
 .../src/main/thrift/mutlileader.thrift             |  34 +++
 49 files changed, 1330 insertions(+), 191 deletions(-)

diff --git a/confignode/src/assembly/resources/conf/iotdb-confignode.properties b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
index 2ca585b2d1..310fed9224 100644
--- a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
+++ b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
@@ -54,14 +54,20 @@ target_confignode=0.0.0.0:22277
 # All parameters in Consensus protocol configuration is unmodifiable after ConfigNode starts for the first time.
 # And these parameters should be consistent within the ConfigNodeGroup.
 
-
-# DataNode consensus protocol type
+# DataRegion consensus protocol type
 # These consensus protocols are currently supported:
 # 1. org.apache.iotdb.consensus.standalone.StandAloneConsensus(No protocol, only supports stand-alone machine)
 # 2. org.apache.iotdb.consensus.ratis.RatisConsensus(Raft protocol)
+# 3. org.apache.iotdb.consensus.multileader.MultiLeaderConsensus(Raft protocol)
 # Datatype: String
-# data_node_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
+# data_region_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
 
+# SchemaRegion consensus protocol type
+# These consensus protocols are currently supported:
+# 1. org.apache.iotdb.consensus.standalone.StandAloneConsensus(No protocol, only supports stand-alone machine)
+# 2. org.apache.iotdb.consensus.ratis.RatisConsensus(Raft protocol)
+# Datatype: String
+# schema_region_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
 
 ####################
 ### PartitionSlot configuration
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
index 923fe5c700..5bc9c9aefb 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
@@ -58,8 +58,13 @@ public class ConfigNodeConf {
   private final String configNodeConsensusProtocolClass =
       "org.apache.iotdb.consensus.ratis.RatisConsensus";
 
-  /** DataNode Regions consensus protocol */
-  private String dataNodeConsensusProtocolClass = "org.apache.iotdb.consensus.ratis.RatisConsensus";
+  /** DataNode data region consensus protocol */
+  private String dataRegionConsensusProtocolClass =
+      "org.apache.iotdb.consensus.ratis.RatisConsensus";
+
+  /** DataNode schema region consensus protocol */
+  private String schemaRegionConsensusProtocolClass =
+      "org.apache.iotdb.consensus.ratis.RatisConsensus";
 
   /**
    * ClientManager will have so many selector threads (TAsyncClientManager) to distribute to its
@@ -308,12 +313,20 @@ public class ConfigNodeConf {
     return configNodeConsensusProtocolClass;
   }
 
-  public String getDataNodeConsensusProtocolClass() {
-    return dataNodeConsensusProtocolClass;
+  public String getDataRegionConsensusProtocolClass() {
+    return dataRegionConsensusProtocolClass;
+  }
+
+  public void setDataRegionConsensusProtocolClass(String dataRegionConsensusProtocolClass) {
+    this.dataRegionConsensusProtocolClass = dataRegionConsensusProtocolClass;
+  }
+
+  public String getSchemaRegionConsensusProtocolClass() {
+    return schemaRegionConsensusProtocolClass;
   }
 
-  public void setDataNodeConsensusProtocolClass(String dataNodeConsensusProtocolClass) {
-    this.dataNodeConsensusProtocolClass = dataNodeConsensusProtocolClass;
+  public void setSchemaRegionConsensusProtocolClass(String schemaRegionConsensusProtocolClass) {
+    this.schemaRegionConsensusProtocolClass = schemaRegionConsensusProtocolClass;
   }
 
   public int getThriftServerAwaitTimeForStopService() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 240021ae40..4952a335d3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -129,9 +129,14 @@ public class ConfigNodeDescriptor {
           properties.getProperty(
               "series_partition_executor_class", conf.getSeriesPartitionExecutorClass()));
 
-      conf.setDataNodeConsensusProtocolClass(
+      conf.setDataRegionConsensusProtocolClass(
           properties.getProperty(
-              "data_node_consensus_protocol_class", conf.getDataNodeConsensusProtocolClass()));
+              "data_region_consensus_protocol_class", conf.getDataRegionConsensusProtocolClass()));
+
+      conf.setSchemaRegionConsensusProtocolClass(
+          properties.getProperty(
+              "schema_region_consensus_protocol_class",
+              conf.getSchemaRegionConsensusProtocolClass()));
 
       conf.setRpcAdvancedCompressionEnable(
           Boolean.parseBoolean(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
index 7480d78e0c..7a98ca4912 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
@@ -95,7 +95,7 @@ public class ConfigNodeStartupCheck {
 
     // When the DataNode consensus protocol is set to StandAlone,
     // the replication factor must be 1
-    if (conf.getDataNodeConsensusProtocolClass()
+    if (conf.getDataRegionConsensusProtocolClass()
         .equals("org.apache.iotdb.consensus.standalone.StandAloneConsensus")) {
       if (conf.getSchemaReplicationFactor() != 1) {
         throw new ConfigurationException(
@@ -183,7 +183,8 @@ public class ConfigNodeStartupCheck {
             new TConfigNodeLocation(
                 new TEndPoint(conf.getRpcAddress(), conf.getRpcPort()),
                 new TEndPoint(conf.getRpcAddress(), conf.getConsensusPort())),
-            conf.getDataNodeConsensusProtocolClass(),
+            conf.getDataRegionConsensusProtocolClass(),
+            conf.getSchemaRegionConsensusProtocolClass(),
             conf.getSeriesPartitionSlotNum(),
             conf.getSeriesPartitionExecutorClass(),
             CommonDescriptor.getInstance().getConfig().getDefaultTTL(),
@@ -215,7 +216,9 @@ public class ConfigNodeStartupCheck {
     systemProperties.setProperty(
         "config_node_consensus_protocol_class", conf.getConfigNodeConsensusProtocolClass());
     systemProperties.setProperty(
-        "data_node_consensus_protocol_class", conf.getDataNodeConsensusProtocolClass());
+        "data_region_consensus_protocol_class", conf.getDataRegionConsensusProtocolClass());
+    systemProperties.setProperty(
+        "schema_region_consensus_protocol_class", conf.getSchemaRegionConsensusProtocolClass());
 
     // PartitionSlot configuration
     systemProperties.setProperty(
@@ -282,15 +285,28 @@ public class ConfigNodeStartupCheck {
           configNodeConsensusProtocolClass);
     }
 
-    String dataNodeConsensusProtocolClass =
-        systemProperties.getProperty("data_node_consensus_protocol_class", null);
-    if (dataNodeConsensusProtocolClass == null) {
+    String dataRegionConsensusProtocolClass =
+        systemProperties.getProperty("data_region_consensus_protocol_class", null);
+    if (dataRegionConsensusProtocolClass == null) {
+      needReWrite = true;
+    } else if (!dataRegionConsensusProtocolClass.equals(
+        conf.getDataRegionConsensusProtocolClass())) {
+      throw new ConfigurationException(
+          "data_region_consensus_protocol_class",
+          conf.getDataRegionConsensusProtocolClass(),
+          dataRegionConsensusProtocolClass);
+    }
+
+    String schemaRegionConsensusProtocolClass =
+        systemProperties.getProperty("schema_region_consensus_protocol_class", null);
+    if (schemaRegionConsensusProtocolClass == null) {
       needReWrite = true;
-    } else if (!dataNodeConsensusProtocolClass.equals(conf.getDataNodeConsensusProtocolClass())) {
+    } else if (!schemaRegionConsensusProtocolClass.equals(
+        conf.getSchemaRegionConsensusProtocolClass())) {
       throw new ConfigurationException(
-          "data_node_consensus_protocol_class",
-          conf.getDataNodeConsensusProtocolClass(),
-          dataNodeConsensusProtocolClass);
+          "schema_region_consensus_protocol_class",
+          conf.getSchemaRegionConsensusProtocolClass(),
+          schemaRegionConsensusProtocolClass);
     }
 
     // PartitionSlot configuration
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 278859b9c4..d2ee15fc8b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -480,11 +480,20 @@ public class ConfigManager implements Manager {
     ConfigNodeConf conf = ConfigNodeDescriptor.getInstance().getConf();
     TConfigNodeRegisterResp errorResp = new TConfigNodeRegisterResp();
     errorResp.setStatus(new TSStatus(TSStatusCode.ERROR_GLOBAL_CONFIG.getStatusCode()));
-    if (!req.getDataNodeConsensusProtocolClass().equals(conf.getDataNodeConsensusProtocolClass())) {
+    if (!req.getDataRegionConsensusProtocolClass()
+        .equals(conf.getDataRegionConsensusProtocolClass())) {
       errorResp
           .getStatus()
           .setMessage(
-              "Reject register, please ensure that the data_node_consensus_protocol_class are consistent.");
+              "Reject register, please ensure that the data_region_consensus_protocol_class are consistent.");
+      return errorResp;
+    }
+    if (!req.getSchemaRegionConsensusProtocolClass()
+        .equals(conf.getSchemaRegionConsensusProtocolClass())) {
+      errorResp
+          .getStatus()
+          .setMessage(
+              "Reject register, please ensure that the schema_region_consensus_protocol_class are consistent.");
       return errorResp;
     }
     if (req.getSeriesPartitionSlotNum() != conf.getSeriesPartitionSlotNum()) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index b5d778e8b5..10c3698a9f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.confignode.manager;
 
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.confignode.client.AsyncDataNodeClientPool;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.read.GetDataNodeInfoReq;
@@ -61,8 +60,10 @@ public class NodeManager {
   private void setGlobalConfig(DataNodeConfigurationResp dataSet) {
     // Set TGlobalConfig
     TGlobalConfig globalConfig = new TGlobalConfig();
-    globalConfig.setDataNodeConsensusProtocolClass(
-        ConfigNodeDescriptor.getInstance().getConf().getDataNodeConsensusProtocolClass());
+    globalConfig.setDataRegionConsensusProtocolClass(
+        ConfigNodeDescriptor.getInstance().getConf().getDataRegionConsensusProtocolClass());
+    globalConfig.setSchemaRegionConsensusProtocolClass(
+        ConfigNodeDescriptor.getInstance().getConf().getSchemaRegionConsensusProtocolClass());
     globalConfig.setSeriesPartitionSlotNum(
         ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionSlotNum());
     globalConfig.setSeriesPartitionExecutorClass(
@@ -149,7 +150,7 @@ public class NodeManager {
 
     // Return PartitionRegionId
     resp.setPartitionRegionId(
-        ConsensusGroupId.convertToTConsensusGroupId(getConsensusManager().getConsensusGroupId()));
+        getConsensusManager().getConsensusGroupId().convertToTConsensusGroupId());
 
     // Return online ConfigNodes
     resp.setConfigNodeList(nodeInfo.getOnlineConfigNodes());
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java
index 89f9cdfefa..ad95524b9e 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java
@@ -85,7 +85,8 @@ public class ConfigRequestSerDeTest {
     dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6667));
     dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
     dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777));
-    dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 7777));
+    dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+    dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010));
     RegisterDataNodeReq req0 = new RegisterDataNodeReq(dataNodeLocation);
     req0.serialize(buffer);
     buffer.flip();
@@ -204,7 +205,8 @@ public class ConfigRequestSerDeTest {
     dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6667));
     dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
     dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777));
-    dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+    dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+    dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010));
 
     CreateRegionsReq req0 = new CreateRegionsReq();
     TRegionReplicaSet dataRegionSet = new TRegionReplicaSet();
@@ -242,7 +244,8 @@ public class ConfigRequestSerDeTest {
     dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6667));
     dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
     dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777));
-    dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+    dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+    dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010));
 
     String storageGroup = "root.sg0";
     TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(10);
@@ -303,7 +306,8 @@ public class ConfigRequestSerDeTest {
     dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6667));
     dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
     dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777));
-    dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+    dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+    dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010));
 
     String storageGroup = "root.sg0";
     TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(10);
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
index 2caf683a1b..438a539167 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
@@ -97,6 +97,7 @@ public class NodeInfoTest {
         new TEndPoint("127.0.0.1", 6600 + flag),
         new TEndPoint("127.0.0.1", 7700 + flag),
         new TEndPoint("127.0.0.1", 8800 + flag),
-        new TEndPoint("127.0.0.1", 9900 + flag));
+        new TEndPoint("127.0.0.1", 9900 + flag),
+        new TEndPoint("127.0.0.1", 1000 + flag));
   }
 }
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
index 6e120afd86..e4286e72dc 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
@@ -144,7 +144,8 @@ public class PartitionInfoTest {
       tDataNodeLocation.setExternalEndPoint(new TEndPoint("127.0.0.1", 6000 + i));
       tDataNodeLocation.setInternalEndPoint(new TEndPoint("127.0.0.1", 7000 + i));
       tDataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("127.0.0.1", 8000 + i));
-      tDataNodeLocation.setConsensusEndPoint(new TEndPoint("127.0.0.1", 9000 + i));
+      tDataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("127.0.0.1", 9000 + i));
+      tDataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("127.0.0.1", 10000 + i));
       dataNodeLocations.add(tDataNodeLocation);
     }
     tRegionReplicaSet.setDataNodeLocations(dataNodeLocations);
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
index bc172462ab..f052fdca20 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
@@ -119,8 +119,11 @@ public class ConfigNodeRPCServiceProcessorTest {
 
   private void checkGlobalConfig(TGlobalConfig globalConfig) {
     Assert.assertEquals(
-        ConfigNodeDescriptor.getInstance().getConf().getDataNodeConsensusProtocolClass(),
-        globalConfig.getDataNodeConsensusProtocolClass());
+        ConfigNodeDescriptor.getInstance().getConf().getDataRegionConsensusProtocolClass(),
+        globalConfig.getDataRegionConsensusProtocolClass());
+    Assert.assertEquals(
+        ConfigNodeDescriptor.getInstance().getConf().getSchemaRegionConsensusProtocolClass(),
+        globalConfig.getSchemaRegionConsensusProtocolClass());
     Assert.assertEquals(
         ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionSlotNum(),
         globalConfig.getSeriesPartitionSlotNum());
@@ -135,7 +138,8 @@ public class ConfigNodeRPCServiceProcessorTest {
       dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6667 + i));
       dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003 + i));
       dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777 + i));
-      dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40010 + i));
+      dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010 + i));
+      dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010 + i));
 
       TDataNodeRegisterReq req = new TDataNodeRegisterReq(dataNodeLocation);
       TDataNodeRegisterResp resp = processor.registerDataNode(req);
@@ -155,7 +159,8 @@ public class ConfigNodeRPCServiceProcessorTest {
     dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6668));
     dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9004));
     dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8778));
-    dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40011));
+    dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40011));
+    dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50011));
 
     TDataNodeRegisterReq req = new TDataNodeRegisterReq(dataNodeLocation);
     TDataNodeRegisterResp resp = processor.registerDataNode(req);
@@ -178,7 +183,8 @@ public class ConfigNodeRPCServiceProcessorTest {
       dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6667 + i));
       dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003 + i));
       dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777 + i));
-      dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40010 + i));
+      dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010 + i));
+      dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010 + i));
       Assert.assertEquals(dataNodeLocation, locationList.get(i).getValue());
     }
 
@@ -192,7 +198,8 @@ public class ConfigNodeRPCServiceProcessorTest {
     dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6668));
     dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9004));
     dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8778));
-    dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40011));
+    dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40011));
+    dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50011));
     Assert.assertEquals(dataNodeLocation, locationMap.get(1));
   }
 
diff --git a/consensus/pom.xml b/consensus/pom.xml
index 7284a5fee7..0d6022ea10 100644
--- a/consensus/pom.xml
+++ b/consensus/pom.xml
@@ -52,6 +52,11 @@
             <version>${project.version}</version>
             <scope>compile</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>thrift-multi-leader-consensus</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
     <properties>
         <maven.compiler.source>8</maven.compiler.source>
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
index 99379156fb..b3bba6dace 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
@@ -21,7 +21,9 @@ package org.apache.iotdb.consensus.common;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 
+import java.nio.ByteBuffer;
 import java.util.Objects;
 
 // TODO Use a mature IDL framework such as Protobuf to manage this structure
@@ -43,6 +45,19 @@ public class Peer {
     return endpoint;
   }
 
+  public void serialize(ByteBuffer buffer) {
+    ThriftCommonsSerDeUtils.serializeTConsensusGroupId(
+        groupId.convertToTConsensusGroupId(), buffer);
+    ThriftCommonsSerDeUtils.serializeTEndPoint(endpoint, buffer);
+  }
+
+  public static Peer Deserialize(ByteBuffer buffer) {
+    return new Peer(
+        ConsensusGroupId.Factory.createFromTConsensusGroupId(
+            ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(buffer)),
+        ThriftCommonsSerDeUtils.deserializeTEndPoint(buffer));
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
new file mode 100644
index 0000000000..a1dccbfb60
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.common.request;
+
+import java.nio.ByteBuffer;
+
+/** only used for multi-leader consensus */
+public class IndexedConsensusRequest implements IConsensusRequest {
+
+  private final long minSyncIndex;
+  private final long currentIndex;
+  private final IConsensusRequest request;
+
+  public IndexedConsensusRequest(long minSyncIndex, long currentIndex, IConsensusRequest request) {
+    this.minSyncIndex = minSyncIndex;
+    this.currentIndex = currentIndex;
+    this.request = request;
+  }
+
+  @Override
+  public void serializeRequest(ByteBuffer buffer) {
+    buffer.putLong(minSyncIndex);
+    buffer.putLong(currentIndex);
+    request.serializeRequest(buffer);
+  }
+
+  public IConsensusRequest getRequest() {
+    return request;
+  }
+
+  public long getMinSyncIndex() {
+    return minSyncIndex;
+  }
+
+  public long getCurrentIndex() {
+    return currentIndex;
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/IndexController.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/IndexController.java
new file mode 100644
index 0000000000..6bad07da41
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/IndexController.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.io.File;
+import java.io.IOException;
+
+/** An index controller class to balance the performance degradation of frequent disk I/O. */
+@ThreadSafe
+public class IndexController {
+
+  private final Logger logger = LoggerFactory.getLogger(IndexController.class);
+  private static final int FLUSH_INTERVAL = 500;
+
+  private volatile long lastFlushedIndex;
+  private volatile long currentIndex;
+
+  private final String storageDir;
+  private final String prefix;
+  private final boolean incrementIntervalAfterRestart;
+
+  public IndexController(String storageDir, String prefix, boolean incrementIntervalAfterRestart) {
+    this.storageDir = storageDir;
+    this.prefix = prefix + '-';
+    this.incrementIntervalAfterRestart = incrementIntervalAfterRestart;
+    restore();
+  }
+
+  public synchronized long incrementAndGet() {
+    currentIndex++;
+    checkPersist();
+    return currentIndex;
+  }
+
+  public synchronized long updateAndGet(int index) {
+    currentIndex = Math.max(currentIndex, index);
+    checkPersist();
+    return currentIndex;
+  }
+
+  public long getCurrentIndex() {
+    return currentIndex;
+  }
+
+  private void checkPersist() {
+    if (currentIndex - lastFlushedIndex >= FLUSH_INTERVAL) {
+      persist();
+    }
+  }
+
+  private void persist() {
+    File oldFile = new File(storageDir, prefix + lastFlushedIndex);
+    File newFile = new File(storageDir, prefix + currentIndex);
+    try {
+      if (oldFile.exists()) {
+        FileUtils.moveFile(oldFile, newFile);
+      }
+      logger.info(
+          "Version file updated, previous: {}, current: {}",
+          oldFile.getAbsolutePath(),
+          newFile.getAbsolutePath());
+      lastFlushedIndex = currentIndex;
+    } catch (IOException e) {
+      logger.error("Error occurred when flushing next version.", e);
+    }
+  }
+
+  private void restore() {
+    File directory = new File(storageDir);
+    File[] versionFiles = directory.listFiles((dir, name) -> name.startsWith(prefix));
+    File versionFile;
+    if (versionFiles != null && versionFiles.length > 0) {
+      long maxVersion = 0;
+      int maxVersionIndex = 0;
+      for (int i = 0; i < versionFiles.length; i++) {
+        long fileVersion = Long.parseLong(versionFiles[i].getName().split("-")[1]);
+        if (fileVersion > maxVersion) {
+          maxVersion = fileVersion;
+          maxVersionIndex = i;
+        }
+      }
+      lastFlushedIndex = maxVersion;
+      for (int i = 0; i < versionFiles.length; i++) {
+        if (i != maxVersionIndex) {
+          versionFiles[i].delete();
+        }
+      }
+    } else {
+      versionFile = new File(directory, prefix + "0");
+      lastFlushedIndex = 0;
+      try {
+        if (!versionFile.createNewFile()) {
+          logger.warn("Cannot create new version file {}", versionFile);
+        }
+      } catch (IOException e) {
+        logger.error("Error occurred when creating new file {}.", versionFile.getName(), e);
+      }
+    }
+    if (incrementIntervalAfterRestart) {
+      // prevent overlapping in case of failure
+      currentIndex = lastFlushedIndex + FLUSH_INTERVAL;
+      persist();
+    } else {
+      currentIndex = lastFlushedIndex;
+    }
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
new file mode 100644
index 0000000000..4010e7b75c
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.consensus.IStateMachine.Registry;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
+import org.apache.iotdb.consensus.multileader.service.MultiLeaderRPCService;
+import org.apache.iotdb.consensus.multileader.service.MultiLeaderRPCServiceProcessor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class MultiLeaderConsensus implements IConsensus {
+
+  private final Logger logger = LoggerFactory.getLogger(MultiLeaderConsensus.class);
+
+  private final TEndPoint thisNode;
+  private final File storageDir;
+  private final IStateMachine.Registry registry;
+  private final Map<ConsensusGroupId, MultiLeaderServerImpl> stateMachineMap =
+      new ConcurrentHashMap<>();
+  private final MultiLeaderRPCService service;
+
+  public MultiLeaderConsensus(TEndPoint thisNode, File storageDir, Registry registry) {
+    this.thisNode = thisNode;
+    this.storageDir = storageDir;
+    this.registry = registry;
+    this.service = new MultiLeaderRPCService(thisNode);
+  }
+
+  @Override
+  public void start() throws IOException {
+    initAndRecover();
+    service.initSyncedServiceImpl(new MultiLeaderRPCServiceProcessor(this));
+  }
+
+  private void initAndRecover() throws IOException {
+    if (!storageDir.exists()) {
+      if (!storageDir.mkdirs()) {
+        logger.warn("Unable to create consensus dir at {}", storageDir);
+      }
+    } else {
+      try (DirectoryStream<Path> stream = Files.newDirectoryStream(storageDir.toPath())) {
+        for (Path path : stream) {
+          String[] items = path.getFileName().toString().split("_");
+          ConsensusGroupId consensusGroupId =
+              ConsensusGroupId.Factory.create(
+                  TConsensusGroupType.valueOf(items[0]).getValue(), Integer.parseInt(items[1]));
+          TEndPoint endPoint = new TEndPoint(items[2], Integer.parseInt(items[3]));
+          stateMachineMap.put(
+              consensusGroupId,
+              new MultiLeaderServerImpl(
+                  path.toString(),
+                  new Peer(consensusGroupId, endPoint),
+                  registry.apply(consensusGroupId)));
+        }
+      }
+    }
+  }
+
+  @Override
+  public void stop() throws IOException {}
+
+  @Override
+  public ConsensusWriteResponse write(ConsensusGroupId groupId, IConsensusRequest request) {
+    MultiLeaderServerImpl impl = stateMachineMap.get(groupId);
+    if (impl == null) {
+      return ConsensusWriteResponse.newBuilder()
+          .setException(new ConsensusGroupNotExistException(groupId))
+          .build();
+    }
+    return ConsensusWriteResponse.newBuilder().setStatus(impl.write(request)).build();
+  }
+
+  @Override
+  public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest request) {
+    MultiLeaderServerImpl impl = stateMachineMap.get(groupId);
+    if (impl == null) {
+      return ConsensusReadResponse.newBuilder()
+          .setException(new ConsensusGroupNotExistException(groupId))
+          .build();
+    }
+    return ConsensusReadResponse.newBuilder().setDataSet(impl.read(request)).build();
+  }
+
+  @Override
+  public ConsensusGenericResponse addConsensusGroup(ConsensusGroupId groupId, List<Peer> peers) {
+    AtomicBoolean exist = new AtomicBoolean(true);
+    stateMachineMap.computeIfAbsent(
+        groupId,
+        (k) -> {
+          exist.set(false);
+          String path = buildPeerDir(groupId);
+          File file = new File(path);
+          if (!file.mkdirs()) {
+            logger.warn("Unable to create consensus dir for group {} at {}", groupId, path);
+          }
+          MultiLeaderServerImpl impl =
+              new MultiLeaderServerImpl(
+                  path, new Peer(groupId, thisNode), peers, registry.apply(groupId));
+          impl.start();
+          return impl;
+        });
+    if (exist.get()) {
+      return ConsensusGenericResponse.newBuilder()
+          .setException(new ConsensusGroupAlreadyExistException(groupId))
+          .build();
+    }
+    return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
+  }
+
+  @Override
+  public ConsensusGenericResponse removeConsensusGroup(ConsensusGroupId groupId) {
+    AtomicBoolean exist = new AtomicBoolean(false);
+    stateMachineMap.computeIfPresent(
+        groupId,
+        (k, v) -> {
+          exist.set(true);
+          v.stop();
+          String path = buildPeerDir(groupId);
+          File file = new File(path);
+          if (!file.delete()) {
+            logger.warn("Unable to delete consensus dir for group {} at {}", groupId, path);
+          }
+          return null;
+        });
+
+    if (!exist.get()) {
+      return ConsensusGenericResponse.newBuilder()
+          .setException(new ConsensusGroupNotExistException(groupId))
+          .build();
+    }
+    return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
+  }
+
+  @Override
+  public ConsensusGenericResponse addPeer(ConsensusGroupId groupId, Peer peer) {
+    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+  }
+
+  @Override
+  public ConsensusGenericResponse removePeer(ConsensusGroupId groupId, Peer peer) {
+    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+  }
+
+  @Override
+  public ConsensusGenericResponse changePeer(ConsensusGroupId groupId, List<Peer> newPeers) {
+    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+  }
+
+  @Override
+  public ConsensusGenericResponse transferLeader(ConsensusGroupId groupId, Peer newLeader) {
+    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+  }
+
+  @Override
+  public ConsensusGenericResponse triggerSnapshot(ConsensusGroupId groupId) {
+    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+  }
+
+  @Override
+  public boolean isLeader(ConsensusGroupId groupId) {
+    return true;
+  }
+
+  @Override
+  public Peer getLeader(ConsensusGroupId groupId) {
+    if (!stateMachineMap.containsKey(groupId)) {
+      return null;
+    }
+    return new Peer(groupId, thisNode);
+  }
+
+  public MultiLeaderServerImpl getImpl(ConsensusGroupId groupId) {
+    return stateMachineMap.get(groupId);
+  }
+
+  private String buildPeerDir(ConsensusGroupId groupId) {
+    return storageDir
+        + File.separator
+        + groupId.getType()
+        + "_"
+        + groupId.getId()
+        + "_"
+        + thisNode.getIp()
+        + "_"
+        + thisNode.getPort();
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
new file mode 100644
index 0000000000..a1f73b525d
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+import org.apache.iotdb.consensus.multileader.asyncLogAppender.AsyncLogAppender;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+
+public class MultiLeaderServerImpl {
+
+  private final Logger logger = LoggerFactory.getLogger(MultiLeaderServerImpl.class);
+  private final String configurationFileName = "configuration.dat";
+  private final Peer thisNode;
+  private final IStateMachine stateMachine;
+  private final String storageDir;
+  private List<Peer> configuration;
+  private IndexController currentNodeController;
+  private List<AsyncLogAppender> asyncLogAppenders;
+
+  private static final int DEFAULT_BUFFER_SIZE = 1024 * 2;
+
+  public MultiLeaderServerImpl(
+      String storageDir, Peer thisNode, List<Peer> configuration, IStateMachine stateMachine) {
+    this.storageDir = storageDir;
+    this.thisNode = thisNode;
+    this.stateMachine = stateMachine;
+    this.currentNodeController =
+        new IndexController(storageDir, Utils.fromTEndPointToString(thisNode.getEndpoint()), false);
+    this.configuration = configuration;
+    persistConfiguration();
+    //    configuration.stream().filter(x -> !Objects.equals(x, thisNode)).
+
+  }
+
+  public MultiLeaderServerImpl(String storageDir, Peer thisNode, IStateMachine stateMachine) {
+    this.storageDir = storageDir;
+    this.thisNode = thisNode;
+    this.stateMachine = stateMachine;
+    this.currentNodeController =
+        new IndexController(storageDir, Utils.fromTEndPointToString(thisNode.getEndpoint()), false);
+    this.configuration = new ArrayList<>();
+    recoverConfiguration();
+  }
+
+  public IStateMachine getStateMachine() {
+    return stateMachine;
+  }
+
+  public void start() {
+    stateMachine.start();
+  }
+
+  public void stop() {
+    stateMachine.stop();
+  }
+
+  public TSStatus write(IConsensusRequest request) {
+    synchronized (stateMachine) {
+      IndexedConsensusRequest newRequest =
+          new IndexedConsensusRequest(
+              Long.MAX_VALUE, currentNodeController.incrementAndGet(), request);
+      return stateMachine.write(newRequest);
+    }
+  }
+
+  public DataSet read(IConsensusRequest request) {
+    return stateMachine.read(request);
+  }
+
+  public boolean takeSnapshot(File snapshotDir) {
+    return stateMachine.takeSnapshot(snapshotDir);
+  }
+
+  public void loadSnapshot(File latestSnapshotRootDir) {
+    stateMachine.loadSnapshot(latestSnapshotRootDir);
+  }
+
+  public void persistConfiguration() {
+    ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+    buffer.putInt(configuration.size());
+    for (Peer peer : configuration) {
+      peer.serialize(buffer);
+    }
+    try {
+      Files.write(
+          Paths.get(new File(storageDir, configurationFileName).getAbsolutePath()), buffer.array());
+    } catch (IOException e) {
+      logger.error("Unexpected error occurs when persisting configuration", e);
+    }
+  }
+
+  public void recoverConfiguration() {
+    ByteBuffer buffer;
+    try {
+      buffer =
+          ByteBuffer.wrap(
+              Files.readAllBytes(
+                  Paths.get(new File(storageDir, configurationFileName).getAbsolutePath())));
+      int size = buffer.getInt();
+      for (int i = 0; i < size; i++) {
+        configuration.add(Peer.Deserialize(buffer));
+      }
+    } catch (IOException e) {
+      logger.error("Unexpected error occurs when recovering configuration", e);
+    }
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/Utils.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/Utils.java
new file mode 100644
index 0000000000..a89f47513a
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/Utils.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+
+public class Utils {
+
+  public static String fromTEndPointToString(TEndPoint endpoint) {
+    return String.format("%s-%d", endpoint.getIp(), endpoint.getPort());
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/asyncLogAppender/AsyncLogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/asyncLogAppender/AsyncLogAppender.java
new file mode 100644
index 0000000000..7d53bbb208
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/asyncLogAppender/AsyncLogAppender.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.asyncLogAppender;
+
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.multileader.IndexController;
+import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+public class AsyncLogAppender {
+
+  private MultiLeaderServerImpl impl;
+  private Peer peer;
+  private ArrayBlockingQueue<IConsensusRequest> pendingRequest;
+  private IndexController controller;
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCService.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCService.java
new file mode 100644
index 0000000000..455bd73fe8
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCService.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.service;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.runtime.RPCServiceException;
+import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.commons.service.ThriftService;
+import org.apache.iotdb.commons.service.ThriftServiceThread;
+import org.apache.iotdb.consensus.multileader.thrift.MultiLeaderConsensusIService;
+
+import java.lang.reflect.InvocationTargetException;
+
+public class MultiLeaderRPCService extends ThriftService implements MultiLeaderRPCServiceMBean {
+
+  // TODO make it configurable
+  private static final int RPC_MAX_CONCURRENT_CLIENT_NUM = 65535;
+  private static final int THRIFT_SERVER_AWAIT_TIME_FOR_STOP_SERVICE = 60;
+  private static final boolean IS_RPC_THRIFT_COMPRESSION_ENABLED = false;
+
+  private final TEndPoint thisNode;
+  private MultiLeaderRPCServiceProcessor multiLeaderRPCServiceProcessor;
+
+  public MultiLeaderRPCService(TEndPoint thisNode) {
+    this.thisNode = thisNode;
+  }
+
+  @Override
+  public ServiceType getID() {
+    return ServiceType.MULTI_LEADER_CONSENSUS_SERVICE;
+  }
+
+  @Override
+  public void initSyncedServiceImpl(Object multiLeaderRPCServiceProcessor) {
+    this.multiLeaderRPCServiceProcessor =
+        (MultiLeaderRPCServiceProcessor) multiLeaderRPCServiceProcessor;
+    super.mbeanName =
+        String.format(
+            "%s:%s=%s", this.getClass().getPackage(), IoTDBConstant.JMX_TYPE, getID().getJmxName());
+    super.initSyncedServiceImpl(this.multiLeaderRPCServiceProcessor);
+  }
+
+  @Override
+  public void initTProcessor()
+      throws ClassNotFoundException, IllegalAccessException, InstantiationException,
+          NoSuchMethodException, InvocationTargetException {
+    processor = new MultiLeaderConsensusIService.Processor<>(multiLeaderRPCServiceProcessor);
+  }
+
+  @Override
+  public void initThriftServiceThread()
+      throws IllegalAccessException, InstantiationException, ClassNotFoundException {
+    try {
+      thriftServiceThread =
+          new ThriftServiceThread(
+              processor,
+              getID().getName(),
+              ThreadName.MULTI_LEADER_CONSENSUS_RPC_CLIENT.getName(),
+              getBindIP(),
+              getBindPort(),
+              RPC_MAX_CONCURRENT_CLIENT_NUM,
+              THRIFT_SERVER_AWAIT_TIME_FOR_STOP_SERVICE,
+              new MultiLeaderRPCServiceHandler(multiLeaderRPCServiceProcessor),
+              IS_RPC_THRIFT_COMPRESSION_ENABLED);
+    } catch (RPCServiceException e) {
+      throw new IllegalAccessException(e.getMessage());
+    }
+    thriftServiceThread.setName(ThreadName.MULTI_LEADER_CONSENSUS_RPC_SERVER.getName());
+  }
+
+  @Override
+  public String getBindIP() {
+    return thisNode.getIp();
+  }
+
+  @Override
+  public int getBindPort() {
+    return thisNode.getPort();
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceHandler.java
new file mode 100644
index 0000000000..889ac7d517
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.service;
+
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.transport.TTransport;
+
+public class MultiLeaderRPCServiceHandler implements TServerEventHandler {
+
+  private final MultiLeaderRPCServiceProcessor processor;
+
+  public MultiLeaderRPCServiceHandler(MultiLeaderRPCServiceProcessor processor) {
+    this.processor = processor;
+  }
+
+  @Override
+  public void preServe() {}
+
+  @Override
+  public ServerContext createContext(TProtocol input, TProtocol output) {
+    return null;
+  }
+
+  @Override
+  public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) {
+    processor.handleClientExit();
+  }
+
+  @Override
+  public void processContext(
+      ServerContext serverContext, TTransport inputTransport, TTransport outputTransport) {}
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceMBean.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceMBean.java
new file mode 100644
index 0000000000..9e354ac204
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceMBean.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.service;
+
+public interface MultiLeaderRPCServiceMBean {}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
new file mode 100644
index 0000000000..8cea5f4007
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.service;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.multileader.MultiLeaderConsensus;
+import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
+import org.apache.iotdb.consensus.multileader.thrift.MultiLeaderConsensusIService;
+import org.apache.iotdb.consensus.multileader.thrift.TSyncLogReq;
+import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIService.Iface {
+
+  private final Logger logger = LoggerFactory.getLogger(MultiLeaderRPCServiceProcessor.class);
+
+  private final MultiLeaderConsensus consensus;
+
+  public MultiLeaderRPCServiceProcessor(MultiLeaderConsensus consensus) {
+    this.consensus = consensus;
+  }
+
+  @Override
+  public TSyncLogRes syncLog(TSyncLogReq req) throws TException {
+    ConsensusGroupId groupId =
+        ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+    MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+    if (impl == null) {
+      String message =
+          String.format("Unexpected consensusGroupId %s for TSyncLogReq %s", groupId, req);
+      logger.error(message);
+      TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      status.setMessage(message);
+      return new TSyncLogRes(Collections.singletonList(status));
+    }
+    List<TSStatus> status = new ArrayList<>();
+    synchronized (impl.getStateMachine()) {
+      for (ByteBuffer batch : req.getBatches()) {
+        status.add(impl.getStateMachine().write(new ByteBufferConsensusRequest(batch)));
+      }
+    }
+    return new TSyncLogRes(status);
+  }
+
+  public void handleClientExit() {}
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
index 242573e5be..d1af672b85 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
@@ -46,9 +46,8 @@ public class RequestMessage implements Message {
     if (serializedContent == null) {
       synchronized (this) {
         if (serializedContent == null) {
-          ByteBufferConsensusRequest req;
           if (actualRequest instanceof ByteBufferConsensusRequest) {
-            req = (ByteBufferConsensusRequest) actualRequest;
+            ByteBufferConsensusRequest req = (ByteBufferConsensusRequest) actualRequest;
             serializedContent = ByteString.copyFrom(req.getContent());
             req.getContent().flip();
           } else {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index a9222f36ad..c691022c78 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -74,6 +74,8 @@ public enum ThreadName {
   CLUSTER_MONITOR("ClusterMonitor"),
   CONFIG_NODE_RPC_SERVER("ConfigNodeRpcServer"),
   CONFIG_NODE_RPC_CLIENT("ConfigNodeRPC-Client"),
+  MULTI_LEADER_CONSENSUS_RPC_CLIENT("MultiLeaderConsensusRPC-Client"),
+  MULTI_LEADER_CONSENSUS_RPC_SERVER("MultiLeaderConsensusRPC-Server"),
   DATA_NODE_MANAGEMENT_RPC_SERVER("DataNodeManagementRPC"),
   DATA_NODE_MANAGEMENT_RPC_CLIENT("DataNodeManagementRPC-Client"),
   Cluster_Monitor("ClusterMonitor"),
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
index 0bc5556c15..2a5ab281f2 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
@@ -37,6 +37,10 @@ public abstract class ConsensusGroupId {
   // return specific type
   public abstract TConsensusGroupType getType();
 
+  public TConsensusGroupId convertToTConsensusGroupId() {
+    return new TConsensusGroupId(getType(), getId());
+  }
+
   @Override
   public int hashCode() {
     return Objects.hash(getType(), getId());
@@ -82,10 +86,6 @@ public abstract class ConsensusGroupId {
     }
   }
 
-  public static TConsensusGroupId convertToTConsensusGroupId(ConsensusGroupId consensusGroupId) {
-    return new TConsensusGroupId(consensusGroupId.getType(), consensusGroupId.getId());
-  }
-
   public static String formatTConsensusGroupId(TConsensusGroupId groupId) {
     StringBuilder format = new StringBuilder();
 
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
index 6b068618ce..edece800fe 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
@@ -72,7 +72,8 @@ public enum ServiceType {
   FRAGMENT_INSTANCE_MANAGER_SERVICE("Fragment instance manager", "FragmentInstanceManager"),
   DATA_BLOCK_MANAGER_SERVICE("Data block manager", "DataBlockManager"),
   INTERNAL_SERVICE("Internal Service", "InternalService"),
-  PROCEDURE_SERVICE("Procedure  Service", "ProcedureService");
+  PROCEDURE_SERVICE("Procedure Service", "ProcedureService"),
+  MULTI_LEADER_CONSENSUS_SERVICE("Multi Leader consensus Service", "MultiLeaderRPCService");
 
   private final String name;
   private final String jmxName;
diff --git a/node-commons/src/test/java/org/apache/iotdb/commons/partition/SerializeTest.java b/node-commons/src/test/java/org/apache/iotdb/commons/partition/SerializeTest.java
index 1e032efc62..1f4f3152d6 100644
--- a/node-commons/src/test/java/org/apache/iotdb/commons/partition/SerializeTest.java
+++ b/node-commons/src/test/java/org/apache/iotdb/commons/partition/SerializeTest.java
@@ -68,7 +68,8 @@ public abstract class SerializeTest {
       tDataNodeLocation.setExternalEndPoint(new TEndPoint("127.0.0.1", 6000 + i));
       tDataNodeLocation.setInternalEndPoint(new TEndPoint("127.0.0.1", 7000 + i));
       tDataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("127.0.0.1", 8000 + i));
-      tDataNodeLocation.setConsensusEndPoint(new TEndPoint("127.0.0.1", 9000 + i));
+      tDataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("127.0.0.1", 9000 + i));
+      tDataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("127.0.0.1", 10000 + i));
       dataNodeLocations.add(tDataNodeLocation);
     }
     tRegionReplicaSet.setDataNodeLocations(dataNodeLocations);
diff --git a/node-commons/src/test/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtilsTest.java b/node-commons/src/test/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtilsTest.java
index 0abde8a0fa..94a2a81427 100644
--- a/node-commons/src/test/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtilsTest.java
+++ b/node-commons/src/test/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtilsTest.java
@@ -58,7 +58,8 @@ public class ThriftCommonsSerDeUtilsTest {
     dataNodeLocation0.setExternalEndPoint(new TEndPoint("0.0.0.0", 6667));
     dataNodeLocation0.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
     dataNodeLocation0.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777));
-    dataNodeLocation0.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+    dataNodeLocation0.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+    dataNodeLocation0.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010));
     ThriftCommonsSerDeUtils.serializeTDataNodeLocation(dataNodeLocation0, buffer);
     buffer.flip();
     TDataNodeLocation dataNodeLocation1 =
@@ -108,7 +109,8 @@ public class ThriftCommonsSerDeUtilsTest {
       dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6667 + i));
       dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003 + i));
       dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777 + i));
-      dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40010 + i));
+      dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010 + i));
+      dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010 + i));
       regionReplicaSet0.getDataNodeLocations().add(dataNodeLocation);
     }
     ThriftCommonsSerDeUtils.serializeTRegionReplicaSet(regionReplicaSet0, buffer);
diff --git a/pom.xml b/pom.xml
index e9f0829436..4bd7cfe5e2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,6 +84,7 @@
         <module>thrift</module>
         <module>thrift-commons</module>
         <module>thrift-confignode</module>
+        <module>thrift-multi-leader-consensus</module>
         <module>thrift-cluster</module>
         <module>thrift-sync</module>
         <module>thrift-influxdb</module>
@@ -790,6 +791,7 @@
                         <sourceDirectory>thrift/target/generated-sources/thrift</sourceDirectory>
                         <sourceDirectory>thrift-commons/target/generated-sources/thrift</sourceDirectory>
                         <sourceDirectory>thrift-confignode/target/generated-sources/thrift</sourceDirectory>
+                        <sourceDirectory>thrift-multi-leader-consensus/target/generated-sources/thrift</sourceDirectory>
                         <sourceDirectory>thrift-sync/target/generated-sources/thrift</sourceDirectory>
                         <sourceDirectory>thrift-cluster/target/generated-sources/thrift</sourceDirectory>
                         <sourceDirectory>thrift-influxdb/target/generated-sources/thrift</sourceDirectory>
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 3235dd92e4..9aad88b81b 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -51,10 +51,13 @@ internal_ip=127.0.0.1
 # port for coordinator's communication between cluster nodes.
 internal_port=9003
 
+# Datatype: int
+# port for consensus's communication for data region between cluster nodes.
+data_region_consensus_port=40010
 
 # Datatype: int
-# port for consensus's communication between cluster nodes.
-consensus_port=40010
+# port for consensus's communication for schema region between cluster nodes.
+schema_region_consensus_port=50010
 
 # comma-separated {IP/DOMAIN}:internal_port pairs
 # Data nodes store config nodes ip and port to communicate with config nodes.
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 8cf84cf10c..3ff42792bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -263,6 +263,12 @@ public class IoTDBConfig {
   /** Consensus directory. */
   private String consensusDir = DEFAULT_BASE_DIR + File.separator + "consensus";
 
+  private String dataRegionConsensusDir =
+      DEFAULT_BASE_DIR + File.separator + "consensus" + File.separator + "dataRegion";
+
+  private String schemaRegionConsensusDir =
+      DEFAULT_BASE_DIR + File.separator + "consensus" + File.separator + "schemaRegion";
+
   /** Maximum MemTable number. Invalid when enableMemControl is true. */
   private int maxMemtableNumber = 0;
 
@@ -812,8 +818,11 @@ public class IoTDBConfig {
   /** Internal port for coordinator */
   private int internalPort = 9003;
 
-  /** Internal port for consensus protocol */
-  private int consensusPort = 40010;
+  /** Internal port for dataRegion consensus protocol */
+  private int dataRegionConsensusPort = 40010;
+
+  /** Internal port for schemaRegion consensus protocol */
+  private int schemaRegionConsensusPort = 50010;
 
   /** Ip and port of config nodes. */
   private List<TEndPoint> configNodeList =
@@ -823,11 +832,20 @@ public class IoTDBConfig {
   private long joinClusterTimeOutMs = TimeUnit.SECONDS.toMillis(5);
 
   /**
-   * The consensus protocol class. The Datanode should communicate with ConfigNode on startup and
-   * set this variable so that the correct class name can be obtained later when the consensus layer
-   * singleton is initialized
+   * The consensus protocol class for data region. The Datanode should communicate with ConfigNode
+   * on startup and set this variable so that the correct class name can be obtained later when the
+   * data region consensus layer singleton is initialized
+   */
+  private String dataRegionConsensusProtocolClass =
+      "org.apache.iotdb.consensus.ratis.RatisConsensus";
+
+  /**
+   * The consensus protocol class for schema region. The Datanode should communicate with ConfigNode
+   * on startup and set this variable so that the correct class name can be obtained later when the
+   * schema region consensus layer singleton is initialized
    */
-  private String consensusProtocolClass = "org.apache.iotdb.consensus.ratis.RatisConsensus";
+  private String schemaRegionConsensusProtocolClass =
+      "org.apache.iotdb.consensus.ratis.RatisConsensus";
 
   /**
    * The series partition executor class. The Datanode should communicate with ConfigNode on startup
@@ -1161,6 +1179,24 @@ public class IoTDBConfig {
 
   public void setConsensusDir(String consensusDir) {
     this.consensusDir = consensusDir;
+    setDataRegionConsensusDir(consensusDir + File.separator + "dataRegion");
+    setSchemaRegionConsensusDir(consensusDir + File.separator + "schemaRegion");
+  }
+
+  public String getDataRegionConsensusDir() {
+    return dataRegionConsensusDir;
+  }
+
+  public void setDataRegionConsensusDir(String dataRegionConsensusDir) {
+    this.dataRegionConsensusDir = dataRegionConsensusDir;
+  }
+
+  public String getSchemaRegionConsensusDir() {
+    return schemaRegionConsensusDir;
+  }
+
+  public void setSchemaRegionConsensusDir(String schemaRegionConsensusDir) {
+    this.schemaRegionConsensusDir = schemaRegionConsensusDir;
   }
 
   public String getExtDir() {
@@ -2620,12 +2656,20 @@ public class IoTDBConfig {
     this.internalPort = internalPort;
   }
 
-  public int getConsensusPort() {
-    return consensusPort;
+  public int getDataRegionConsensusPort() {
+    return dataRegionConsensusPort;
   }
 
-  public void setConsensusPort(int consensusPort) {
-    this.consensusPort = consensusPort;
+  public void setDataRegionConsensusPort(int dataRegionConsensusPort) {
+    this.dataRegionConsensusPort = dataRegionConsensusPort;
+  }
+
+  public int getSchemaRegionConsensusPort() {
+    return schemaRegionConsensusPort;
+  }
+
+  public void setSchemaRegionConsensusPort(int schemaRegionConsensusPort) {
+    this.schemaRegionConsensusPort = schemaRegionConsensusPort;
   }
 
   public List<TEndPoint> getConfigNodeList() {
@@ -2644,12 +2688,20 @@ public class IoTDBConfig {
     this.joinClusterTimeOutMs = joinClusterTimeOutMs;
   }
 
-  public String getConsensusProtocolClass() {
-    return consensusProtocolClass;
+  public String getDataRegionConsensusProtocolClass() {
+    return dataRegionConsensusProtocolClass;
+  }
+
+  public void setDataRegionConsensusProtocolClass(String dataRegionConsensusProtocolClass) {
+    this.dataRegionConsensusProtocolClass = dataRegionConsensusProtocolClass;
+  }
+
+  public String getSchemaRegionConsensusProtocolClass() {
+    return schemaRegionConsensusProtocolClass;
   }
 
-  public void setConsensusProtocolClass(String consensusProtocolClass) {
-    this.consensusProtocolClass = consensusProtocolClass;
+  public void setSchemaRegionConsensusProtocolClass(String schemaRegionConsensusProtocolClass) {
+    this.schemaRegionConsensusProtocolClass = schemaRegionConsensusProtocolClass;
   }
 
   public String getSeriesPartitionExecutorClass() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 40b680bfed..517dcab53c 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1555,9 +1555,17 @@ public class IoTDBDescriptor {
         Integer.parseInt(
             properties.getProperty("internal_port", Integer.toString(conf.getInternalPort()))));
 
-    conf.setConsensusPort(
+    conf.setDataRegionConsensusPort(
         Integer.parseInt(
-            properties.getProperty("consensus_port", Integer.toString(conf.getConsensusPort()))));
+            properties.getProperty(
+                "data_region_consensus_port",
+                Integer.toString(conf.getDataRegionConsensusPort()))));
+
+    conf.setSchemaRegionConsensusPort(
+        Integer.parseInt(
+            properties.getProperty(
+                "schema_region_consensus_port",
+                Integer.toString(conf.getSchemaRegionConsensusPort()))));
   }
 
   public void loadShuffleProps(Properties properties) {
@@ -1608,7 +1616,8 @@ public class IoTDBDescriptor {
   // These configurations are received from config node when registering
   public void loadGlobalConfig(TGlobalConfig globalConfig) {
     conf.setSeriesPartitionExecutorClass(globalConfig.getSeriesPartitionExecutorClass());
-    conf.setConsensusProtocolClass(globalConfig.getDataNodeConsensusProtocolClass());
+    conf.setDataRegionConsensusProtocolClass(globalConfig.getDataRegionConsensusProtocolClass());
+    conf.setSchemaRegionConsensusDir(globalConfig.getSchemaRegionConsensusProtocolClass());
     conf.setSeriesPartitionSlotNum(globalConfig.getSeriesPartitionSlotNum());
     conf.setPartitionInterval(globalConfig.timePartitionInterval);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusImpl.java b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
similarity index 60%
copy from server/src/main/java/org/apache/iotdb/db/consensus/ConsensusImpl.java
copy to server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index 96fd26af89..692e229f9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -21,15 +21,12 @@ package org.apache.iotdb.db.consensus;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.consensus.DataRegionId;
-import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.statemachine.DataRegionStateMachine;
-import org.apache.iotdb.db.consensus.statemachine.SchemaRegionStateMachine;
 import org.apache.iotdb.db.engine.StorageEngineV2;
-import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 
 import java.io.File;
 
@@ -37,41 +34,32 @@ import java.io.File;
  * We can use ConsensusImpl.getInstance() to obtain a consensus layer reference for reading and
  * writing
  */
-public class ConsensusImpl {
+public class DataRegionConsensusImpl {
 
-  private ConsensusImpl() {}
+  private DataRegionConsensusImpl() {}
 
   public static IConsensus getInstance() {
-    return ConsensusImplHolder.INSTANCE;
+    return DataRegionConsensusImplHolder.INSTANCE;
   }
 
-  private static class ConsensusImplHolder {
+  private static class DataRegionConsensusImplHolder {
 
     private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
     private static final IConsensus INSTANCE =
         ConsensusFactory.getConsensusImpl(
-                conf.getConsensusProtocolClass(),
-                new TEndPoint(conf.getInternalIp(), conf.getConsensusPort()),
-                new File(conf.getConsensusDir()),
-                gid -> {
-                  switch (gid.getType()) {
-                    case SchemaRegion:
-                      return new SchemaRegionStateMachine(
-                          SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) gid));
-                    case DataRegion:
-                      return new DataRegionStateMachine(
-                          StorageEngineV2.getInstance().getDataRegion((DataRegionId) gid));
-                  }
-                  throw new IllegalArgumentException(
-                      String.format("Unexpected consensusGroup %s", gid));
-                })
+                conf.getDataRegionConsensusProtocolClass(),
+                new TEndPoint(conf.getInternalIp(), conf.getDataRegionConsensusPort()),
+                new File(conf.getDataRegionConsensusDir()),
+                gid ->
+                    new DataRegionStateMachine(
+                        StorageEngineV2.getInstance().getDataRegion((DataRegionId) gid)))
             .orElseThrow(
                 () ->
                     new IllegalArgumentException(
                         String.format(
                             ConsensusFactory.CONSTRUCT_FAILED_MSG,
-                            conf.getConsensusProtocolClass())));
+                            conf.getDataRegionConsensusProtocolClass())));
 
-    private ConsensusImplHolder() {}
+    private DataRegionConsensusImplHolder() {}
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusImpl.java b/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
similarity index 56%
rename from server/src/main/java/org/apache/iotdb/db/consensus/ConsensusImpl.java
rename to server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
index 96fd26af89..87e99f98a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
@@ -20,58 +20,42 @@
 package org.apache.iotdb.db.consensus;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.consensus.statemachine.DataRegionStateMachine;
 import org.apache.iotdb.db.consensus.statemachine.SchemaRegionStateMachine;
-import org.apache.iotdb.db.engine.StorageEngineV2;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 
 import java.io.File;
 
-/**
- * We can use ConsensusImpl.getInstance() to obtain a consensus layer reference for reading and
- * writing
- */
-public class ConsensusImpl {
+public class SchemaRegionConsensusImpl {
 
-  private ConsensusImpl() {}
+  private SchemaRegionConsensusImpl() {}
 
   public static IConsensus getInstance() {
-    return ConsensusImplHolder.INSTANCE;
+    return SchemaRegionConsensusImplHolder.INSTANCE;
   }
 
-  private static class ConsensusImplHolder {
+  private static class SchemaRegionConsensusImplHolder {
 
     private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
     private static final IConsensus INSTANCE =
         ConsensusFactory.getConsensusImpl(
-                conf.getConsensusProtocolClass(),
-                new TEndPoint(conf.getInternalIp(), conf.getConsensusPort()),
-                new File(conf.getConsensusDir()),
-                gid -> {
-                  switch (gid.getType()) {
-                    case SchemaRegion:
-                      return new SchemaRegionStateMachine(
-                          SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) gid));
-                    case DataRegion:
-                      return new DataRegionStateMachine(
-                          StorageEngineV2.getInstance().getDataRegion((DataRegionId) gid));
-                  }
-                  throw new IllegalArgumentException(
-                      String.format("Unexpected consensusGroup %s", gid));
-                })
+                conf.getSchemaRegionConsensusProtocolClass(),
+                new TEndPoint(conf.getInternalIp(), conf.getSchemaRegionConsensusPort()),
+                new File(conf.getSchemaRegionConsensusDir()),
+                gid ->
+                    new SchemaRegionStateMachine(
+                        SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) gid)))
             .orElseThrow(
                 () ->
                     new IllegalArgumentException(
                         String.format(
                             ConsensusFactory.CONSTRUCT_FAILED_MSG,
-                            conf.getConsensusProtocolClass())));
+                            conf.getDataRegionConsensusProtocolClass())));
 
-    private ConsensusImplHolder() {}
+    private SchemaRegionConsensusImplHolder() {}
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
index 51616c30bd..0d22c6df90 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
@@ -19,13 +19,11 @@
 
 package org.apache.iotdb.db.consensus.statemachine;
 
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
-import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,18 +32,6 @@ public abstract class BaseStateMachine implements IStateMachine, IStateMachine.E
 
   private static final Logger logger = LoggerFactory.getLogger(BaseStateMachine.class);
 
-  @Override
-  public TSStatus write(IConsensusRequest request) {
-    try {
-      return write(getFragmentInstance(request));
-    } catch (IllegalArgumentException e) {
-      logger.error(e.getMessage(), e);
-      return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
-    }
-  }
-
-  protected abstract TSStatus write(FragmentInstance fragmentInstance);
-
   @Override
   public DataSet read(IConsensusRequest request) {
     try {
@@ -58,7 +44,7 @@ public abstract class BaseStateMachine implements IStateMachine, IStateMachine.E
 
   protected abstract DataSet read(FragmentInstance fragmentInstance);
 
-  private FragmentInstance getFragmentInstance(IConsensusRequest request) {
+  protected FragmentInstance getFragmentInstance(IConsensusRequest request) {
     FragmentInstance instance;
     if (request instanceof ByteBufferConsensusRequest) {
       instance =
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 7fe44559f7..f0b03a451f 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.db.engine.StorageEngineV2;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.exception.BatchProcessException;
@@ -31,11 +33,13 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.DeleteRegionNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,8 +75,31 @@ public class DataRegionStateMachine extends BaseStateMachine {
   public void loadSnapshot(File latestSnapshotRootDir) {}
 
   @Override
-  protected TSStatus write(FragmentInstance fragmentInstance) {
-    PlanNode planNode = fragmentInstance.getFragment().getRoot();
+  public TSStatus write(IConsensusRequest request) {
+    FragmentInstance fi;
+    long minSyncIndex = Long.MAX_VALUE;
+    long currentIndex = -1;
+    try {
+      if (request instanceof IndexedConsensusRequest) {
+        fi = getFragmentInstance(((IndexedConsensusRequest) request).getRequest());
+        minSyncIndex = ((IndexedConsensusRequest) request).getMinSyncIndex();
+        currentIndex = ((IndexedConsensusRequest) request).getCurrentIndex();
+      } else {
+        fi = getFragmentInstance(request);
+      }
+      PlanNode planNode = fi.getFragment().getRoot();
+      if (planNode instanceof InsertNode) {
+        logger.error("minIndex", minSyncIndex);
+        logger.error("currentIndex", currentIndex);
+      }
+      return write(planNode);
+    } catch (IllegalArgumentException e) {
+      logger.error(e.getMessage(), e);
+      return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+    }
+  }
+
+  protected TSStatus write(PlanNode planNode) {
     try {
       if (planNode instanceof InsertRowNode) {
         region.insert((InsertRowNode) planNode);
@@ -95,7 +122,7 @@ public class DataRegionStateMachine extends BaseStateMachine {
     } catch (BatchProcessException e) {
       return RpcUtils.getStatus(Arrays.asList(e.getFailingStatus()));
     } catch (Exception e) {
-      logger.error("Error in executing plan node: {}", planNode);
+      logger.error("Error in executing plan node: {}", planNode, e);
       return StatusUtils.EXECUTE_STATEMENT_ERROR;
     }
     return StatusUtils.OK;
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
index 888346907a..dfd6f1ef5e 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
@@ -21,11 +21,13 @@ package org.apache.iotdb.db.consensus.statemachine;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.metadata.visitor.SchemaExecutionVisitor;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,11 +61,15 @@ public class SchemaRegionStateMachine extends BaseStateMachine {
   public void loadSnapshot(File latestSnapshotRootDir) {}
 
   @Override
-  protected TSStatus write(FragmentInstance fragmentInstance) {
+  public TSStatus write(IConsensusRequest request) {
     logger.info("Execute write plan in SchemaRegionStateMachine");
-    PlanNode planNode = fragmentInstance.getFragment().getRoot();
-    TSStatus status = planNode.accept(new SchemaExecutionVisitor(), schemaRegion);
-    return status;
+    try {
+      PlanNode planNode = getFragmentInstance(request).getFragment().getRoot();
+      return planNode.accept(new SchemaExecutionVisitor(), schemaRegion);
+    } catch (IllegalArgumentException e) {
+      logger.error(e.getMessage(), e);
+      return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+    }
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 8922de59e3..35637f8140 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -24,9 +24,11 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.consensus.ConsensusImpl;
+import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
+import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
@@ -167,8 +169,16 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
             instance.getRegionReplicaSet().getRegionId());
     switch (instance.getType()) {
       case READ:
-        FragmentInstanceInfo info =
-            (FragmentInstanceInfo) ConsensusImpl.getInstance().read(groupId, instance).getDataset();
+        FragmentInstanceInfo info;
+        if (groupId instanceof SchemaRegionId) {
+          info =
+              (FragmentInstanceInfo)
+                  SchemaRegionConsensusImpl.getInstance().read(groupId, instance).getDataset();
+        } else {
+          info =
+              (FragmentInstanceInfo)
+                  DataRegionConsensusImpl.getInstance().read(groupId, instance).getDataset();
+        }
         return !info.getState().isFailed();
       case WRITE:
         PlanNode planNode = instance.getFragment().getRoot();
@@ -179,7 +189,12 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
             throw new FragmentInstanceDispatchException(e);
           }
         }
-        ConsensusWriteResponse resp = ConsensusImpl.getInstance().write(groupId, instance);
+        ConsensusWriteResponse resp;
+        if (groupId instanceof SchemaRegionId) {
+          resp = SchemaRegionConsensusImpl.getInstance().write(groupId, instance);
+        } else {
+          resp = DataRegionConsensusImpl.getInstance().write(groupId, instance);
+        }
         return TSStatusCode.SUCCESS_STATUS.getStatusCode() == resp.getStatus().getCode();
     }
     throw new UnsupportedOperationException(
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 8875150162..37079c0355 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -35,7 +35,8 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.IoTDBStartCheck;
 import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor;
-import org.apache.iotdb.db.consensus.ConsensusImpl;
+import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
+import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.engine.StorageEngineV2;
 import org.apache.iotdb.db.engine.cache.CacheHitRatioMonitor;
 import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
@@ -150,8 +151,10 @@ public class DataNode implements DataNodeMBean {
             new TEndPoint(config.getInternalIp(), config.getInternalPort()));
         location.setDataBlockManagerEndPoint(
             new TEndPoint(config.getInternalIp(), config.getDataBlockManagerPort()));
-        location.setConsensusEndPoint(
-            new TEndPoint(config.getInternalIp(), config.getConsensusPort()));
+        location.setDataRegionConsensusEndPoint(
+            new TEndPoint(config.getInternalIp(), config.getDataRegionConsensusPort()));
+        location.setSchemaRegionConsensusEndPoint(
+            new TEndPoint(config.getInternalIp(), config.getSchemaRegionConsensusPort()));
         req.setDataNodeLocation(location);
 
         TDataNodeRegisterResp dataNodeRegisterResp = configNodeClient.registerDataNode(req);
@@ -228,7 +231,8 @@ public class DataNode implements DataNodeMBean {
 
     try {
       // TODO: Start consensus layer in some where else
-      ConsensusImpl.getInstance().start();
+      DataRegionConsensusImpl.getInstance().start();
+      SchemaRegionConsensusImpl.getInstance().start();
     } catch (IOException e) {
       throw new StartupException(e);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index 1a4661d5b4..1144cb807f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -32,13 +32,13 @@ import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
-import org.apache.iotdb.db.consensus.ConsensusImpl;
+import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
+import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.engine.StorageEngineV2;
 import org.apache.iotdb.db.exception.DataRegionException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
@@ -91,7 +91,6 @@ public class InternalServiceImpl implements InternalService.Iface {
   private static final Logger LOGGER = LoggerFactory.getLogger(InternalServiceImpl.class);
   private final SchemaEngine schemaEngine = SchemaEngine.getInstance();
   private final StorageEngineV2 storageEngine = StorageEngineV2.getInstance();
-  private final IConsensus consensusImpl = ConsensusImpl.getInstance();
 
   public InternalServiceImpl() {
     super();
@@ -105,9 +104,16 @@ public class InternalServiceImpl implements InternalService.Iface {
         ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
     switch (type) {
       case READ:
-        ConsensusReadResponse readResp =
-            ConsensusImpl.getInstance()
-                .read(groupId, new ByteBufferConsensusRequest(req.fragmentInstance.body));
+        ConsensusReadResponse readResp;
+        if (groupId instanceof SchemaRegionId) {
+          readResp =
+              SchemaRegionConsensusImpl.getInstance()
+                  .read(groupId, new ByteBufferConsensusRequest(req.fragmentInstance.body));
+        } else {
+          readResp =
+              DataRegionConsensusImpl.getInstance()
+                  .read(groupId, new ByteBufferConsensusRequest(req.fragmentInstance.body));
+        }
         FragmentInstanceInfo info = (FragmentInstanceInfo) readResp.getDataset();
         return new TSendFragmentInstanceResp(!info.getState().isFailed());
       case WRITE:
@@ -126,7 +132,11 @@ public class InternalServiceImpl implements InternalService.Iface {
             return response;
           }
         }
-        resp = ConsensusImpl.getInstance().write(groupId, fragmentInstance);
+        if (groupId instanceof SchemaRegionId) {
+          resp = SchemaRegionConsensusImpl.getInstance().write(groupId, fragmentInstance);
+        } else {
+          resp = DataRegionConsensusImpl.getInstance().write(groupId, fragmentInstance);
+        }
         // TODO need consider more status
         response.setAccepted(
             TSStatusCode.SUCCESS_STATUS.getStatusCode() == resp.getStatus().getCode());
@@ -183,12 +193,12 @@ public class InternalServiceImpl implements InternalService.Iface {
       for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
         TEndPoint endpoint =
             new TEndPoint(
-                dataNodeLocation.getConsensusEndPoint().getIp(),
-                dataNodeLocation.getConsensusEndPoint().getPort());
+                dataNodeLocation.getSchemaRegionConsensusEndPoint().getIp(),
+                dataNodeLocation.getSchemaRegionConsensusEndPoint().getPort());
         peers.add(new Peer(schemaRegionId, endpoint));
       }
       ConsensusGenericResponse consensusGenericResponse =
-          consensusImpl.addConsensusGroup(schemaRegionId, peers);
+          SchemaRegionConsensusImpl.getInstance().addConsensusGroup(schemaRegionId, peers);
       if (consensusGenericResponse.isSuccess()) {
         tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
       } else {
@@ -221,12 +231,12 @@ public class InternalServiceImpl implements InternalService.Iface {
       for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
         TEndPoint endpoint =
             new TEndPoint(
-                dataNodeLocation.getConsensusEndPoint().getIp(),
-                dataNodeLocation.getConsensusEndPoint().getPort());
+                dataNodeLocation.getDataRegionConsensusEndPoint().getIp(),
+                dataNodeLocation.getDataRegionConsensusEndPoint().getPort());
         peers.add(new Peer(dataRegionId, endpoint));
       }
       ConsensusGenericResponse consensusGenericResponse =
-          consensusImpl.addConsensusGroup(dataRegionId, peers);
+          DataRegionConsensusImpl.getInstance().addConsensusGroup(dataRegionId, peers);
       if (consensusGenericResponse.isSuccess()) {
         tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
       } else {
@@ -285,7 +295,15 @@ public class InternalServiceImpl implements InternalService.Iface {
     PlanFragment planFragment = new PlanFragment(planFragmentId, deleteRegionNode);
     FragmentInstance fragmentInstance =
         new FragmentInstance(planFragment, fragmentInstanceId, null, QueryType.WRITE);
-    return consensusImpl.write(consensusGroupId, fragmentInstance).getStatus();
+    if (consensusGroupId instanceof SchemaRegionId) {
+      return SchemaRegionConsensusImpl.getInstance()
+          .write(consensusGroupId, fragmentInstance)
+          .getStatus();
+    } else {
+      return DataRegionConsensusImpl.getInstance()
+          .write(consensusGroupId, fragmentInstance)
+          .getStatus();
+    }
   }
 
   public void handleClientExit() {}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java
index 53950b092a..7385b9cbd1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java
@@ -57,7 +57,8 @@ public class FragmentInstanceSerdeTest {
     dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6667));
     dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
     dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777));
-    dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+    dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+    dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010));
 
     PlanFragmentId planFragmentId = new PlanFragmentId("test", -1);
     FragmentInstance fragmentInstance =
@@ -86,7 +87,8 @@ public class FragmentInstanceSerdeTest {
     dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6667));
     dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
     dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777));
-    dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+    dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+    dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010));
 
     PlanFragmentId planFragmentId = new PlanFragmentId("test2", 1);
     FragmentInstance fragmentInstance =
diff --git a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
index 3fafea7cfd..c1a00026af 100644
--- a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
@@ -31,7 +31,8 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.consensus.ConsensusImpl;
+import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
+import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.localconfignode.LocalConfigNode;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
@@ -78,23 +79,24 @@ public class InternalServiceImplTest {
     IoTDB.configManager.init();
     configNode = LocalConfigNode.getInstance();
     configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln"));
-    ConsensusImpl.getInstance().start();
+    DataRegionConsensusImpl.getInstance().start();
+    SchemaRegionConsensusImpl.getInstance().start();
   }
 
   @Before
   public void setUp() throws Exception {
     TRegionReplicaSet regionReplicaSet = genRegionReplicaSet();
-    ConsensusImpl.getInstance()
+    SchemaRegionConsensusImpl.getInstance()
         .addConsensusGroup(
             ConsensusGroupId.Factory.createFromTConsensusGroupId(regionReplicaSet.getRegionId()),
-            genPeerList(regionReplicaSet));
+            genSchemaRegionPeerList(regionReplicaSet));
     internalServiceImpl = new InternalServiceImpl();
   }
 
   @After
   public void tearDown() throws Exception {
     TRegionReplicaSet regionReplicaSet = genRegionReplicaSet();
-    ConsensusImpl.getInstance()
+    SchemaRegionConsensusImpl.getInstance()
         .removeConsensusGroup(
             ConsensusGroupId.Factory.createFromTConsensusGroupId(regionReplicaSet.getRegionId()));
     FileUtils.deleteFully(new File(conf.getConsensusDir()));
@@ -102,7 +104,8 @@ public class InternalServiceImplTest {
 
   @AfterClass
   public static void tearDownAfterClass() throws IOException, StorageEngineException {
-    ConsensusImpl.getInstance().stop();
+    DataRegionConsensusImpl.getInstance().stop();
+    SchemaRegionConsensusImpl.getInstance().stop();
     IoTDB.configManager.clear();
     EnvironmentUtils.cleanEnv();
   }
@@ -360,20 +363,23 @@ public class InternalServiceImplTest {
             .setInternalEndPoint(new TEndPoint(conf.getInternalIp(), conf.getInternalPort()))
             .setDataBlockManagerEndPoint(
                 new TEndPoint(conf.getInternalIp(), conf.getDataBlockManagerPort()))
-            .setConsensusEndPoint(new TEndPoint(conf.getInternalIp(), conf.getConsensusPort())));
+            .setDataRegionConsensusEndPoint(
+                new TEndPoint(conf.getInternalIp(), conf.getDataRegionConsensusPort()))
+            .setSchemaRegionConsensusEndPoint(
+                new TEndPoint(conf.getInternalIp(), conf.getSchemaRegionConsensusPort())));
 
     // construct fragmentInstance
     return new TRegionReplicaSet(
         new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 0), dataNodeList);
   }
 
-  private List<Peer> genPeerList(TRegionReplicaSet regionReplicaSet) {
+  private List<Peer> genSchemaRegionPeerList(TRegionReplicaSet regionReplicaSet) {
     List<Peer> peerList = new ArrayList<>();
     for (TDataNodeLocation node : regionReplicaSet.getDataNodeLocations()) {
       peerList.add(
           new Peer(
               new SchemaRegionId(regionReplicaSet.getRegionId().getId()),
-              node.getConsensusEndPoint()));
+              node.getSchemaRegionConsensusEndPoint()));
     }
     return peerList;
   }
diff --git a/server/src/test/resources/datanode1conf/iotdb-engine.properties b/server/src/test/resources/datanode1conf/iotdb-engine.properties
index 0f02933bbc..cced42b412 100644
--- a/server/src/test/resources/datanode1conf/iotdb-engine.properties
+++ b/server/src/test/resources/datanode1conf/iotdb-engine.properties
@@ -23,7 +23,8 @@ internal_ip=127.0.0.1
 rpc_port=6667
 data_block_manager_port=8777
 internal_port=9003
-consensus_port=40010
+data_region_consensus_port=40010
+schema_region_consensus_port=50010
 
 config_nodes=0.0.0.0:22277,0.0.0.0:22279,0.0.0.0:22281
 
diff --git a/server/src/test/resources/datanode2conf/iotdb-engine.properties b/server/src/test/resources/datanode2conf/iotdb-engine.properties
index 429c025a6b..c570d996b7 100644
--- a/server/src/test/resources/datanode2conf/iotdb-engine.properties
+++ b/server/src/test/resources/datanode2conf/iotdb-engine.properties
@@ -23,7 +23,8 @@ internal_ip=127.0.0.1
 rpc_port=6669
 data_block_manager_port=8779
 internal_port=9005
-consensus_port=40012
+data_region_consensus_port=40012
+schema_region_consensus_port=50012
 
 config_nodes=0.0.0.0:22277,0.0.0.0:22279,0.0.0.0:22281
 
diff --git a/server/src/test/resources/datanode3conf/iotdb-engine.properties b/server/src/test/resources/datanode3conf/iotdb-engine.properties
index 5a390729a6..956faee018 100644
--- a/server/src/test/resources/datanode3conf/iotdb-engine.properties
+++ b/server/src/test/resources/datanode3conf/iotdb-engine.properties
@@ -23,7 +23,8 @@ internal_ip=127.0.0.1
 rpc_port=6671
 data_block_manager_port=8781
 internal_port=9007
-consensus_port=40014
+data_region_consensus_port=40014
+schema_region_consensus_port=50014
 
 config_nodes=0.0.0.0:22277,0.0.0.0:22279,0.0.0.0:22281
 
diff --git a/thrift-commons/src/main/thrift/common.thrift b/thrift-commons/src/main/thrift/common.thrift
index 8ffdf3ff40..cfd5fc4157 100644
--- a/thrift-commons/src/main/thrift/common.thrift
+++ b/thrift-commons/src/main/thrift/common.thrift
@@ -74,8 +74,10 @@ struct TDataNodeLocation {
   3: required TEndPoint internalEndPoint
   // TEndPoint for transfering data between DataNodes
   4: required TEndPoint dataBlockManagerEndPoint
-  // TEndPoint for DataNode's ConsensusLayer
-  5: required TEndPoint consensusEndPoint
+  // TEndPoint for DataNode's dataRegion consensus protocol
+  5: required TEndPoint dataRegionConsensusEndPoint
+  // TEndPoint for DataNode's schemaRegion consensus protocol
+  6: required TEndPoint schemaRegionConsensusEndPoint
 }
 
 struct THeartbeatResp {
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 7fa76108e8..6d817f29ce 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -30,10 +30,11 @@ struct TDataNodeRegisterReq {
 }
 
 struct TGlobalConfig {
-  1: required string dataNodeConsensusProtocolClass
-  2: required i32 seriesPartitionSlotNum
-  3: required string seriesPartitionExecutorClass
-  4: required i64 timePartitionInterval
+  1: required string dataRegionConsensusProtocolClass
+  2: required string schemaRegionConsensusProtocolClass
+  3: required i32 seriesPartitionSlotNum
+  4: required string seriesPartitionExecutorClass
+  5: required i64 timePartitionInterval
 }
 
 struct TDataNodeRegisterResp {
@@ -159,13 +160,14 @@ struct TCheckUserPrivilegesReq{
 // ConfigNode
 struct TConfigNodeRegisterReq {
   1: required common.TConfigNodeLocation configNodeLocation
-  2: required string dataNodeConsensusProtocolClass
-  3: required i32 seriesPartitionSlotNum
-  4: required string seriesPartitionExecutorClass
-  5: required i64 defaultTTL
-  6: required i64 timePartitionInterval
-  7: required i32 schemaReplicationFactor
-  8: required i32 dataReplicationFactor
+  2: required string dataRegionConsensusProtocolClass
+  3: required string schemaRegionConsensusProtocolClass
+  4: required i32 seriesPartitionSlotNum
+  5: required string seriesPartitionExecutorClass
+  6: required i64 defaultTTL
+  7: required i64 timePartitionInterval
+  8: required i32 schemaReplicationFactor
+  9: required i32 dataReplicationFactor
 }
 
 struct TConfigNodeRegisterResp {
diff --git a/thrift-multi-leader-consensus/pom.xml b/thrift-multi-leader-consensus/pom.xml
new file mode 100644
index 0000000000..7504f96722
--- /dev/null
+++ b/thrift-multi-leader-consensus/pom.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.iotdb</groupId>
+        <artifactId>iotdb-parent</artifactId>
+        <version>0.14.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <artifactId>thrift-multi-leader-consensus</artifactId>
+    <name>rpc-thrift-multi-leader-consensus</name>
+    <description>Rpc modules for multi leader consensus algorithm</description>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.thrift</groupId>
+            <artifactId>libthrift</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-thrift-commons</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <version>3.2.0</version>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>${project.build.directory}/generated-sources/thrift</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
new file mode 100644
index 0000000000..10c0195506
--- /dev/null
+++ b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+include "common.thrift"
+namespace java org.apache.iotdb.consensus.multileader.thrift
+
+struct TSyncLogReq {
+  1: required common.TConsensusGroupId consensusGroupId
+  2: required list<binary> batches
+}
+
+struct TSyncLogRes {
+  1: required list<common.TSStatus> status
+}
+
+service MultiLeaderConsensusIService {
+  TSyncLogRes syncLog(TSyncLogReq req)
+}
\ No newline at end of file