You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/04/05 10:38:53 UTC

[iotdb] 01/01: finish

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch datanode_consensus_init
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a4204fd360866526b164f2844a6555a80e631255
Author: LebronAl <TX...@gmail.com>
AuthorDate: Tue Apr 5 18:38:26 2022 +0800

    finish
---
 .../resources/conf/iotdb-confignode.properties     |  21 ++-
 .../iotdb/confignode/conf/ConfigNodeConf.java      |  28 ++--
 .../confignode/conf/ConfigNodeDescriptor.java      |  39 +++---
 .../iotdb/confignode/manager/ConsensusManager.java |  49 ++-----
 consensus/README.md                                | 126 +++++++++++------
 consensus/pom.xml                                  |   2 +-
 .../apache/iotdb/consensus/IConsensusFactory.java  |  57 ++++++++
 .../iotdb/consensus/ratis/RatisConsensus.java      | 152 ++++++++-------------
 .../consensus/standalone/StandAloneConsensus.java  |  20 ++-
 .../iotdb/consensus/ratis/RatisConsensusTest.java  |  20 ++-
 .../standalone/StandAloneConsensusTest.java        |  33 +++--
 .../org/apache/iotdb/commons/cluster/Endpoint.java |  10 ++
 .../apache/iotdb/commons/utils/CommonUtils.java    |  10 +-
 .../resources/conf/iotdb-engine.properties         |  19 ++-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  40 +++++-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   6 +
 .../iotdb/db/consensus/ConsensusExample.java       |  30 ++--
 .../apache/iotdb/db/consensus/ConsensusImpl.java   |  66 +++++++++
 .../statemachine/DataRegionStateMachine.java       |   4 +-
 .../statemachine/SchemaRegionStateMachine.java     |   4 +-
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  16 +++
 .../java/org/apache/iotdb/db/service/DataNode.java |  14 +-
 22 files changed, 505 insertions(+), 261 deletions(-)

diff --git a/confignode/src/assembly/resources/conf/iotdb-confignode.properties b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
index 801bf17b44..3aaf61eddc 100644
--- a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
+++ b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
@@ -57,18 +57,25 @@ config_node_rpc_port=22277
 ### consensus protocol configuration
 ####################
 
-# ConfigNodeGroup consensus protocol type
-# These consensus protocol are currently supported:
-# 1. standalone(No protocol, only supports stand-alone machine)
-# 2. ratis(Raft protocol)
+# ConfigNode consensus protocol type
+# These consensus protocols are currently supported:
+# 1. org.apache.iotdb.consensus.standalone.StandAloneConsensus(No protocol, only supports stand-alone machine)
+# 2. org.apache.iotdb.consensus.ratis.RatisConsensus(Raft protocol)
 # Datatype: String
-# consensus_type=standalone
+# config_node_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
 
 # Used for building the ConfigNode consensus group
 # all config node address and internal port, use comma to distinguish
 # every node should have the same config_node_address_lists
 # Datatype: String
-# config_node_group_address_list=0.0.0.0:22278,0.0.0.0:22280,0.0.0.0:22282
+# config_node_group_address_list=0.0.0.0:22278
+
+# DataNode consensus protocol type
+# These consensus protocols are currently supported:
+# 1. org.apache.iotdb.consensus.standalone.StandAloneConsensus(No protocol, only supports stand-alone machine)
+# 2. org.apache.iotdb.consensus.ratis.RatisConsensus(Raft protocol)
+# Datatype: String
+# data_node_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
 
 ####################
 ### DeviceGroup Configuration
@@ -79,7 +86,6 @@ config_node_rpc_port=22277
 # device_group_count=10000
 
 # DeviceGroup hash algorithm
-# Datatype: String
 # These hashing algorithms are currently supported:
 # 1. org.apache.iotdb.commons.hash.BKDRHashExecutor(Default)
 # 2. org.apache.iotdb.commons.hash.APHashExecutor
@@ -87,6 +93,7 @@ config_node_rpc_port=22277
 # 4. org.apache.iotdb.commons.hash.SDBMHashExecutor
 # Also, if you want to implement your own hash algorithm, you can inherit the DeviceGroupHashExecutor class and
 # modify this parameter to correspond to your Java class
+# Datatype: String
 # device_group_hash_executor_class=org.apache.iotdb.commons.hash.BKDRHashExecutor
 
 ####################
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
index c94ebe05a6..f21bb768d8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.confignode.conf;
 
 import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.consensus.common.ConsensusType;
 import org.apache.iotdb.rpc.RpcUtils;
 
 import java.io.File;
@@ -33,14 +32,17 @@ public class ConfigNodeConf {
   /** used for communication between data node and config node */
   private int rpcPort = 22277;
 
-  /** used for communication between data node and data node */
+  /** used for communication between config node and config node */
   private int internalPort = 22278;
 
-  /** ConfigNodeGroup consensus protocol */
-  private ConsensusType consensusType = ConsensusType.STANDALONE;
+  /** ConfigNode consensus protocol */
+  private String configNodeConsensusProtocolClass =
+      "org.apache.iotdb.consensus.ratis.RatisConsensus";
+
+  private String dataNodeConsensusProtocolClass = "org.apache.iotdb.consensus.ratis.RatisConsensus";
 
   /** Used for building the ConfigNode consensus group */
-  private Endpoint[] configNodeGroupAddressList = null;
+  private Endpoint[] configNodeGroupAddressList;
 
   /** Number of DeviceGroups per StorageGroup */
   private int deviceGroupCount = 10000;
@@ -199,12 +201,20 @@ public class ConfigNodeConf {
     this.consensusDir = consensusDir;
   }
 
-  public ConsensusType getConsensusType() {
-    return consensusType;
+  public String getConfigNodeConsensusProtocolClass() {
+    return configNodeConsensusProtocolClass;
+  }
+
+  public void setConfigNodeConsensusProtocolClass(String configNodeConsensusProtocolClass) {
+    this.configNodeConsensusProtocolClass = configNodeConsensusProtocolClass;
+  }
+
+  public String getDataNodeConsensusProtocolClass() {
+    return dataNodeConsensusProtocolClass;
   }
 
-  public void setConsensusType(ConsensusType consensusType) {
-    this.consensusType = consensusType;
+  public void setDataNodeConsensusProtocolClass(String dataNodeConsensusProtocolClass) {
+    this.dataNodeConsensusProtocolClass = dataNodeConsensusProtocolClass;
   }
 
   public Endpoint[] getConfigNodeGroupAddressList() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index f6f035d987..4f2590192d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -19,7 +19,8 @@
 package org.apache.iotdb.confignode.conf;
 
 import org.apache.iotdb.commons.cluster.Endpoint;
-import org.apache.iotdb.consensus.common.ConsensusType;
+import org.apache.iotdb.commons.exception.BadNodeUrlException;
+import org.apache.iotdb.commons.utils.CommonUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -122,9 +123,13 @@ public class ConfigNodeDescriptor {
               properties.getProperty(
                   "config_node_internal_port", String.valueOf(conf.getInternalPort()))));
 
-      conf.setConsensusType(
-          ConsensusType.getConsensusType(
-              properties.getProperty("consensus_type", String.valueOf(conf.getConsensusType()))));
+      conf.setConfigNodeConsensusProtocolClass(
+          properties.getProperty(
+              "config_node_consensus_protocol_class", conf.getConfigNodeConsensusProtocolClass()));
+
+      conf.setDataNodeConsensusProtocolClass(
+          properties.getProperty(
+              "data_node_consensus_protocol_class", conf.getDataNodeConsensusProtocolClass()));
 
       conf.setRpcAdvancedCompressionEnable(
           Boolean.parseBoolean(
@@ -175,25 +180,15 @@ public class ConfigNodeDescriptor {
               properties.getProperty(
                   "data_region_count", String.valueOf(conf.getDataRegionCount()))));
 
-      String addresses = properties.getProperty("config_node_group_address_list", null);
-      if (addresses != null) {
-        String[] addressList = addresses.split(",");
-        Endpoint[] endpointList = new Endpoint[addressList.length];
-        for (int i = 0; i < addressList.length; i++) {
-          String[] ipPort = addressList[i].split(":");
-          if (ipPort.length != 2) {
-            throw new IOException(
-                String.format(
-                    "Parsing parameter config_node_group_address_list error. "
-                        + "The %d-th address must format to ip:port, but currently is %s",
-                    i, addressList[i]));
-          }
-          endpointList[i] = new Endpoint(ipPort[0], Integer.parseInt(ipPort[1]));
-        }
-        conf.setConfigNodeGroupAddressList(endpointList);
-      }
+      String addresses = properties.getProperty("config_node_group_address_list", "0.0.0.0:22278");
 
-    } catch (IOException e) {
+      String[] addressList = addresses.split(",");
+      Endpoint[] endpointList = new Endpoint[addressList.length];
+      for (int i = 0; i < addressList.length; i++) {
+        endpointList[i] = CommonUtils.parseNodeUrl(addressList[i]);
+      }
+      conf.setConfigNodeGroupAddressList(endpointList);
+    } catch (IOException | BadNodeUrlException e) {
       LOGGER.warn("Couldn't load ConfigNode conf file, use default config", e);
     } finally {
       conf.updatePath();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index 3261c6dde0..d48c264790 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -27,11 +27,10 @@ import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.statemachine.PartitionRegionStateMachine;
 import org.apache.iotdb.confignode.physical.PhysicalPlan;
 import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IConsensusFactory;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
-import org.apache.iotdb.consensus.ratis.RatisConsensus;
-import org.apache.iotdb.consensus.standalone.StandAloneConsensus;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,7 +41,6 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 
 /** ConsensusManager maintains consensus class, request will redirect to consensus layer */
@@ -90,42 +88,19 @@ public class ConsensusManager {
     // There is only one ConfigNodeGroup
     consensusGroupId = new ConsensusGroupId(GroupType.PartitionRegion, 0);
 
-    // Implement specific consensus
-    switch (conf.getConsensusType()) {
-      case STANDALONE:
-        constructStandAloneConsensus();
-        break;
-      case RATIS:
-        constructRatisConsensus();
-        break;
-      default:
-        throw new IllegalArgumentException(
-            "Start ConfigNode failed, unrecognized ConsensusType: "
-                + conf.getConsensusType().getTypeName());
-    }
-  }
-
-  private void constructStandAloneConsensus() throws IOException {
-    // Standalone consensus
-    consensusImpl = new StandAloneConsensus(id -> new PartitionRegionStateMachine());
-    consensusImpl.start();
-
-    // Standalone ConsensusGroup
-    consensusImpl.addConsensusGroup(
-        consensusGroupId,
-        Collections.singletonList(
-            new Peer(
-                consensusGroupId, new Endpoint(conf.getRpcAddress(), conf.getInternalPort()))));
-  }
-
-  private void constructRatisConsensus() throws IOException {
     // Ratis consensus local implement
     consensusImpl =
-        RatisConsensus.newBuilder()
-            .setEndpoint(new Endpoint(conf.getRpcAddress(), conf.getInternalPort()))
-            .setStateMachineRegistry(id -> new PartitionRegionStateMachine())
-            .setStorageDir(new File(conf.getConsensusDir()))
-            .build();
+        IConsensusFactory.getConsensusImpl(
+                conf.getConfigNodeConsensusProtocolClass(),
+                new Endpoint(conf.getRpcAddress(), conf.getInternalPort()),
+                new File(conf.getConsensusDir()),
+                gid -> new PartitionRegionStateMachine())
+            .orElseThrow(
+                () ->
+                    new IllegalArgumentException(
+                        String.format(
+                            IConsensusFactory.CONSTRUCT_FAILED_MSG,
+                            conf.getConfigNodeConsensusProtocolClass())));
     consensusImpl.start();
 
     // Build ratis group from user properties
diff --git a/consensus/README.md b/consensus/README.md
index 7fc51e0d8d..4c731a6322 100644
--- a/consensus/README.md
+++ b/consensus/README.md
@@ -20,104 +20,144 @@
 -->
 
 # Overview
+
 The Consensus Package provides the Consensus Layer's service definitions and its implementations.
- 
+
 ### What is Consensus Layer used for?
-Generally, We maintain multiple copies of application data for the purpose of fault-tolerance and data-integrity.
-There are variety of consensus algorithms to manage multiple copies of data, which differs in levels of data consistency and performance.
-Each consensus algorithm may have multiple industrial implementations.
-The Consensus Layer aims to hide all complications behind different consensus algorithms and implementations, providing higher level of abstraction to the user.
+
+Generally, We maintain multiple copies of application data for the purpose of fault-tolerance and
+data-integrity. There are variety of consensus algorithms to manage multiple copies of data, which
+differs in levels of data consistency and performance. Each consensus algorithm may have multiple
+industrial implementations. The Consensus Layer aims to hide all complications behind different
+consensus algorithms and implementations, providing higher level of abstraction to the user.
 
 # Consensus Layer
+
 ### Basic Concepts
 
 * `IStateMachine` is the user application that manages a local copy of data.
-* `Peer` is the smallest consensus unit (inside a process), which holds a `IStateMachine` internally.
+* `Peer` is the smallest consensus unit (inside a process), which holds a `IStateMachine`
+  internally.
 * `ConsensusGroup` is a group of `Peer` all managing the same copy of data.
-* `IConsensus` interface defines the basic functionality provided by Consensus Layer. 
+* `IConsensus` interface defines the basic functionality provided by Consensus Layer.
+* `IConsessusFactory` is the only factory class exposed to other modules to create a consensus layer
+  implementation.
 
-User application can create a `ConsensusGroup` with k `Peer` to store data, 
-i.e. that there will be k copies of data.
+User application can create a `ConsensusGroup` with k `Peer` to store data, i.e. that there will be
+k copies of data.
 
-When write data into a `ConsensusGroup` using `IConsensus::write`, it will be sent to the group leader's `IStateMachine::write` . 
-The leader makes decision about this write operation first, 
-then applies write-operation to the local statemachine using `IStateMachine::write`,
-and forward this operation to other members' `IStateMachine::write` in the same group
+When write data into a `ConsensusGroup` using `IConsensus::write`, it will be sent to the group
+leader's `IStateMachine::write` . The leader makes decision about this write operation first, then
+applies write-operation to the local statemachine using `IStateMachine::write`, and forward this
+operation to other members' `IStateMachine::write` in the same group
 
 ### How to use
+
 1. Define the  `IStateMachine` to manage local copy of data.
 2. Define the `IConsensusRequest` to customize request format
 3. Define the `DataSet` to customize the response format
-4. Pick a concrete `IConsensus` implementation as the backbone consensus algorithm
-
-
+4. Select a specific "IConsensus" class name and call `IConsensusFactory.getConsensusImpl()` to
+   instantiate the corresponding consensus protocol
 
 # Ratis Consensus Implementation
 
-RatisConsensus is a multi-raft implementation of `IConsensus` protocol. It is based on [Apache Ratis](https://ratis.apache.org/).
+RatisConsensus is a multi-raft implementation of `IConsensus` protocol. It is based
+on [Apache Ratis](https://ratis.apache.org/).
+
+### 1. build and start a RatisConsensusImpl
 
-### 1. build and start a RatisConsensusService
 ```java
-IConsensus consensusService = RatisConsensus.newBuilder()
-              .setEndpoint(peers.get(i).getEndpoint())
-              .setStateMachineRegistry(groupId -> new IntegerCounter())
-              .setStorageDir(peersStorage.get(i))
-              .build();
-consensusService.start();
+IConsensus consensusImpl =
+    IConsensusFactory.getConsensusImpl(
+    "org.apache.iotdb.consensus.ratis.RatisConsensus",
+    new Endpoint(conf.getRpcAddress(), conf.getInternalPort()),
+    new File(conf.getConsensusDir()),
+    gid -> new PartitionRegionStateMachine())
+    .orElseThrow(() ->
+    new IllegalArgumentException(
+    String.format(
+    IConsensusFactory.CONSTRUCT_FAILED_MSG,
+    "org.apache.iotdb.consensus.ratis.RatisConsensus")));
+
+consensusImpl.start();
 ```
-* `endpoint` is the communication endpoint for this consensus service.
-* `StateMachineRegistery` 
-* `StoreageDir` specifies the location to store RaftLog. Assign a fix location so that the RatisConsensus knows where to recover when crashes and restarts.
+
+* `endpoint` is the communication endpoint for this consensusImpl.
+* `StateMachineRegistery`
+* `StoreageDir` specifies the location to store RaftLog. Assign a fix location so that the
+  RatisConsensus knows where to recover when crashes and restarts.
 
 ### 2. assign local RatisConsensus a new Group
+
 ```java
 ConsensusGroup group = new ConsensusGroup(...);
-response = service.addConsensusGroup(group.getGroupId(), group.getPeers());
+response = consensusImpl.addConsensusGroup(group.getGroupId(),group.getPeers());
 ```
-The underling Raft Service will initialize its states, and reaching out to other peers to elect the raft leader.
+
+The underling consensusImpl will initialize its states, and reaching out to other peers to elect the
+raft leader.
 
 **Notice**: this request may fail. It's caller's responsibility to retry / rollback.
 
 ### 3. change group configuration
+
 #### (1) remove a member
+
 suppose now the group contains peer[0,1,2], and we want to remove[1,2] from this group
+
 ```java
 // the following code should be called in peer both 1 & 2
 // first  use removePeer to inform the group leader of configuration change 
-service.removePeer(gid, myself);
+consensusImpl.removePeer(gid,myself);
 // then use removeConsensusGroup to clean up local states and data
-service.removeConsensusGroup(gid);
+consensusImpl.removeConsensusGroup(gid);
 ```
-**Notice**: either of `removePeer` or `removeConsensusGroup` may fail. 
-It's caller's responsibility to retry and make these two calls atomic.
+
+**Notice**: either of `removePeer` or `removeConsensusGroup` may fail. It's caller's responsibility
+to retry and make these two calls atomic.
+
 #### (2) add a member
-adding a new member is similar to removing a member except that you should call `addConsensusGroup` first and then `addPeer`
+
+adding a new member is similar to removing a member except that you should call `addConsensusGroup`
+first and then `addPeer`
+
 ```java
 // the following code should be called in peer both 1 & 2
 // first addConsensusGroup to initialize local states
-service.addConsensusGroup(gid);
+consensusImpl.addConsensusGroup(gid);
 // then use addPeer to inform the previous group members of joining a new member
-service.addPeer(gid, myself);
+consensusImpl.addPeer(gid,myself);
 ```
+
 #### (3) add/remove multiple members
+
 ```java
 // pre. For each member newly added, call addConsensusGroup locally to initialize
-serivice.changePeer(group.getGroupId(), newGroupmember);
+consensusImpl.changePeer(group.getGroupId(),newGroupmember);
 // after. For each member removed, call removeConsensusGroup locally to clean up
 ```
+
 **Notice**: the old group and the new group must overlap in at least one member.
+
 ### 4. write data
+
 ```java
-ConsensusWriteResponse response = service.write(gid, request)
-if (response.isSuccess() && response.getStates().code() == 200) {...}
+ConsensusWriteResponse response = consensusImpl.write(gid,request)
+if(response.isSuccess() && response.getStates().code() == 200){
+    ...
+}
 ```
+
 ### 5. read data
+
 ```java
-ConsensusReadResponse response = consensus.read(gid, request);
-if (response.isSuccess()) {
-    MyDataSet result = (MyDataSet) response.getDataset();
+ConsensusReadResponse response = consensusImpl.read(gid,request);
+if(response.isSuccess()){
+    MyDataSet result=(MyDataSet)response.getDataset();
 }
 ```
-**NOTICE**: currently in RatisConsensus, read will direct read the local copy. Thus, the result may be stale and not linearizable!
+
+**NOTICE**: currently in RatisConsensus, read will direct read the local copy. Thus, the result may
+be stale and not linearizable!
 
 
diff --git a/consensus/pom.xml b/consensus/pom.xml
index 8f6646eae6..f668d8b4bd 100644
--- a/consensus/pom.xml
+++ b/consensus/pom.xml
@@ -54,7 +54,7 @@
         <dependency>
             <groupId>org.apache.iotdb</groupId>
             <artifactId>node-commons</artifactId>
-            <version>0.14.0-SNAPSHOT</version>
+            <version>${project.version}</version>
             <scope>compile</scope>
         </dependency>
     </dependencies>
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/IConsensusFactory.java b/consensus/src/main/java/org/apache/iotdb/consensus/IConsensusFactory.java
new file mode 100644
index 0000000000..9f36de4396
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/IConsensusFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus;
+
+import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Optional;
+
+public class IConsensusFactory {
+  public static final String CONSTRUCT_FAILED_MSG =
+      "Construct consensusImpl failed, Please check your consensus className %s";
+
+  private static final Logger logger = LoggerFactory.getLogger(IConsensusFactory.class);
+
+  public static Optional<IConsensus> getConsensusImpl(
+      String className, Endpoint endpoint, File storageDir, IStateMachine.Registry registry) {
+    try {
+      Class<?> executor = Class.forName(className);
+      Constructor<?> executorConstructor =
+          executor.getDeclaredConstructor(Endpoint.class, File.class, IStateMachine.Registry.class);
+      executorConstructor.setAccessible(true);
+      return Optional.of(
+          (IConsensus) executorConstructor.newInstance(endpoint, storageDir, registry));
+    } catch (ClassNotFoundException
+        | NoSuchMethodException
+        | InstantiationException
+        | IllegalAccessException
+        | InvocationTargetException e) {
+      logger.error("Couldn't Construct IConsensus class: {}", className, e);
+    }
+    return Optional.empty();
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 8e70f9653a..68094643cc 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -73,7 +73,7 @@ import java.util.stream.Collectors;
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
-public class RatisConsensus implements IConsensus {
+class RatisConsensus implements IConsensus {
   // the unique net communication endpoint
   private final RaftPeer myself;
 
@@ -89,41 +89,40 @@ public class RatisConsensus implements IConsensus {
   private static final int LEADER_PRIORITY = 1;
 
   private Logger logger = LoggerFactory.getLogger(RatisConsensus.class);
-  /**
-   * This function will use the previous client for groupId to query the latest group info It will
-   * update the new group info into the groupMap and rebuild its client
-   *
-   * @throws ConsensusGroupNotExistException when cannot get the group info
-   */
-  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
-      throws ConsensusGroupNotExistException {
-    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
-    RaftClient current = clientMap.get(raftGroupId);
-    try {
-      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
 
-      if (!reply.isSuccess()) {
-        throw new ConsensusGroupNotExistException(groupId);
-      }
+  public RatisConsensus(Endpoint endpoint, File ratisStorageDir, IStateMachine.Registry registry)
+      throws IOException {
 
-      raftGroupMap.put(raftGroupId, reply.getGroup());
-      buildClientAndCache(raftGroupMap.get(raftGroupId));
-    } catch (IOException e) {
-      throw new ConsensusGroupNotExistException(groupId);
-    }
-  }
+    this.clientMap = new ConcurrentHashMap<>();
+    this.raftGroupMap = new ConcurrentHashMap<>();
+    this.localFakeId = ClientId.randomId();
+    this.localFakeCallId = new AtomicLong(0);
 
-  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
-      throws RatisRequestFailedException {
-    RaftClient client = clientMap.get(raftGroupId);
-    // notify the group leader of configuration change
-    RaftClientReply reply = null;
-    try {
-      reply = client.admin().setConfiguration(peers);
-    } catch (IOException e) {
-      throw new RatisRequestFailedException(e);
+    // create a RaftPeer as endpoint of comm
+    String address = Utils.IPAddress(endpoint);
+    myself = Utils.toRaftPeer(endpoint, DEFAULT_PRIORITY);
+
+    RaftProperties properties = new RaftProperties();
+
+    // set the storage directory (different for each peer) in RaftProperty object
+    if (ratisStorageDir == null || !ratisStorageDir.isDirectory()) {
+      ratisStorageDir = new File("./" + myself.getId().toString());
     }
-    return reply;
+    RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(ratisStorageDir));
+
+    // set the port which server listen to in RaftProperty object
+    final int port = NetUtils.createSocketAddr(address).getPort();
+    GrpcConfigKeys.Server.setPort(properties, port);
+
+    server =
+        RaftServer.newBuilder()
+            .setServerId(myself.getId())
+            .setProperties(properties)
+            .setStateMachineRegistry(
+                raftGroupId ->
+                    new ApplicationStateMachineProxy(
+                        registry.apply(Utils.toConsensusGroupId(raftGroupId))))
+            .build();
   }
 
   @Override
@@ -517,71 +516,40 @@ public class RatisConsensus implements IConsensus {
     }
   }
 
-  private RatisConsensus(Endpoint endpoint, File ratisStorageDir, IStateMachine.Registry registry)
-      throws IOException {
-
-    this.clientMap = new ConcurrentHashMap<>();
-    this.raftGroupMap = new ConcurrentHashMap<>();
-    this.localFakeId = ClientId.randomId();
-    this.localFakeCallId = new AtomicLong(0);
-
-    // create a RaftPeer as endpoint of comm
-    String address = Utils.IPAddress(endpoint);
-    myself = Utils.toRaftPeer(endpoint, DEFAULT_PRIORITY);
+  /**
+   * This function will use the previous client for groupId to query the latest group info It will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
 
-    RaftProperties properties = new RaftProperties();
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
 
-    // set the storage directory (different for each peer) in RaftProperty object
-    if (ratisStorageDir == null || !ratisStorageDir.isDirectory()) {
-      ratisStorageDir = new File("./" + myself.getId().toString());
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
     }
-    RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(ratisStorageDir));
-
-    // set the port which server listen to in RaftProperty object
-    final int port = NetUtils.createSocketAddr(address).getPort();
-    GrpcConfigKeys.Server.setPort(properties, port);
-
-    server =
-        RaftServer.newBuilder()
-            .setServerId(myself.getId())
-            .setProperties(properties)
-            .setStateMachineRegistry(
-                raftGroupId ->
-                    new ApplicationStateMachineProxy(
-                        registry.apply(Utils.toConsensusGroupId(raftGroupId))))
-            .build();
-  }
-
-  public static Builder newBuilder() {
-    return new Builder();
   }
 
-  public static class Builder {
-    private Endpoint endpoint;
-    private IStateMachine.Registry registry;
-    private File storageDir;
-
-    public Builder() {
-      storageDir = null;
-    }
-
-    public Builder setEndpoint(Endpoint endpoint) {
-      this.endpoint = endpoint;
-      return this;
-    }
-
-    public Builder setStateMachineRegistry(IStateMachine.Registry registry) {
-      this.registry = registry;
-      return this;
-    }
-
-    public Builder setStorageDir(File dir) {
-      this.storageDir = dir;
-      return this;
-    }
-
-    public RatisConsensus build() throws IOException {
-      return new RatisConsensus(endpoint, storageDir, registry);
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException(e);
     }
+    return reply;
   }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
index 6f1582a268..91a767055b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.consensus.standalone;
 
+import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.DataSet;
@@ -31,8 +32,10 @@ import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
 import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.consensus.statemachine.IStateMachine.Registry;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -41,19 +44,25 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * A simple single replica consensus implementation.
+ * A simple consensus implementation, which can be used when replicaNum is 1.
+ *
+ * <p>Notice: The stateMachine needs to implement WAL itself to ensure recovery after a restart
  *
  * <p>any module can use `IConsensus consensusImpl = new StandAloneConsensus(id -> new
  * EmptyStateMachine());` to perform an initialization implementation.
  */
-public class StandAloneConsensus implements IConsensus {
+class StandAloneConsensus implements IConsensus {
 
+  private final Endpoint thisNode;
+  private final File storageDir;
   private final IStateMachine.Registry registry;
-  private final Map<ConsensusGroupId, StandAloneServerImpl> stateMachineMap;
+  private final Map<ConsensusGroupId, StandAloneServerImpl> stateMachineMap =
+      new ConcurrentHashMap<>();
 
-  public StandAloneConsensus(IStateMachine.Registry registry) {
+  public StandAloneConsensus(Endpoint thisNode, File storageDir, Registry registry) {
+    this.thisNode = thisNode;
+    this.storageDir = storageDir;
     this.registry = registry;
-    this.stateMachineMap = new ConcurrentHashMap<>();
   }
 
   @Override
@@ -136,6 +145,7 @@ public class StandAloneConsensus implements IConsensus {
           v.stop();
           return null;
         });
+
     if (!exist.get()) {
       return ConsensusGenericResponse.newBuilder()
           .setException(new ConsensusGroupNotExistException(groupId))
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index 2558ad2579..581fdd55c5 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.GroupType;
 import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IConsensusFactory;
 import org.apache.iotdb.consensus.common.ConsensusGroup;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
@@ -52,6 +53,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 public class RatisConsensusTest {
 
+  private static final String RATIS_CLASS_NAME = "org.apache.iotdb.consensus.ratis.RatisConsensus";
+
   private static class TestDataSet implements DataSet {
     private int number;
 
@@ -65,7 +68,7 @@ public class RatisConsensusTest {
   }
 
   private static class TestRequest {
-    private int cmd;
+    private final int cmd;
 
     public TestRequest(ByteBuffer buffer) {
       cmd = buffer.getInt();
@@ -136,11 +139,16 @@ public class RatisConsensusTest {
     servers = new ArrayList<>();
     for (int i = 0; i < 3; i++) {
       servers.add(
-          RatisConsensus.newBuilder()
-              .setEndpoint(peers.get(i).getEndpoint())
-              .setStateMachineRegistry(groupId -> new IntegerCounter())
-              .setStorageDir(peersStorage.get(i))
-              .build());
+          IConsensusFactory.getConsensusImpl(
+                  RATIS_CLASS_NAME,
+                  peers.get(i).getEndpoint(),
+                  peersStorage.get(i),
+                  groupId -> new IntegerCounter())
+              .orElseThrow(
+                  () ->
+                      new IllegalArgumentException(
+                          String.format(
+                              IConsensusFactory.CONSTRUCT_FAILED_MSG, RATIS_CLASS_NAME))));
       servers.get(i).start();
     }
   }
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
index ded008960d..ddb86f817a 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.GroupType;
 import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IConsensusFactory;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
@@ -40,6 +41,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
@@ -52,6 +54,8 @@ import static org.junit.Assert.assertTrue;
 
 public class StandAloneConsensusTest {
 
+  private static final String STANDALONE_CONSENSUS_CLASS_NAME =
+      "org.apache.iotdb.consensus.standalone.StandAloneConsensus";
   private IConsensus consensusImpl;
   private final TestEntry entry1 = new TestEntry(0);
   private final ByteBufferConsensusRequest entry2 =
@@ -108,16 +112,25 @@ public class StandAloneConsensusTest {
   @Before
   public void setUp() throws Exception {
     consensusImpl =
-        new StandAloneConsensus(
-            gid -> {
-              switch (gid.getType()) {
-                case SchemaRegion:
-                  return new TestStateMachine(true);
-                case DataRegion:
-                  return new TestStateMachine(false);
-              }
-              return new EmptyStateMachine();
-            });
+        IConsensusFactory.getConsensusImpl(
+                STANDALONE_CONSENSUS_CLASS_NAME,
+                new Endpoint("localhost", 6667),
+                new File("./"),
+                gid -> {
+                  switch (gid.getType()) {
+                    case SchemaRegion:
+                      return new TestStateMachine(true);
+                    case DataRegion:
+                      return new TestStateMachine(false);
+                  }
+                  return new EmptyStateMachine();
+                })
+            .orElseThrow(
+                () ->
+                    new IllegalArgumentException(
+                        String.format(
+                            IConsensusFactory.CONSTRUCT_FAILED_MSG,
+                            STANDALONE_CONSENSUS_CLASS_NAME)));
     consensusImpl.start();
   }
 
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/cluster/Endpoint.java b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/Endpoint.java
index ff6e8b118b..25d7b84d0f 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/cluster/Endpoint.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/Endpoint.java
@@ -39,10 +39,20 @@ public class Endpoint {
     return ip;
   }
 
+  public Endpoint setIp(String ip) {
+    this.ip = ip;
+    return this;
+  }
+
   public int getPort() {
     return port;
   }
 
+  public Endpoint setPort(int port) {
+    this.port = port;
+    return this;
+  }
+
   public void serializeImpl(ByteBuffer buffer) {
     byte[] bytes = ip.getBytes();
     buffer.putInt(bytes.length);
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/utils/CommonUtils.java b/node-commons/src/main/java/org/apache/iotdb/commons/utils/CommonUtils.java
index 3d9995edbc..e9e170232c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/utils/CommonUtils.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/utils/CommonUtils.java
@@ -19,8 +19,8 @@
 
 package org.apache.iotdb.commons.utils;
 
+import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.exception.BadNodeUrlException;
-import org.apache.iotdb.service.rpc.thrift.EndPoint;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,8 +31,8 @@ import java.util.List;
 public class CommonUtils {
   private static final Logger logger = LoggerFactory.getLogger(CommonUtils.class);
 
-  public static EndPoint parseNodeUrl(String nodeUrl) throws BadNodeUrlException {
-    EndPoint result = new EndPoint();
+  public static Endpoint parseNodeUrl(String nodeUrl) throws BadNodeUrlException {
+    Endpoint result = new Endpoint();
     String[] split = nodeUrl.split(":");
     if (split.length != 2) {
       logger.warn("Bad node url: {}", nodeUrl);
@@ -49,8 +49,8 @@ public class CommonUtils {
     return result;
   }
 
-  public static List<EndPoint> parseNodeUrls(List<String> nodeUrls) throws BadNodeUrlException {
-    List<EndPoint> result = new ArrayList<>();
+  public static List<Endpoint> parseNodeUrls(List<String> nodeUrls) throws BadNodeUrlException {
+    List<Endpoint> result = new ArrayList<>();
     for (String url : nodeUrls) {
       result.add(parseNodeUrl(url));
     }
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index d6598a5d14..ae84c4f2f1 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -33,9 +33,14 @@ rpc_port=6667
 internal_ip=127.0.0.1
 
 # Datatype: int
-# port for communication between cluster nodes.
+# port for coordinator's communication between cluster nodes.
 internal_port=9003
 
+
+# Datatype: int
+# port for consensus's communication between cluster nodes.
+consensus_port=40010
+
 # comma-separated {IP/DOMAIN}:internal_port pairs
 # Data nodes store config nodes ip and port to communicate with config nodes.
 # Several nodes will be picked randomly to send the request, the number of nodes
@@ -140,6 +145,18 @@ config_nodes=127.0.0.1:22277
 # If its prefix is "/", then the path is absolute. Otherwise, it is relative.
 # wal_dir=data/wal
 
+# consensus dir
+# If this property is unset, system will save the data in the default relative path directory under the IoTDB folder(i.e., %IOTDB_HOME%/data).
+# If it is absolute, system will save the data in the exact location it points to.
+# If it is relative, system will save the data in the relative path directory it indicates under the IoTDB folder.
+# Note: If consensus_dir is assigned an empty string(i.e.,zero-size), it will be handled as a relative path.
+# For windows platform
+# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is absolute. Otherwise, it is relative.
+# consensus_dir=data\\consensus
+# For Linux platform
+# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
+# consensus_dir=data/consensus
+
 
 # TSFile storage file system. Currently, Tsfiles are supported to be stored in LOCAL file system or HDFS.
 # Datatype: FSType
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 580d8f7552..a78d38baa3 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -260,6 +260,9 @@ public class IoTDBConfig {
   /** Wal directory. */
   private String walDir = DEFAULT_BASE_DIR + File.separator + "wal";
 
+  /** Consensus directory. */
+  private String consensusDir = DEFAULT_BASE_DIR + File.separator + "consensus";
+
   /** Maximum MemTable number. Invalid when enableMemControl is true. */
   private int maxMemtableNumber = 0;
 
@@ -820,12 +823,22 @@ public class IoTDBConfig {
   /** Internal ip for data node */
   private String internalIp;
 
-  /** Internal port of data node */
+  /** Internal port for coordinator */
   private int internalPort = 9003;
 
+  /** Internal port for consensus protocol */
+  private int consensusPort = 40010;
+
   /** The max time of data node waiting to join into the cluster */
   private long joinClusterTimeOutMs = TimeUnit.SECONDS.toMillis(60);
 
+  /**
+   * The consensus protocol class. The Datanode should communicate with ConfigNode on startup and
+   * set this variable so that the correct class name can be obtained later when the consensus layer
+   * singleton is initialized
+   */
+  private String consensusProtocolClass = "org.apache.iotdb.consensus.ratis.RatisConsensus";
+
   public IoTDBConfig() {
     try {
       internalIp = InetAddress.getLocalHost().getHostAddress();
@@ -937,6 +950,7 @@ public class IoTDBConfig {
     syncDir = addHomeDir(syncDir);
     tracingDir = addHomeDir(tracingDir);
     walDir = addHomeDir(walDir);
+    consensusDir = addHomeDir(consensusDir);
     indexRootFolder = addHomeDir(indexRootFolder);
     extDir = addHomeDir(extDir);
     udfDir = addHomeDir(udfDir);
@@ -1142,6 +1156,14 @@ public class IoTDBConfig {
     this.walDir = walDir;
   }
 
+  public String getConsensusDir() {
+    return consensusDir;
+  }
+
+  public void setConsensusDir(String consensusDir) {
+    this.consensusDir = consensusDir;
+  }
+
   public String getExtDir() {
     return extDir;
   }
@@ -2579,6 +2601,14 @@ public class IoTDBConfig {
     this.internalPort = internalPort;
   }
 
+  public int getConsensusPort() {
+    return consensusPort;
+  }
+
+  public void setConsensusPort(int consensusPort) {
+    this.consensusPort = consensusPort;
+  }
+
   public long getJoinClusterTimeOutMs() {
     return joinClusterTimeOutMs;
   }
@@ -2586,4 +2616,12 @@ public class IoTDBConfig {
   public void setJoinClusterTimeOutMs(long joinClusterTimeOutMs) {
     this.joinClusterTimeOutMs = joinClusterTimeOutMs;
   }
+
+  public String getConsensusProtocolClass() {
+    return consensusProtocolClass;
+  }
+
+  public void setConsensusProtocolClass(String consensusProtocolClass) {
+    this.consensusProtocolClass = consensusProtocolClass;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index e384b5c2b1..0e2934ebf6 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -243,6 +243,8 @@ public class IoTDBDescriptor {
 
       conf.setWalDir(properties.getProperty("wal_dir", conf.getWalDir()));
 
+      conf.setConsensusDir(properties.getProperty("consensus_dir", conf.getConsensusDir()));
+
       int mlogBufferSize =
           Integer.parseInt(
               properties.getProperty(
@@ -1472,6 +1474,10 @@ public class IoTDBDescriptor {
     conf.setInternalPort(
         Integer.parseInt(
             properties.getProperty("internal_port", Integer.toString(conf.getInternalPort()))));
+
+    conf.setConsensusPort(
+        Integer.parseInt(
+            properties.getProperty("consensus_port", Integer.toString(conf.getConsensusPort()))));
   }
 
   /** Get default encode algorithm by data type */
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusExample.java b/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusExample.java
index 9b1a2ed498..e9ecb69131 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusExample.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusExample.java
@@ -23,10 +23,9 @@ import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.GroupType;
 import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IConsensusFactory;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
-import org.apache.iotdb.consensus.standalone.StandAloneConsensus;
-import org.apache.iotdb.consensus.statemachine.EmptyStateMachine;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.statemachine.DataRegionStateMachine;
 import org.apache.iotdb.db.consensus.statemachine.SchemaRegionStateMachine;
@@ -35,6 +34,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
@@ -43,16 +43,22 @@ public class ConsensusExample {
 
   public static void main(String[] args) throws IllegalPathException, IOException {
     IConsensus consensusImpl =
-        new StandAloneConsensus(
-            id -> {
-              switch (id.getType()) {
-                case SchemaRegion:
-                  return new SchemaRegionStateMachine();
-                case DataRegion:
-                  return new DataRegionStateMachine();
-              }
-              return new EmptyStateMachine();
-            });
+        IConsensusFactory.getConsensusImpl(
+                "org.apache.iotdb.consensus.standalone.StandAloneConsensus",
+                new Endpoint("localhost", 6667),
+                new File("./"),
+                gid -> {
+                  switch (gid.getType()) {
+                    case SchemaRegion:
+                      return new SchemaRegionStateMachine();
+                    case DataRegion:
+                      return new DataRegionStateMachine();
+                  }
+                  throw new IllegalArgumentException(
+                      String.format("Unexpected consensusGroup %s", gid));
+                })
+            .orElseThrow(
+                () -> new IllegalArgumentException(IConsensusFactory.CONSTRUCT_FAILED_MSG));
     consensusImpl.start();
     InsertRowPlan plan = getInsertRowPlan();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusImpl.java b/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusImpl.java
new file mode 100644
index 0000000000..9fd76a7585
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusImpl.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.consensus;
+
+import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IConsensusFactory;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+
+import java.io.File;
+
+public class ConsensusImpl {
+
+  private ConsensusImpl() {}
+
+  public IConsensus getInstance() {
+    return ConsensusImplHolder.INSTANCE;
+  }
+
+  private static class ConsensusImplHolder {
+
+    private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
+    private static final IConsensus INSTANCE =
+        IConsensusFactory.getConsensusImpl(
+                conf.getConsensusProtocolClass(),
+                new Endpoint(conf.getInternalIp(), conf.getConsensusPort()),
+                new File(conf.getConsensusDir()),
+                gid -> {
+                  switch (gid.getType()) {
+                    case SchemaRegion:
+                      return StorageEngine.getInstance().getOrCreateSchemaRegionStateMachine(gid);
+                    case DataRegion:
+                      return StorageEngine.getInstance().getOrCreateDataRegionStateMachine(gid);
+                  }
+                  throw new IllegalArgumentException(
+                      String.format("Unexpected consensusGroup %s", gid));
+                })
+            .orElseThrow(
+                () ->
+                    new IllegalArgumentException(
+                        String.format(
+                            IConsensusFactory.CONSTRUCT_FAILED_MSG,
+                            conf.getConsensusProtocolClass())));
+
+    private ConsensusImplHolder() {}
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 4ef3ad8f07..ff59f952c8 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -39,13 +39,13 @@ public class DataRegionStateMachine extends BaseStateMachine {
 
   @Override
   protected TSStatus write(FragmentInstance fragmentInstance) {
-    logger.info("Execute write plan in DataRegionStateMachine : {}", fragmentInstance);
+    logger.info("Execute write plan in DataRegionStateMachine");
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
   @Override
   protected DataSet read(FragmentInstance fragmentInstance) {
-    logger.info("Execute read plan in DataRegionStateMachine: {}", fragmentInstance);
+    logger.info("Execute read plan in DataRegionStateMachine");
     return null;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
index 628874f1d3..3c3be5d208 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
@@ -39,13 +39,13 @@ public class SchemaRegionStateMachine extends BaseStateMachine {
 
   @Override
   protected TSStatus write(FragmentInstance fragmentInstance) {
-    logger.info("Execute write plan in SchemaRegionStateMachine : {}", fragmentInstance);
+    logger.info("Execute write plan in SchemaRegionStateMachine");
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
   @Override
   protected DataSet read(FragmentInstance fragmentInstance) {
-    logger.info("Execute read plan in SchemaRegionStateMachine: {}", fragmentInstance);
+    logger.info("Execute read plan in SchemaRegionStateMachine");
     return null;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 7c92e4216b..d8b4096b49 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.engine;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.exception.ShutdownException;
 import org.apache.iotdb.commons.partition.TimePartitionSlot;
 import org.apache.iotdb.commons.service.IService;
@@ -28,6 +29,9 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.ServerConfigConsistent;
+import org.apache.iotdb.db.consensus.statemachine.BaseStateMachine;
+import org.apache.iotdb.db.consensus.statemachine.DataRegionStateMachine;
+import org.apache.iotdb.db.consensus.statemachine.SchemaRegionStateMachine;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.engine.flush.CloseFileListener;
 import org.apache.iotdb.db.engine.flush.FlushListener;
@@ -136,6 +140,8 @@ public class StorageEngine implements IService {
   private List<CloseFileListener> customCloseFileListeners = new ArrayList<>();
   private List<FlushListener> customFlushListeners = new ArrayList<>();
 
+  private final Map<ConsensusGroupId, BaseStateMachine> stateMachineMap = new ConcurrentHashMap<>();
+
   private StorageEngine() {}
 
   public static StorageEngine getInstance() {
@@ -1073,6 +1079,16 @@ public class StorageEngine implements IService {
     }
   }
 
+  public SchemaRegionStateMachine getOrCreateSchemaRegionStateMachine(ConsensusGroupId gid) {
+    return (SchemaRegionStateMachine)
+        stateMachineMap.computeIfAbsent(gid, id -> new SchemaRegionStateMachine());
+  }
+
+  public DataRegionStateMachine getOrCreateDataRegionStateMachine(ConsensusGroupId gid) {
+    return (DataRegionStateMachine)
+        stateMachineMap.computeIfAbsent(gid, id -> new DataRegionStateMachine());
+  }
+
   static class InstanceHolder {
 
     private static final StorageEngine INSTANCE = new StorageEngine();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index d5533e6cd7..eaa85865be 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.service;
 
+import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.BadNodeUrlException;
 import org.apache.iotdb.commons.exception.ConfigurationException;
@@ -62,7 +63,7 @@ public class DataNode implements DataNodeMBean {
    */
   private static final int DEFAULT_JOIN_RETRY = 10;
 
-  private EndPoint thisNode = new EndPoint();
+  private Endpoint thisNode = new Endpoint();
 
   private int dataNodeID;
 
@@ -122,7 +123,7 @@ public class DataNode implements DataNodeMBean {
   }
 
   public void joinCluster() throws StartupException {
-    List<EndPoint> configNodes;
+    List<Endpoint> configNodes;
     try {
       configNodes =
           CommonUtils.parseNodeUrls(IoTDBDescriptor.getInstance().getConfig().getConfigNodeUrls());
@@ -134,12 +135,13 @@ public class DataNode implements DataNodeMBean {
     while (retry > 0) {
       // randomly pick up a config node to try
       Random random = new Random();
-      EndPoint configNode = configNodes.get(random.nextInt(configNodes.size()));
+      Endpoint configNode = configNodes.get(random.nextInt(configNodes.size()));
       logger.info("start joining the cluster with the help of {}", configNode);
       try {
         ConfigIService.Client client = createClient(configNode);
         DataNodeRegisterResp dataNodeRegisterResp =
-            client.registerDataNode(new DataNodeRegisterReq(thisNode));
+            client.registerDataNode(
+                new DataNodeRegisterReq(new EndPoint(thisNode.getIp(), thisNode.getPort())));
         if (dataNodeRegisterResp.getRegisterResult().getCode()
             == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
           dataNodeID = dataNodeRegisterResp.getDataNodeID();
@@ -199,13 +201,13 @@ public class DataNode implements DataNodeMBean {
     private DataNodeHolder() {}
   }
 
-  private ConfigIService.Client createClient(EndPoint endPoint) throws IoTDBConnectionException {
+  private ConfigIService.Client createClient(Endpoint endpoint) throws IoTDBConnectionException {
     TTransport transport;
     try {
       transport =
           RpcTransportFactory.INSTANCE.getTransport(
               // as there is a try-catch already, we do not need to use TSocket.wrap
-              endPoint.getIp(), endPoint.getPort(), 2000);
+              endpoint.getIp(), endpoint.getPort(), 2000);
       transport.open();
     } catch (TTransportException e) {
       throw new IoTDBConnectionException(e);