You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/03/27 05:07:50 UTC

[iotdb] branch master updated: [IOTDB-2780] Config node ratis consensus protocol implementation (#5347)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f4b360  [IOTDB-2780] Config node ratis consensus protocol implementation (#5347)
4f4b360 is described below

commit 4f4b360724eb50871f7b856ddb9f15adb13831cf
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Sun Mar 27 13:06:51 2022 +0800

    [IOTDB-2780] Config node ratis consensus protocol implementation (#5347)
---
 confignode/pom.xml                                 |   5 +
 .../resources/conf/iotdb-confignode.properties     |  44 +++++--
 confignode/src/assembly/resources/conf/logback.xml |   4 +-
 .../iotdb/confignode/conf/ConfigNodeConf.java      |  38 +++++-
 .../iotdb/confignode/conf/ConfigNodeConstant.java  |   1 +
 .../confignode/conf/ConfigNodeDescriptor.java      |  42 +++++--
 .../iotdb/confignode/consensus/ConsensusType.java  |  54 +++++++++
 .../statemachine/PartitionRegionStateMachine.java  |  15 ++-
 .../iotdb/confignode/manager/ConfigManager.java    | 109 ++++++++++++-----
 .../iotdb/confignode/partition/PartitionTable.java |  11 +-
 .../iotdb/confignode/physical/PhysicalPlan.java    |   9 ++
 .../iotdb/confignode/service/ConfigNode.java       |  22 +---
 .../confignode/service/ConfigNodeCommandLine.java  |   6 +-
 .../confignode/service/executor/PlanExecutor.java  |   4 +-
 .../service/thrift/server/ConfigNodeRPCServer.java |  21 ++--
 .../server/ConfigNodeRPCServerProcessor.java       |   7 +-
 .../thrift/server/ConfigNodeRPCServiceHandler.java |   2 +-
 .../manager/ConfigManagerManualTest.java           | 132 +++++++++++++++++++++
 ...java => DeviceGroupHashExecutorManualTest.java} |   8 +-
 .../server/ConfigNodeRPCServerProcessorTest.java   |   5 +-
 .../iotdb/consensus/ratis/RequestMessage.java      |  23 +++-
 .../iotdb/commons/service/RegisterManager.java     |   2 +-
 22 files changed, 455 insertions(+), 109 deletions(-)

diff --git a/confignode/pom.xml b/confignode/pom.xml
index 68b3614..874e682 100644
--- a/confignode/pom.xml
+++ b/confignode/pom.xml
@@ -56,6 +56,11 @@
             <version>${project.version}</version>
         </dependency>
         <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+            <version>3.2.5</version>
+        </dependency>
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <version>${junit.version}</version>
diff --git a/confignode/src/assembly/resources/conf/iotdb-confignode.properties b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
index 103cc1b..801bf17 100644
--- a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
+++ b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
@@ -33,11 +33,7 @@ config_node_rpc_port=22277
 # Datatype: int
 # config_node_internal_port=22278
 
-# used for building the ConfigNode consensus group
-# all config node address and internal port
-# every node should have the same config_node_address_lists
-# Datatype: String
-# config_node_address_lists=host0:22278,host1:22278,host2:22278
+# this feature is under development, set this as false before it is done.
 # Datatype: boolean
 # rpc_thrift_compression_enable=false
 
@@ -58,6 +54,23 @@ config_node_rpc_port=22277
 # thrift_init_buffer_size=1024
 
 ####################
+### consensus protocol configuration
+####################
+
+# ConfigNodeGroup consensus protocol type
+# These consensus protocol are currently supported:
+# 1. standalone(No protocol, only supports stand-alone machine)
+# 2. ratis(Raft protocol)
+# Datatype: String
+# consensus_type=standalone
+
+# Used for building the ConfigNode consensus group
+# all config node address and internal port, use comma to distinguish
+# every node should have the same config_node_address_lists
+# Datatype: String
+# config_node_group_address_list=0.0.0.0:22278,0.0.0.0:22280,0.0.0.0:22282
+
+####################
 ### DeviceGroup Configuration
 ####################
 
@@ -81,9 +94,9 @@ config_node_rpc_port=22277
 ####################
 
 # system dir
-# If this property is unset, system will save the data in the default relative path directory under the IoTDB folder(i.e., %IOTDB_HOME%/data/system).
+# If this property is unset, system will save the data in the default relative path directory under the confignode folder(i.e., %CONFIGNODE_HOME%/data/system).
 # If it is absolute, system will save the data in exact location it points to.
-# If it is relative, system will save the data in the relative path directory it indicates under the IoTDB folder.
+# If it is relative, system will save the data in the relative path directory it indicates under the confignode folder.
 # For windows platform
 # If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is absolute. Otherwise, it is relative.
 # system_dir=data\\system
@@ -93,9 +106,9 @@ config_node_rpc_port=22277
 
 
 # data dirs
-# If this property is unset, system will save the data in the default relative path directory under the IoTDB folder(i.e., %IOTDB_HOME%/data/data).
+# If this property is unset, system will save the data in the default relative path directory under the confignode folder(i.e., %CONFIGNODE_HOME%/data/data).
 # If it is absolute, system will save the data in exact location it points to.
-# If it is relative, system will save the data in the relative path directory it indicates under the IoTDB folder.
+# If it is relative, system will save the data in the relative path directory it indicates under the confignode folder.
 # Note: If data_dir is assigned an empty string(i.e.,zero-size), it will be handled as a relative path.
 # For windows platform
 # If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is absolute. Otherwise, it is relative.
@@ -104,6 +117,19 @@ config_node_rpc_port=22277
 # If its prefix is "/", then the path is absolute. Otherwise, it is relative.
 # data_dirs=data/data
 
+
+# consensus dir
+# If this property is unset, system will save the data in the default relative path directory under the confignode folder(i.e., %CONFIGNODE_HOME%/data/consensus).
+# If it is absolute, system will save the data in exact location it points to.
+# If it is relative, system will save the data in the relative path directory it indicates under the confignode folder.
+# Note: If data_dir is assigned an empty string(i.e.,zero-size), it will be handled as a relative path.
+# For windows platform
+# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is absolute. Otherwise, it is relative.
+# consensus_dir=data\\consensus
+# For Linux platform
+# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
+# consensus_dir=data/consensus
+
 ####################
 ### Region Configuration
 ####################
diff --git a/confignode/src/assembly/resources/conf/logback.xml b/confignode/src/assembly/resources/conf/logback.xml
index cfef99e..4e8e0d6 100644
--- a/confignode/src/assembly/resources/conf/logback.xml
+++ b/confignode/src/assembly/resources/conf/logback.xml
@@ -135,6 +135,6 @@
         <appender-ref ref="FILEALL"/>
         <appender-ref ref="stdout"/>
     </root>
-    <logger level="info" name="org.apache.iotdb.confignode.service"/>
-    <logger level="info" name="org.apache.iotdb.confignode.conf"/>
+    <logger level="info" name="org.apache.iotdb.confignode"/>
+    <!-- <logger level="info" name="org.apache.ratis"/> -->
 </configuration>
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
index 128a54d..be19e56 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
@@ -19,6 +19,8 @@
 package org.apache.iotdb.confignode.conf;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.confignode.consensus.ConsensusType;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.rpc.RpcUtils;
 
 import java.io.File;
@@ -34,8 +36,11 @@ public class ConfigNodeConf {
   /** used for communication between data node and data node */
   private int internalPort = 22278;
 
-  /** every node should have the same config_node_address_lists */
-  private String addressLists;
+  /** ConfigNodeGroup consensus protocol */
+  private ConsensusType consensusType = ConsensusType.STANDALONE;
+
+  /** Used for building the ConfigNode consensus group */
+  private Endpoint[] configNodeGroupAddressList = null;
 
   /** Number of DeviceGroups per StorageGroup */
   private int deviceGroupCount = 10000;
@@ -70,6 +75,10 @@ public class ConfigNodeConf {
     ConfigNodeConstant.DATA_DIR + File.separator + ConfigNodeConstant.DATA_DIR
   };
 
+  /** Consensus directory, storage consensus protocol logs */
+  private String consensusDir =
+      ConfigNodeConstant.DATA_DIR + File.separator + ConfigNodeConstant.CONSENSUS_FOLDER;
+
   private int regionReplicaCount = 3;
   private int schemaRegionCount = 1;
   private int dataRegionCount = 1;
@@ -87,6 +96,7 @@ public class ConfigNodeConf {
     for (int i = 0; i < dataDirs.length; i++) {
       dataDirs[i] = addHomeDir(dataDirs[i]);
     }
+    consensusDir = addHomeDir(consensusDir);
   }
 
   private String addHomeDir(String dir) {
@@ -181,12 +191,28 @@ public class ConfigNodeConf {
     this.internalPort = internalPort;
   }
 
-  public String getAddressLists() {
-    return addressLists;
+  public String getConsensusDir() {
+    return consensusDir;
+  }
+
+  public void setConsensusDir(String consensusDir) {
+    this.consensusDir = consensusDir;
+  }
+
+  public ConsensusType getConsensusType() {
+    return consensusType;
+  }
+
+  public void setConsensusType(ConsensusType consensusType) {
+    this.consensusType = consensusType;
+  }
+
+  public Endpoint[] getConfigNodeGroupAddressList() {
+    return configNodeGroupAddressList;
   }
 
-  public void setAddressLists(String addressLists) {
-    this.addressLists = addressLists;
+  public void setConfigNodeGroupAddressList(Endpoint[] configNodeGroupAddressList) {
+    this.configNodeGroupAddressList = configNodeGroupAddressList;
   }
 
   public int getThriftServerAwaitTimeForStopService() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java
index 5629d8d..f3bc4ac 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java
@@ -41,6 +41,7 @@ public class ConfigNodeConstant {
 
   public static final String DATA_DIR = "data";
   public static final String CONF_DIR = "conf";
+  public static final String CONSENSUS_FOLDER = "consensus";
 
   public static final int MIN_SUPPORTED_JDK_VERSION = 8;
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index d71faee..acfce96 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -18,6 +18,9 @@
  */
 package org.apache.iotdb.confignode.conf;
 
+import org.apache.iotdb.confignode.consensus.ConsensusType;
+import org.apache.iotdb.consensus.common.Endpoint;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,7 +52,7 @@ public class ConfigNodeDescriptor {
   public URL getPropsUrl() {
     // Check if a config-directory was specified first.
     String urlString = System.getProperty(ConfigNodeConstant.CONFIGNODE_CONF, null);
-    // If it wasn't, check if a home directory was provided (This usually contains a config)
+    // If it wasn't, check if a home directory was provided
     if (urlString == null) {
       urlString = System.getProperty(ConfigNodeConstant.CONFIGNODE_HOME, null);
       if (urlString != null) {
@@ -60,15 +63,9 @@ public class ConfigNodeDescriptor {
                 + File.separatorChar
                 + ConfigNodeConstant.CONF_NAME;
       } else {
-        // If this too wasn't provided, try to find a default config in the root of the classpath.
-        URL uri = ConfigNodeConf.class.getResource("/" + ConfigNodeConstant.CONF_NAME);
-        if (uri != null) {
-          return uri;
-        }
-        LOGGER.warn(
-            "Cannot find IOTDB_HOME or IOTDB_CONF environment variable when loading "
-                + "config file {}, use default configuration",
-            ConfigNodeConstant.CONF_NAME);
+        // When start ConfigNode with the script, the environment variables CONFIGNODE_CONF
+        // and CONFIGNODE_HOME will be set. But we didn't set these two in developer mode.
+        // Thus, just return null and use default Configuration in developer mode.
         return null;
       }
     }
@@ -125,8 +122,9 @@ public class ConfigNodeDescriptor {
               properties.getProperty(
                   "config_node_internal_port", String.valueOf(conf.getInternalPort()))));
 
-      conf.setAddressLists(
-          properties.getProperty("config_node_address_lists", conf.getAddressLists()));
+      conf.setConsensusType(
+          ConsensusType.getConsensusType(
+              properties.getProperty("consensus_type", String.valueOf(conf.getConsensusType()))));
 
       conf.setRpcAdvancedCompressionEnable(
           Boolean.parseBoolean(
@@ -160,6 +158,8 @@ public class ConfigNodeDescriptor {
 
       conf.setDataDirs(properties.getProperty("data_dirs", conf.getDataDirs()[0]).split(","));
 
+      conf.setConsensusDir(properties.getProperty("consensus_dir", conf.getConsensusDir()));
+
       conf.setRegionReplicaCount(
           Integer.parseInt(
               properties.getProperty(
@@ -175,6 +175,24 @@ public class ConfigNodeDescriptor {
               properties.getProperty(
                   "data_region_count", String.valueOf(conf.getDataRegionCount()))));
 
+      String addresses = properties.getProperty("config_node_group_address_list", null);
+      if (addresses != null) {
+        String[] addressList = addresses.split(",");
+        Endpoint[] endpointList = new Endpoint[addressList.length];
+        for (int i = 0; i < addressList.length; i++) {
+          String[] ipPort = addressList[i].split(":");
+          if (ipPort.length != 2) {
+            throw new IOException(
+                String.format(
+                    "Parsing parameter config_node_group_address_list error. "
+                        + "The %d-th address must format to ip:port, but currently is %s",
+                    i, addressList[i]));
+          }
+          endpointList[i] = new Endpoint(ipPort[0], Integer.parseInt(ipPort[1]));
+        }
+        conf.setConfigNodeGroupAddressList(endpointList);
+      }
+
     } catch (IOException e) {
       LOGGER.warn("Couldn't load ConfigNode conf file, use default config", e);
     } finally {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/ConsensusType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/ConsensusType.java
new file mode 100644
index 0000000..8d242cc
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/ConsensusType.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.consensus;
+
+import java.util.Arrays;
+
+public enum ConsensusType {
+  STANDALONE("standalone"),
+  RATIS("ratis");
+
+  private final String typeName;
+
+  ConsensusType(String typeName) {
+    this.typeName = typeName;
+  }
+
+  public String getTypeName() {
+    return typeName;
+  }
+
+  @Override
+  public String toString() {
+    return typeName;
+  }
+
+  public static ConsensusType getConsensusType(String typeName) {
+    for (ConsensusType type : ConsensusType.values()) {
+      if (type.getTypeName().equals(typeName)) {
+        return type;
+      }
+    }
+
+    throw new IllegalArgumentException(
+        String.format(
+            "Unknown consensus type, found: %s expected: %s",
+            typeName, Arrays.toString(ConsensusType.values())));
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index c161061..2c63b40 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -38,7 +38,11 @@ public class PartitionRegionStateMachine implements IStateMachine {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PartitionRegionStateMachine.class);
 
-  private final PlanExecutor executor = new PlanExecutor();
+  private final PlanExecutor executor;
+
+  public PartitionRegionStateMachine() {
+    this.executor = new PlanExecutor();
+  }
 
   @Override
   public TSStatus write(IConsensusRequest request) {
@@ -74,7 +78,14 @@ public class PartitionRegionStateMachine implements IStateMachine {
   @Override
   public DataSet read(IConsensusRequest request) {
     PhysicalPlan plan;
-    if (request instanceof PhysicalPlan) {
+    if (request instanceof ByteBufferConsensusRequest) {
+      try {
+        plan = PhysicalPlan.Factory.create(((ByteBufferConsensusRequest) request).getContent());
+      } catch (IOException e) {
+        LOGGER.error("Deserialization error for write plan : {}", request);
+        return null;
+      }
+    } else if (request instanceof PhysicalPlan) {
       plan = (PhysicalPlan) request;
     } else {
       LOGGER.error("Unexpected read plan : {}", request);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 7c9a463..1fd5767 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.confignode.manager;
 
 import org.apache.iotdb.commons.hash.DeviceGroupHashExecutor;
-import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.conf.ConfigNodeConf;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.statemachine.PartitionRegionStateMachine;
@@ -31,15 +30,20 @@ import org.apache.iotdb.consensus.common.GroupType;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.ratis.RatisConsensus;
 import org.apache.iotdb.consensus.standalone.StandAloneConsensus;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 
 /**
  * ConfigManager maintains consistency between PartitionTables in the ConfigNodeGroup. Expose the
@@ -48,36 +52,34 @@ import java.util.Collections;
 public class ConfigManager {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(ConfigManager.class);
+  private static final ConfigNodeConf conf = ConfigNodeDescriptor.getInstance().getConf();
 
   private IConsensus consensusImpl;
   private ConsensusGroupId consensusGroupId;
 
   private DeviceGroupHashExecutor hashExecutor;
 
-  @TestOnly
-  public ConfigManager(String hashExecutorClass, int deviceGroupCount) {
-    setHashExecutor(hashExecutorClass, deviceGroupCount);
-  }
-
-  public ConfigManager() {
-    ConfigNodeConf config = ConfigNodeDescriptor.getInstance().getConf();
-
-    setHashExecutor(config.getDeviceGroupHashExecutorClass(), config.getDeviceGroupCount());
-    setConsensusLayer(config);
+  public ConfigManager() throws IOException {
+    setHashExecutor();
+    setConsensusLayer();
   }
 
   /** Build DeviceGroupHashExecutor */
-  private void setHashExecutor(String hashExecutorClass, int deviceGroupCount) {
+  private void setHashExecutor() {
     try {
-      Class<?> executor = Class.forName(hashExecutorClass);
+      Class<?> executor = Class.forName(conf.getDeviceGroupHashExecutorClass());
       Constructor<?> executorConstructor = executor.getConstructor(int.class);
-      hashExecutor = (DeviceGroupHashExecutor) executorConstructor.newInstance(deviceGroupCount);
+      hashExecutor =
+          (DeviceGroupHashExecutor) executorConstructor.newInstance(conf.getDeviceGroupCount());
     } catch (ClassNotFoundException
         | NoSuchMethodException
         | InstantiationException
         | IllegalAccessException
         | InvocationTargetException e) {
-      LOGGER.error("Couldn't Constructor DeviceGroupHashExecutor class: {}", hashExecutorClass, e);
+      LOGGER.error(
+          "Couldn't Constructor DeviceGroupHashExecutor class: {}",
+          conf.getDeviceGroupHashExecutorClass(),
+          e);
       hashExecutor = null;
     }
   }
@@ -87,33 +89,80 @@ public class ConfigManager {
   }
 
   /** Build ConfigNodeGroup ConsensusLayer */
-  private void setConsensusLayer(ConfigNodeConf config) {
-    // TODO: Support other consensus protocol
-    this.consensusImpl = new StandAloneConsensus(id -> new PartitionRegionStateMachine());
-    // TODO: handle the possible exception that cause the Consensus fail
-    try {
-      this.consensusImpl.start();
-    } catch (IOException e) {
-      e.printStackTrace();
+  private void setConsensusLayer() throws IOException {
+    // There is only one ConfigNodeGroup
+    consensusGroupId = new ConsensusGroupId(GroupType.PartitionRegion, 0);
+
+    // If consensusDir does not exist, create consensusDir
+    File consensusDir = new File(conf.getConsensusDir());
+    if (!consensusDir.exists()) {
+      if (consensusDir.mkdirs()) {
+        LOGGER.info("Make consensus dirs: {}", consensusDir);
+      } else {
+        throw new IOException(
+            String.format(
+                "Start ConfigNode failed, because couldn't make system dirs: %s.",
+                consensusDir.getAbsolutePath()));
+      }
     }
 
-    this.consensusGroupId = new ConsensusGroupId(GroupType.PartitionRegion, 0);
-    this.consensusImpl.addConsensusGroup(
-        this.consensusGroupId,
+    // Implement specific consensus
+    switch (conf.getConsensusType()) {
+      case STANDALONE:
+        constructStandAloneConsensus();
+        break;
+      case RATIS:
+        constructRatisConsensus();
+        break;
+      default:
+        throw new IllegalArgumentException(
+            "Start ConfigNode failed, unrecognized ConsensusType: "
+                + conf.getConsensusType().getTypeName());
+    }
+  }
+
+  private void constructStandAloneConsensus() throws IOException {
+    // Standalone consensus
+    consensusImpl = new StandAloneConsensus(id -> new PartitionRegionStateMachine());
+    consensusImpl.start();
+
+    // Standalone ConsensusGroup
+    consensusImpl.addConsensusGroup(
+        consensusGroupId,
         Collections.singletonList(
             new Peer(
-                this.consensusGroupId,
-                new Endpoint(config.getRpcAddress(), config.getInternalPort()))));
+                consensusGroupId, new Endpoint(conf.getRpcAddress(), conf.getInternalPort()))));
+  }
+
+  private void constructRatisConsensus() throws IOException {
+    // Ratis consensus local implement
+    consensusImpl =
+        RatisConsensus.newBuilder()
+            .setEndpoint(new Endpoint(conf.getRpcAddress(), conf.getInternalPort()))
+            .setStateMachineRegistry(id -> new PartitionRegionStateMachine())
+            .setStorageDir(new File(conf.getConsensusDir()))
+            .build();
+    consensusImpl.start();
+
+    // Build ratis group from user properties
+    LOGGER.info(
+        "Set ConfigNode consensus group {}...",
+        Arrays.toString(conf.getConfigNodeGroupAddressList()));
+    List<Peer> peerList = new ArrayList<>();
+    for (Endpoint endpoint : conf.getConfigNodeGroupAddressList()) {
+      peerList.add(new Peer(consensusGroupId, endpoint));
+    }
+    consensusImpl.addConsensusGroup(consensusGroupId, peerList);
   }
 
   /** Transmit PhysicalPlan to confignode.consensus.statemachine */
   public ConsensusWriteResponse write(PhysicalPlan plan) {
-    return this.consensusImpl.write(this.consensusGroupId, plan);
+    return consensusImpl.write(consensusGroupId, plan);
   }
 
   /** Transmit PhysicalPlan to confignode.consensus.statemachine */
   public ConsensusReadResponse read(PhysicalPlan plan) {
-    return this.consensusImpl.read(this.consensusGroupId, plan);
+    return consensusImpl.read(consensusGroupId, plan);
   }
 
   // TODO: Interfaces for LoadBalancer control
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/partition/PartitionTable.java b/confignode/src/main/java/org/apache/iotdb/confignode/partition/PartitionTable.java
index ea11a7e..9f0354d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/partition/PartitionTable.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/partition/PartitionTable.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.confignode.partition;
 
+import org.apache.iotdb.confignode.conf.ConfigNodeConf;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.response.DataNodesInfoDataSet;
 import org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaDataSet;
@@ -40,12 +41,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
  */
 public class PartitionTable {
 
-  private static final int regionReplicaCount =
-      ConfigNodeDescriptor.getInstance().getConf().getRegionReplicaCount();
-  private static final int schemaRegionCount =
-      ConfigNodeDescriptor.getInstance().getConf().getSchemaRegionCount();
-  private static final int dataRegionCount =
-      ConfigNodeDescriptor.getInstance().getConf().getDataRegionCount();
+  private static final ConfigNodeConf conf = ConfigNodeDescriptor.getInstance().getConf();
+  private static final int regionReplicaCount = conf.getRegionReplicaCount();
+  private static final int schemaRegionCount = conf.getSchemaRegionCount();
+  private static final int dataRegionCount = conf.getDataRegionCount();
 
   private final ReentrantReadWriteLock lock;
   // TODO: Serialize and Deserialize
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlan.java
index 523385a..858cc9d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlan.java
@@ -19,7 +19,9 @@
 package org.apache.iotdb.confignode.physical;
 
 import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
+import org.apache.iotdb.confignode.physical.sys.QueryStorageGroupSchemaPlan;
 import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
+import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 
 import org.slf4j.Logger;
@@ -64,6 +66,7 @@ public abstract class PhysicalPlan implements IConsensusRequest {
       buffer.reset();
       throw e;
     }
+    buffer.flip();
   }
 
   protected abstract void serializeImpl(ByteBuffer buffer);
@@ -83,6 +86,12 @@ public abstract class PhysicalPlan implements IConsensusRequest {
         case QueryDataNodeInfo:
           plan = new QueryDataNodeInfoPlan();
           break;
+        case SetStorageGroup:
+          plan = new SetStorageGroupPlan();
+          break;
+        case QueryStorageGroupSchema:
+          plan = new QueryStorageGroupSchemaPlan();
+          break;
         default:
           throw new IOException("unknown PhysicalPlan type: " + typeNum);
       }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index 6069f70..418ff9a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.commons.exception.ShutdownException;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.service.JMXService;
 import org.apache.iotdb.commons.service.RegisterManager;
-import org.apache.iotdb.commons.service.StartupChecks;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
 import org.apache.iotdb.confignode.service.thrift.server.ConfigNodeRPCServer;
@@ -31,6 +30,8 @@ import org.apache.iotdb.confignode.service.thrift.server.ConfigNodeRPCServerProc
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+
 public class ConfigNode implements ConfigNodeMBean {
   private static final Logger LOGGER = LoggerFactory.getLogger(ConfigNode.class);
 
@@ -39,7 +40,7 @@ public class ConfigNode implements ConfigNodeMBean {
           "%s:%s=%s",
           ConfigNodeConstant.CONFIGNODE_PACKAGE, ConfigNodeConstant.JMX_TYPE, "ConfigNode");
 
-  private static final RegisterManager registerManager = new RegisterManager();
+  private final RegisterManager registerManager = new RegisterManager();
 
   private ConfigNode() {
     // empty constructor
@@ -50,31 +51,20 @@ public class ConfigNode implements ConfigNodeMBean {
   }
 
   /** Register services */
-  private void setUp() throws StartupException {
+  private void setUp() throws StartupException, IOException {
     LOGGER.info("Setting up {}...", ConfigNodeConstant.GLOBAL_NAME);
     registerManager.register(JMXService.getInstance());
     JMXService.registerMBean(getInstance(), mbeanName);
 
-    ConfigNodeRPCServerProcessor configNodeRPCServerProcessor = new ConfigNodeRPCServerProcessor();
-    ConfigNodeRPCServer.getInstance().initSyncedServiceImpl(configNodeRPCServerProcessor);
+    ConfigNodeRPCServer.getInstance().initSyncedServiceImpl(new ConfigNodeRPCServerProcessor());
     registerManager.register(ConfigNodeRPCServer.getInstance());
     LOGGER.info("Init rpc server success");
   }
 
   public void active() {
-    StartupChecks checks = new StartupChecks().withDefaultTest();
-    try {
-      // Startup environment check
-      checks.verify();
-    } catch (StartupException e) {
-      LOGGER.error(
-          "{}: failed to start because some checks failed. ", ConfigNodeConstant.GLOBAL_NAME, e);
-      return;
-    }
-
     try {
       setUp();
-    } catch (StartupException e) {
+    } catch (StartupException | IOException e) {
       LOGGER.error("Meet error while starting up.", e);
       deactivate();
       return;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java
index 27d1325..52894d3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.service;
 import org.apache.iotdb.commons.ServerCommandLine;
 import org.apache.iotdb.commons.exception.ConfigurationException;
 import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.service.StartupChecks;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfCheck;
 
 import org.slf4j.Logger;
@@ -66,7 +67,10 @@ public class ConfigNodeCommandLine extends ServerCommandLine {
     LOGGER.info("Running mode {}", mode);
     if (MODE_START.equals(mode)) {
       try {
-        // Check parameters
+        // Startup environment check
+        StartupChecks checks = new StartupChecks().withDefaultTest();
+        checks.verify();
+        // Check special parameters
         ConfigNodeConfCheck.getInstance().checkConfig();
       } catch (IOException | ConfigurationException | StartupException e) {
         LOGGER.error("Meet error when doing start checking", e);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/executor/PlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/executor/PlanExecutor.java
index ccb57e3..c9c90af 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/executor/PlanExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/executor/PlanExecutor.java
@@ -29,10 +29,10 @@ import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
 public class PlanExecutor {
 
-  private final PartitionTable partitionTable = new PartitionTable();
+  private final PartitionTable partitionTable;
 
   public PlanExecutor() {
-    // empty constructor
+    this.partitionTable = new PartitionTable();
   }
 
   public DataSet executorQueryPlan(PhysicalPlan plan) throws UnknownPhysicalPlanTypeException {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServer.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServer.java
index 876f059..2c3542b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServer.java
@@ -30,11 +30,14 @@ import org.apache.iotdb.confignode.rpc.thrift.ConfigIService;
 
 /** ConfigNodeRPCServer exposes the interface that interacts with the DataNode */
 public class ConfigNodeRPCServer extends ThriftService implements ConfigNodeRPCServerMBean {
-  private final ConfigNodeConf config = ConfigNodeDescriptor.getInstance().getConf();
+
+  private static final ConfigNodeConf conf = ConfigNodeDescriptor.getInstance().getConf();
 
   private ConfigNodeRPCServerProcessor configNodeRPCServerProcessor;
 
-  private ConfigNodeRPCServer() {}
+  private ConfigNodeRPCServer() {
+    // empty constructor
+  }
 
   @Override
   public ThriftService getImplementation() {
@@ -72,10 +75,10 @@ public class ConfigNodeRPCServer extends ThriftService implements ConfigNodeRPCS
               ThreadName.CONFIG_NODE_RPC_CLIENT.getName(),
               getBindIP(),
               getBindPort(),
-              config.getRpcMaxConcurrentClientNum(),
-              config.getThriftServerAwaitTimeForStopService(),
+              conf.getRpcMaxConcurrentClientNum(),
+              conf.getThriftServerAwaitTimeForStopService(),
               new ConfigNodeRPCServiceHandler(configNodeRPCServerProcessor),
-              config.isRpcThriftCompressionEnabled());
+              conf.isRpcThriftCompressionEnabled());
     } catch (RPCServiceException e) {
       throw new IllegalAccessException(e.getMessage());
     }
@@ -84,12 +87,12 @@ public class ConfigNodeRPCServer extends ThriftService implements ConfigNodeRPCS
 
   @Override
   public String getBindIP() {
-    return config.getRpcAddress();
+    return conf.getRpcAddress();
   }
 
   @Override
   public int getBindPort() {
-    return config.getRpcPort();
+    return conf.getRpcPort();
   }
 
   public static ConfigNodeRPCServer getInstance() {
@@ -100,6 +103,8 @@ public class ConfigNodeRPCServer extends ThriftService implements ConfigNodeRPCS
 
     private static final ConfigNodeRPCServer INSTANCE = new ConfigNodeRPCServer();
 
-    private ConfigNodeRPCServerHolder() {}
+    private ConfigNodeRPCServerHolder() {
+      // empty constructor
+    }
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
index 2427a0a..9cd98e1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
@@ -48,16 +48,17 @@ import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
 import org.apache.thrift.TException;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
 /** ConfigNodeRPCServer exposes the interface that interacts with the DataNode */
 public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
 
-  private final ConfigManager configManager = new ConfigManager();
+  private final ConfigManager configManager;
 
-  public ConfigNodeRPCServerProcessor() {
-    // empty constructor
+  public ConfigNodeRPCServerProcessor() throws IOException {
+    this.configManager = new ConfigManager();
   }
 
   @Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServiceHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServiceHandler.java
index a186a12..6388850 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServiceHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServiceHandler.java
@@ -22,7 +22,7 @@ import org.apache.thrift.server.TServerEventHandler;
 import org.apache.thrift.transport.TTransport;
 
 public class ConfigNodeRPCServiceHandler implements TServerEventHandler {
-  private ConfigNodeRPCServerProcessor processor;
+  private final ConfigNodeRPCServerProcessor processor;
 
   public ConfigNodeRPCServiceHandler(ConfigNodeRPCServerProcessor processor) {
     this.processor = processor;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/ConfigManagerManualTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/ConfigManagerManualTest.java
new file mode 100644
index 0000000..270ba1c
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/ConfigManagerManualTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager;
+
+import org.apache.iotdb.confignode.rpc.thrift.ConfigIService;
+import org.apache.iotdb.confignode.rpc.thrift.DataNodeMessage;
+import org.apache.iotdb.confignode.rpc.thrift.DataNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.DataNodeRegisterResp;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.Assert;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class ConfigManagerManualTest {
+
+  // TODO: Optimize this manual test to automatic test after the test environment is set up.
+  // @YongzaoDan
+
+  private static final String localhost = "0.0.0.0";
+  private static final int timeOutInMS = 2000;
+
+  private ConfigIService.Client[] clients;
+
+  /**
+   * This is a temporary test of ConfigNode's integration with the ratis-consensus protocol. To run
+   * this code, follow these steps: 1. Compile IoTDB 2. Copy at least three
+   * iotdb-confignode-0.14.0-SNAPSHOT 3. Make sure these parameters: config_node_rpc_address(all
+   * 0.0.0.0), config_node_rpc_port(22277, 22279, 22281), config_node_internal_port(22278, 22280,
+   * 22282), consensus_type(all ratis) and config_node_group_address_list(all 0.0.0.0:22278,
+   * 0.0.0.0:22280, 0.0.0.0:22282) in each iotdb-confignode.properties file are set 4. Start these
+   * ConfigNode by yourself 5. Add @Test and run
+   */
+  public void ratisConsensusTest() throws TException, InterruptedException {
+    createClients();
+
+    registerDataNodes();
+
+    queryDataNodes();
+  }
+
+  private void createClients() throws TTransportException {
+    clients = new ConfigIService.Client[3];
+    for (int i = 0; i < 3; i++) {
+      TTransport transport =
+          RpcTransportFactory.INSTANCE.getTransport(localhost, 22277 + i * 2, timeOutInMS);
+      transport.open();
+      clients[i] = new ConfigIService.Client(new TBinaryProtocol(transport));
+    }
+  }
+
+  private void registerDataNodes() throws TException {
+    for (int i = 0; i < 3; i++) {
+      DataNodeRegisterReq req = new DataNodeRegisterReq(new EndPoint("0.0.0.0", 6667 + i));
+      DataNodeRegisterResp resp = clients[0].registerDataNode(req);
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.registerResult.getCode());
+      Assert.assertEquals(i, resp.getDataNodeID());
+    }
+  }
+
+  private void queryDataNodes() throws InterruptedException, TException {
+    // sleep 1s to make sure all ConfigNode in ConfigNodeGroup hold the same PartitionTable
+    TimeUnit.SECONDS.sleep(1);
+
+    for (int i = 0; i < 3; i++) {
+      Map<Integer, DataNodeMessage> msgMap = clients[i].getDataNodesMessage(-1);
+      Assert.assertEquals(3, msgMap.size());
+      for (int j = 0; j < 3; j++) {
+        Assert.assertNotNull(msgMap.get(j));
+        Assert.assertEquals(j, msgMap.get(j).getDataNodeID());
+        Assert.assertEquals(localhost, msgMap.get(j).getEndPoint().getIp());
+        Assert.assertEquals(6667 + j, msgMap.get(j).getEndPoint().getPort());
+      }
+    }
+  }
+
+  /**
+   * This is a temporary test of ConfigNode's integration with the ratis-consensus protocol. This
+   * code tests the high availability of the ratis-consensus protocol. Make sure that you have run
+   * according to the comments of ratisConsensusTest before executing this code. Next, close
+   * ConfigNode that occupies ports 22281 and 22282 on the local machine. Finally, run this test.
+   */
+  public void killTest() throws TException {
+    clients = new ConfigIService.Client[2];
+    for (int i = 0; i < 2; i++) {
+      TTransport transport =
+          RpcTransportFactory.INSTANCE.getTransport(localhost, 22277 + i * 2, timeOutInMS);
+      transport.open();
+      clients[i] = new ConfigIService.Client(new TBinaryProtocol(transport));
+    }
+
+    DataNodeRegisterResp resp =
+        clients[1].registerDataNode(new DataNodeRegisterReq(new EndPoint("0.0.0.0", 6670)));
+    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.registerResult.getCode());
+    Assert.assertEquals(3, resp.getDataNodeID());
+
+    for (int i = 0; i < 2; i++) {
+      Map<Integer, DataNodeMessage> msgMap = clients[i].getDataNodesMessage(-1);
+      Assert.assertEquals(4, msgMap.size());
+      for (int j = 0; j < 4; j++) {
+        Assert.assertNotNull(msgMap.get(j));
+        Assert.assertEquals(j, msgMap.get(j).getDataNodeID());
+        Assert.assertEquals(localhost, msgMap.get(j).getEndPoint().getIp());
+        Assert.assertEquals(6667 + j, msgMap.get(j).getEndPoint().getPort());
+      }
+    }
+  }
+}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java
similarity index 94%
rename from confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorTest.java
rename to confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java
index 6039043..f99daa9 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.manager.hash;
 
 import org.apache.iotdb.confignode.manager.ConfigManager;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -29,7 +30,7 @@ import java.util.Random;
  * This is a not active test class, which can be used for general index testing when there is a new
  * DeviceGroup hash algorithm
  */
-public class DeviceGroupHashExecutorTest {
+public class DeviceGroupHashExecutorManualTest {
 
   private static final int deviceGroupCount = 10_000;
   private static final String sg = "root.SGGroup.";
@@ -58,9 +59,8 @@ public class DeviceGroupHashExecutorTest {
     return devices;
   }
 
-  public void GeneralIndexTest() {
-    ConfigManager manager =
-        new ConfigManager("org.apache.iotdb.commons.hash.BKDRHashExecutor", deviceGroupCount);
+  public void GeneralIndexTest() throws IOException {
+    ConfigManager manager = new ConfigManager();
     int[] bucket = new int[deviceGroupCount];
     Arrays.fill(bucket, 0);
 
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
index b2f169d..fd85d40 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
@@ -31,6 +31,7 @@ import org.apache.thrift.TException;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
@@ -39,7 +40,7 @@ import java.util.Map;
 public class ConfigNodeRPCServerProcessorTest {
 
   @Test
-  public void registerDataNodeTest() throws TException {
+  public void registerDataNodeTest() throws TException, IOException {
     ConfigNodeRPCServerProcessor processor = new ConfigNodeRPCServerProcessor();
 
     DataNodeRegisterResp resp;
@@ -87,7 +88,7 @@ public class ConfigNodeRPCServerProcessorTest {
   }
 
   @Test
-  public void setStorageGroupTest() throws TException {
+  public void setStorageGroupTest() throws TException, IOException {
     ConfigNodeRPCServerProcessor processor = new ConfigNodeRPCServerProcessor();
 
     TSStatus status;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
index 244a58b..feb42b5 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
@@ -23,11 +23,18 @@ import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
 
 public class RequestMessage implements Message {
 
+  private final Logger logger = LoggerFactory.getLogger(RequestMessage.class);
+
   private final IConsensusRequest actualRequest;
   private volatile ByteString serializedContent;
+  private final int DEFAULT_BUFFER_SIZE = 1024 * 10;
 
   public RequestMessage(IConsensusRequest request) {
     this.actualRequest = request;
@@ -43,10 +50,18 @@ public class RequestMessage implements Message {
     if (serializedContent == null) {
       synchronized (this) {
         if (serializedContent == null) {
-          assert actualRequest instanceof ByteBufferConsensusRequest;
-          ByteBufferConsensusRequest req = (ByteBufferConsensusRequest) actualRequest;
-          serializedContent = ByteString.copyFrom(req.getContent());
-          req.getContent().flip(); // so that it can be read from other sources
+          ByteBufferConsensusRequest req;
+          if (actualRequest instanceof ByteBufferConsensusRequest) {
+            req = (ByteBufferConsensusRequest) actualRequest;
+            serializedContent = ByteString.copyFrom(req.getContent());
+            req.getContent().flip();
+          } else {
+            // TODO Pooling
+            ByteBuffer byteBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+            actualRequest.serializeRequest(byteBuffer);
+            serializedContent = ByteString.copyFrom(byteBuffer);
+            byteBuffer.flip();
+          }
         }
       }
     }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/RegisterManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/RegisterManager.java
index db82c58..f0b1396 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/RegisterManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/RegisterManager.java
@@ -32,7 +32,7 @@ import java.util.List;
 public class RegisterManager {
 
   private static final Logger logger = LoggerFactory.getLogger(RegisterManager.class);
-  private List<IService> iServices;
+  private final List<IService> iServices;
   private static long deregisterTimeOut = 10_000L;
 
   public RegisterManager() {