You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/05/30 16:47:37 UTC

[iotdb] branch jira3195 created (now 9e1cc7596f)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a change to branch jira3195
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 9e1cc7596f abstract interface && add MultiLeaderConfig

This branch includes the following new commits:

     new 9e1cc7596f abstract interface && add MultiLeaderConfig

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: abstract interface && add MultiLeaderConfig

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch jira3195
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9e1cc7596f0a4b9cb74ca2fdd3dbd09ba8deca2a
Author: LebronAl <TX...@gmail.com>
AuthorDate: Tue May 31 00:47:18 2022 +0800

    abstract interface && add MultiLeaderConfig
---
 .../iotdb/confignode/manager/ConsensusManager.java |   7 +-
 .../apache/iotdb/consensus/ConsensusFactory.java   |  11 +-
 .../iotdb/consensus/config/ConsensusConfig.java    | 374 +++++++++++++++++++++
 .../multileader/MultiLeaderConsensus.java          |  17 +-
 .../multileader/MultiLeaderServerImpl.java         |  15 +-
 .../multileader/client/DispatchLogHandler.java     |   7 +-
 .../client/MultiLeaderConsensusClientPool.java     |  16 +-
 .../conf/MultiLeaderConsensusConfig.java           |  40 ---
 .../multileader/logdispatcher/LogDispatcher.java   |  39 ++-
 .../multileader/logdispatcher/SyncStatus.java      |   8 +-
 .../multileader/service/MultiLeaderRPCService.java |  12 +-
 .../iotdb/consensus/ratis/RatisConsensus.java      |  18 +-
 .../consensus/standalone/StandAloneConsensus.java  |   7 +-
 .../multileader/MultiLeaderConsensusTest.java      |   7 +-
 .../multileader/logdispatcher/SyncStatusTest.java  |  79 ++---
 .../iotdb/consensus/ratis/RatisConsensusTest.java  |   7 +-
 .../iotdb/consensus/standalone/RecoveryTest.java   |   7 +-
 .../standalone/StandAloneConsensusTest.java        |   7 +-
 .../db/consensus/DataRegionConsensusImpl.java      |   8 +-
 .../db/consensus/SchemaRegionConsensusImpl.java    |   8 +-
 20 files changed, 542 insertions(+), 152 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index 702f5cfa05..a43e176f44 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
@@ -88,8 +89,10 @@ public class ConsensusManager {
     consensusImpl =
         ConsensusFactory.getConsensusImpl(
                 conf.getConfigNodeConsensusProtocolClass(),
-                new TEndPoint(conf.getRpcAddress(), conf.getConsensusPort()),
-                new File(conf.getConsensusDir()),
+                ConsensusConfig.newBuilder()
+                    .setThisNode(new TEndPoint(conf.getRpcAddress(), conf.getConsensusPort()))
+                    .setStorageDir(new File(conf.getConsensusDir()))
+                    .build(),
                 gid -> stateMachine)
             .orElseThrow(
                 () ->
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java b/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
index 15e2492b2e..8146fcf3a4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
@@ -19,12 +19,11 @@
 
 package org.apache.iotdb.consensus;
 
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.Optional;
@@ -42,15 +41,13 @@ public class ConsensusFactory {
   private static final Logger logger = LoggerFactory.getLogger(ConsensusFactory.class);
 
   public static Optional<IConsensus> getConsensusImpl(
-      String className, TEndPoint endpoint, File storageDir, IStateMachine.Registry registry) {
+      String className, ConsensusConfig config, IStateMachine.Registry registry) {
     try {
       Class<?> executor = Class.forName(className);
       Constructor<?> executorConstructor =
-          executor.getDeclaredConstructor(
-              TEndPoint.class, File.class, IStateMachine.Registry.class);
+          executor.getDeclaredConstructor(ConsensusConfig.class, IStateMachine.Registry.class);
       executorConstructor.setAccessible(true);
-      return Optional.of(
-          (IConsensus) executorConstructor.newInstance(endpoint, storageDir, registry));
+      return Optional.of((IConsensus) executorConstructor.newInstance(config, registry));
     } catch (ClassNotFoundException
         | NoSuchMethodException
         | InstantiationException
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
new file mode 100644
index 0000000000..e152126014
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.config;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+public class ConsensusConfig {
+
+  private final TEndPoint thisNode;
+  private final File storageDir;
+  private final StandAloneConfig standAloneConfig;
+  private final RatisConfig ratisConfig;
+  private final MultiLeaderConfig multiLeaderConfig;
+
+  public ConsensusConfig(
+      TEndPoint thisNode,
+      File storageDir,
+      StandAloneConfig standAloneConfig,
+      RatisConfig ratisConfig,
+      MultiLeaderConfig multiLeaderConfig) {
+    this.thisNode = thisNode;
+    this.storageDir = storageDir;
+    this.standAloneConfig = standAloneConfig;
+    this.ratisConfig = ratisConfig;
+    this.multiLeaderConfig = multiLeaderConfig;
+  }
+
+  public TEndPoint getThisNode() {
+    return thisNode;
+  }
+
+  public File getStorageDir() {
+    return storageDir;
+  }
+
+  public StandAloneConfig getStandAloneConfig() {
+    return standAloneConfig;
+  }
+
+  public RatisConfig getRatisConfig() {
+    return ratisConfig;
+  }
+
+  public MultiLeaderConfig getMultiLeaderConfig() {
+    return multiLeaderConfig;
+  }
+
+  public static ConsensusConfig.Builder newBuilder() {
+    return new ConsensusConfig.Builder();
+  }
+
+  public static class Builder {
+
+    private TEndPoint thisNode;
+    private File storageDir;
+    private StandAloneConfig standAloneConfig;
+    private RatisConfig ratisConfig;
+    private MultiLeaderConfig multiLeaderConfig;
+
+    public ConsensusConfig build() {
+      return new ConsensusConfig(
+          thisNode,
+          storageDir,
+          standAloneConfig != null ? standAloneConfig : StandAloneConfig.newBuilder().build(),
+          ratisConfig != null ? ratisConfig : RatisConfig.newBuilder().build(),
+          multiLeaderConfig != null ? multiLeaderConfig : MultiLeaderConfig.newBuilder().build());
+    }
+
+    public Builder setThisNode(TEndPoint thisNode) {
+      this.thisNode = thisNode;
+      return this;
+    }
+
+    public Builder setStorageDir(File storageDir) {
+      this.storageDir = storageDir;
+      return this;
+    }
+
+    public Builder setStandAloneConfig(StandAloneConfig standAloneConfig) {
+      this.standAloneConfig = standAloneConfig;
+      return this;
+    }
+
+    public Builder setRatisConfig(RatisConfig ratisConfig) {
+      this.ratisConfig = ratisConfig;
+      return this;
+    }
+
+    public Builder setMultiLeaderConfig(MultiLeaderConfig multiLeaderConfig) {
+      this.multiLeaderConfig = multiLeaderConfig;
+      return this;
+    }
+  }
+
+  public static class StandAloneConfig {
+    public static StandAloneConfig.Builder newBuilder() {
+      return new StandAloneConfig.Builder();
+    }
+
+    public static class Builder {
+      public StandAloneConfig build() {
+        return new StandAloneConfig();
+      }
+    }
+  }
+
+  public static class RatisConfig {
+
+    public static RatisConfig.Builder newBuilder() {
+      return new RatisConfig.Builder();
+    }
+
+    public static class Builder {
+      public RatisConfig build() {
+        return new RatisConfig();
+      }
+    }
+  }
+
+  public static class MultiLeaderConfig {
+
+    private final RPC rpc;
+    private final Replication replication;
+
+    private MultiLeaderConfig(RPC rpc, Replication replication) {
+      this.rpc = rpc;
+      this.replication = replication;
+    }
+
+    public RPC getRpc() {
+      return rpc;
+    }
+
+    public Replication getReplication() {
+      return replication;
+    }
+
+    public static MultiLeaderConfig.Builder newBuilder() {
+      return new MultiLeaderConfig.Builder();
+    }
+
+    public static class Builder {
+
+      private RPC rpc;
+      private Replication replication;
+
+      public MultiLeaderConfig build() {
+        return new MultiLeaderConfig(
+            rpc != null ? rpc : new RPC.Builder().build(),
+            replication != null ? replication : new Replication.Builder().build());
+      }
+
+      public Builder setRpc(RPC rpc) {
+        this.rpc = rpc;
+        return this;
+      }
+
+      public Builder setReplication(Replication replication) {
+        this.replication = replication;
+        return this;
+      }
+    }
+
+    public static class RPC {
+      private final int rpcMaxConcurrentClientNum;
+      private final int thriftServerAwaitTimeForStopService;
+      private final boolean isRpcThriftCompressionEnabled;
+      private final int selectorNumOfClientManager;
+      private final int connectionTimeoutInMs;
+
+      public RPC(
+          int rpcMaxConcurrentClientNum,
+          int thriftServerAwaitTimeForStopService,
+          boolean isRpcThriftCompressionEnabled,
+          int selectorNumOfClientManager,
+          int connectionTimeoutInMs) {
+        this.rpcMaxConcurrentClientNum = rpcMaxConcurrentClientNum;
+        this.thriftServerAwaitTimeForStopService = thriftServerAwaitTimeForStopService;
+        this.isRpcThriftCompressionEnabled = isRpcThriftCompressionEnabled;
+        this.selectorNumOfClientManager = selectorNumOfClientManager;
+        this.connectionTimeoutInMs = connectionTimeoutInMs;
+      }
+
+      public int getRpcMaxConcurrentClientNum() {
+        return rpcMaxConcurrentClientNum;
+      }
+
+      public int getThriftServerAwaitTimeForStopService() {
+        return thriftServerAwaitTimeForStopService;
+      }
+
+      public boolean isRpcThriftCompressionEnabled() {
+        return isRpcThriftCompressionEnabled;
+      }
+
+      public int getSelectorNumOfClientManager() {
+        return selectorNumOfClientManager;
+      }
+
+      public int getConnectionTimeoutInMs() {
+        return connectionTimeoutInMs;
+      }
+
+      public static RPC.Builder newBuilder() {
+        return new RPC.Builder();
+      }
+
+      public static class Builder {
+        private int rpcMaxConcurrentClientNum = 65535;
+        private int thriftServerAwaitTimeForStopService = 60;
+        private boolean isRpcThriftCompressionEnabled = false;
+        private int selectorNumOfClientManager = 1;
+        private int connectionTimeoutInMs = (int) TimeUnit.SECONDS.toMillis(20);
+
+        public Builder setRpcMaxConcurrentClientNum(int rpcMaxConcurrentClientNum) {
+          this.rpcMaxConcurrentClientNum = rpcMaxConcurrentClientNum;
+          return this;
+        }
+
+        public Builder setThriftServerAwaitTimeForStopService(
+            int thriftServerAwaitTimeForStopService) {
+          this.thriftServerAwaitTimeForStopService = thriftServerAwaitTimeForStopService;
+          return this;
+        }
+
+        public Builder setRpcThriftCompressionEnabled(boolean rpcThriftCompressionEnabled) {
+          isRpcThriftCompressionEnabled = rpcThriftCompressionEnabled;
+          return this;
+        }
+
+        public Builder setSelectorNumOfClientManager(int selectorNumOfClientManager) {
+          this.selectorNumOfClientManager = selectorNumOfClientManager;
+          return this;
+        }
+
+        public Builder setConnectionTimeoutInMs(int connectionTimeoutInMs) {
+          this.connectionTimeoutInMs = connectionTimeoutInMs;
+          return this;
+        }
+
+        public RPC build() {
+          return new RPC(
+              rpcMaxConcurrentClientNum,
+              thriftServerAwaitTimeForStopService,
+              isRpcThriftCompressionEnabled,
+              selectorNumOfClientManager,
+              connectionTimeoutInMs);
+        }
+      }
+    }
+
+    public static class Replication {
+      private final int maxPendingRequestNumPerNode;
+      private final int maxRequestPerBatch;
+      private final int maxPendingBatch;
+      private final int maxWaitingTimeForAccumulatingBatchInMs;
+      private final long basicRetryWaitTimeMs;
+      private final long maxRetryWaitTimeMs;
+
+      private Replication(
+          int maxPendingRequestNumPerNode,
+          int maxRequestPerBatch,
+          int maxPendingBatch,
+          int maxWaitingTimeForAccumulatingBatchInMs,
+          long basicRetryWaitTimeMs,
+          long maxRetryWaitTimeMs) {
+        this.maxPendingRequestNumPerNode = maxPendingRequestNumPerNode;
+        this.maxRequestPerBatch = maxRequestPerBatch;
+        this.maxPendingBatch = maxPendingBatch;
+        this.maxWaitingTimeForAccumulatingBatchInMs = maxWaitingTimeForAccumulatingBatchInMs;
+        this.basicRetryWaitTimeMs = basicRetryWaitTimeMs;
+        this.maxRetryWaitTimeMs = maxRetryWaitTimeMs;
+      }
+
+      public int getMaxPendingRequestNumPerNode() {
+        return maxPendingRequestNumPerNode;
+      }
+
+      public int getMaxRequestPerBatch() {
+        return maxRequestPerBatch;
+      }
+
+      public int getMaxPendingBatch() {
+        return maxPendingBatch;
+      }
+
+      public int getMaxWaitingTimeForAccumulatingBatchInMs() {
+        return maxWaitingTimeForAccumulatingBatchInMs;
+      }
+
+      public long getBasicRetryWaitTimeMs() {
+        return basicRetryWaitTimeMs;
+      }
+
+      public long getMaxRetryWaitTimeMs() {
+        return maxRetryWaitTimeMs;
+      }
+
+      public static Replication.Builder newBuilder() {
+        return new Replication.Builder();
+      }
+
+      public static class Builder {
+        private int maxPendingRequestNumPerNode = 1000;
+        private int maxRequestPerBatch = 100;
+        private int maxPendingBatch = 50;
+        private int maxWaitingTimeForAccumulatingBatchInMs = 10;
+        private long basicRetryWaitTimeMs = TimeUnit.MILLISECONDS.toMillis(100);
+        private long maxRetryWaitTimeMs = TimeUnit.SECONDS.toMillis(20);
+
+        public Builder setMaxPendingRequestNumPerNode(int maxPendingRequestNumPerNode) {
+          this.maxPendingRequestNumPerNode = maxPendingRequestNumPerNode;
+          return this;
+        }
+
+        public Builder setMaxRequestPerBatch(int maxRequestPerBatch) {
+          this.maxRequestPerBatch = maxRequestPerBatch;
+          return this;
+        }
+
+        public Builder setMaxPendingBatch(int maxPendingBatch) {
+          this.maxPendingBatch = maxPendingBatch;
+          return this;
+        }
+
+        public Builder setMaxWaitingTimeForAccumulatingBatchInMs(
+            int maxWaitingTimeForAccumulatingBatchInMs) {
+          this.maxWaitingTimeForAccumulatingBatchInMs = maxWaitingTimeForAccumulatingBatchInMs;
+          return this;
+        }
+
+        public Builder setBasicRetryWaitTimeMs(long basicRetryWaitTimeMs) {
+          this.basicRetryWaitTimeMs = basicRetryWaitTimeMs;
+          return this;
+        }
+
+        public Builder setMaxRetryWaitTimeMs(long maxRetryWaitTimeMs) {
+          this.maxRetryWaitTimeMs = maxRetryWaitTimeMs;
+          return this;
+        }
+
+        public Replication build() {
+          return new Replication(
+              maxPendingRequestNumPerNode,
+              maxRequestPerBatch,
+              maxPendingBatch,
+              maxWaitingTimeForAccumulatingBatchInMs,
+              basicRetryWaitTimeMs,
+              maxRetryWaitTimeMs);
+        }
+      }
+    }
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
index c9393ad642..1f8440ff29 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
@@ -31,6 +31,8 @@ import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
+import org.apache.iotdb.consensus.config.ConsensusConfig.MultiLeaderConfig;
 import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
@@ -64,12 +66,14 @@ public class MultiLeaderConsensus implements IConsensus {
       new ConcurrentHashMap<>();
   private final MultiLeaderRPCService service;
   private final RegisterManager registerManager = new RegisterManager();
+  private final MultiLeaderConfig config;
 
-  public MultiLeaderConsensus(TEndPoint thisNode, File storageDir, Registry registry) {
-    this.thisNode = thisNode;
-    this.storageDir = storageDir;
+  public MultiLeaderConsensus(ConsensusConfig config, Registry registry) {
+    this.thisNode = config.getThisNode();
+    this.storageDir = config.getStorageDir();
+    this.config = config.getMultiLeaderConfig();
     this.registry = registry;
-    this.service = new MultiLeaderRPCService(thisNode);
+    this.service = new MultiLeaderRPCService(thisNode, config.getMultiLeaderConfig());
   }
 
   @Override
@@ -100,7 +104,8 @@ public class MultiLeaderConsensus implements IConsensus {
                   path.toString(),
                   new Peer(consensusGroupId, thisNode),
                   new ArrayList<>(),
-                  registry.apply(consensusGroupId));
+                  registry.apply(consensusGroupId),
+                  config);
           stateMachineMap.put(consensusGroupId, consensus);
           consensus.start();
         }
@@ -161,7 +166,7 @@ public class MultiLeaderConsensus implements IConsensus {
           }
           MultiLeaderServerImpl impl =
               new MultiLeaderServerImpl(
-                  path, new Peer(groupId, thisNode), peers, registry.apply(groupId));
+                  path, new Peer(groupId, thisNode), peers, registry.apply(groupId), config);
           impl.start();
           return impl;
         });
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 017fe05a15..b6db8a8a6c 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+import org.apache.iotdb.consensus.config.ConsensusConfig.MultiLeaderConfig;
 import org.apache.iotdb.consensus.multileader.logdispatcher.IndexController;
 import org.apache.iotdb.consensus.multileader.logdispatcher.LogDispatcher;
 import org.apache.iotdb.consensus.multileader.thrift.TLogType;
@@ -55,9 +56,14 @@ public class MultiLeaderServerImpl {
   private final List<Peer> configuration;
   private final IndexController controller;
   private final LogDispatcher logDispatcher;
+  private final MultiLeaderConfig config;
 
   public MultiLeaderServerImpl(
-      String storageDir, Peer thisNode, List<Peer> configuration, IStateMachine stateMachine) {
+      String storageDir,
+      Peer thisNode,
+      List<Peer> configuration,
+      IStateMachine stateMachine,
+      MultiLeaderConfig config) {
     this.storageDir = storageDir;
     this.thisNode = thisNode;
     this.stateMachine = stateMachine;
@@ -69,7 +75,8 @@ public class MultiLeaderServerImpl {
     } else {
       persistConfiguration();
     }
-    logDispatcher = new LogDispatcher(this);
+    this.config = config;
+    this.logDispatcher = new LogDispatcher(this);
   }
 
   public IStateMachine getStateMachine() {
@@ -183,4 +190,8 @@ public class MultiLeaderServerImpl {
   public IndexController getController() {
     return controller;
   }
+
+  public MultiLeaderConfig getConfig() {
+    return config;
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
index 8cc7d10bef..14a2b4ad58 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.consensus.multileader.client;
 
-import org.apache.iotdb.consensus.multileader.conf.MultiLeaderConsensusConfig;
 import org.apache.iotdb.consensus.multileader.logdispatcher.LogDispatcher.LogDispatcherThread;
 import org.apache.iotdb.consensus.multileader.logdispatcher.PendingBatch;
 import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
@@ -79,9 +78,11 @@ public class DispatchLogHandler implements AsyncMethodCallback<TSyncLogRes> {
           try {
             long defaultSleepTime =
                 (long)
-                    (MultiLeaderConsensusConfig.BASIC_RETRY_WAIT_TIME_MS * Math.pow(2, retryCount));
+                    (thread.getConfig().getReplication().getBasicRetryWaitTimeMs()
+                        * Math.pow(2, retryCount));
             Thread.sleep(
-                Math.min(defaultSleepTime, MultiLeaderConsensusConfig.MAX_RETRY_WAIT_TIME_MS));
+                Math.min(
+                    defaultSleepTime, thread.getConfig().getReplication().getMaxRetryWaitTimeMs()));
           } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             logger.warn("Unexpected interruption during retry pending batch");
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/MultiLeaderConsensusClientPool.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/MultiLeaderConsensusClientPool.java
index a43a979694..b17e3d5038 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/MultiLeaderConsensusClientPool.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/MultiLeaderConsensusClientPool.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.commons.client.ClientFactoryProperty;
 import org.apache.iotdb.commons.client.ClientManager;
 import org.apache.iotdb.commons.client.ClientPoolProperty;
 import org.apache.iotdb.commons.client.IClientPoolFactory;
-import org.apache.iotdb.consensus.multileader.conf.MultiLeaderConsensusConfig;
+import org.apache.iotdb.consensus.config.ConsensusConfig.MultiLeaderConfig;
 
 import org.apache.commons.pool2.KeyedObjectPool;
 import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
@@ -35,6 +35,13 @@ public class MultiLeaderConsensusClientPool {
 
   public static class AsyncMultiLeaderServiceClientPoolFactory
       implements IClientPoolFactory<TEndPoint, AsyncMultiLeaderServiceClient> {
+
+    private final MultiLeaderConfig config;
+
+    public AsyncMultiLeaderServiceClientPoolFactory(MultiLeaderConfig config) {
+      this.config = config;
+    }
+
     @Override
     public KeyedObjectPool<TEndPoint, AsyncMultiLeaderServiceClient> createClientPool(
         ClientManager<TEndPoint, AsyncMultiLeaderServiceClient> manager) {
@@ -42,11 +49,10 @@ public class MultiLeaderConsensusClientPool {
           new AsyncMultiLeaderServiceClient.Factory(
               manager,
               new ClientFactoryProperty.Builder()
-                  .setConnectionTimeoutMs(MultiLeaderConsensusConfig.CONNECTION_TIMEOUT_IN_MS)
-                  .setRpcThriftCompressionEnabled(
-                      MultiLeaderConsensusConfig.IS_RPC_THRIFT_COMPRESSION_ENABLED)
+                  .setConnectionTimeoutMs(config.getRpc().getConnectionTimeoutInMs())
+                  .setRpcThriftCompressionEnabled(config.getRpc().isRpcThriftCompressionEnabled())
                   .setSelectorNumOfAsyncClientManager(
-                      MultiLeaderConsensusConfig.SELECTOR_NUM_OF_CLIENT_MANAGER)
+                      config.getRpc().getSelectorNumOfClientManager())
                   .build()),
           new ClientPoolProperty.Builder<AsyncMultiLeaderServiceClient>().build().getConfig());
     }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/conf/MultiLeaderConsensusConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/conf/MultiLeaderConsensusConfig.java
deleted file mode 100644
index 10a547728f..0000000000
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/conf/MultiLeaderConsensusConfig.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.consensus.multileader.conf;
-
-import java.util.concurrent.TimeUnit;
-
-// TODO make it configurable
-public class MultiLeaderConsensusConfig {
-
-  private MultiLeaderConsensusConfig() {}
-
-  public static final int RPC_MAX_CONCURRENT_CLIENT_NUM = 65535;
-  public static final int THRIFT_SERVER_AWAIT_TIME_FOR_STOP_SERVICE = 60;
-  public static final boolean IS_RPC_THRIFT_COMPRESSION_ENABLED = false;
-  public static final int SELECTOR_NUM_OF_CLIENT_MANAGER = 1;
-  public static final int CONNECTION_TIMEOUT_IN_MS = (int) TimeUnit.SECONDS.toMillis(20);
-  public static final int MAX_PENDING_REQUEST_NUM_PER_NODE = 1000;
-  public static final int MAX_REQUEST_PER_BATCH = 100;
-  public static final int MAX_PENDING_BATCH = 50;
-  public static final int MAX_WAITING_TIME_FOR_ACCUMULATE_BATCH_IN_MS = 10;
-  public static final long BASIC_RETRY_WAIT_TIME_MS = TimeUnit.MILLISECONDS.toMillis(100);
-  public static final long MAX_RETRY_WAIT_TIME_MS = TimeUnit.SECONDS.toMillis(20);
-}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 3e664ce4e0..075349e2f7 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -25,11 +25,11 @@ import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+import org.apache.iotdb.consensus.config.ConsensusConfig.MultiLeaderConfig;
 import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
 import org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClient;
 import org.apache.iotdb.consensus.multileader.client.DispatchLogHandler;
 import org.apache.iotdb.consensus.multileader.client.MultiLeaderConsensusClientPool.AsyncMultiLeaderServiceClientPoolFactory;
-import org.apache.iotdb.consensus.multileader.conf.MultiLeaderConsensusConfig;
 import org.apache.iotdb.consensus.multileader.thrift.TLogBatch;
 import org.apache.iotdb.consensus.multileader.thrift.TLogType;
 import org.apache.iotdb.consensus.multileader.thrift.TSyncLogReq;
@@ -72,14 +72,14 @@ public class LogDispatcher {
     this.threads =
         impl.getConfiguration().stream()
             .filter(x -> !Objects.equals(x, impl.getThisNode()))
-            .map(LogDispatcherThread::new)
+            .map(x -> new LogDispatcherThread(x, impl.getConfig()))
             .collect(Collectors.toList());
     if (!threads.isEmpty()) {
       this.executorService =
           IoTDBThreadPoolFactory.newFixedThreadPool(threads.size(), "LogDispatcher");
       this.clientManager =
           new IClientManager.Factory<TEndPoint, AsyncMultiLeaderServiceClient>()
-              .createClientManager(new AsyncMultiLeaderServiceClientPoolFactory());
+              .createClientManager(new AsyncMultiLeaderServiceClientPoolFactory(impl.getConfig()));
     }
   }
 
@@ -121,26 +121,29 @@ public class LogDispatcher {
 
   public class LogDispatcherThread implements Runnable {
 
-    private volatile boolean stopped = false;
+    private final MultiLeaderConfig config;
     private final Peer peer;
     private final IndexController controller;
     // A sliding window class that manages asynchronously pendingBatches
     private final SyncStatus syncStatus;
     // A queue used to receive asynchronous replication requests
-    private final BlockingQueue<IndexedConsensusRequest> pendingRequest =
-        new ArrayBlockingQueue<>(MultiLeaderConsensusConfig.MAX_PENDING_REQUEST_NUM_PER_NODE);
+    private final BlockingQueue<IndexedConsensusRequest> pendingRequest;
     // A container used to cache requests, whose size changes dynamically
     private final List<IndexedConsensusRequest> bufferedRequest = new LinkedList<>();
     // A reader management class that gets requests from the DataRegion
     private final ConsensusReqReader reader =
         (ConsensusReqReader) impl.getStateMachine().read(new GetConsensusReqReaderPlan());
+    private volatile boolean stopped = false;
 
-    public LogDispatcherThread(Peer peer) {
+    public LogDispatcherThread(Peer peer, MultiLeaderConfig config) {
       this.peer = peer;
+      this.config = config;
+      this.pendingRequest =
+          new ArrayBlockingQueue<>(config.getReplication().getMaxPendingRequestNumPerNode());
       this.controller =
           new IndexController(
               impl.getStorageDir(), Utils.fromTEndPointToString(peer.getEndpoint()), false);
-      this.syncStatus = new SyncStatus(controller);
+      this.syncStatus = new SyncStatus(controller, config);
     }
 
     public IndexController getController() {
@@ -155,6 +158,10 @@ public class LogDispatcher {
       return peer;
     }
 
+    public MultiLeaderConfig getConfig() {
+      return config;
+    }
+
     public boolean offer(IndexedConsensusRequest request) {
       return pendingRequest.offer(request);
     }
@@ -177,8 +184,8 @@ public class LogDispatcher {
             // we may block here if there is no requests in the queue
             bufferedRequest.add(pendingRequest.take());
             // If write pressure is low, we simply sleep a little to reduce the number of RPC
-            if (pendingRequest.size() <= MultiLeaderConsensusConfig.MAX_REQUEST_PER_BATCH) {
-              Thread.sleep(MultiLeaderConsensusConfig.MAX_WAITING_TIME_FOR_ACCUMULATE_BATCH_IN_MS);
+            if (pendingRequest.size() <= config.getReplication().getMaxRequestPerBatch()) {
+              Thread.sleep(config.getReplication().getMaxWaitingTimeForAccumulatingBatchInMs());
             }
           }
           // we may block here if the synchronization pipeline is full
@@ -200,11 +207,11 @@ public class LogDispatcher {
       long startIndex = syncStatus.getNextSendingIndex();
       long maxIndex = impl.getController().getCurrentIndex();
       long endIndex;
-      if (bufferedRequest.size() <= MultiLeaderConsensusConfig.MAX_REQUEST_PER_BATCH) {
+      if (bufferedRequest.size() <= config.getReplication().getMaxRequestPerBatch()) {
         // Use drainTo instead of poll to reduce lock overhead
         pendingRequest.drainTo(
             bufferedRequest,
-            MultiLeaderConsensusConfig.MAX_REQUEST_PER_BATCH - bufferedRequest.size());
+            config.getReplication().getMaxRequestPerBatch() - bufferedRequest.size());
       }
       if (bufferedRequest.isEmpty()) {
         // only execute this after a restart
@@ -217,7 +224,7 @@ public class LogDispatcher {
         // Prevents gap between logs. For example, some requests are not written into the queue when
         // the queue is full. In this case, requests need to be loaded from the WAL
         endIndex = constructBatchFromWAL(startIndex, prev.getSearchIndex(), logBatches);
-        if (logBatches.size() == MultiLeaderConsensusConfig.MAX_REQUEST_PER_BATCH) {
+        if (logBatches.size() == config.getReplication().getMaxRequestPerBatch()) {
           batch = new PendingBatch(startIndex, endIndex, logBatches);
           logger.debug("accumulated a {} from wal", batch);
           return batch;
@@ -226,14 +233,14 @@ public class LogDispatcher {
         endIndex = prev.getSearchIndex();
         iterator.remove();
         while (iterator.hasNext()
-            && logBatches.size() <= MultiLeaderConsensusConfig.MAX_REQUEST_PER_BATCH) {
+            && logBatches.size() <= config.getReplication().getMaxRequestPerBatch()) {
           IndexedConsensusRequest current = iterator.next();
           // Prevents gap between logs. For example, some logs are not written into the queue when
           // the queue is full. In this case, requests need to be loaded from the WAL
           if (current.getSearchIndex() != prev.getSearchIndex() + 1) {
             endIndex =
                 constructBatchFromWAL(prev.getSearchIndex(), current.getSearchIndex(), logBatches);
-            if (logBatches.size() == MultiLeaderConsensusConfig.MAX_REQUEST_PER_BATCH) {
+            if (logBatches.size() == config.getReplication().getMaxRequestPerBatch()) {
               batch = new PendingBatch(startIndex, endIndex, logBatches);
               logger.debug("accumulated a {} from queue and wal", batch);
               return batch;
@@ -271,7 +278,7 @@ public class LogDispatcher {
     private long constructBatchFromWAL(
         long currentIndex, long maxIndex, List<TLogBatch> logBatches) {
       while (currentIndex < maxIndex
-          && logBatches.size() < MultiLeaderConsensusConfig.MAX_REQUEST_PER_BATCH) {
+          && logBatches.size() < config.getReplication().getMaxRequestPerBatch()) {
         // TODO iterator
         IConsensusRequest data = reader.getReq(currentIndex++);
         if (data != null) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
index fb521b9b07..36c8726005 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.consensus.multileader.logdispatcher;
 
 import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.consensus.multileader.conf.MultiLeaderConsensusConfig;
+import org.apache.iotdb.consensus.config.ConsensusConfig.MultiLeaderConfig;
 
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -28,17 +28,19 @@ import java.util.List;
 
 public class SyncStatus {
 
+  private final MultiLeaderConfig config;
   private final IndexController controller;
   private final List<PendingBatch> pendingBatches = new LinkedList<>();
 
-  public SyncStatus(IndexController controller) {
+  public SyncStatus(IndexController controller, MultiLeaderConfig config) {
     this.controller = controller;
+    this.config = config;
   }
 
   /** we may block here if the synchronization pipeline is full */
   public void addNextBatch(PendingBatch batch) throws InterruptedException {
     synchronized (this) {
-      while (pendingBatches.size() >= MultiLeaderConsensusConfig.MAX_PENDING_BATCH) {
+      while (pendingBatches.size() >= config.getReplication().getMaxPendingBatch()) {
         wait();
       }
       pendingBatches.add(batch);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCService.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCService.java
index 694a7e038b..e4a08cf57b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCService.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCService.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.commons.exception.runtime.RPCServiceException;
 import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.commons.service.ThriftService;
 import org.apache.iotdb.commons.service.ThriftServiceThread;
-import org.apache.iotdb.consensus.multileader.conf.MultiLeaderConsensusConfig;
+import org.apache.iotdb.consensus.config.ConsensusConfig.MultiLeaderConfig;
 import org.apache.iotdb.consensus.multileader.thrift.MultiLeaderConsensusIService;
 
 import java.lang.reflect.InvocationTargetException;
@@ -34,10 +34,12 @@ import java.lang.reflect.InvocationTargetException;
 public class MultiLeaderRPCService extends ThriftService implements MultiLeaderRPCServiceMBean {
 
   private final TEndPoint thisNode;
+  private final MultiLeaderConfig config;
   private MultiLeaderRPCServiceProcessor multiLeaderRPCServiceProcessor;
 
-  public MultiLeaderRPCService(TEndPoint thisNode) {
+  public MultiLeaderRPCService(TEndPoint thisNode, MultiLeaderConfig config) {
     this.thisNode = thisNode;
+    this.config = config;
   }
 
   @Override
@@ -73,10 +75,10 @@ public class MultiLeaderRPCService extends ThriftService implements MultiLeaderR
               ThreadName.MULTI_LEADER_CONSENSUS_RPC_CLIENT.getName(),
               getBindIP(),
               getBindPort(),
-              MultiLeaderConsensusConfig.RPC_MAX_CONCURRENT_CLIENT_NUM,
-              MultiLeaderConsensusConfig.THRIFT_SERVER_AWAIT_TIME_FOR_STOP_SERVICE,
+              config.getRpc().getRpcMaxConcurrentClientNum(),
+              config.getRpc().getThriftServerAwaitTimeForStopService(),
               new MultiLeaderRPCServiceHandler(multiLeaderRPCServiceProcessor),
-              MultiLeaderConsensusConfig.IS_RPC_THRIFT_COMPRESSION_ENABLED);
+              config.getRpc().isRpcThriftCompressionEnabled());
     } catch (RPCServiceException e) {
       throw new IllegalAccessException(e.getMessage());
     }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 0c96034f7c..54383deb6f 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
@@ -67,7 +68,6 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -94,6 +94,8 @@ class RatisConsensus implements IConsensus {
   private final RaftProperties properties = new RaftProperties();
   private final RaftClientRpc clientRpc;
 
+  private final ConsensusConfig config;
+
   private final IClientManager<RaftGroup, RatisClient> clientManager =
       new IClientManager.Factory<RaftGroup, RatisClient>()
           .createClientManager(new RatisClientPoolFactory());
@@ -109,17 +111,15 @@ class RatisConsensus implements IConsensus {
   // TODO make it configurable
   private static final int DEFAULT_WAIT_LEADER_READY_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(20);
 
-  /**
-   * @param ratisStorageDir different groups of RatisConsensus Peer all share ratisStorageDir as
-   *     root dir
-   */
-  public RatisConsensus(TEndPoint endpoint, File ratisStorageDir, IStateMachine.Registry registry)
+  public RatisConsensus(ConsensusConfig config, IStateMachine.Registry registry)
       throws IOException {
-    myself = Utils.fromTEndPointAndPriorityToRaftPeer(endpoint, DEFAULT_PRIORITY);
+    this.config = config;
 
+    myself = Utils.fromTEndPointAndPriorityToRaftPeer(config.getThisNode(), DEFAULT_PRIORITY);
     System.setProperty(
         "org.apache.ratis.thirdparty.io.netty.allocator.useCacheForAllThreads", "false");
-    RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(ratisStorageDir));
+    RaftServerConfigKeys.setStorageDir(
+        properties, Collections.singletonList(config.getStorageDir()));
     RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(properties, true);
     // TODO make this configurable so that RatisConsensusTest can trigger multiple snapshot process
     // RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(properties, 20);
@@ -131,7 +131,7 @@ class RatisConsensus implements IConsensus {
     RaftClientConfigKeys.Rpc.setRequestTimeout(
         properties, TimeDuration.valueOf(20, TimeUnit.SECONDS));
 
-    GrpcConfigKeys.Server.setPort(properties, endpoint.getPort());
+    GrpcConfigKeys.Server.setPort(properties, config.getThisNode().getPort());
     clientRpc = new GrpcFactory(new Parameters()).newRaftClientRpc(ClientId.randomId(), properties);
 
     server =
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
index 3604d29059..53c0d3996b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
@@ -63,9 +64,9 @@ class StandAloneConsensus implements IConsensus {
   private final Map<ConsensusGroupId, StandAloneServerImpl> stateMachineMap =
       new ConcurrentHashMap<>();
 
-  public StandAloneConsensus(TEndPoint thisNode, File storageDir, Registry registry) {
-    this.thisNode = thisNode;
-    this.storageDir = storageDir;
+  public StandAloneConsensus(ConsensusConfig config, Registry registry) {
+    this.thisNode = config.getThisNode();
+    this.storageDir = config.getStorageDir();
     this.registry = registry;
   }
 
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
index 770031bee1..264d927cbe 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.consensus.multileader.logdispatcher.IndexController;
 import org.apache.iotdb.consensus.multileader.thrift.TLogType;
 import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
@@ -100,8 +101,10 @@ public class MultiLeaderConsensusTest {
           (MultiLeaderConsensus)
               ConsensusFactory.getConsensusImpl(
                       ConsensusFactory.MultiLeaderConsensus,
-                      peers.get(i).getEndpoint(),
-                      peersStorage.get(i),
+                      ConsensusConfig.newBuilder()
+                          .setThisNode(peers.get(i).getEndpoint())
+                          .setStorageDir(peersStorage.get(i))
+                          .build(),
                       groupId -> stateMachines.get(finalI))
                   .orElseThrow(
                       () ->
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
index cba5147872..9bff9000f3 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.consensus.multileader.logdispatcher;
 
-import org.apache.iotdb.consensus.multileader.conf.MultiLeaderConsensusConfig;
+import org.apache.iotdb.consensus.config.ConsensusConfig.MultiLeaderConfig;
 
 import org.apache.ratis.util.FileUtils;
 import org.junit.After;
@@ -39,6 +39,7 @@ public class SyncStatusTest {
 
   private static final File storageDir = new File("target" + java.io.File.separator + "test");
   private static final String prefix = "version";
+  private static final MultiLeaderConfig config = new MultiLeaderConfig.Builder().build();
 
   @Before
   public void setUp() throws IOException {
@@ -56,22 +57,22 @@ public class SyncStatusTest {
     IndexController controller = new IndexController(storageDir.getAbsolutePath(), prefix, true);
     Assert.assertEquals(0, controller.getCurrentIndex());
 
-    SyncStatus status = new SyncStatus(controller);
+    SyncStatus status = new SyncStatus(controller, config);
     List<PendingBatch> batchList = new ArrayList<>();
 
-    for (long i = 0; i < MultiLeaderConsensusConfig.MAX_PENDING_BATCH; i++) {
+    for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) {
       PendingBatch batch = new PendingBatch(i, i, Collections.emptyList());
       batchList.add(batch);
       status.addNextBatch(batch);
     }
 
-    for (int i = 0; i < MultiLeaderConsensusConfig.MAX_PENDING_BATCH; i++) {
+    for (int i = 0; i < config.getReplication().getMaxPendingBatch(); i++) {
       status.removeBatch(batchList.get(i));
       Assert.assertEquals(
-          MultiLeaderConsensusConfig.MAX_PENDING_BATCH - 1 - i, status.getPendingBatches().size());
+          config.getReplication().getMaxPendingBatch() - 1 - i, status.getPendingBatches().size());
       Assert.assertEquals(i, controller.getCurrentIndex());
       Assert.assertEquals(
-          MultiLeaderConsensusConfig.MAX_PENDING_BATCH, status.getNextSendingIndex());
+          config.getReplication().getMaxPendingBatch(), status.getNextSendingIndex());
     }
   }
 
@@ -82,29 +83,29 @@ public class SyncStatusTest {
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
-    SyncStatus status = new SyncStatus(controller);
+    SyncStatus status = new SyncStatus(controller, config);
     List<PendingBatch> batchList = new ArrayList<>();
 
-    for (long i = 0; i < MultiLeaderConsensusConfig.MAX_PENDING_BATCH; i++) {
+    for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) {
       PendingBatch batch = new PendingBatch(i, i, Collections.emptyList());
       batchList.add(batch);
       status.addNextBatch(batch);
     }
 
-    for (int i = 0; i < MultiLeaderConsensusConfig.MAX_PENDING_BATCH - 1; i++) {
-      status.removeBatch(batchList.get(MultiLeaderConsensusConfig.MAX_PENDING_BATCH - 1 - i));
+    for (int i = 0; i < config.getReplication().getMaxPendingBatch() - 1; i++) {
+      status.removeBatch(batchList.get(config.getReplication().getMaxPendingBatch() - 1 - i));
       Assert.assertEquals(
-          MultiLeaderConsensusConfig.MAX_PENDING_BATCH, status.getPendingBatches().size());
+          config.getReplication().getMaxPendingBatch(), status.getPendingBatches().size());
       Assert.assertEquals(0, controller.getCurrentIndex());
       Assert.assertEquals(
-          MultiLeaderConsensusConfig.MAX_PENDING_BATCH, status.getNextSendingIndex());
+          config.getReplication().getMaxPendingBatch(), status.getNextSendingIndex());
     }
 
     status.removeBatch(batchList.get(0));
     Assert.assertEquals(0, status.getPendingBatches().size());
     Assert.assertEquals(
-        MultiLeaderConsensusConfig.MAX_PENDING_BATCH - 1, controller.getCurrentIndex());
-    Assert.assertEquals(MultiLeaderConsensusConfig.MAX_PENDING_BATCH, status.getNextSendingIndex());
+        config.getReplication().getMaxPendingBatch() - 1, controller.getCurrentIndex());
+    Assert.assertEquals(config.getReplication().getMaxPendingBatch(), status.getNextSendingIndex());
   }
 
   /** Confirm success first from front to back, then back to front */
@@ -114,39 +115,39 @@ public class SyncStatusTest {
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
-    SyncStatus status = new SyncStatus(controller);
+    SyncStatus status = new SyncStatus(controller, config);
     List<PendingBatch> batchList = new ArrayList<>();
 
-    for (long i = 0; i < MultiLeaderConsensusConfig.MAX_PENDING_BATCH; i++) {
+    for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) {
       PendingBatch batch = new PendingBatch(i, i, Collections.emptyList());
       batchList.add(batch);
       status.addNextBatch(batch);
     }
 
-    for (int i = 0; i < MultiLeaderConsensusConfig.MAX_PENDING_BATCH / 2; i++) {
+    for (int i = 0; i < config.getReplication().getMaxPendingBatch() / 2; i++) {
       status.removeBatch(batchList.get(i));
       Assert.assertEquals(
-          MultiLeaderConsensusConfig.MAX_PENDING_BATCH - 1 - i, status.getPendingBatches().size());
+          config.getReplication().getMaxPendingBatch() - 1 - i, status.getPendingBatches().size());
       Assert.assertEquals(i, controller.getCurrentIndex());
       Assert.assertEquals(
-          MultiLeaderConsensusConfig.MAX_PENDING_BATCH, status.getNextSendingIndex());
+          config.getReplication().getMaxPendingBatch(), status.getNextSendingIndex());
     }
 
-    for (int i = MultiLeaderConsensusConfig.MAX_PENDING_BATCH / 2 + 1;
-        i < MultiLeaderConsensusConfig.MAX_PENDING_BATCH;
+    for (int i = config.getReplication().getMaxPendingBatch() / 2 + 1;
+        i < config.getReplication().getMaxPendingBatch();
         i++) {
       status.removeBatch(batchList.get(i));
       Assert.assertEquals(
-          MultiLeaderConsensusConfig.MAX_PENDING_BATCH / 2, status.getPendingBatches().size());
+          config.getReplication().getMaxPendingBatch() / 2, status.getPendingBatches().size());
       Assert.assertEquals(
-          MultiLeaderConsensusConfig.MAX_PENDING_BATCH, status.getNextSendingIndex());
+          config.getReplication().getMaxPendingBatch(), status.getNextSendingIndex());
     }
 
-    status.removeBatch(batchList.get(MultiLeaderConsensusConfig.MAX_PENDING_BATCH / 2));
+    status.removeBatch(batchList.get(config.getReplication().getMaxPendingBatch() / 2));
     Assert.assertEquals(0, status.getPendingBatches().size());
     Assert.assertEquals(
-        MultiLeaderConsensusConfig.MAX_PENDING_BATCH - 1, controller.getCurrentIndex());
-    Assert.assertEquals(MultiLeaderConsensusConfig.MAX_PENDING_BATCH, status.getNextSendingIndex());
+        config.getReplication().getMaxPendingBatch() - 1, controller.getCurrentIndex());
+    Assert.assertEquals(config.getReplication().getMaxPendingBatch(), status.getNextSendingIndex());
   }
 
   /** Test Blocking while addNextBatch */
@@ -155,22 +156,22 @@ public class SyncStatusTest {
     IndexController controller = new IndexController(storageDir.getAbsolutePath(), prefix, true);
     Assert.assertEquals(0, controller.getCurrentIndex());
 
-    SyncStatus status = new SyncStatus(controller);
+    SyncStatus status = new SyncStatus(controller, config);
     List<PendingBatch> batchList = new ArrayList<>();
 
-    for (long i = 0; i < MultiLeaderConsensusConfig.MAX_PENDING_BATCH; i++) {
+    for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) {
       PendingBatch batch = new PendingBatch(i, i, Collections.emptyList());
       batchList.add(batch);
       status.addNextBatch(batch);
     }
 
-    for (int i = 0; i < MultiLeaderConsensusConfig.MAX_PENDING_BATCH - 1; i++) {
-      status.removeBatch(batchList.get(MultiLeaderConsensusConfig.MAX_PENDING_BATCH - 1 - i));
+    for (int i = 0; i < config.getReplication().getMaxPendingBatch() - 1; i++) {
+      status.removeBatch(batchList.get(config.getReplication().getMaxPendingBatch() - 1 - i));
       Assert.assertEquals(
-          MultiLeaderConsensusConfig.MAX_PENDING_BATCH, status.getPendingBatches().size());
+          config.getReplication().getMaxPendingBatch(), status.getPendingBatches().size());
       Assert.assertEquals(0, controller.getCurrentIndex());
       Assert.assertEquals(
-          MultiLeaderConsensusConfig.MAX_PENDING_BATCH, status.getNextSendingIndex());
+          config.getReplication().getMaxPendingBatch(), status.getNextSendingIndex());
     }
 
     CompletableFuture<Boolean> future =
@@ -178,8 +179,8 @@ public class SyncStatusTest {
             () -> {
               PendingBatch batch =
                   new PendingBatch(
-                      MultiLeaderConsensusConfig.MAX_PENDING_BATCH,
-                      MultiLeaderConsensusConfig.MAX_PENDING_BATCH,
+                      config.getReplication().getMaxPendingBatch(),
+                      config.getReplication().getMaxPendingBatch(),
                       Collections.emptyList());
               batchList.add(batch);
               try {
@@ -198,14 +199,14 @@ public class SyncStatusTest {
     Assert.assertTrue(future.get());
     Assert.assertEquals(1, status.getPendingBatches().size());
     Assert.assertEquals(
-        MultiLeaderConsensusConfig.MAX_PENDING_BATCH - 1, controller.getCurrentIndex());
+        config.getReplication().getMaxPendingBatch() - 1, controller.getCurrentIndex());
     Assert.assertEquals(
-        MultiLeaderConsensusConfig.MAX_PENDING_BATCH + 1, status.getNextSendingIndex());
+        config.getReplication().getMaxPendingBatch() + 1, status.getNextSendingIndex());
 
-    status.removeBatch(batchList.get(MultiLeaderConsensusConfig.MAX_PENDING_BATCH));
+    status.removeBatch(batchList.get(config.getReplication().getMaxPendingBatch()));
     Assert.assertEquals(0, status.getPendingBatches().size());
-    Assert.assertEquals(MultiLeaderConsensusConfig.MAX_PENDING_BATCH, controller.getCurrentIndex());
+    Assert.assertEquals(config.getReplication().getMaxPendingBatch(), controller.getCurrentIndex());
     Assert.assertEquals(
-        MultiLeaderConsensusConfig.MAX_PENDING_BATCH + 1, status.getNextSendingIndex());
+        config.getReplication().getMaxPendingBatch() + 1, status.getNextSendingIndex());
   }
 }
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index b114c2305a..d44c212141 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
 
 import org.apache.ratis.util.FileUtils;
 import org.junit.After;
@@ -63,8 +64,10 @@ public class RatisConsensusTest {
       servers.add(
           ConsensusFactory.getConsensusImpl(
                   ConsensusFactory.RatisConsensus,
-                  peers.get(i).getEndpoint(),
-                  peersStorage.get(i),
+                  ConsensusConfig.newBuilder()
+                      .setThisNode(peers.get(i).getEndpoint())
+                      .setStorageDir(peersStorage.get(i))
+                      .build(),
                   groupId -> stateMachines.get(finalI))
               .orElseThrow(
                   () ->
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
index de4442ed79..b133f83939 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
 
 import org.apache.ratis.util.FileUtils;
@@ -45,8 +46,10 @@ public class RecoveryTest {
     consensusImpl =
         ConsensusFactory.getConsensusImpl(
                 ConsensusFactory.StandAloneConsensus,
-                new TEndPoint("0.0.0.0", 9000),
-                new File("target" + java.io.File.separator + "recovery"),
+                ConsensusConfig.newBuilder()
+                    .setThisNode(new TEndPoint("0.0.0.0", 9000))
+                    .setStorageDir(new File("target" + java.io.File.separator + "recovery"))
+                    .build(),
                 gid -> new EmptyStateMachine())
             .orElseThrow(
                 () ->
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
index 5c3ae96453..c04adb98ff 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
@@ -123,8 +124,10 @@ public class StandAloneConsensusTest {
     consensusImpl =
         ConsensusFactory.getConsensusImpl(
                 ConsensusFactory.StandAloneConsensus,
-                new TEndPoint("0.0.0.0", 6667),
-                new File("target" + java.io.File.separator + "standalone"),
+                ConsensusConfig.newBuilder()
+                    .setThisNode(new TEndPoint("0.0.0.0", 6667))
+                    .setStorageDir(new File("target" + java.io.File.separator + "standalone"))
+                    .build(),
                 gid -> {
                   switch (gid.getType()) {
                     case SchemaRegion:
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index 2007f1af59..1557163ead 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.statemachine.DataRegionStateMachine;
@@ -48,8 +49,11 @@ public class DataRegionConsensusImpl {
     private static final IConsensus INSTANCE =
         ConsensusFactory.getConsensusImpl(
                 conf.getDataRegionConsensusProtocolClass(),
-                new TEndPoint(conf.getInternalIp(), conf.getDataRegionConsensusPort()),
-                new File(conf.getDataRegionConsensusDir()),
+                ConsensusConfig.newBuilder()
+                    .setThisNode(
+                        new TEndPoint(conf.getInternalIp(), conf.getDataRegionConsensusPort()))
+                    .setStorageDir(new File(conf.getDataRegionConsensusDir()))
+                    .build(),
                 gid ->
                     new DataRegionStateMachine(
                         StorageEngineV2.getInstance().getDataRegion((DataRegionId) gid)))
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java b/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
index 1bde912992..84c9da9b4f 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.statemachine.SchemaRegionStateMachine;
@@ -48,8 +49,11 @@ public class SchemaRegionConsensusImpl {
     private static final IConsensus INSTANCE =
         ConsensusFactory.getConsensusImpl(
                 conf.getSchemaRegionConsensusProtocolClass(),
-                new TEndPoint(conf.getInternalIp(), conf.getSchemaRegionConsensusPort()),
-                new File(conf.getSchemaRegionConsensusDir()),
+                ConsensusConfig.newBuilder()
+                    .setThisNode(
+                        new TEndPoint(conf.getInternalIp(), conf.getSchemaRegionConsensusPort()))
+                    .setStorageDir(new File(conf.getSchemaRegionConsensusDir()))
+                    .build(),
                 gid ->
                     new SchemaRegionStateMachine(
                         SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) gid)))