You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/04/05 10:43:53 UTC

[iotdb] branch datanode_consensus_init updated (a4204fd360 -> eb20560fb9)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a change to branch datanode_consensus_init
in repository https://gitbox.apache.org/repos/asf/iotdb.git


 discard a4204fd360 finish
     new eb20560fb9 finish

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (a4204fd360)
            \
             N -- N -- N   refs/heads/datanode_consensus_init (eb20560fb9)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 consensus/README.md | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)


[iotdb] 01/01: finish

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch datanode_consensus_init
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit eb20560fb92618f29b2fed68ef65cd2d9d5aa28e
Author: LebronAl <TX...@gmail.com>
AuthorDate: Tue Apr 5 18:38:26 2022 +0800

    finish
---
 .../resources/conf/iotdb-confignode.properties     |  21 ++-
 .../iotdb/confignode/conf/ConfigNodeConf.java      |  28 ++--
 .../confignode/conf/ConfigNodeDescriptor.java      |  39 +++---
 .../iotdb/confignode/manager/ConsensusManager.java |  49 ++-----
 consensus/README.md                                | 126 +++++++++++------
 consensus/pom.xml                                  |   2 +-
 .../apache/iotdb/consensus/IConsensusFactory.java  |  57 ++++++++
 .../iotdb/consensus/ratis/RatisConsensus.java      | 152 ++++++++-------------
 .../consensus/standalone/StandAloneConsensus.java  |  20 ++-
 .../iotdb/consensus/ratis/RatisConsensusTest.java  |  20 ++-
 .../standalone/StandAloneConsensusTest.java        |  33 +++--
 .../org/apache/iotdb/commons/cluster/Endpoint.java |  10 ++
 .../apache/iotdb/commons/utils/CommonUtils.java    |  10 +-
 .../resources/conf/iotdb-engine.properties         |  19 ++-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  40 +++++-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   6 +
 .../iotdb/db/consensus/ConsensusExample.java       |  30 ++--
 .../apache/iotdb/db/consensus/ConsensusImpl.java   |  66 +++++++++
 .../statemachine/DataRegionStateMachine.java       |   4 +-
 .../statemachine/SchemaRegionStateMachine.java     |   4 +-
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  16 +++
 .../java/org/apache/iotdb/db/service/DataNode.java |  14 +-
 22 files changed, 505 insertions(+), 261 deletions(-)

diff --git a/confignode/src/assembly/resources/conf/iotdb-confignode.properties b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
index 801bf17b44..3aaf61eddc 100644
--- a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
+++ b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
@@ -57,18 +57,25 @@ config_node_rpc_port=22277
 ### consensus protocol configuration
 ####################
 
-# ConfigNodeGroup consensus protocol type
-# These consensus protocol are currently supported:
-# 1. standalone(No protocol, only supports stand-alone machine)
-# 2. ratis(Raft protocol)
+# ConfigNode consensus protocol type
+# These consensus protocols are currently supported:
+# 1. org.apache.iotdb.consensus.standalone.StandAloneConsensus(No protocol, only supports stand-alone machine)
+# 2. org.apache.iotdb.consensus.ratis.RatisConsensus(Raft protocol)
 # Datatype: String
-# consensus_type=standalone
+# config_node_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
 
 # Used for building the ConfigNode consensus group
 # all config node address and internal port, use comma to distinguish
 # every node should have the same config_node_address_lists
 # Datatype: String
-# config_node_group_address_list=0.0.0.0:22278,0.0.0.0:22280,0.0.0.0:22282
+# config_node_group_address_list=0.0.0.0:22278
+
+# DataNode consensus protocol type
+# These consensus protocols are currently supported:
+# 1. org.apache.iotdb.consensus.standalone.StandAloneConsensus(No protocol, only supports stand-alone machine)
+# 2. org.apache.iotdb.consensus.ratis.RatisConsensus(Raft protocol)
+# Datatype: String
+# data_node_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
 
 ####################
 ### DeviceGroup Configuration
@@ -79,7 +86,6 @@ config_node_rpc_port=22277
 # device_group_count=10000
 
 # DeviceGroup hash algorithm
-# Datatype: String
 # These hashing algorithms are currently supported:
 # 1. org.apache.iotdb.commons.hash.BKDRHashExecutor(Default)
 # 2. org.apache.iotdb.commons.hash.APHashExecutor
@@ -87,6 +93,7 @@ config_node_rpc_port=22277
 # 4. org.apache.iotdb.commons.hash.SDBMHashExecutor
 # Also, if you want to implement your own hash algorithm, you can inherit the DeviceGroupHashExecutor class and
 # modify this parameter to correspond to your Java class
+# Datatype: String
 # device_group_hash_executor_class=org.apache.iotdb.commons.hash.BKDRHashExecutor
 
 ####################
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
index c94ebe05a6..f21bb768d8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.confignode.conf;
 
 import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.consensus.common.ConsensusType;
 import org.apache.iotdb.rpc.RpcUtils;
 
 import java.io.File;
@@ -33,14 +32,17 @@ public class ConfigNodeConf {
   /** used for communication between data node and config node */
   private int rpcPort = 22277;
 
-  /** used for communication between data node and data node */
+  /** used for communication between config node and config node */
   private int internalPort = 22278;
 
-  /** ConfigNodeGroup consensus protocol */
-  private ConsensusType consensusType = ConsensusType.STANDALONE;
+  /** ConfigNode consensus protocol */
+  private String configNodeConsensusProtocolClass =
+      "org.apache.iotdb.consensus.ratis.RatisConsensus";
+
+  private String dataNodeConsensusProtocolClass = "org.apache.iotdb.consensus.ratis.RatisConsensus";
 
   /** Used for building the ConfigNode consensus group */
-  private Endpoint[] configNodeGroupAddressList = null;
+  private Endpoint[] configNodeGroupAddressList;
 
   /** Number of DeviceGroups per StorageGroup */
   private int deviceGroupCount = 10000;
@@ -199,12 +201,20 @@ public class ConfigNodeConf {
     this.consensusDir = consensusDir;
   }
 
-  public ConsensusType getConsensusType() {
-    return consensusType;
+  public String getConfigNodeConsensusProtocolClass() {
+    return configNodeConsensusProtocolClass;
+  }
+
+  public void setConfigNodeConsensusProtocolClass(String configNodeConsensusProtocolClass) {
+    this.configNodeConsensusProtocolClass = configNodeConsensusProtocolClass;
+  }
+
+  public String getDataNodeConsensusProtocolClass() {
+    return dataNodeConsensusProtocolClass;
   }
 
-  public void setConsensusType(ConsensusType consensusType) {
-    this.consensusType = consensusType;
+  public void setDataNodeConsensusProtocolClass(String dataNodeConsensusProtocolClass) {
+    this.dataNodeConsensusProtocolClass = dataNodeConsensusProtocolClass;
   }
 
   public Endpoint[] getConfigNodeGroupAddressList() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index f6f035d987..4f2590192d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -19,7 +19,8 @@
 package org.apache.iotdb.confignode.conf;
 
 import org.apache.iotdb.commons.cluster.Endpoint;
-import org.apache.iotdb.consensus.common.ConsensusType;
+import org.apache.iotdb.commons.exception.BadNodeUrlException;
+import org.apache.iotdb.commons.utils.CommonUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -122,9 +123,13 @@ public class ConfigNodeDescriptor {
               properties.getProperty(
                   "config_node_internal_port", String.valueOf(conf.getInternalPort()))));
 
-      conf.setConsensusType(
-          ConsensusType.getConsensusType(
-              properties.getProperty("consensus_type", String.valueOf(conf.getConsensusType()))));
+      conf.setConfigNodeConsensusProtocolClass(
+          properties.getProperty(
+              "config_node_consensus_protocol_class", conf.getConfigNodeConsensusProtocolClass()));
+
+      conf.setDataNodeConsensusProtocolClass(
+          properties.getProperty(
+              "data_node_consensus_protocol_class", conf.getDataNodeConsensusProtocolClass()));
 
       conf.setRpcAdvancedCompressionEnable(
           Boolean.parseBoolean(
@@ -175,25 +180,15 @@ public class ConfigNodeDescriptor {
               properties.getProperty(
                   "data_region_count", String.valueOf(conf.getDataRegionCount()))));
 
-      String addresses = properties.getProperty("config_node_group_address_list", null);
-      if (addresses != null) {
-        String[] addressList = addresses.split(",");
-        Endpoint[] endpointList = new Endpoint[addressList.length];
-        for (int i = 0; i < addressList.length; i++) {
-          String[] ipPort = addressList[i].split(":");
-          if (ipPort.length != 2) {
-            throw new IOException(
-                String.format(
-                    "Parsing parameter config_node_group_address_list error. "
-                        + "The %d-th address must format to ip:port, but currently is %s",
-                    i, addressList[i]));
-          }
-          endpointList[i] = new Endpoint(ipPort[0], Integer.parseInt(ipPort[1]));
-        }
-        conf.setConfigNodeGroupAddressList(endpointList);
-      }
+      String addresses = properties.getProperty("config_node_group_address_list", "0.0.0.0:22278");
 
-    } catch (IOException e) {
+      String[] addressList = addresses.split(",");
+      Endpoint[] endpointList = new Endpoint[addressList.length];
+      for (int i = 0; i < addressList.length; i++) {
+        endpointList[i] = CommonUtils.parseNodeUrl(addressList[i]);
+      }
+      conf.setConfigNodeGroupAddressList(endpointList);
+    } catch (IOException | BadNodeUrlException e) {
       LOGGER.warn("Couldn't load ConfigNode conf file, use default config", e);
     } finally {
       conf.updatePath();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index 3261c6dde0..d48c264790 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -27,11 +27,10 @@ import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.statemachine.PartitionRegionStateMachine;
 import org.apache.iotdb.confignode.physical.PhysicalPlan;
 import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IConsensusFactory;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
-import org.apache.iotdb.consensus.ratis.RatisConsensus;
-import org.apache.iotdb.consensus.standalone.StandAloneConsensus;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,7 +41,6 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 
 /** ConsensusManager maintains consensus class, request will redirect to consensus layer */
@@ -90,42 +88,19 @@ public class ConsensusManager {
     // There is only one ConfigNodeGroup
     consensusGroupId = new ConsensusGroupId(GroupType.PartitionRegion, 0);
 
-    // Implement specific consensus
-    switch (conf.getConsensusType()) {
-      case STANDALONE:
-        constructStandAloneConsensus();
-        break;
-      case RATIS:
-        constructRatisConsensus();
-        break;
-      default:
-        throw new IllegalArgumentException(
-            "Start ConfigNode failed, unrecognized ConsensusType: "
-                + conf.getConsensusType().getTypeName());
-    }
-  }
-
-  private void constructStandAloneConsensus() throws IOException {
-    // Standalone consensus
-    consensusImpl = new StandAloneConsensus(id -> new PartitionRegionStateMachine());
-    consensusImpl.start();
-
-    // Standalone ConsensusGroup
-    consensusImpl.addConsensusGroup(
-        consensusGroupId,
-        Collections.singletonList(
-            new Peer(
-                consensusGroupId, new Endpoint(conf.getRpcAddress(), conf.getInternalPort()))));
-  }
-
-  private void constructRatisConsensus() throws IOException {
     // Ratis consensus local implement
     consensusImpl =
-        RatisConsensus.newBuilder()
-            .setEndpoint(new Endpoint(conf.getRpcAddress(), conf.getInternalPort()))
-            .setStateMachineRegistry(id -> new PartitionRegionStateMachine())
-            .setStorageDir(new File(conf.getConsensusDir()))
-            .build();
+        IConsensusFactory.getConsensusImpl(
+                conf.getConfigNodeConsensusProtocolClass(),
+                new Endpoint(conf.getRpcAddress(), conf.getInternalPort()),
+                new File(conf.getConsensusDir()),
+                gid -> new PartitionRegionStateMachine())
+            .orElseThrow(
+                () ->
+                    new IllegalArgumentException(
+                        String.format(
+                            IConsensusFactory.CONSTRUCT_FAILED_MSG,
+                            conf.getConfigNodeConsensusProtocolClass())));
     consensusImpl.start();
 
     // Build ratis group from user properties
diff --git a/consensus/README.md b/consensus/README.md
index 7fc51e0d8d..027fc358c2 100644
--- a/consensus/README.md
+++ b/consensus/README.md
@@ -20,104 +20,144 @@
 -->
 
 # Overview
+
 The Consensus Package provides the Consensus Layer's service definitions and its implementations.
- 
+
 ### What is Consensus Layer used for?
-Generally, We maintain multiple copies of application data for the purpose of fault-tolerance and data-integrity.
-There are variety of consensus algorithms to manage multiple copies of data, which differs in levels of data consistency and performance.
-Each consensus algorithm may have multiple industrial implementations.
-The Consensus Layer aims to hide all complications behind different consensus algorithms and implementations, providing higher level of abstraction to the user.
+
+Generally, We maintain multiple copies of application data for the purpose of fault-tolerance and
+data-integrity. There are variety of consensus algorithms to manage multiple copies of data, which
+differs in levels of data consistency and performance. Each consensus algorithm may have multiple
+industrial implementations. The Consensus Layer aims to hide all complications behind different
+consensus algorithms and implementations, providing higher level of abstraction to the user.
 
 # Consensus Layer
+
 ### Basic Concepts
 
 * `IStateMachine` is the user application that manages a local copy of data.
-* `Peer` is the smallest consensus unit (inside a process), which holds a `IStateMachine` internally.
+* `Peer` is the smallest consensus unit (inside a process), which holds a `IStateMachine`
+  internally.
 * `ConsensusGroup` is a group of `Peer` all managing the same copy of data.
-* `IConsensus` interface defines the basic functionality provided by Consensus Layer. 
+* `IConsensus` interface defines the basic functionality provided by Consensus Layer.
+* `IConsessusFactory` is the only factory class exposed to other modules to create a consensus layer
+  implementation.
 
-User application can create a `ConsensusGroup` with k `Peer` to store data, 
-i.e. that there will be k copies of data.
+User application can create a `ConsensusGroup` with k `Peer` to store data, i.e. that there will be
+k copies of data.
 
-When write data into a `ConsensusGroup` using `IConsensus::write`, it will be sent to the group leader's `IStateMachine::write` . 
-The leader makes decision about this write operation first, 
-then applies write-operation to the local statemachine using `IStateMachine::write`,
-and forward this operation to other members' `IStateMachine::write` in the same group
+When write data into a `ConsensusGroup` using `IConsensus::write`, it will be sent to the group
+leader's `IStateMachine::write` . The leader makes decision about this write operation first, then
+applies write-operation to the local statemachine using `IStateMachine::write`, and forward this
+operation to other members' `IStateMachine::write` in the same group
 
 ### How to use
+
 1. Define the  `IStateMachine` to manage local copy of data.
 2. Define the `IConsensusRequest` to customize request format
 3. Define the `DataSet` to customize the response format
-4. Pick a concrete `IConsensus` implementation as the backbone consensus algorithm
-
-
+4. Select a specific "IConsensus" class name and call `IConsensusFactory.getConsensusImpl()` to
+   instantiate the corresponding consensus protocol
 
 # Ratis Consensus Implementation
 
-RatisConsensus is a multi-raft implementation of `IConsensus` protocol. It is based on [Apache Ratis](https://ratis.apache.org/).
+RatisConsensus is a multi-raft implementation of `IConsensus` protocol. It is based
+on [Apache Ratis](https://ratis.apache.org/).
+
+### 1. build and start a RatisConsensusImpl
 
-### 1. build and start a RatisConsensusService
 ```java
-IConsensus consensusService = RatisConsensus.newBuilder()
-              .setEndpoint(peers.get(i).getEndpoint())
-              .setStateMachineRegistry(groupId -> new IntegerCounter())
-              .setStorageDir(peersStorage.get(i))
-              .build();
-consensusService.start();
+IConsensus consensusImpl =
+    IConsensusFactory.getConsensusImpl(
+        "org.apache.iotdb.consensus.ratis.RatisConsensus",
+        new Endpoint(conf.getRpcAddress(), conf.getInternalPort()),
+        new File(conf.getConsensusDir()),
+        gid -> new PartitionRegionStateMachine())
+    .orElseThrow(() ->
+        new IllegalArgumentException(
+        String.format(
+        IConsensusFactory.CONSTRUCT_FAILED_MSG,
+        "org.apache.iotdb.consensus.ratis.RatisConsensus")));
+
+consensusImpl.start();
 ```
-* `endpoint` is the communication endpoint for this consensus service.
-* `StateMachineRegistery` 
-* `StoreageDir` specifies the location to store RaftLog. Assign a fix location so that the RatisConsensus knows where to recover when crashes and restarts.
+
+* `endpoint` is the communication endpoint for this consensusImpl.
+* `StateMachineRegistery`
+* `StoreageDir` specifies the location to store RaftLog. Assign a fix location so that the
+  RatisConsensus knows where to recover when crashes and restarts.
 
 ### 2. assign local RatisConsensus a new Group
+
 ```java
 ConsensusGroup group = new ConsensusGroup(...);
-response = service.addConsensusGroup(group.getGroupId(), group.getPeers());
+response = consensusImpl.addConsensusGroup(group.getGroupId(),group.getPeers());
 ```
-The underling Raft Service will initialize its states, and reaching out to other peers to elect the raft leader.
+
+The underling consensusImpl will initialize its states, and reaching out to other peers to elect the
+raft leader.
 
 **Notice**: this request may fail. It's caller's responsibility to retry / rollback.
 
 ### 3. change group configuration
+
 #### (1) remove a member
+
 suppose now the group contains peer[0,1,2], and we want to remove[1,2] from this group
+
 ```java
 // the following code should be called in peer both 1 & 2
 // first  use removePeer to inform the group leader of configuration change 
-service.removePeer(gid, myself);
+consensusImpl.removePeer(gid,myself);
 // then use removeConsensusGroup to clean up local states and data
-service.removeConsensusGroup(gid);
+consensusImpl.removeConsensusGroup(gid);
 ```
-**Notice**: either of `removePeer` or `removeConsensusGroup` may fail. 
-It's caller's responsibility to retry and make these two calls atomic.
+
+**Notice**: either of `removePeer` or `removeConsensusGroup` may fail. It's caller's responsibility
+to retry and make these two calls atomic.
+
 #### (2) add a member
-adding a new member is similar to removing a member except that you should call `addConsensusGroup` first and then `addPeer`
+
+adding a new member is similar to removing a member except that you should call `addConsensusGroup`
+first and then `addPeer`
+
 ```java
 // the following code should be called in peer both 1 & 2
 // first addConsensusGroup to initialize local states
-service.addConsensusGroup(gid);
+consensusImpl.addConsensusGroup(gid);
 // then use addPeer to inform the previous group members of joining a new member
-service.addPeer(gid, myself);
+consensusImpl.addPeer(gid,myself);
 ```
+
 #### (3) add/remove multiple members
+
 ```java
 // pre. For each member newly added, call addConsensusGroup locally to initialize
-serivice.changePeer(group.getGroupId(), newGroupmember);
+consensusImpl.changePeer(group.getGroupId(),newGroupmember);
 // after. For each member removed, call removeConsensusGroup locally to clean up
 ```
+
 **Notice**: the old group and the new group must overlap in at least one member.
+
 ### 4. write data
+
 ```java
-ConsensusWriteResponse response = service.write(gid, request)
-if (response.isSuccess() && response.getStates().code() == 200) {...}
+ConsensusWriteResponse response = consensusImpl.write(gid,request)
+if(response.isSuccess() && response.getStates().code() == 200){
+    ...
+}
 ```
+
 ### 5. read data
+
 ```java
-ConsensusReadResponse response = consensus.read(gid, request);
-if (response.isSuccess()) {
-    MyDataSet result = (MyDataSet) response.getDataset();
+ConsensusReadResponse response = consensusImpl.read(gid,request);
+if(response.isSuccess()){
+    MyDataSet result=(MyDataSet)response.getDataset();
 }
 ```
-**NOTICE**: currently in RatisConsensus, read will direct read the local copy. Thus, the result may be stale and not linearizable!
+
+**NOTICE**: currently in RatisConsensus, read will direct read the local copy. Thus, the result may
+be stale and not linearizable!
 
 
diff --git a/consensus/pom.xml b/consensus/pom.xml
index 8f6646eae6..f668d8b4bd 100644
--- a/consensus/pom.xml
+++ b/consensus/pom.xml
@@ -54,7 +54,7 @@
         <dependency>
             <groupId>org.apache.iotdb</groupId>
             <artifactId>node-commons</artifactId>
-            <version>0.14.0-SNAPSHOT</version>
+            <version>${project.version}</version>
             <scope>compile</scope>
         </dependency>
     </dependencies>
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/IConsensusFactory.java b/consensus/src/main/java/org/apache/iotdb/consensus/IConsensusFactory.java
new file mode 100644
index 0000000000..9f36de4396
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/IConsensusFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus;
+
+import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Optional;
+
+public class IConsensusFactory {
+  public static final String CONSTRUCT_FAILED_MSG =
+      "Construct consensusImpl failed, Please check your consensus className %s";
+
+  private static final Logger logger = LoggerFactory.getLogger(IConsensusFactory.class);
+
+  public static Optional<IConsensus> getConsensusImpl(
+      String className, Endpoint endpoint, File storageDir, IStateMachine.Registry registry) {
+    try {
+      Class<?> executor = Class.forName(className);
+      Constructor<?> executorConstructor =
+          executor.getDeclaredConstructor(Endpoint.class, File.class, IStateMachine.Registry.class);
+      executorConstructor.setAccessible(true);
+      return Optional.of(
+          (IConsensus) executorConstructor.newInstance(endpoint, storageDir, registry));
+    } catch (ClassNotFoundException
+        | NoSuchMethodException
+        | InstantiationException
+        | IllegalAccessException
+        | InvocationTargetException e) {
+      logger.error("Couldn't Construct IConsensus class: {}", className, e);
+    }
+    return Optional.empty();
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 8e70f9653a..68094643cc 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -73,7 +73,7 @@ import java.util.stream.Collectors;
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
-public class RatisConsensus implements IConsensus {
+class RatisConsensus implements IConsensus {
   // the unique net communication endpoint
   private final RaftPeer myself;
 
@@ -89,41 +89,40 @@ public class RatisConsensus implements IConsensus {
   private static final int LEADER_PRIORITY = 1;
 
   private Logger logger = LoggerFactory.getLogger(RatisConsensus.class);
-  /**
-   * This function will use the previous client for groupId to query the latest group info It will
-   * update the new group info into the groupMap and rebuild its client
-   *
-   * @throws ConsensusGroupNotExistException when cannot get the group info
-   */
-  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
-      throws ConsensusGroupNotExistException {
-    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
-    RaftClient current = clientMap.get(raftGroupId);
-    try {
-      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
 
-      if (!reply.isSuccess()) {
-        throw new ConsensusGroupNotExistException(groupId);
-      }
+  public RatisConsensus(Endpoint endpoint, File ratisStorageDir, IStateMachine.Registry registry)
+      throws IOException {
 
-      raftGroupMap.put(raftGroupId, reply.getGroup());
-      buildClientAndCache(raftGroupMap.get(raftGroupId));
-    } catch (IOException e) {
-      throw new ConsensusGroupNotExistException(groupId);
-    }
-  }
+    this.clientMap = new ConcurrentHashMap<>();
+    this.raftGroupMap = new ConcurrentHashMap<>();
+    this.localFakeId = ClientId.randomId();
+    this.localFakeCallId = new AtomicLong(0);
 
-  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
-      throws RatisRequestFailedException {
-    RaftClient client = clientMap.get(raftGroupId);
-    // notify the group leader of configuration change
-    RaftClientReply reply = null;
-    try {
-      reply = client.admin().setConfiguration(peers);
-    } catch (IOException e) {
-      throw new RatisRequestFailedException(e);
+    // create a RaftPeer as endpoint of comm
+    String address = Utils.IPAddress(endpoint);
+    myself = Utils.toRaftPeer(endpoint, DEFAULT_PRIORITY);
+
+    RaftProperties properties = new RaftProperties();
+
+    // set the storage directory (different for each peer) in RaftProperty object
+    if (ratisStorageDir == null || !ratisStorageDir.isDirectory()) {
+      ratisStorageDir = new File("./" + myself.getId().toString());
     }
-    return reply;
+    RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(ratisStorageDir));
+
+    // set the port which server listen to in RaftProperty object
+    final int port = NetUtils.createSocketAddr(address).getPort();
+    GrpcConfigKeys.Server.setPort(properties, port);
+
+    server =
+        RaftServer.newBuilder()
+            .setServerId(myself.getId())
+            .setProperties(properties)
+            .setStateMachineRegistry(
+                raftGroupId ->
+                    new ApplicationStateMachineProxy(
+                        registry.apply(Utils.toConsensusGroupId(raftGroupId))))
+            .build();
   }
 
   @Override
@@ -517,71 +516,40 @@ public class RatisConsensus implements IConsensus {
     }
   }
 
-  private RatisConsensus(Endpoint endpoint, File ratisStorageDir, IStateMachine.Registry registry)
-      throws IOException {
-
-    this.clientMap = new ConcurrentHashMap<>();
-    this.raftGroupMap = new ConcurrentHashMap<>();
-    this.localFakeId = ClientId.randomId();
-    this.localFakeCallId = new AtomicLong(0);
-
-    // create a RaftPeer as endpoint of comm
-    String address = Utils.IPAddress(endpoint);
-    myself = Utils.toRaftPeer(endpoint, DEFAULT_PRIORITY);
+  /**
+   * This function will use the previous client for groupId to query the latest group info It will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
 
-    RaftProperties properties = new RaftProperties();
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
 
-    // set the storage directory (different for each peer) in RaftProperty object
-    if (ratisStorageDir == null || !ratisStorageDir.isDirectory()) {
-      ratisStorageDir = new File("./" + myself.getId().toString());
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
     }
-    RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(ratisStorageDir));
-
-    // set the port which server listen to in RaftProperty object
-    final int port = NetUtils.createSocketAddr(address).getPort();
-    GrpcConfigKeys.Server.setPort(properties, port);
-
-    server =
-        RaftServer.newBuilder()
-            .setServerId(myself.getId())
-            .setProperties(properties)
-            .setStateMachineRegistry(
-                raftGroupId ->
-                    new ApplicationStateMachineProxy(
-                        registry.apply(Utils.toConsensusGroupId(raftGroupId))))
-            .build();
-  }
-
-  public static Builder newBuilder() {
-    return new Builder();
   }
 
-  public static class Builder {
-    private Endpoint endpoint;
-    private IStateMachine.Registry registry;
-    private File storageDir;
-
-    public Builder() {
-      storageDir = null;
-    }
-
-    public Builder setEndpoint(Endpoint endpoint) {
-      this.endpoint = endpoint;
-      return this;
-    }
-
-    public Builder setStateMachineRegistry(IStateMachine.Registry registry) {
-      this.registry = registry;
-      return this;
-    }
-
-    public Builder setStorageDir(File dir) {
-      this.storageDir = dir;
-      return this;
-    }
-
-    public RatisConsensus build() throws IOException {
-      return new RatisConsensus(endpoint, storageDir, registry);
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException(e);
     }
+    return reply;
   }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
index 6f1582a268..91a767055b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.consensus.standalone;
 
+import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.DataSet;
@@ -31,8 +32,10 @@ import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
 import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.consensus.statemachine.IStateMachine.Registry;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -41,19 +44,25 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * A simple single replica consensus implementation.
+ * A simple consensus implementation, which can be used when replicaNum is 1.
+ *
+ * <p>Notice: The stateMachine needs to implement WAL itself to ensure recovery after a restart
  *
  * <p>any module can use `IConsensus consensusImpl = new StandAloneConsensus(id -> new
  * EmptyStateMachine());` to perform an initialization implementation.
  */
-public class StandAloneConsensus implements IConsensus {
+class StandAloneConsensus implements IConsensus {
 
+  private final Endpoint thisNode;
+  private final File storageDir;
   private final IStateMachine.Registry registry;
-  private final Map<ConsensusGroupId, StandAloneServerImpl> stateMachineMap;
+  private final Map<ConsensusGroupId, StandAloneServerImpl> stateMachineMap =
+      new ConcurrentHashMap<>();
 
-  public StandAloneConsensus(IStateMachine.Registry registry) {
+  public StandAloneConsensus(Endpoint thisNode, File storageDir, Registry registry) {
+    this.thisNode = thisNode;
+    this.storageDir = storageDir;
     this.registry = registry;
-    this.stateMachineMap = new ConcurrentHashMap<>();
   }
 
   @Override
@@ -136,6 +145,7 @@ public class StandAloneConsensus implements IConsensus {
           v.stop();
           return null;
         });
+
     if (!exist.get()) {
       return ConsensusGenericResponse.newBuilder()
           .setException(new ConsensusGroupNotExistException(groupId))
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index 2558ad2579..581fdd55c5 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.GroupType;
 import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IConsensusFactory;
 import org.apache.iotdb.consensus.common.ConsensusGroup;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
@@ -52,6 +53,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 public class RatisConsensusTest {
 
+  private static final String RATIS_CLASS_NAME = "org.apache.iotdb.consensus.ratis.RatisConsensus";
+
   private static class TestDataSet implements DataSet {
     private int number;
 
@@ -65,7 +68,7 @@ public class RatisConsensusTest {
   }
 
   private static class TestRequest {
-    private int cmd;
+    private final int cmd;
 
     public TestRequest(ByteBuffer buffer) {
       cmd = buffer.getInt();
@@ -136,11 +139,16 @@ public class RatisConsensusTest {
     servers = new ArrayList<>();
     for (int i = 0; i < 3; i++) {
       servers.add(
-          RatisConsensus.newBuilder()
-              .setEndpoint(peers.get(i).getEndpoint())
-              .setStateMachineRegistry(groupId -> new IntegerCounter())
-              .setStorageDir(peersStorage.get(i))
-              .build());
+          IConsensusFactory.getConsensusImpl(
+                  RATIS_CLASS_NAME,
+                  peers.get(i).getEndpoint(),
+                  peersStorage.get(i),
+                  groupId -> new IntegerCounter())
+              .orElseThrow(
+                  () ->
+                      new IllegalArgumentException(
+                          String.format(
+                              IConsensusFactory.CONSTRUCT_FAILED_MSG, RATIS_CLASS_NAME))));
       servers.get(i).start();
     }
   }
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
index ded008960d..ddb86f817a 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.GroupType;
 import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IConsensusFactory;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
@@ -40,6 +41,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
@@ -52,6 +54,8 @@ import static org.junit.Assert.assertTrue;
 
 public class StandAloneConsensusTest {
 
+  private static final String STANDALONE_CONSENSUS_CLASS_NAME =
+      "org.apache.iotdb.consensus.standalone.StandAloneConsensus";
   private IConsensus consensusImpl;
   private final TestEntry entry1 = new TestEntry(0);
   private final ByteBufferConsensusRequest entry2 =
@@ -108,16 +112,25 @@ public class StandAloneConsensusTest {
   @Before
   public void setUp() throws Exception {
     consensusImpl =
-        new StandAloneConsensus(
-            gid -> {
-              switch (gid.getType()) {
-                case SchemaRegion:
-                  return new TestStateMachine(true);
-                case DataRegion:
-                  return new TestStateMachine(false);
-              }
-              return new EmptyStateMachine();
-            });
+        IConsensusFactory.getConsensusImpl(
+                STANDALONE_CONSENSUS_CLASS_NAME,
+                new Endpoint("localhost", 6667),
+                new File("./"),
+                gid -> {
+                  switch (gid.getType()) {
+                    case SchemaRegion:
+                      return new TestStateMachine(true);
+                    case DataRegion:
+                      return new TestStateMachine(false);
+                  }
+                  return new EmptyStateMachine();
+                })
+            .orElseThrow(
+                () ->
+                    new IllegalArgumentException(
+                        String.format(
+                            IConsensusFactory.CONSTRUCT_FAILED_MSG,
+                            STANDALONE_CONSENSUS_CLASS_NAME)));
     consensusImpl.start();
   }
 
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/cluster/Endpoint.java b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/Endpoint.java
index ff6e8b118b..25d7b84d0f 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/cluster/Endpoint.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/Endpoint.java
@@ -39,10 +39,20 @@ public class Endpoint {
     return ip;
   }
 
+  public Endpoint setIp(String ip) {
+    this.ip = ip;
+    return this;
+  }
+
   public int getPort() {
     return port;
   }
 
+  public Endpoint setPort(int port) {
+    this.port = port;
+    return this;
+  }
+
   public void serializeImpl(ByteBuffer buffer) {
     byte[] bytes = ip.getBytes();
     buffer.putInt(bytes.length);
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/utils/CommonUtils.java b/node-commons/src/main/java/org/apache/iotdb/commons/utils/CommonUtils.java
index 3d9995edbc..e9e170232c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/utils/CommonUtils.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/utils/CommonUtils.java
@@ -19,8 +19,8 @@
 
 package org.apache.iotdb.commons.utils;
 
+import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.exception.BadNodeUrlException;
-import org.apache.iotdb.service.rpc.thrift.EndPoint;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,8 +31,8 @@ import java.util.List;
 public class CommonUtils {
   private static final Logger logger = LoggerFactory.getLogger(CommonUtils.class);
 
-  public static EndPoint parseNodeUrl(String nodeUrl) throws BadNodeUrlException {
-    EndPoint result = new EndPoint();
+  public static Endpoint parseNodeUrl(String nodeUrl) throws BadNodeUrlException {
+    Endpoint result = new Endpoint();
     String[] split = nodeUrl.split(":");
     if (split.length != 2) {
       logger.warn("Bad node url: {}", nodeUrl);
@@ -49,8 +49,8 @@ public class CommonUtils {
     return result;
   }
 
-  public static List<EndPoint> parseNodeUrls(List<String> nodeUrls) throws BadNodeUrlException {
-    List<EndPoint> result = new ArrayList<>();
+  public static List<Endpoint> parseNodeUrls(List<String> nodeUrls) throws BadNodeUrlException {
+    List<Endpoint> result = new ArrayList<>();
     for (String url : nodeUrls) {
       result.add(parseNodeUrl(url));
     }
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index d6598a5d14..ae84c4f2f1 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -33,9 +33,14 @@ rpc_port=6667
 internal_ip=127.0.0.1
 
 # Datatype: int
-# port for communication between cluster nodes.
+# port for coordinator's communication between cluster nodes.
 internal_port=9003
 
+
+# Datatype: int
+# port for consensus's communication between cluster nodes.
+consensus_port=40010
+
 # comma-separated {IP/DOMAIN}:internal_port pairs
 # Data nodes store config nodes ip and port to communicate with config nodes.
 # Several nodes will be picked randomly to send the request, the number of nodes
@@ -140,6 +145,18 @@ config_nodes=127.0.0.1:22277
 # If its prefix is "/", then the path is absolute. Otherwise, it is relative.
 # wal_dir=data/wal
 
+# consensus dir
+# If this property is unset, system will save the data in the default relative path directory under the IoTDB folder(i.e., %IOTDB_HOME%/data).
+# If it is absolute, system will save the data in the exact location it points to.
+# If it is relative, system will save the data in the relative path directory it indicates under the IoTDB folder.
+# Note: If consensus_dir is assigned an empty string(i.e.,zero-size), it will be handled as a relative path.
+# For windows platform
+# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is absolute. Otherwise, it is relative.
+# consensus_dir=data\\consensus
+# For Linux platform
+# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
+# consensus_dir=data/consensus
+
 
 # TSFile storage file system. Currently, Tsfiles are supported to be stored in LOCAL file system or HDFS.
 # Datatype: FSType
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 580d8f7552..a78d38baa3 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -260,6 +260,9 @@ public class IoTDBConfig {
   /** Wal directory. */
   private String walDir = DEFAULT_BASE_DIR + File.separator + "wal";
 
+  /** Consensus directory. */
+  private String consensusDir = DEFAULT_BASE_DIR + File.separator + "consensus";
+
   /** Maximum MemTable number. Invalid when enableMemControl is true. */
   private int maxMemtableNumber = 0;
 
@@ -820,12 +823,22 @@ public class IoTDBConfig {
   /** Internal ip for data node */
   private String internalIp;
 
-  /** Internal port of data node */
+  /** Internal port for coordinator */
   private int internalPort = 9003;
 
+  /** Internal port for consensus protocol */
+  private int consensusPort = 40010;
+
   /** The max time of data node waiting to join into the cluster */
   private long joinClusterTimeOutMs = TimeUnit.SECONDS.toMillis(60);
 
+  /**
+   * The consensus protocol class. The Datanode should communicate with ConfigNode on startup and
+   * set this variable so that the correct class name can be obtained later when the consensus layer
+   * singleton is initialized
+   */
+  private String consensusProtocolClass = "org.apache.iotdb.consensus.ratis.RatisConsensus";
+
   public IoTDBConfig() {
     try {
       internalIp = InetAddress.getLocalHost().getHostAddress();
@@ -937,6 +950,7 @@ public class IoTDBConfig {
     syncDir = addHomeDir(syncDir);
     tracingDir = addHomeDir(tracingDir);
     walDir = addHomeDir(walDir);
+    consensusDir = addHomeDir(consensusDir);
     indexRootFolder = addHomeDir(indexRootFolder);
     extDir = addHomeDir(extDir);
     udfDir = addHomeDir(udfDir);
@@ -1142,6 +1156,14 @@ public class IoTDBConfig {
     this.walDir = walDir;
   }
 
+  public String getConsensusDir() {
+    return consensusDir;
+  }
+
+  public void setConsensusDir(String consensusDir) {
+    this.consensusDir = consensusDir;
+  }
+
   public String getExtDir() {
     return extDir;
   }
@@ -2579,6 +2601,14 @@ public class IoTDBConfig {
     this.internalPort = internalPort;
   }
 
+  public int getConsensusPort() {
+    return consensusPort;
+  }
+
+  public void setConsensusPort(int consensusPort) {
+    this.consensusPort = consensusPort;
+  }
+
   public long getJoinClusterTimeOutMs() {
     return joinClusterTimeOutMs;
   }
@@ -2586,4 +2616,12 @@ public class IoTDBConfig {
   public void setJoinClusterTimeOutMs(long joinClusterTimeOutMs) {
     this.joinClusterTimeOutMs = joinClusterTimeOutMs;
   }
+
+  public String getConsensusProtocolClass() {
+    return consensusProtocolClass;
+  }
+
+  public void setConsensusProtocolClass(String consensusProtocolClass) {
+    this.consensusProtocolClass = consensusProtocolClass;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index e384b5c2b1..0e2934ebf6 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -243,6 +243,8 @@ public class IoTDBDescriptor {
 
       conf.setWalDir(properties.getProperty("wal_dir", conf.getWalDir()));
 
+      conf.setConsensusDir(properties.getProperty("consensus_dir", conf.getConsensusDir()));
+
       int mlogBufferSize =
           Integer.parseInt(
               properties.getProperty(
@@ -1472,6 +1474,10 @@ public class IoTDBDescriptor {
     conf.setInternalPort(
         Integer.parseInt(
             properties.getProperty("internal_port", Integer.toString(conf.getInternalPort()))));
+
+    conf.setConsensusPort(
+        Integer.parseInt(
+            properties.getProperty("consensus_port", Integer.toString(conf.getConsensusPort()))));
   }
 
   /** Get default encode algorithm by data type */
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusExample.java b/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusExample.java
index 9b1a2ed498..e9ecb69131 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusExample.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusExample.java
@@ -23,10 +23,9 @@ import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.GroupType;
 import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IConsensusFactory;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
-import org.apache.iotdb.consensus.standalone.StandAloneConsensus;
-import org.apache.iotdb.consensus.statemachine.EmptyStateMachine;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.statemachine.DataRegionStateMachine;
 import org.apache.iotdb.db.consensus.statemachine.SchemaRegionStateMachine;
@@ -35,6 +34,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
@@ -43,16 +43,22 @@ public class ConsensusExample {
 
   public static void main(String[] args) throws IllegalPathException, IOException {
     IConsensus consensusImpl =
-        new StandAloneConsensus(
-            id -> {
-              switch (id.getType()) {
-                case SchemaRegion:
-                  return new SchemaRegionStateMachine();
-                case DataRegion:
-                  return new DataRegionStateMachine();
-              }
-              return new EmptyStateMachine();
-            });
+        IConsensusFactory.getConsensusImpl(
+                "org.apache.iotdb.consensus.standalone.StandAloneConsensus",
+                new Endpoint("localhost", 6667),
+                new File("./"),
+                gid -> {
+                  switch (gid.getType()) {
+                    case SchemaRegion:
+                      return new SchemaRegionStateMachine();
+                    case DataRegion:
+                      return new DataRegionStateMachine();
+                  }
+                  throw new IllegalArgumentException(
+                      String.format("Unexpected consensusGroup %s", gid));
+                })
+            .orElseThrow(
+                () -> new IllegalArgumentException(IConsensusFactory.CONSTRUCT_FAILED_MSG));
     consensusImpl.start();
     InsertRowPlan plan = getInsertRowPlan();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusImpl.java b/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusImpl.java
new file mode 100644
index 0000000000..9fd76a7585
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusImpl.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.consensus;
+
+import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IConsensusFactory;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+
+import java.io.File;
+
+public class ConsensusImpl {
+
+  private ConsensusImpl() {}
+
+  public IConsensus getInstance() {
+    return ConsensusImplHolder.INSTANCE;
+  }
+
+  private static class ConsensusImplHolder {
+
+    private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
+    private static final IConsensus INSTANCE =
+        IConsensusFactory.getConsensusImpl(
+                conf.getConsensusProtocolClass(),
+                new Endpoint(conf.getInternalIp(), conf.getConsensusPort()),
+                new File(conf.getConsensusDir()),
+                gid -> {
+                  switch (gid.getType()) {
+                    case SchemaRegion:
+                      return StorageEngine.getInstance().getOrCreateSchemaRegionStateMachine(gid);
+                    case DataRegion:
+                      return StorageEngine.getInstance().getOrCreateDataRegionStateMachine(gid);
+                  }
+                  throw new IllegalArgumentException(
+                      String.format("Unexpected consensusGroup %s", gid));
+                })
+            .orElseThrow(
+                () ->
+                    new IllegalArgumentException(
+                        String.format(
+                            IConsensusFactory.CONSTRUCT_FAILED_MSG,
+                            conf.getConsensusProtocolClass())));
+
+    private ConsensusImplHolder() {}
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 4ef3ad8f07..ff59f952c8 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -39,13 +39,13 @@ public class DataRegionStateMachine extends BaseStateMachine {
 
   @Override
   protected TSStatus write(FragmentInstance fragmentInstance) {
-    logger.info("Execute write plan in DataRegionStateMachine : {}", fragmentInstance);
+    logger.info("Execute write plan in DataRegionStateMachine");
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
   @Override
   protected DataSet read(FragmentInstance fragmentInstance) {
-    logger.info("Execute read plan in DataRegionStateMachine: {}", fragmentInstance);
+    logger.info("Execute read plan in DataRegionStateMachine");
     return null;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
index 628874f1d3..3c3be5d208 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
@@ -39,13 +39,13 @@ public class SchemaRegionStateMachine extends BaseStateMachine {
 
   @Override
   protected TSStatus write(FragmentInstance fragmentInstance) {
-    logger.info("Execute write plan in SchemaRegionStateMachine : {}", fragmentInstance);
+    logger.info("Execute write plan in SchemaRegionStateMachine");
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
   @Override
   protected DataSet read(FragmentInstance fragmentInstance) {
-    logger.info("Execute read plan in SchemaRegionStateMachine: {}", fragmentInstance);
+    logger.info("Execute read plan in SchemaRegionStateMachine");
     return null;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 7c92e4216b..d8b4096b49 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.engine;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.exception.ShutdownException;
 import org.apache.iotdb.commons.partition.TimePartitionSlot;
 import org.apache.iotdb.commons.service.IService;
@@ -28,6 +29,9 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.ServerConfigConsistent;
+import org.apache.iotdb.db.consensus.statemachine.BaseStateMachine;
+import org.apache.iotdb.db.consensus.statemachine.DataRegionStateMachine;
+import org.apache.iotdb.db.consensus.statemachine.SchemaRegionStateMachine;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.engine.flush.CloseFileListener;
 import org.apache.iotdb.db.engine.flush.FlushListener;
@@ -136,6 +140,8 @@ public class StorageEngine implements IService {
   private List<CloseFileListener> customCloseFileListeners = new ArrayList<>();
   private List<FlushListener> customFlushListeners = new ArrayList<>();
 
+  private final Map<ConsensusGroupId, BaseStateMachine> stateMachineMap = new ConcurrentHashMap<>();
+
   private StorageEngine() {}
 
   public static StorageEngine getInstance() {
@@ -1073,6 +1079,16 @@ public class StorageEngine implements IService {
     }
   }
 
+  public SchemaRegionStateMachine getOrCreateSchemaRegionStateMachine(ConsensusGroupId gid) {
+    return (SchemaRegionStateMachine)
+        stateMachineMap.computeIfAbsent(gid, id -> new SchemaRegionStateMachine());
+  }
+
+  public DataRegionStateMachine getOrCreateDataRegionStateMachine(ConsensusGroupId gid) {
+    return (DataRegionStateMachine)
+        stateMachineMap.computeIfAbsent(gid, id -> new DataRegionStateMachine());
+  }
+
   static class InstanceHolder {
 
     private static final StorageEngine INSTANCE = new StorageEngine();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index d5533e6cd7..eaa85865be 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.service;
 
+import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.BadNodeUrlException;
 import org.apache.iotdb.commons.exception.ConfigurationException;
@@ -62,7 +63,7 @@ public class DataNode implements DataNodeMBean {
    */
   private static final int DEFAULT_JOIN_RETRY = 10;
 
-  private EndPoint thisNode = new EndPoint();
+  private Endpoint thisNode = new Endpoint();
 
   private int dataNodeID;
 
@@ -122,7 +123,7 @@ public class DataNode implements DataNodeMBean {
   }
 
   public void joinCluster() throws StartupException {
-    List<EndPoint> configNodes;
+    List<Endpoint> configNodes;
     try {
       configNodes =
           CommonUtils.parseNodeUrls(IoTDBDescriptor.getInstance().getConfig().getConfigNodeUrls());
@@ -134,12 +135,13 @@ public class DataNode implements DataNodeMBean {
     while (retry > 0) {
       // randomly pick up a config node to try
       Random random = new Random();
-      EndPoint configNode = configNodes.get(random.nextInt(configNodes.size()));
+      Endpoint configNode = configNodes.get(random.nextInt(configNodes.size()));
       logger.info("start joining the cluster with the help of {}", configNode);
       try {
         ConfigIService.Client client = createClient(configNode);
         DataNodeRegisterResp dataNodeRegisterResp =
-            client.registerDataNode(new DataNodeRegisterReq(thisNode));
+            client.registerDataNode(
+                new DataNodeRegisterReq(new EndPoint(thisNode.getIp(), thisNode.getPort())));
         if (dataNodeRegisterResp.getRegisterResult().getCode()
             == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
           dataNodeID = dataNodeRegisterResp.getDataNodeID();
@@ -199,13 +201,13 @@ public class DataNode implements DataNodeMBean {
     private DataNodeHolder() {}
   }
 
-  private ConfigIService.Client createClient(EndPoint endPoint) throws IoTDBConnectionException {
+  private ConfigIService.Client createClient(Endpoint endpoint) throws IoTDBConnectionException {
     TTransport transport;
     try {
       transport =
           RpcTransportFactory.INSTANCE.getTransport(
               // as there is a try-catch already, we do not need to use TSocket.wrap
-              endPoint.getIp(), endPoint.getPort(), 2000);
+              endpoint.getIp(), endpoint.getPort(), 2000);
       transport.open();
     } catch (TTransportException e) {
       throw new IoTDBConnectionException(e);