You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/05/17 14:34:24 UTC

[iotdb] branch jira3188 created (now 3b5fb85164)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a change to branch jira3188
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 3b5fb85164 split dataRegion/schemaRegion consensus protocol && multiLeaderConsensus init

This branch includes the following new commits:

     new 3b5fb85164 split dataRegion/schemaRegion consensus protocol && multiLeaderConsensus init

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: split dataRegion/schemaRegion consensus protocol && multiLeaderConsensus init

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch jira3188
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3b5fb851641f8c90a9d0711b1c9669a4c5237b23
Author: LebronAl <TX...@gmail.com>
AuthorDate: Tue May 17 22:34:03 2022 +0800

    split dataRegion/schemaRegion consensus protocol && multiLeaderConsensus init
---
 .../resources/conf/iotdb-confignode.properties     |  12 +-
 .../iotdb/confignode/conf/ConfigNodeConf.java      |  25 ++-
 .../confignode/conf/ConfigNodeDescriptor.java      |   9 +-
 .../confignode/conf/ConfigNodeStartupCheck.java    |  36 +++-
 .../iotdb/confignode/manager/ConfigManager.java    |  13 +-
 .../iotdb/confignode/manager/NodeManager.java      |   9 +-
 .../consensus/request/ConfigRequestSerDeTest.java  |  12 +-
 .../iotdb/confignode/persistence/NodeInfoTest.java |   3 +-
 .../confignode/persistence/PartitionInfoTest.java  |   3 +-
 .../thrift/ConfigNodeRPCServiceProcessorTest.java  |  19 +-
 consensus/pom.xml                                  |   5 +
 .../org/apache/iotdb/consensus/common/Peer.java    |  15 ++
 .../common/request/IndexedConsensusRequest.java    |  55 +++++
 .../consensus/multileader/IndexController.java     | 130 ++++++++++++
 .../multileader/MultiLeaderConsensus.java          | 227 +++++++++++++++++++++
 .../multileader/MultiLeaderServerImpl.java         | 139 +++++++++++++
 .../apache/iotdb/consensus/multileader/Utils.java  |  29 +++
 .../asyncLogAppender/AsyncLogAppender.java         |  35 ++++
 .../multileader/service/MultiLeaderRPCService.java |  99 +++++++++
 .../service/MultiLeaderRPCServiceHandler.java      |  51 +++++
 .../service/MultiLeaderRPCServiceMBean.java        |  22 ++
 .../service/MultiLeaderRPCServiceProcessor.java    |  74 +++++++
 .../iotdb/consensus/ratis/RequestMessage.java      |   3 +-
 .../iotdb/commons/concurrent/ThreadName.java       |   2 +
 .../iotdb/commons/consensus/ConsensusGroupId.java  |   8 +-
 .../apache/iotdb/commons/service/ServiceType.java  |   3 +-
 .../iotdb/commons/partition/SerializeTest.java     |   3 +-
 .../commons/utils/ThriftCommonsSerDeUtilsTest.java |   6 +-
 pom.xml                                            |   2 +
 .../resources/conf/iotdb-engine.properties         |   7 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  80 ++++++--
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  15 +-
 ...ensusImpl.java => DataRegionConsensusImpl.java} |  36 ++--
 ...susImpl.java => SchemaRegionConsensusImpl.java} |  40 ++--
 .../consensus/statemachine/BaseStateMachine.java   |  16 +-
 .../statemachine/DataRegionStateMachine.java       |  33 ++-
 .../statemachine/SchemaRegionStateMachine.java     |  14 +-
 .../scheduler/FragmentInstanceDispatcherImpl.java  |  23 ++-
 .../java/org/apache/iotdb/db/service/DataNode.java |  12 +-
 .../service/thrift/impl/InternalServiceImpl.java   |  46 +++--
 .../mpp/plan/plan/FragmentInstanceSerdeTest.java   |   6 +-
 .../iotdb/db/service/InternalServiceImplTest.java  |  24 ++-
 .../datanode1conf/iotdb-engine.properties          |   3 +-
 .../datanode2conf/iotdb-engine.properties          |   3 +-
 .../datanode3conf/iotdb-engine.properties          |   3 +-
 thrift-commons/src/main/thrift/common.thrift       |   6 +-
 .../src/main/thrift/confignode.thrift              |  24 ++-
 thrift-multi-leader-consensus/pom.xml              |  47 +++++
 .../src/main/thrift/mutlileader.thrift             |  34 +++
 49 files changed, 1330 insertions(+), 191 deletions(-)

diff --git a/confignode/src/assembly/resources/conf/iotdb-confignode.properties b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
index 2ca585b2d1..310fed9224 100644
--- a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
+++ b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
@@ -54,14 +54,20 @@ target_confignode=0.0.0.0:22277
 # All parameters in Consensus protocol configuration is unmodifiable after ConfigNode starts for the first time.
 # And these parameters should be consistent within the ConfigNodeGroup.
 
-
-# DataNode consensus protocol type
+# DataRegion consensus protocol type
 # These consensus protocols are currently supported:
 # 1. org.apache.iotdb.consensus.standalone.StandAloneConsensus(No protocol, only supports stand-alone machine)
 # 2. org.apache.iotdb.consensus.ratis.RatisConsensus(Raft protocol)
+# 3. org.apache.iotdb.consensus.multileader.MultiLeaderConsensus(Raft protocol)
 # Datatype: String
-# data_node_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
+# data_region_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
 
+# SchemaRegion consensus protocol type
+# These consensus protocols are currently supported:
+# 1. org.apache.iotdb.consensus.standalone.StandAloneConsensus(No protocol, only supports stand-alone machine)
+# 2. org.apache.iotdb.consensus.ratis.RatisConsensus(Raft protocol)
+# Datatype: String
+# schema_region_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
 
 ####################
 ### PartitionSlot configuration
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
index 923fe5c700..5bc9c9aefb 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
@@ -58,8 +58,13 @@ public class ConfigNodeConf {
   private final String configNodeConsensusProtocolClass =
       "org.apache.iotdb.consensus.ratis.RatisConsensus";
 
-  /** DataNode Regions consensus protocol */
-  private String dataNodeConsensusProtocolClass = "org.apache.iotdb.consensus.ratis.RatisConsensus";
+  /** DataNode data region consensus protocol */
+  private String dataRegionConsensusProtocolClass =
+      "org.apache.iotdb.consensus.ratis.RatisConsensus";
+
+  /** DataNode schema region consensus protocol */
+  private String schemaRegionConsensusProtocolClass =
+      "org.apache.iotdb.consensus.ratis.RatisConsensus";
 
   /**
    * ClientManager will have so many selector threads (TAsyncClientManager) to distribute to its
@@ -308,12 +313,20 @@ public class ConfigNodeConf {
     return configNodeConsensusProtocolClass;
   }
 
-  public String getDataNodeConsensusProtocolClass() {
-    return dataNodeConsensusProtocolClass;
+  public String getDataRegionConsensusProtocolClass() {
+    return dataRegionConsensusProtocolClass;
+  }
+
+  public void setDataRegionConsensusProtocolClass(String dataRegionConsensusProtocolClass) {
+    this.dataRegionConsensusProtocolClass = dataRegionConsensusProtocolClass;
+  }
+
+  public String getSchemaRegionConsensusProtocolClass() {
+    return schemaRegionConsensusProtocolClass;
   }
 
-  public void setDataNodeConsensusProtocolClass(String dataNodeConsensusProtocolClass) {
-    this.dataNodeConsensusProtocolClass = dataNodeConsensusProtocolClass;
+  public void setSchemaRegionConsensusProtocolClass(String schemaRegionConsensusProtocolClass) {
+    this.schemaRegionConsensusProtocolClass = schemaRegionConsensusProtocolClass;
   }
 
   public int getThriftServerAwaitTimeForStopService() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 240021ae40..4952a335d3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -129,9 +129,14 @@ public class ConfigNodeDescriptor {
           properties.getProperty(
               "series_partition_executor_class", conf.getSeriesPartitionExecutorClass()));
 
-      conf.setDataNodeConsensusProtocolClass(
+      conf.setDataRegionConsensusProtocolClass(
           properties.getProperty(
-              "data_node_consensus_protocol_class", conf.getDataNodeConsensusProtocolClass()));
+              "data_region_consensus_protocol_class", conf.getDataRegionConsensusProtocolClass()));
+
+      conf.setSchemaRegionConsensusProtocolClass(
+          properties.getProperty(
+              "schema_region_consensus_protocol_class",
+              conf.getSchemaRegionConsensusProtocolClass()));
 
       conf.setRpcAdvancedCompressionEnable(
           Boolean.parseBoolean(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
index 7480d78e0c..7a98ca4912 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
@@ -95,7 +95,7 @@ public class ConfigNodeStartupCheck {
 
     // When the DataNode consensus protocol is set to StandAlone,
     // the replication factor must be 1
-    if (conf.getDataNodeConsensusProtocolClass()
+    if (conf.getDataRegionConsensusProtocolClass()
         .equals("org.apache.iotdb.consensus.standalone.StandAloneConsensus")) {
       if (conf.getSchemaReplicationFactor() != 1) {
         throw new ConfigurationException(
@@ -183,7 +183,8 @@ public class ConfigNodeStartupCheck {
             new TConfigNodeLocation(
                 new TEndPoint(conf.getRpcAddress(), conf.getRpcPort()),
                 new TEndPoint(conf.getRpcAddress(), conf.getConsensusPort())),
-            conf.getDataNodeConsensusProtocolClass(),
+            conf.getDataRegionConsensusProtocolClass(),
+            conf.getSchemaRegionConsensusProtocolClass(),
             conf.getSeriesPartitionSlotNum(),
             conf.getSeriesPartitionExecutorClass(),
             CommonDescriptor.getInstance().getConfig().getDefaultTTL(),
@@ -215,7 +216,9 @@ public class ConfigNodeStartupCheck {
     systemProperties.setProperty(
         "config_node_consensus_protocol_class", conf.getConfigNodeConsensusProtocolClass());
     systemProperties.setProperty(
-        "data_node_consensus_protocol_class", conf.getDataNodeConsensusProtocolClass());
+        "data_region_consensus_protocol_class", conf.getDataRegionConsensusProtocolClass());
+    systemProperties.setProperty(
+        "schema_region_consensus_protocol_class", conf.getSchemaRegionConsensusProtocolClass());
 
     // PartitionSlot configuration
     systemProperties.setProperty(
@@ -282,15 +285,28 @@ public class ConfigNodeStartupCheck {
           configNodeConsensusProtocolClass);
     }
 
-    String dataNodeConsensusProtocolClass =
-        systemProperties.getProperty("data_node_consensus_protocol_class", null);
-    if (dataNodeConsensusProtocolClass == null) {
+    String dataRegionConsensusProtocolClass =
+        systemProperties.getProperty("data_region_consensus_protocol_class", null);
+    if (dataRegionConsensusProtocolClass == null) {
+      needReWrite = true;
+    } else if (!dataRegionConsensusProtocolClass.equals(
+        conf.getDataRegionConsensusProtocolClass())) {
+      throw new ConfigurationException(
+          "data_region_consensus_protocol_class",
+          conf.getDataRegionConsensusProtocolClass(),
+          dataRegionConsensusProtocolClass);
+    }
+
+    String schemaRegionConsensusProtocolClass =
+        systemProperties.getProperty("schema_region_consensus_protocol_class", null);
+    if (schemaRegionConsensusProtocolClass == null) {
       needReWrite = true;
-    } else if (!dataNodeConsensusProtocolClass.equals(conf.getDataNodeConsensusProtocolClass())) {
+    } else if (!schemaRegionConsensusProtocolClass.equals(
+        conf.getSchemaRegionConsensusProtocolClass())) {
       throw new ConfigurationException(
-          "data_node_consensus_protocol_class",
-          conf.getDataNodeConsensusProtocolClass(),
-          dataNodeConsensusProtocolClass);
+          "schema_region_consensus_protocol_class",
+          conf.getSchemaRegionConsensusProtocolClass(),
+          schemaRegionConsensusProtocolClass);
     }
 
     // PartitionSlot configuration
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 278859b9c4..d2ee15fc8b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -480,11 +480,20 @@ public class ConfigManager implements Manager {
     ConfigNodeConf conf = ConfigNodeDescriptor.getInstance().getConf();
     TConfigNodeRegisterResp errorResp = new TConfigNodeRegisterResp();
     errorResp.setStatus(new TSStatus(TSStatusCode.ERROR_GLOBAL_CONFIG.getStatusCode()));
-    if (!req.getDataNodeConsensusProtocolClass().equals(conf.getDataNodeConsensusProtocolClass())) {
+    if (!req.getDataRegionConsensusProtocolClass()
+        .equals(conf.getDataRegionConsensusProtocolClass())) {
       errorResp
           .getStatus()
           .setMessage(
-              "Reject register, please ensure that the data_node_consensus_protocol_class are consistent.");
+              "Reject register, please ensure that the data_region_consensus_protocol_class are consistent.");
+      return errorResp;
+    }
+    if (!req.getSchemaRegionConsensusProtocolClass()
+        .equals(conf.getSchemaRegionConsensusProtocolClass())) {
+      errorResp
+          .getStatus()
+          .setMessage(
+              "Reject register, please ensure that the schema_region_consensus_protocol_class are consistent.");
       return errorResp;
     }
     if (req.getSeriesPartitionSlotNum() != conf.getSeriesPartitionSlotNum()) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index b5d778e8b5..10c3698a9f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.confignode.manager;
 
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.confignode.client.AsyncDataNodeClientPool;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.read.GetDataNodeInfoReq;
@@ -61,8 +60,10 @@ public class NodeManager {
   private void setGlobalConfig(DataNodeConfigurationResp dataSet) {
     // Set TGlobalConfig
     TGlobalConfig globalConfig = new TGlobalConfig();
-    globalConfig.setDataNodeConsensusProtocolClass(
-        ConfigNodeDescriptor.getInstance().getConf().getDataNodeConsensusProtocolClass());
+    globalConfig.setDataRegionConsensusProtocolClass(
+        ConfigNodeDescriptor.getInstance().getConf().getDataRegionConsensusProtocolClass());
+    globalConfig.setSchemaRegionConsensusProtocolClass(
+        ConfigNodeDescriptor.getInstance().getConf().getSchemaRegionConsensusProtocolClass());
     globalConfig.setSeriesPartitionSlotNum(
         ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionSlotNum());
     globalConfig.setSeriesPartitionExecutorClass(
@@ -149,7 +150,7 @@ public class NodeManager {
 
     // Return PartitionRegionId
     resp.setPartitionRegionId(
-        ConsensusGroupId.convertToTConsensusGroupId(getConsensusManager().getConsensusGroupId()));
+        getConsensusManager().getConsensusGroupId().convertToTConsensusGroupId());
 
     // Return online ConfigNodes
     resp.setConfigNodeList(nodeInfo.getOnlineConfigNodes());
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java
index 89f9cdfefa..ad95524b9e 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java
@@ -85,7 +85,8 @@ public class ConfigRequestSerDeTest {
     dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6667));
     dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
     dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777));
-    dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 7777));
+    dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+    dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010));
     RegisterDataNodeReq req0 = new RegisterDataNodeReq(dataNodeLocation);
     req0.serialize(buffer);
     buffer.flip();
@@ -204,7 +205,8 @@ public class ConfigRequestSerDeTest {
     dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6667));
     dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
     dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777));
-    dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+    dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+    dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010));
 
     CreateRegionsReq req0 = new CreateRegionsReq();
     TRegionReplicaSet dataRegionSet = new TRegionReplicaSet();
@@ -242,7 +244,8 @@ public class ConfigRequestSerDeTest {
     dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6667));
     dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
     dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777));
-    dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+    dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+    dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010));
 
     String storageGroup = "root.sg0";
     TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(10);
@@ -303,7 +306,8 @@ public class ConfigRequestSerDeTest {
     dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6667));
     dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
     dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777));
-    dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+    dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+    dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010));
 
     String storageGroup = "root.sg0";
     TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(10);
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
index 2caf683a1b..438a539167 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
@@ -97,6 +97,7 @@ public class NodeInfoTest {
         new TEndPoint("127.0.0.1", 6600 + flag),
         new TEndPoint("127.0.0.1", 7700 + flag),
         new TEndPoint("127.0.0.1", 8800 + flag),
-        new TEndPoint("127.0.0.1", 9900 + flag));
+        new TEndPoint("127.0.0.1", 9900 + flag),
+        new TEndPoint("127.0.0.1", 1000 + flag));
   }
 }
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
index 6e120afd86..e4286e72dc 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
@@ -144,7 +144,8 @@ public class PartitionInfoTest {
       tDataNodeLocation.setExternalEndPoint(new TEndPoint("127.0.0.1", 6000 + i));
       tDataNodeLocation.setInternalEndPoint(new TEndPoint("127.0.0.1", 7000 + i));
       tDataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("127.0.0.1", 8000 + i));
-      tDataNodeLocation.setConsensusEndPoint(new TEndPoint("127.0.0.1", 9000 + i));
+      tDataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("127.0.0.1", 9000 + i));
+      tDataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("127.0.0.1", 10000 + i));
       dataNodeLocations.add(tDataNodeLocation);
     }
     tRegionReplicaSet.setDataNodeLocations(dataNodeLocations);
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
index bc172462ab..f052fdca20 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
@@ -119,8 +119,11 @@ public class ConfigNodeRPCServiceProcessorTest {
 
   private void checkGlobalConfig(TGlobalConfig globalConfig) {
     Assert.assertEquals(
-        ConfigNodeDescriptor.getInstance().getConf().getDataNodeConsensusProtocolClass(),
-        globalConfig.getDataNodeConsensusProtocolClass());
+        ConfigNodeDescriptor.getInstance().getConf().getDataRegionConsensusProtocolClass(),
+        globalConfig.getDataRegionConsensusProtocolClass());
+    Assert.assertEquals(
+        ConfigNodeDescriptor.getInstance().getConf().getSchemaRegionConsensusProtocolClass(),
+        globalConfig.getSchemaRegionConsensusProtocolClass());
     Assert.assertEquals(
         ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionSlotNum(),
         globalConfig.getSeriesPartitionSlotNum());
@@ -135,7 +138,8 @@ public class ConfigNodeRPCServiceProcessorTest {
       dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6667 + i));
       dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003 + i));
       dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777 + i));
-      dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40010 + i));
+      dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010 + i));
+      dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010 + i));
 
       TDataNodeRegisterReq req = new TDataNodeRegisterReq(dataNodeLocation);
       TDataNodeRegisterResp resp = processor.registerDataNode(req);
@@ -155,7 +159,8 @@ public class ConfigNodeRPCServiceProcessorTest {
     dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6668));
     dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9004));
     dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8778));
-    dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40011));
+    dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40011));
+    dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50011));
 
     TDataNodeRegisterReq req = new TDataNodeRegisterReq(dataNodeLocation);
     TDataNodeRegisterResp resp = processor.registerDataNode(req);
@@ -178,7 +183,8 @@ public class ConfigNodeRPCServiceProcessorTest {
       dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6667 + i));
       dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003 + i));
       dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777 + i));
-      dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40010 + i));
+      dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010 + i));
+      dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010 + i));
       Assert.assertEquals(dataNodeLocation, locationList.get(i).getValue());
     }
 
@@ -192,7 +198,8 @@ public class ConfigNodeRPCServiceProcessorTest {
     dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6668));
     dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9004));
     dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8778));
-    dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40011));
+    dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40011));
+    dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50011));
     Assert.assertEquals(dataNodeLocation, locationMap.get(1));
   }
 
diff --git a/consensus/pom.xml b/consensus/pom.xml
index 7284a5fee7..0d6022ea10 100644
--- a/consensus/pom.xml
+++ b/consensus/pom.xml
@@ -52,6 +52,11 @@
             <version>${project.version}</version>
             <scope>compile</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>thrift-multi-leader-consensus</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
     <properties>
         <maven.compiler.source>8</maven.compiler.source>
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
index 99379156fb..b3bba6dace 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
@@ -21,7 +21,9 @@ package org.apache.iotdb.consensus.common;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 
+import java.nio.ByteBuffer;
 import java.util.Objects;
 
 // TODO Use a mature IDL framework such as Protobuf to manage this structure
@@ -43,6 +45,19 @@ public class Peer {
     return endpoint;
   }
 
+  public void serialize(ByteBuffer buffer) {
+    ThriftCommonsSerDeUtils.serializeTConsensusGroupId(
+        groupId.convertToTConsensusGroupId(), buffer);
+    ThriftCommonsSerDeUtils.serializeTEndPoint(endpoint, buffer);
+  }
+
+  public static Peer Deserialize(ByteBuffer buffer) {
+    return new Peer(
+        ConsensusGroupId.Factory.createFromTConsensusGroupId(
+            ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(buffer)),
+        ThriftCommonsSerDeUtils.deserializeTEndPoint(buffer));
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
new file mode 100644
index 0000000000..a1dccbfb60
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.common.request;
+
+import java.nio.ByteBuffer;
+
+/** only used for multi-leader consensus */
+public class IndexedConsensusRequest implements IConsensusRequest {
+
+  private final long minSyncIndex;
+  private final long currentIndex;
+  private final IConsensusRequest request;
+
+  public IndexedConsensusRequest(long minSyncIndex, long currentIndex, IConsensusRequest request) {
+    this.minSyncIndex = minSyncIndex;
+    this.currentIndex = currentIndex;
+    this.request = request;
+  }
+
+  @Override
+  public void serializeRequest(ByteBuffer buffer) {
+    buffer.putLong(minSyncIndex);
+    buffer.putLong(currentIndex);
+    request.serializeRequest(buffer);
+  }
+
+  public IConsensusRequest getRequest() {
+    return request;
+  }
+
+  public long getMinSyncIndex() {
+    return minSyncIndex;
+  }
+
+  public long getCurrentIndex() {
+    return currentIndex;
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/IndexController.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/IndexController.java
new file mode 100644
index 0000000000..6bad07da41
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/IndexController.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.io.File;
+import java.io.IOException;
+
+/** An index controller class to balance the performance degradation of frequent disk I/O. */
+@ThreadSafe
+public class IndexController {
+
+  private final Logger logger = LoggerFactory.getLogger(IndexController.class);
+  private static final int FLUSH_INTERVAL = 500;
+
+  private volatile long lastFlushedIndex;
+  private volatile long currentIndex;
+
+  private final String storageDir;
+  private final String prefix;
+  private final boolean incrementIntervalAfterRestart;
+
+  public IndexController(String storageDir, String prefix, boolean incrementIntervalAfterRestart) {
+    this.storageDir = storageDir;
+    this.prefix = prefix + '-';
+    this.incrementIntervalAfterRestart = incrementIntervalAfterRestart;
+    restore();
+  }
+
+  public synchronized long incrementAndGet() {
+    currentIndex++;
+    checkPersist();
+    return currentIndex;
+  }
+
+  public synchronized long updateAndGet(int index) {
+    currentIndex = Math.max(currentIndex, index);
+    checkPersist();
+    return currentIndex;
+  }
+
+  public long getCurrentIndex() {
+    return currentIndex;
+  }
+
+  private void checkPersist() {
+    if (currentIndex - lastFlushedIndex >= FLUSH_INTERVAL) {
+      persist();
+    }
+  }
+
+  private void persist() {
+    File oldFile = new File(storageDir, prefix + lastFlushedIndex);
+    File newFile = new File(storageDir, prefix + currentIndex);
+    try {
+      if (oldFile.exists()) {
+        FileUtils.moveFile(oldFile, newFile);
+      }
+      logger.info(
+          "Version file updated, previous: {}, current: {}",
+          oldFile.getAbsolutePath(),
+          newFile.getAbsolutePath());
+      lastFlushedIndex = currentIndex;
+    } catch (IOException e) {
+      logger.error("Error occurred when flushing next version.", e);
+    }
+  }
+
+  private void restore() {
+    File directory = new File(storageDir);
+    File[] versionFiles = directory.listFiles((dir, name) -> name.startsWith(prefix));
+    File versionFile;
+    if (versionFiles != null && versionFiles.length > 0) {
+      long maxVersion = 0;
+      int maxVersionIndex = 0;
+      for (int i = 0; i < versionFiles.length; i++) {
+        long fileVersion = Long.parseLong(versionFiles[i].getName().split("-")[1]);
+        if (fileVersion > maxVersion) {
+          maxVersion = fileVersion;
+          maxVersionIndex = i;
+        }
+      }
+      lastFlushedIndex = maxVersion;
+      for (int i = 0; i < versionFiles.length; i++) {
+        if (i != maxVersionIndex) {
+          versionFiles[i].delete();
+        }
+      }
+    } else {
+      versionFile = new File(directory, prefix + "0");
+      lastFlushedIndex = 0;
+      try {
+        if (!versionFile.createNewFile()) {
+          logger.warn("Cannot create new version file {}", versionFile);
+        }
+      } catch (IOException e) {
+        logger.error("Error occurred when creating new file {}.", versionFile.getName(), e);
+      }
+    }
+    if (incrementIntervalAfterRestart) {
+      // prevent overlapping in case of failure
+      currentIndex = lastFlushedIndex + FLUSH_INTERVAL;
+      persist();
+    } else {
+      currentIndex = lastFlushedIndex;
+    }
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
new file mode 100644
index 0000000000..4010e7b75c
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.consensus.IStateMachine.Registry;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
+import org.apache.iotdb.consensus.multileader.service.MultiLeaderRPCService;
+import org.apache.iotdb.consensus.multileader.service.MultiLeaderRPCServiceProcessor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class MultiLeaderConsensus implements IConsensus {
+
+  private final Logger logger = LoggerFactory.getLogger(MultiLeaderConsensus.class);
+
+  private final TEndPoint thisNode;
+  private final File storageDir;
+  private final IStateMachine.Registry registry;
+  private final Map<ConsensusGroupId, MultiLeaderServerImpl> stateMachineMap =
+      new ConcurrentHashMap<>();
+  private final MultiLeaderRPCService service;
+
+  public MultiLeaderConsensus(TEndPoint thisNode, File storageDir, Registry registry) {
+    this.thisNode = thisNode;
+    this.storageDir = storageDir;
+    this.registry = registry;
+    this.service = new MultiLeaderRPCService(thisNode);
+  }
+
+  @Override
+  public void start() throws IOException {
+    initAndRecover();
+    service.initSyncedServiceImpl(new MultiLeaderRPCServiceProcessor(this));
+  }
+
+  private void initAndRecover() throws IOException {
+    if (!storageDir.exists()) {
+      if (!storageDir.mkdirs()) {
+        logger.warn("Unable to create consensus dir at {}", storageDir);
+      }
+    } else {
+      try (DirectoryStream<Path> stream = Files.newDirectoryStream(storageDir.toPath())) {
+        for (Path path : stream) {
+          String[] items = path.getFileName().toString().split("_");
+          ConsensusGroupId consensusGroupId =
+              ConsensusGroupId.Factory.create(
+                  TConsensusGroupType.valueOf(items[0]).getValue(), Integer.parseInt(items[1]));
+          TEndPoint endPoint = new TEndPoint(items[2], Integer.parseInt(items[3]));
+          stateMachineMap.put(
+              consensusGroupId,
+              new MultiLeaderServerImpl(
+                  path.toString(),
+                  new Peer(consensusGroupId, endPoint),
+                  registry.apply(consensusGroupId)));
+        }
+      }
+    }
+  }
+
+  @Override
+  public void stop() throws IOException {}
+
+  @Override
+  public ConsensusWriteResponse write(ConsensusGroupId groupId, IConsensusRequest request) {
+    MultiLeaderServerImpl impl = stateMachineMap.get(groupId);
+    if (impl == null) {
+      return ConsensusWriteResponse.newBuilder()
+          .setException(new ConsensusGroupNotExistException(groupId))
+          .build();
+    }
+    return ConsensusWriteResponse.newBuilder().setStatus(impl.write(request)).build();
+  }
+
+  @Override
+  public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest request) {
+    MultiLeaderServerImpl impl = stateMachineMap.get(groupId);
+    if (impl == null) {
+      return ConsensusReadResponse.newBuilder()
+          .setException(new ConsensusGroupNotExistException(groupId))
+          .build();
+    }
+    return ConsensusReadResponse.newBuilder().setDataSet(impl.read(request)).build();
+  }
+
+  @Override
+  public ConsensusGenericResponse addConsensusGroup(ConsensusGroupId groupId, List<Peer> peers) {
+    AtomicBoolean exist = new AtomicBoolean(true);
+    stateMachineMap.computeIfAbsent(
+        groupId,
+        (k) -> {
+          exist.set(false);
+          String path = buildPeerDir(groupId);
+          File file = new File(path);
+          if (!file.mkdirs()) {
+            logger.warn("Unable to create consensus dir for group {} at {}", groupId, path);
+          }
+          MultiLeaderServerImpl impl =
+              new MultiLeaderServerImpl(
+                  path, new Peer(groupId, thisNode), peers, registry.apply(groupId));
+          impl.start();
+          return impl;
+        });
+    if (exist.get()) {
+      return ConsensusGenericResponse.newBuilder()
+          .setException(new ConsensusGroupAlreadyExistException(groupId))
+          .build();
+    }
+    return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
+  }
+
+  @Override
+  public ConsensusGenericResponse removeConsensusGroup(ConsensusGroupId groupId) {
+    AtomicBoolean exist = new AtomicBoolean(false);
+    stateMachineMap.computeIfPresent(
+        groupId,
+        (k, v) -> {
+          exist.set(true);
+          v.stop();
+          String path = buildPeerDir(groupId);
+          File file = new File(path);
+          if (!file.delete()) {
+            logger.warn("Unable to delete consensus dir for group {} at {}", groupId, path);
+          }
+          return null;
+        });
+
+    if (!exist.get()) {
+      return ConsensusGenericResponse.newBuilder()
+          .setException(new ConsensusGroupNotExistException(groupId))
+          .build();
+    }
+    return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
+  }
+
+  @Override
+  public ConsensusGenericResponse addPeer(ConsensusGroupId groupId, Peer peer) {
+    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+  }
+
+  @Override
+  public ConsensusGenericResponse removePeer(ConsensusGroupId groupId, Peer peer) {
+    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+  }
+
+  @Override
+  public ConsensusGenericResponse changePeer(ConsensusGroupId groupId, List<Peer> newPeers) {
+    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+  }
+
+  @Override
+  public ConsensusGenericResponse transferLeader(ConsensusGroupId groupId, Peer newLeader) {
+    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+  }
+
+  @Override
+  public ConsensusGenericResponse triggerSnapshot(ConsensusGroupId groupId) {
+    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+  }
+
+  @Override
+  public boolean isLeader(ConsensusGroupId groupId) {
+    return true;
+  }
+
+  @Override
+  public Peer getLeader(ConsensusGroupId groupId) {
+    if (!stateMachineMap.containsKey(groupId)) {
+      return null;
+    }
+    return new Peer(groupId, thisNode);
+  }
+
+  public MultiLeaderServerImpl getImpl(ConsensusGroupId groupId) {
+    return stateMachineMap.get(groupId);
+  }
+
+  private String buildPeerDir(ConsensusGroupId groupId) {
+    return storageDir
+        + File.separator
+        + groupId.getType()
+        + "_"
+        + groupId.getId()
+        + "_"
+        + thisNode.getIp()
+        + "_"
+        + thisNode.getPort();
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
new file mode 100644
index 0000000000..a1f73b525d
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+import org.apache.iotdb.consensus.multileader.asyncLogAppender.AsyncLogAppender;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+
+public class MultiLeaderServerImpl {
+
+  private final Logger logger = LoggerFactory.getLogger(MultiLeaderServerImpl.class);
+  private final String configurationFileName = "configuration.dat";
+  private final Peer thisNode;
+  private final IStateMachine stateMachine;
+  private final String storageDir;
+  private List<Peer> configuration;
+  private IndexController currentNodeController;
+  private List<AsyncLogAppender> asyncLogAppenders;
+
+  private static final int DEFAULT_BUFFER_SIZE = 1024 * 2;
+
+  public MultiLeaderServerImpl(
+      String storageDir, Peer thisNode, List<Peer> configuration, IStateMachine stateMachine) {
+    this.storageDir = storageDir;
+    this.thisNode = thisNode;
+    this.stateMachine = stateMachine;
+    this.currentNodeController =
+        new IndexController(storageDir, Utils.fromTEndPointToString(thisNode.getEndpoint()), false);
+    this.configuration = configuration;
+    persistConfiguration();
+    //    configuration.stream().filter(x -> !Objects.equals(x, thisNode)).
+
+  }
+
+  public MultiLeaderServerImpl(String storageDir, Peer thisNode, IStateMachine stateMachine) {
+    this.storageDir = storageDir;
+    this.thisNode = thisNode;
+    this.stateMachine = stateMachine;
+    this.currentNodeController =
+        new IndexController(storageDir, Utils.fromTEndPointToString(thisNode.getEndpoint()), false);
+    this.configuration = new ArrayList<>();
+    recoverConfiguration();
+  }
+
+  public IStateMachine getStateMachine() {
+    return stateMachine;
+  }
+
+  public void start() {
+    stateMachine.start();
+  }
+
+  public void stop() {
+    stateMachine.stop();
+  }
+
+  public TSStatus write(IConsensusRequest request) {
+    synchronized (stateMachine) {
+      IndexedConsensusRequest newRequest =
+          new IndexedConsensusRequest(
+              Long.MAX_VALUE, currentNodeController.incrementAndGet(), request);
+      return stateMachine.write(newRequest);
+    }
+  }
+
+  public DataSet read(IConsensusRequest request) {
+    return stateMachine.read(request);
+  }
+
+  public boolean takeSnapshot(File snapshotDir) {
+    return stateMachine.takeSnapshot(snapshotDir);
+  }
+
+  public void loadSnapshot(File latestSnapshotRootDir) {
+    stateMachine.loadSnapshot(latestSnapshotRootDir);
+  }
+
+  public void persistConfiguration() {
+    ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+    buffer.putInt(configuration.size());
+    for (Peer peer : configuration) {
+      peer.serialize(buffer);
+    }
+    try {
+      Files.write(
+          Paths.get(new File(storageDir, configurationFileName).getAbsolutePath()), buffer.array());
+    } catch (IOException e) {
+      logger.error("Unexpected error occurs when persisting configuration", e);
+    }
+  }
+
+  public void recoverConfiguration() {
+    ByteBuffer buffer;
+    try {
+      buffer =
+          ByteBuffer.wrap(
+              Files.readAllBytes(
+                  Paths.get(new File(storageDir, configurationFileName).getAbsolutePath())));
+      int size = buffer.getInt();
+      for (int i = 0; i < size; i++) {
+        configuration.add(Peer.Deserialize(buffer));
+      }
+    } catch (IOException e) {
+      logger.error("Unexpected error occurs when recovering configuration", e);
+    }
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/Utils.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/Utils.java
new file mode 100644
index 0000000000..a89f47513a
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/Utils.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+
+public class Utils {
+
+  public static String fromTEndPointToString(TEndPoint endpoint) {
+    return String.format("%s-%d", endpoint.getIp(), endpoint.getPort());
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/asyncLogAppender/AsyncLogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/asyncLogAppender/AsyncLogAppender.java
new file mode 100644
index 0000000000..7d53bbb208
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/asyncLogAppender/AsyncLogAppender.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.asyncLogAppender;
+
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.multileader.IndexController;
+import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+public class AsyncLogAppender {
+
+  private MultiLeaderServerImpl impl;
+  private Peer peer;
+  private ArrayBlockingQueue<IConsensusRequest> pendingRequest;
+  private IndexController controller;
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCService.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCService.java
new file mode 100644
index 0000000000..455bd73fe8
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCService.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.service;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.runtime.RPCServiceException;
+import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.commons.service.ThriftService;
+import org.apache.iotdb.commons.service.ThriftServiceThread;
+import org.apache.iotdb.consensus.multileader.thrift.MultiLeaderConsensusIService;
+
+import java.lang.reflect.InvocationTargetException;
+
+public class MultiLeaderRPCService extends ThriftService implements MultiLeaderRPCServiceMBean {
+
+  // TODO make it configurable
+  private static final int RPC_MAX_CONCURRENT_CLIENT_NUM = 65535;
+  private static final int THRIFT_SERVER_AWAIT_TIME_FOR_STOP_SERVICE = 60;
+  private static final boolean IS_RPC_THRIFT_COMPRESSION_ENABLED = false;
+
+  private final TEndPoint thisNode;
+  private MultiLeaderRPCServiceProcessor multiLeaderRPCServiceProcessor;
+
+  public MultiLeaderRPCService(TEndPoint thisNode) {
+    this.thisNode = thisNode;
+  }
+
+  @Override
+  public ServiceType getID() {
+    return ServiceType.MULTI_LEADER_CONSENSUS_SERVICE;
+  }
+
+  @Override
+  public void initSyncedServiceImpl(Object multiLeaderRPCServiceProcessor) {
+    this.multiLeaderRPCServiceProcessor =
+        (MultiLeaderRPCServiceProcessor) multiLeaderRPCServiceProcessor;
+    super.mbeanName =
+        String.format(
+            "%s:%s=%s", this.getClass().getPackage(), IoTDBConstant.JMX_TYPE, getID().getJmxName());
+    super.initSyncedServiceImpl(this.multiLeaderRPCServiceProcessor);
+  }
+
+  @Override
+  public void initTProcessor()
+      throws ClassNotFoundException, IllegalAccessException, InstantiationException,
+          NoSuchMethodException, InvocationTargetException {
+    processor = new MultiLeaderConsensusIService.Processor<>(multiLeaderRPCServiceProcessor);
+  }
+
+  @Override
+  public void initThriftServiceThread()
+      throws IllegalAccessException, InstantiationException, ClassNotFoundException {
+    try {
+      thriftServiceThread =
+          new ThriftServiceThread(
+              processor,
+              getID().getName(),
+              ThreadName.MULTI_LEADER_CONSENSUS_RPC_CLIENT.getName(),
+              getBindIP(),
+              getBindPort(),
+              RPC_MAX_CONCURRENT_CLIENT_NUM,
+              THRIFT_SERVER_AWAIT_TIME_FOR_STOP_SERVICE,
+              new MultiLeaderRPCServiceHandler(multiLeaderRPCServiceProcessor),
+              IS_RPC_THRIFT_COMPRESSION_ENABLED);
+    } catch (RPCServiceException e) {
+      throw new IllegalAccessException(e.getMessage());
+    }
+    thriftServiceThread.setName(ThreadName.MULTI_LEADER_CONSENSUS_RPC_SERVER.getName());
+  }
+
+  @Override
+  public String getBindIP() {
+    return thisNode.getIp();
+  }
+
+  @Override
+  public int getBindPort() {
+    return thisNode.getPort();
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceHandler.java
new file mode 100644
index 0000000000..889ac7d517
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.service;
+
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.transport.TTransport;
+
+public class MultiLeaderRPCServiceHandler implements TServerEventHandler {
+
+  private final MultiLeaderRPCServiceProcessor processor;
+
+  public MultiLeaderRPCServiceHandler(MultiLeaderRPCServiceProcessor processor) {
+    this.processor = processor;
+  }
+
+  @Override
+  public void preServe() {}
+
+  @Override
+  public ServerContext createContext(TProtocol input, TProtocol output) {
+    return null;
+  }
+
+  @Override
+  public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) {
+    processor.handleClientExit();
+  }
+
+  @Override
+  public void processContext(
+      ServerContext serverContext, TTransport inputTransport, TTransport outputTransport) {}
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceMBean.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceMBean.java
new file mode 100644
index 0000000000..9e354ac204
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceMBean.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.service;
+
+public interface MultiLeaderRPCServiceMBean {}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
new file mode 100644
index 0000000000..8cea5f4007
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.service;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.multileader.MultiLeaderConsensus;
+import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
+import org.apache.iotdb.consensus.multileader.thrift.MultiLeaderConsensusIService;
+import org.apache.iotdb.consensus.multileader.thrift.TSyncLogReq;
+import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIService.Iface {
+
+  private final Logger logger = LoggerFactory.getLogger(MultiLeaderRPCServiceProcessor.class);
+
+  private final MultiLeaderConsensus consensus;
+
+  public MultiLeaderRPCServiceProcessor(MultiLeaderConsensus consensus) {
+    this.consensus = consensus;
+  }
+
+  @Override
+  public TSyncLogRes syncLog(TSyncLogReq req) throws TException {
+    ConsensusGroupId groupId =
+        ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+    MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+    if (impl == null) {
+      String message =
+          String.format("Unexpected consensusGroupId %s for TSyncLogReq %s", groupId, req);
+      logger.error(message);
+      TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      status.setMessage(message);
+      return new TSyncLogRes(Collections.singletonList(status));
+    }
+    List<TSStatus> status = new ArrayList<>();
+    synchronized (impl.getStateMachine()) {
+      for (ByteBuffer batch : req.getBatches()) {
+        status.add(impl.getStateMachine().write(new ByteBufferConsensusRequest(batch)));
+      }
+    }
+    return new TSyncLogRes(status);
+  }
+
+  public void handleClientExit() {}
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
index 242573e5be..d1af672b85 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
@@ -46,9 +46,8 @@ public class RequestMessage implements Message {
     if (serializedContent == null) {
       synchronized (this) {
         if (serializedContent == null) {
-          ByteBufferConsensusRequest req;
           if (actualRequest instanceof ByteBufferConsensusRequest) {
-            req = (ByteBufferConsensusRequest) actualRequest;
+            ByteBufferConsensusRequest req = (ByteBufferConsensusRequest) actualRequest;
             serializedContent = ByteString.copyFrom(req.getContent());
             req.getContent().flip();
           } else {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index a9222f36ad..c691022c78 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -74,6 +74,8 @@ public enum ThreadName {
   CLUSTER_MONITOR("ClusterMonitor"),
   CONFIG_NODE_RPC_SERVER("ConfigNodeRpcServer"),
   CONFIG_NODE_RPC_CLIENT("ConfigNodeRPC-Client"),
+  MULTI_LEADER_CONSENSUS_RPC_CLIENT("MultiLeaderConsensusRPC-Client"),
+  MULTI_LEADER_CONSENSUS_RPC_SERVER("MultiLeaderConsensusRPC-Server"),
   DATA_NODE_MANAGEMENT_RPC_SERVER("DataNodeManagementRPC"),
   DATA_NODE_MANAGEMENT_RPC_CLIENT("DataNodeManagementRPC-Client"),
   Cluster_Monitor("ClusterMonitor"),
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
index 0bc5556c15..2a5ab281f2 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
@@ -37,6 +37,10 @@ public abstract class ConsensusGroupId {
   // return specific type
   public abstract TConsensusGroupType getType();
 
+  public TConsensusGroupId convertToTConsensusGroupId() {
+    return new TConsensusGroupId(getType(), getId());
+  }
+
   @Override
   public int hashCode() {
     return Objects.hash(getType(), getId());
@@ -82,10 +86,6 @@ public abstract class ConsensusGroupId {
     }
   }
 
-  public static TConsensusGroupId convertToTConsensusGroupId(ConsensusGroupId consensusGroupId) {
-    return new TConsensusGroupId(consensusGroupId.getType(), consensusGroupId.getId());
-  }
-
   public static String formatTConsensusGroupId(TConsensusGroupId groupId) {
     StringBuilder format = new StringBuilder();
 
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
index 6b068618ce..edece800fe 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
@@ -72,7 +72,8 @@ public enum ServiceType {
   FRAGMENT_INSTANCE_MANAGER_SERVICE("Fragment instance manager", "FragmentInstanceManager"),
   DATA_BLOCK_MANAGER_SERVICE("Data block manager", "DataBlockManager"),
   INTERNAL_SERVICE("Internal Service", "InternalService"),
-  PROCEDURE_SERVICE("Procedure  Service", "ProcedureService");
+  PROCEDURE_SERVICE("Procedure Service", "ProcedureService"),
+  MULTI_LEADER_CONSENSUS_SERVICE("Multi Leader consensus Service", "MultiLeaderRPCService");
 
   private final String name;
   private final String jmxName;
diff --git a/node-commons/src/test/java/org/apache/iotdb/commons/partition/SerializeTest.java b/node-commons/src/test/java/org/apache/iotdb/commons/partition/SerializeTest.java
index 1e032efc62..1f4f3152d6 100644
--- a/node-commons/src/test/java/org/apache/iotdb/commons/partition/SerializeTest.java
+++ b/node-commons/src/test/java/org/apache/iotdb/commons/partition/SerializeTest.java
@@ -68,7 +68,8 @@ public abstract class SerializeTest {
       tDataNodeLocation.setExternalEndPoint(new TEndPoint("127.0.0.1", 6000 + i));
       tDataNodeLocation.setInternalEndPoint(new TEndPoint("127.0.0.1", 7000 + i));
       tDataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("127.0.0.1", 8000 + i));
-      tDataNodeLocation.setConsensusEndPoint(new TEndPoint("127.0.0.1", 9000 + i));
+      tDataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("127.0.0.1", 9000 + i));
+      tDataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("127.0.0.1", 10000 + i));
       dataNodeLocations.add(tDataNodeLocation);
     }
     tRegionReplicaSet.setDataNodeLocations(dataNodeLocations);
diff --git a/node-commons/src/test/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtilsTest.java b/node-commons/src/test/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtilsTest.java
index 0abde8a0fa..94a2a81427 100644
--- a/node-commons/src/test/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtilsTest.java
+++ b/node-commons/src/test/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtilsTest.java
@@ -58,7 +58,8 @@ public class ThriftCommonsSerDeUtilsTest {
     dataNodeLocation0.setExternalEndPoint(new TEndPoint("0.0.0.0", 6667));
     dataNodeLocation0.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
     dataNodeLocation0.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777));
-    dataNodeLocation0.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+    dataNodeLocation0.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+    dataNodeLocation0.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010));
     ThriftCommonsSerDeUtils.serializeTDataNodeLocation(dataNodeLocation0, buffer);
     buffer.flip();
     TDataNodeLocation dataNodeLocation1 =
@@ -108,7 +109,8 @@ public class ThriftCommonsSerDeUtilsTest {
       dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6667 + i));
       dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003 + i));
       dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777 + i));
-      dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40010 + i));
+      dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010 + i));
+      dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010 + i));
       regionReplicaSet0.getDataNodeLocations().add(dataNodeLocation);
     }
     ThriftCommonsSerDeUtils.serializeTRegionReplicaSet(regionReplicaSet0, buffer);
diff --git a/pom.xml b/pom.xml
index e9f0829436..4bd7cfe5e2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,6 +84,7 @@
         <module>thrift</module>
         <module>thrift-commons</module>
         <module>thrift-confignode</module>
+        <module>thrift-multi-leader-consensus</module>
         <module>thrift-cluster</module>
         <module>thrift-sync</module>
         <module>thrift-influxdb</module>
@@ -790,6 +791,7 @@
                         <sourceDirectory>thrift/target/generated-sources/thrift</sourceDirectory>
                         <sourceDirectory>thrift-commons/target/generated-sources/thrift</sourceDirectory>
                         <sourceDirectory>thrift-confignode/target/generated-sources/thrift</sourceDirectory>
+                        <sourceDirectory>thrift-multi-leader-consensus/target/generated-sources/thrift</sourceDirectory>
                         <sourceDirectory>thrift-sync/target/generated-sources/thrift</sourceDirectory>
                         <sourceDirectory>thrift-cluster/target/generated-sources/thrift</sourceDirectory>
                         <sourceDirectory>thrift-influxdb/target/generated-sources/thrift</sourceDirectory>
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 3235dd92e4..9aad88b81b 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -51,10 +51,13 @@ internal_ip=127.0.0.1
 # port for coordinator's communication between cluster nodes.
 internal_port=9003
 
+# Datatype: int
+# port for consensus's communication for data region between cluster nodes.
+data_region_consensus_port=40010
 
 # Datatype: int
-# port for consensus's communication between cluster nodes.
-consensus_port=40010
+# port for consensus's communication for schema region between cluster nodes.
+schema_region_consensus_port=50010
 
 # comma-separated {IP/DOMAIN}:internal_port pairs
 # Data nodes store config nodes ip and port to communicate with config nodes.
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 8cf84cf10c..3ff42792bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -263,6 +263,12 @@ public class IoTDBConfig {
   /** Consensus directory. */
   private String consensusDir = DEFAULT_BASE_DIR + File.separator + "consensus";
 
+  private String dataRegionConsensusDir =
+      DEFAULT_BASE_DIR + File.separator + "consensus" + File.separator + "dataRegion";
+
+  private String schemaRegionConsensusDir =
+      DEFAULT_BASE_DIR + File.separator + "consensus" + File.separator + "schemaRegion";
+
   /** Maximum MemTable number. Invalid when enableMemControl is true. */
   private int maxMemtableNumber = 0;
 
@@ -812,8 +818,11 @@ public class IoTDBConfig {
   /** Internal port for coordinator */
   private int internalPort = 9003;
 
-  /** Internal port for consensus protocol */
-  private int consensusPort = 40010;
+  /** Internal port for dataRegion consensus protocol */
+  private int dataRegionConsensusPort = 40010;
+
+  /** Internal port for schemaRegion consensus protocol */
+  private int schemaRegionConsensusPort = 50010;
 
   /** Ip and port of config nodes. */
   private List<TEndPoint> configNodeList =
@@ -823,11 +832,20 @@ public class IoTDBConfig {
   private long joinClusterTimeOutMs = TimeUnit.SECONDS.toMillis(5);
 
   /**
-   * The consensus protocol class. The Datanode should communicate with ConfigNode on startup and
-   * set this variable so that the correct class name can be obtained later when the consensus layer
-   * singleton is initialized
+   * The consensus protocol class for data region. The Datanode should communicate with ConfigNode
+   * on startup and set this variable so that the correct class name can be obtained later when the
+   * data region consensus layer singleton is initialized
+   */
+  private String dataRegionConsensusProtocolClass =
+      "org.apache.iotdb.consensus.ratis.RatisConsensus";
+
+  /**
+   * The consensus protocol class for schema region. The Datanode should communicate with ConfigNode
+   * on startup and set this variable so that the correct class name can be obtained later when the
+   * schema region consensus layer singleton is initialized
    */
-  private String consensusProtocolClass = "org.apache.iotdb.consensus.ratis.RatisConsensus";
+  private String schemaRegionConsensusProtocolClass =
+      "org.apache.iotdb.consensus.ratis.RatisConsensus";
 
   /**
    * The series partition executor class. The Datanode should communicate with ConfigNode on startup
@@ -1161,6 +1179,24 @@ public class IoTDBConfig {
 
   public void setConsensusDir(String consensusDir) {
     this.consensusDir = consensusDir;
+    setDataRegionConsensusDir(consensusDir + File.separator + "dataRegion");
+    setSchemaRegionConsensusDir(consensusDir + File.separator + "schemaRegion");
+  }
+
+  public String getDataRegionConsensusDir() {
+    return dataRegionConsensusDir;
+  }
+
+  public void setDataRegionConsensusDir(String dataRegionConsensusDir) {
+    this.dataRegionConsensusDir = dataRegionConsensusDir;
+  }
+
+  public String getSchemaRegionConsensusDir() {
+    return schemaRegionConsensusDir;
+  }
+
+  public void setSchemaRegionConsensusDir(String schemaRegionConsensusDir) {
+    this.schemaRegionConsensusDir = schemaRegionConsensusDir;
   }
 
   public String getExtDir() {
@@ -2620,12 +2656,20 @@ public class IoTDBConfig {
     this.internalPort = internalPort;
   }
 
-  public int getConsensusPort() {
-    return consensusPort;
+  public int getDataRegionConsensusPort() {
+    return dataRegionConsensusPort;
   }
 
-  public void setConsensusPort(int consensusPort) {
-    this.consensusPort = consensusPort;
+  public void setDataRegionConsensusPort(int dataRegionConsensusPort) {
+    this.dataRegionConsensusPort = dataRegionConsensusPort;
+  }
+
+  public int getSchemaRegionConsensusPort() {
+    return schemaRegionConsensusPort;
+  }
+
+  public void setSchemaRegionConsensusPort(int schemaRegionConsensusPort) {
+    this.schemaRegionConsensusPort = schemaRegionConsensusPort;
   }
 
   public List<TEndPoint> getConfigNodeList() {
@@ -2644,12 +2688,20 @@ public class IoTDBConfig {
     this.joinClusterTimeOutMs = joinClusterTimeOutMs;
   }
 
-  public String getConsensusProtocolClass() {
-    return consensusProtocolClass;
+  public String getDataRegionConsensusProtocolClass() {
+    return dataRegionConsensusProtocolClass;
+  }
+
+  public void setDataRegionConsensusProtocolClass(String dataRegionConsensusProtocolClass) {
+    this.dataRegionConsensusProtocolClass = dataRegionConsensusProtocolClass;
+  }
+
+  public String getSchemaRegionConsensusProtocolClass() {
+    return schemaRegionConsensusProtocolClass;
   }
 
-  public void setConsensusProtocolClass(String consensusProtocolClass) {
-    this.consensusProtocolClass = consensusProtocolClass;
+  public void setSchemaRegionConsensusProtocolClass(String schemaRegionConsensusProtocolClass) {
+    this.schemaRegionConsensusProtocolClass = schemaRegionConsensusProtocolClass;
   }
 
   public String getSeriesPartitionExecutorClass() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 40b680bfed..517dcab53c 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1555,9 +1555,17 @@ public class IoTDBDescriptor {
         Integer.parseInt(
             properties.getProperty("internal_port", Integer.toString(conf.getInternalPort()))));
 
-    conf.setConsensusPort(
+    conf.setDataRegionConsensusPort(
         Integer.parseInt(
-            properties.getProperty("consensus_port", Integer.toString(conf.getConsensusPort()))));
+            properties.getProperty(
+                "data_region_consensus_port",
+                Integer.toString(conf.getDataRegionConsensusPort()))));
+
+    conf.setSchemaRegionConsensusPort(
+        Integer.parseInt(
+            properties.getProperty(
+                "schema_region_consensus_port",
+                Integer.toString(conf.getSchemaRegionConsensusPort()))));
   }
 
   public void loadShuffleProps(Properties properties) {
@@ -1608,7 +1616,8 @@ public class IoTDBDescriptor {
   // These configurations are received from config node when registering
   public void loadGlobalConfig(TGlobalConfig globalConfig) {
     conf.setSeriesPartitionExecutorClass(globalConfig.getSeriesPartitionExecutorClass());
-    conf.setConsensusProtocolClass(globalConfig.getDataNodeConsensusProtocolClass());
+    conf.setDataRegionConsensusProtocolClass(globalConfig.getDataRegionConsensusProtocolClass());
+    conf.setSchemaRegionConsensusDir(globalConfig.getSchemaRegionConsensusProtocolClass());
     conf.setSeriesPartitionSlotNum(globalConfig.getSeriesPartitionSlotNum());
     conf.setPartitionInterval(globalConfig.timePartitionInterval);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusImpl.java b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
similarity index 60%
copy from server/src/main/java/org/apache/iotdb/db/consensus/ConsensusImpl.java
copy to server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index 96fd26af89..692e229f9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -21,15 +21,12 @@ package org.apache.iotdb.db.consensus;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.consensus.DataRegionId;
-import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.statemachine.DataRegionStateMachine;
-import org.apache.iotdb.db.consensus.statemachine.SchemaRegionStateMachine;
 import org.apache.iotdb.db.engine.StorageEngineV2;
-import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 
 import java.io.File;
 
@@ -37,41 +34,32 @@ import java.io.File;
  * We can use ConsensusImpl.getInstance() to obtain a consensus layer reference for reading and
  * writing
  */
-public class ConsensusImpl {
+public class DataRegionConsensusImpl {
 
-  private ConsensusImpl() {}
+  private DataRegionConsensusImpl() {}
 
   public static IConsensus getInstance() {
-    return ConsensusImplHolder.INSTANCE;
+    return DataRegionConsensusImplHolder.INSTANCE;
   }
 
-  private static class ConsensusImplHolder {
+  private static class DataRegionConsensusImplHolder {
 
     private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
     private static final IConsensus INSTANCE =
         ConsensusFactory.getConsensusImpl(
-                conf.getConsensusProtocolClass(),
-                new TEndPoint(conf.getInternalIp(), conf.getConsensusPort()),
-                new File(conf.getConsensusDir()),
-                gid -> {
-                  switch (gid.getType()) {
-                    case SchemaRegion:
-                      return new SchemaRegionStateMachine(
-                          SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) gid));
-                    case DataRegion:
-                      return new DataRegionStateMachine(
-                          StorageEngineV2.getInstance().getDataRegion((DataRegionId) gid));
-                  }
-                  throw new IllegalArgumentException(
-                      String.format("Unexpected consensusGroup %s", gid));
-                })
+                conf.getDataRegionConsensusProtocolClass(),
+                new TEndPoint(conf.getInternalIp(), conf.getDataRegionConsensusPort()),
+                new File(conf.getDataRegionConsensusDir()),
+                gid ->
+                    new DataRegionStateMachine(
+                        StorageEngineV2.getInstance().getDataRegion((DataRegionId) gid)))
             .orElseThrow(
                 () ->
                     new IllegalArgumentException(
                         String.format(
                             ConsensusFactory.CONSTRUCT_FAILED_MSG,
-                            conf.getConsensusProtocolClass())));
+                            conf.getDataRegionConsensusProtocolClass())));
 
-    private ConsensusImplHolder() {}
+    private DataRegionConsensusImplHolder() {}
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusImpl.java b/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
similarity index 56%
rename from server/src/main/java/org/apache/iotdb/db/consensus/ConsensusImpl.java
rename to server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
index 96fd26af89..87e99f98a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
@@ -20,58 +20,42 @@
 package org.apache.iotdb.db.consensus;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.consensus.statemachine.DataRegionStateMachine;
 import org.apache.iotdb.db.consensus.statemachine.SchemaRegionStateMachine;
-import org.apache.iotdb.db.engine.StorageEngineV2;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 
 import java.io.File;
 
-/**
- * We can use ConsensusImpl.getInstance() to obtain a consensus layer reference for reading and
- * writing
- */
-public class ConsensusImpl {
+public class SchemaRegionConsensusImpl {
 
-  private ConsensusImpl() {}
+  private SchemaRegionConsensusImpl() {}
 
   public static IConsensus getInstance() {
-    return ConsensusImplHolder.INSTANCE;
+    return SchemaRegionConsensusImplHolder.INSTANCE;
   }
 
-  private static class ConsensusImplHolder {
+  private static class SchemaRegionConsensusImplHolder {
 
     private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
     private static final IConsensus INSTANCE =
         ConsensusFactory.getConsensusImpl(
-                conf.getConsensusProtocolClass(),
-                new TEndPoint(conf.getInternalIp(), conf.getConsensusPort()),
-                new File(conf.getConsensusDir()),
-                gid -> {
-                  switch (gid.getType()) {
-                    case SchemaRegion:
-                      return new SchemaRegionStateMachine(
-                          SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) gid));
-                    case DataRegion:
-                      return new DataRegionStateMachine(
-                          StorageEngineV2.getInstance().getDataRegion((DataRegionId) gid));
-                  }
-                  throw new IllegalArgumentException(
-                      String.format("Unexpected consensusGroup %s", gid));
-                })
+                conf.getSchemaRegionConsensusProtocolClass(),
+                new TEndPoint(conf.getInternalIp(), conf.getSchemaRegionConsensusPort()),
+                new File(conf.getSchemaRegionConsensusDir()),
+                gid ->
+                    new SchemaRegionStateMachine(
+                        SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) gid)))
             .orElseThrow(
                 () ->
                     new IllegalArgumentException(
                         String.format(
                             ConsensusFactory.CONSTRUCT_FAILED_MSG,
-                            conf.getConsensusProtocolClass())));
+                            conf.getDataRegionConsensusProtocolClass())));
 
-    private ConsensusImplHolder() {}
+    private SchemaRegionConsensusImplHolder() {}
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
index 51616c30bd..0d22c6df90 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
@@ -19,13 +19,11 @@
 
 package org.apache.iotdb.db.consensus.statemachine;
 
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
-import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,18 +32,6 @@ public abstract class BaseStateMachine implements IStateMachine, IStateMachine.E
 
   private static final Logger logger = LoggerFactory.getLogger(BaseStateMachine.class);
 
-  @Override
-  public TSStatus write(IConsensusRequest request) {
-    try {
-      return write(getFragmentInstance(request));
-    } catch (IllegalArgumentException e) {
-      logger.error(e.getMessage(), e);
-      return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
-    }
-  }
-
-  protected abstract TSStatus write(FragmentInstance fragmentInstance);
-
   @Override
   public DataSet read(IConsensusRequest request) {
     try {
@@ -58,7 +44,7 @@ public abstract class BaseStateMachine implements IStateMachine, IStateMachine.E
 
   protected abstract DataSet read(FragmentInstance fragmentInstance);
 
-  private FragmentInstance getFragmentInstance(IConsensusRequest request) {
+  protected FragmentInstance getFragmentInstance(IConsensusRequest request) {
     FragmentInstance instance;
     if (request instanceof ByteBufferConsensusRequest) {
       instance =
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 7fe44559f7..f0b03a451f 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.db.engine.StorageEngineV2;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.exception.BatchProcessException;
@@ -31,11 +33,13 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.DeleteRegionNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,8 +75,31 @@ public class DataRegionStateMachine extends BaseStateMachine {
   public void loadSnapshot(File latestSnapshotRootDir) {}
 
   @Override
-  protected TSStatus write(FragmentInstance fragmentInstance) {
-    PlanNode planNode = fragmentInstance.getFragment().getRoot();
+  public TSStatus write(IConsensusRequest request) {
+    FragmentInstance fi;
+    long minSyncIndex = Long.MAX_VALUE;
+    long currentIndex = -1;
+    try {
+      if (request instanceof IndexedConsensusRequest) {
+        fi = getFragmentInstance(((IndexedConsensusRequest) request).getRequest());
+        minSyncIndex = ((IndexedConsensusRequest) request).getMinSyncIndex();
+        currentIndex = ((IndexedConsensusRequest) request).getCurrentIndex();
+      } else {
+        fi = getFragmentInstance(request);
+      }
+      PlanNode planNode = fi.getFragment().getRoot();
+      if (planNode instanceof InsertNode) {
+        logger.error("minIndex", minSyncIndex);
+        logger.error("currentIndex", currentIndex);
+      }
+      return write(planNode);
+    } catch (IllegalArgumentException e) {
+      logger.error(e.getMessage(), e);
+      return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+    }
+  }
+
+  protected TSStatus write(PlanNode planNode) {
     try {
       if (planNode instanceof InsertRowNode) {
         region.insert((InsertRowNode) planNode);
@@ -95,7 +122,7 @@ public class DataRegionStateMachine extends BaseStateMachine {
     } catch (BatchProcessException e) {
       return RpcUtils.getStatus(Arrays.asList(e.getFailingStatus()));
     } catch (Exception e) {
-      logger.error("Error in executing plan node: {}", planNode);
+      logger.error("Error in executing plan node: {}", planNode, e);
       return StatusUtils.EXECUTE_STATEMENT_ERROR;
     }
     return StatusUtils.OK;
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
index 888346907a..dfd6f1ef5e 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
@@ -21,11 +21,13 @@ package org.apache.iotdb.db.consensus.statemachine;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.metadata.visitor.SchemaExecutionVisitor;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,11 +61,15 @@ public class SchemaRegionStateMachine extends BaseStateMachine {
   public void loadSnapshot(File latestSnapshotRootDir) {}
 
   @Override
-  protected TSStatus write(FragmentInstance fragmentInstance) {
+  public TSStatus write(IConsensusRequest request) {
     logger.info("Execute write plan in SchemaRegionStateMachine");
-    PlanNode planNode = fragmentInstance.getFragment().getRoot();
-    TSStatus status = planNode.accept(new SchemaExecutionVisitor(), schemaRegion);
-    return status;
+    try {
+      PlanNode planNode = getFragmentInstance(request).getFragment().getRoot();
+      return planNode.accept(new SchemaExecutionVisitor(), schemaRegion);
+    } catch (IllegalArgumentException e) {
+      logger.error(e.getMessage(), e);
+      return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+    }
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 8922de59e3..35637f8140 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -24,9 +24,11 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.consensus.ConsensusImpl;
+import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
+import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
@@ -167,8 +169,16 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
             instance.getRegionReplicaSet().getRegionId());
     switch (instance.getType()) {
       case READ:
-        FragmentInstanceInfo info =
-            (FragmentInstanceInfo) ConsensusImpl.getInstance().read(groupId, instance).getDataset();
+        FragmentInstanceInfo info;
+        if (groupId instanceof SchemaRegionId) {
+          info =
+              (FragmentInstanceInfo)
+                  SchemaRegionConsensusImpl.getInstance().read(groupId, instance).getDataset();
+        } else {
+          info =
+              (FragmentInstanceInfo)
+                  DataRegionConsensusImpl.getInstance().read(groupId, instance).getDataset();
+        }
         return !info.getState().isFailed();
       case WRITE:
         PlanNode planNode = instance.getFragment().getRoot();
@@ -179,7 +189,12 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
             throw new FragmentInstanceDispatchException(e);
           }
         }
-        ConsensusWriteResponse resp = ConsensusImpl.getInstance().write(groupId, instance);
+        ConsensusWriteResponse resp;
+        if (groupId instanceof SchemaRegionId) {
+          resp = SchemaRegionConsensusImpl.getInstance().write(groupId, instance);
+        } else {
+          resp = DataRegionConsensusImpl.getInstance().write(groupId, instance);
+        }
         return TSStatusCode.SUCCESS_STATUS.getStatusCode() == resp.getStatus().getCode();
     }
     throw new UnsupportedOperationException(
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 8875150162..37079c0355 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -35,7 +35,8 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.IoTDBStartCheck;
 import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor;
-import org.apache.iotdb.db.consensus.ConsensusImpl;
+import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
+import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.engine.StorageEngineV2;
 import org.apache.iotdb.db.engine.cache.CacheHitRatioMonitor;
 import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
@@ -150,8 +151,10 @@ public class DataNode implements DataNodeMBean {
             new TEndPoint(config.getInternalIp(), config.getInternalPort()));
         location.setDataBlockManagerEndPoint(
             new TEndPoint(config.getInternalIp(), config.getDataBlockManagerPort()));
-        location.setConsensusEndPoint(
-            new TEndPoint(config.getInternalIp(), config.getConsensusPort()));
+        location.setDataRegionConsensusEndPoint(
+            new TEndPoint(config.getInternalIp(), config.getDataRegionConsensusPort()));
+        location.setSchemaRegionConsensusEndPoint(
+            new TEndPoint(config.getInternalIp(), config.getSchemaRegionConsensusPort()));
         req.setDataNodeLocation(location);
 
         TDataNodeRegisterResp dataNodeRegisterResp = configNodeClient.registerDataNode(req);
@@ -228,7 +231,8 @@ public class DataNode implements DataNodeMBean {
 
     try {
       // TODO: Start consensus layer in some where else
-      ConsensusImpl.getInstance().start();
+      DataRegionConsensusImpl.getInstance().start();
+      SchemaRegionConsensusImpl.getInstance().start();
     } catch (IOException e) {
       throw new StartupException(e);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index 1a4661d5b4..1144cb807f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -32,13 +32,13 @@ import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
-import org.apache.iotdb.db.consensus.ConsensusImpl;
+import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
+import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.engine.StorageEngineV2;
 import org.apache.iotdb.db.exception.DataRegionException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
@@ -91,7 +91,6 @@ public class InternalServiceImpl implements InternalService.Iface {
   private static final Logger LOGGER = LoggerFactory.getLogger(InternalServiceImpl.class);
   private final SchemaEngine schemaEngine = SchemaEngine.getInstance();
   private final StorageEngineV2 storageEngine = StorageEngineV2.getInstance();
-  private final IConsensus consensusImpl = ConsensusImpl.getInstance();
 
   public InternalServiceImpl() {
     super();
@@ -105,9 +104,16 @@ public class InternalServiceImpl implements InternalService.Iface {
         ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
     switch (type) {
       case READ:
-        ConsensusReadResponse readResp =
-            ConsensusImpl.getInstance()
-                .read(groupId, new ByteBufferConsensusRequest(req.fragmentInstance.body));
+        ConsensusReadResponse readResp;
+        if (groupId instanceof SchemaRegionId) {
+          readResp =
+              SchemaRegionConsensusImpl.getInstance()
+                  .read(groupId, new ByteBufferConsensusRequest(req.fragmentInstance.body));
+        } else {
+          readResp =
+              DataRegionConsensusImpl.getInstance()
+                  .read(groupId, new ByteBufferConsensusRequest(req.fragmentInstance.body));
+        }
         FragmentInstanceInfo info = (FragmentInstanceInfo) readResp.getDataset();
         return new TSendFragmentInstanceResp(!info.getState().isFailed());
       case WRITE:
@@ -126,7 +132,11 @@ public class InternalServiceImpl implements InternalService.Iface {
             return response;
           }
         }
-        resp = ConsensusImpl.getInstance().write(groupId, fragmentInstance);
+        if (groupId instanceof SchemaRegionId) {
+          resp = SchemaRegionConsensusImpl.getInstance().write(groupId, fragmentInstance);
+        } else {
+          resp = DataRegionConsensusImpl.getInstance().write(groupId, fragmentInstance);
+        }
         // TODO need consider more status
         response.setAccepted(
             TSStatusCode.SUCCESS_STATUS.getStatusCode() == resp.getStatus().getCode());
@@ -183,12 +193,12 @@ public class InternalServiceImpl implements InternalService.Iface {
       for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
         TEndPoint endpoint =
             new TEndPoint(
-                dataNodeLocation.getConsensusEndPoint().getIp(),
-                dataNodeLocation.getConsensusEndPoint().getPort());
+                dataNodeLocation.getSchemaRegionConsensusEndPoint().getIp(),
+                dataNodeLocation.getSchemaRegionConsensusEndPoint().getPort());
         peers.add(new Peer(schemaRegionId, endpoint));
       }
       ConsensusGenericResponse consensusGenericResponse =
-          consensusImpl.addConsensusGroup(schemaRegionId, peers);
+          SchemaRegionConsensusImpl.getInstance().addConsensusGroup(schemaRegionId, peers);
       if (consensusGenericResponse.isSuccess()) {
         tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
       } else {
@@ -221,12 +231,12 @@ public class InternalServiceImpl implements InternalService.Iface {
       for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
         TEndPoint endpoint =
             new TEndPoint(
-                dataNodeLocation.getConsensusEndPoint().getIp(),
-                dataNodeLocation.getConsensusEndPoint().getPort());
+                dataNodeLocation.getDataRegionConsensusEndPoint().getIp(),
+                dataNodeLocation.getDataRegionConsensusEndPoint().getPort());
         peers.add(new Peer(dataRegionId, endpoint));
       }
       ConsensusGenericResponse consensusGenericResponse =
-          consensusImpl.addConsensusGroup(dataRegionId, peers);
+          DataRegionConsensusImpl.getInstance().addConsensusGroup(dataRegionId, peers);
       if (consensusGenericResponse.isSuccess()) {
         tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
       } else {
@@ -285,7 +295,15 @@ public class InternalServiceImpl implements InternalService.Iface {
     PlanFragment planFragment = new PlanFragment(planFragmentId, deleteRegionNode);
     FragmentInstance fragmentInstance =
         new FragmentInstance(planFragment, fragmentInstanceId, null, QueryType.WRITE);
-    return consensusImpl.write(consensusGroupId, fragmentInstance).getStatus();
+    if (consensusGroupId instanceof SchemaRegionId) {
+      return SchemaRegionConsensusImpl.getInstance()
+          .write(consensusGroupId, fragmentInstance)
+          .getStatus();
+    } else {
+      return DataRegionConsensusImpl.getInstance()
+          .write(consensusGroupId, fragmentInstance)
+          .getStatus();
+    }
   }
 
   public void handleClientExit() {}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java
index 53950b092a..7385b9cbd1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java
@@ -57,7 +57,8 @@ public class FragmentInstanceSerdeTest {
     dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6667));
     dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
     dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777));
-    dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+    dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+    dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010));
 
     PlanFragmentId planFragmentId = new PlanFragmentId("test", -1);
     FragmentInstance fragmentInstance =
@@ -86,7 +87,8 @@ public class FragmentInstanceSerdeTest {
     dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6667));
     dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
     dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777));
-    dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+    dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+    dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010));
 
     PlanFragmentId planFragmentId = new PlanFragmentId("test2", 1);
     FragmentInstance fragmentInstance =
diff --git a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
index 3fafea7cfd..c1a00026af 100644
--- a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
@@ -31,7 +31,8 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.consensus.ConsensusImpl;
+import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
+import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.localconfignode.LocalConfigNode;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
@@ -78,23 +79,24 @@ public class InternalServiceImplTest {
     IoTDB.configManager.init();
     configNode = LocalConfigNode.getInstance();
     configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln"));
-    ConsensusImpl.getInstance().start();
+    DataRegionConsensusImpl.getInstance().start();
+    SchemaRegionConsensusImpl.getInstance().start();
   }
 
   @Before
   public void setUp() throws Exception {
     TRegionReplicaSet regionReplicaSet = genRegionReplicaSet();
-    ConsensusImpl.getInstance()
+    SchemaRegionConsensusImpl.getInstance()
         .addConsensusGroup(
             ConsensusGroupId.Factory.createFromTConsensusGroupId(regionReplicaSet.getRegionId()),
-            genPeerList(regionReplicaSet));
+            genSchemaRegionPeerList(regionReplicaSet));
     internalServiceImpl = new InternalServiceImpl();
   }
 
   @After
   public void tearDown() throws Exception {
     TRegionReplicaSet regionReplicaSet = genRegionReplicaSet();
-    ConsensusImpl.getInstance()
+    SchemaRegionConsensusImpl.getInstance()
         .removeConsensusGroup(
             ConsensusGroupId.Factory.createFromTConsensusGroupId(regionReplicaSet.getRegionId()));
     FileUtils.deleteFully(new File(conf.getConsensusDir()));
@@ -102,7 +104,8 @@ public class InternalServiceImplTest {
 
   @AfterClass
   public static void tearDownAfterClass() throws IOException, StorageEngineException {
-    ConsensusImpl.getInstance().stop();
+    DataRegionConsensusImpl.getInstance().stop();
+    SchemaRegionConsensusImpl.getInstance().stop();
     IoTDB.configManager.clear();
     EnvironmentUtils.cleanEnv();
   }
@@ -360,20 +363,23 @@ public class InternalServiceImplTest {
             .setInternalEndPoint(new TEndPoint(conf.getInternalIp(), conf.getInternalPort()))
             .setDataBlockManagerEndPoint(
                 new TEndPoint(conf.getInternalIp(), conf.getDataBlockManagerPort()))
-            .setConsensusEndPoint(new TEndPoint(conf.getInternalIp(), conf.getConsensusPort())));
+            .setDataRegionConsensusEndPoint(
+                new TEndPoint(conf.getInternalIp(), conf.getDataRegionConsensusPort()))
+            .setSchemaRegionConsensusEndPoint(
+                new TEndPoint(conf.getInternalIp(), conf.getSchemaRegionConsensusPort())));
 
     // construct fragmentInstance
     return new TRegionReplicaSet(
         new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 0), dataNodeList);
   }
 
-  private List<Peer> genPeerList(TRegionReplicaSet regionReplicaSet) {
+  private List<Peer> genSchemaRegionPeerList(TRegionReplicaSet regionReplicaSet) {
     List<Peer> peerList = new ArrayList<>();
     for (TDataNodeLocation node : regionReplicaSet.getDataNodeLocations()) {
       peerList.add(
           new Peer(
               new SchemaRegionId(regionReplicaSet.getRegionId().getId()),
-              node.getConsensusEndPoint()));
+              node.getSchemaRegionConsensusEndPoint()));
     }
     return peerList;
   }
diff --git a/server/src/test/resources/datanode1conf/iotdb-engine.properties b/server/src/test/resources/datanode1conf/iotdb-engine.properties
index 0f02933bbc..cced42b412 100644
--- a/server/src/test/resources/datanode1conf/iotdb-engine.properties
+++ b/server/src/test/resources/datanode1conf/iotdb-engine.properties
@@ -23,7 +23,8 @@ internal_ip=127.0.0.1
 rpc_port=6667
 data_block_manager_port=8777
 internal_port=9003
-consensus_port=40010
+data_region_consensus_port=40010
+schema_region_consensus_port=50010
 
 config_nodes=0.0.0.0:22277,0.0.0.0:22279,0.0.0.0:22281
 
diff --git a/server/src/test/resources/datanode2conf/iotdb-engine.properties b/server/src/test/resources/datanode2conf/iotdb-engine.properties
index 429c025a6b..c570d996b7 100644
--- a/server/src/test/resources/datanode2conf/iotdb-engine.properties
+++ b/server/src/test/resources/datanode2conf/iotdb-engine.properties
@@ -23,7 +23,8 @@ internal_ip=127.0.0.1
 rpc_port=6669
 data_block_manager_port=8779
 internal_port=9005
-consensus_port=40012
+data_region_consensus_port=40012
+schema_region_consensus_port=50012
 
 config_nodes=0.0.0.0:22277,0.0.0.0:22279,0.0.0.0:22281
 
diff --git a/server/src/test/resources/datanode3conf/iotdb-engine.properties b/server/src/test/resources/datanode3conf/iotdb-engine.properties
index 5a390729a6..956faee018 100644
--- a/server/src/test/resources/datanode3conf/iotdb-engine.properties
+++ b/server/src/test/resources/datanode3conf/iotdb-engine.properties
@@ -23,7 +23,8 @@ internal_ip=127.0.0.1
 rpc_port=6671
 data_block_manager_port=8781
 internal_port=9007
-consensus_port=40014
+data_region_consensus_port=40014
+schema_region_consensus_port=50014
 
 config_nodes=0.0.0.0:22277,0.0.0.0:22279,0.0.0.0:22281
 
diff --git a/thrift-commons/src/main/thrift/common.thrift b/thrift-commons/src/main/thrift/common.thrift
index 8ffdf3ff40..cfd5fc4157 100644
--- a/thrift-commons/src/main/thrift/common.thrift
+++ b/thrift-commons/src/main/thrift/common.thrift
@@ -74,8 +74,10 @@ struct TDataNodeLocation {
   3: required TEndPoint internalEndPoint
   // TEndPoint for transfering data between DataNodes
   4: required TEndPoint dataBlockManagerEndPoint
-  // TEndPoint for DataNode's ConsensusLayer
-  5: required TEndPoint consensusEndPoint
+  // TEndPoint for DataNode's dataRegion consensus protocol
+  5: required TEndPoint dataRegionConsensusEndPoint
+  // TEndPoint for DataNode's schemaRegion consensus protocol
+  6: required TEndPoint schemaRegionConsensusEndPoint
 }
 
 struct THeartbeatResp {
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 7fa76108e8..6d817f29ce 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -30,10 +30,11 @@ struct TDataNodeRegisterReq {
 }
 
 struct TGlobalConfig {
-  1: required string dataNodeConsensusProtocolClass
-  2: required i32 seriesPartitionSlotNum
-  3: required string seriesPartitionExecutorClass
-  4: required i64 timePartitionInterval
+  1: required string dataRegionConsensusProtocolClass
+  2: required string schemaRegionConsensusProtocolClass
+  3: required i32 seriesPartitionSlotNum
+  4: required string seriesPartitionExecutorClass
+  5: required i64 timePartitionInterval
 }
 
 struct TDataNodeRegisterResp {
@@ -159,13 +160,14 @@ struct TCheckUserPrivilegesReq{
 // ConfigNode
 struct TConfigNodeRegisterReq {
   1: required common.TConfigNodeLocation configNodeLocation
-  2: required string dataNodeConsensusProtocolClass
-  3: required i32 seriesPartitionSlotNum
-  4: required string seriesPartitionExecutorClass
-  5: required i64 defaultTTL
-  6: required i64 timePartitionInterval
-  7: required i32 schemaReplicationFactor
-  8: required i32 dataReplicationFactor
+  2: required string dataRegionConsensusProtocolClass
+  3: required string schemaRegionConsensusProtocolClass
+  4: required i32 seriesPartitionSlotNum
+  5: required string seriesPartitionExecutorClass
+  6: required i64 defaultTTL
+  7: required i64 timePartitionInterval
+  8: required i32 schemaReplicationFactor
+  9: required i32 dataReplicationFactor
 }
 
 struct TConfigNodeRegisterResp {
diff --git a/thrift-multi-leader-consensus/pom.xml b/thrift-multi-leader-consensus/pom.xml
new file mode 100644
index 0000000000..7504f96722
--- /dev/null
+++ b/thrift-multi-leader-consensus/pom.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.iotdb</groupId>
+        <artifactId>iotdb-parent</artifactId>
+        <version>0.14.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <artifactId>thrift-multi-leader-consensus</artifactId>
+    <name>rpc-thrift-multi-leader-consensus</name>
+    <description>Rpc modules for multi leader consensus algorithm</description>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.thrift</groupId>
+            <artifactId>libthrift</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-thrift-commons</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <version>3.2.0</version>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>${project.build.directory}/generated-sources/thrift</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
new file mode 100644
index 0000000000..10c0195506
--- /dev/null
+++ b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+include "common.thrift"
+namespace java org.apache.iotdb.consensus.multileader.thrift
+
+struct TSyncLogReq {
+  1: required common.TConsensusGroupId consensusGroupId
+  2: required list<binary> batches
+}
+
+struct TSyncLogRes {
+  1: required list<common.TSStatus> status
+}
+
+service MultiLeaderConsensusIService {
+  TSyncLogRes syncLog(TSyncLogReq req)
+}
\ No newline at end of file