You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/03/13 06:50:12 UTC

[iotdb] branch native_raft updated: adapt config

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/native_raft by this push:
     new 1e6b1dda5b adapt config
1e6b1dda5b is described below

commit 1e6b1dda5b29deb95776df8b6ca248455b6df51d
Author: Tian Jiang <jt...@163.com>
AuthorDate: Mon Mar 13 14:51:59 2023 +0800

    adapt config
---
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |  10 +
 .../confignode/conf/ConfigNodeDescriptor.java      |   1 +
 .../iotdb/confignode/manager/ConsensusManager.java |   2 +
 .../iotdb/consensus/config/ConsensusConfig.java    |  19 +-
 .../consensus/natraft/protocol/RaftConfig.java     | 315 +++++++++++++++------
 .../protocol/log/appender/BlockingLogAppender.java |   2 +-
 .../log/appender/SlidingWindowLogAppender.java     |   2 +-
 .../protocol/log/dispatch/LogDispatcher.java       |   4 +-
 .../log/dispatch/flowcontrol/FlowMonitor.java      |   2 +-
 .../log/sequencing/SynchronousSequencer.java       |   2 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   2 +
 .../db/consensus/DataRegionConsensusImpl.java      |   1 +
 .../db/consensus/SchemaRegionConsensusImpl.java    |   1 +
 13 files changed, 266 insertions(+), 97 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 03442f59a6..d850139696 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.confignode.conf;
 
+import java.util.Properties;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
@@ -288,6 +289,7 @@ public class ConfigNodeConfig {
   private boolean isEnablePrintingNewlyCreatedPartition = false;
 
   private long forceWalPeriodForConfigNodeSimpleInMs = 100;
+  private Properties otherProperties = new Properties();
 
   public ConfigNodeConfig() {
     // empty constructor
@@ -1110,4 +1112,12 @@ public class ConfigNodeConfig {
   public void setForceWalPeriodForConfigNodeSimpleInMs(long forceWalPeriodForConfigNodeSimpleInMs) {
     this.forceWalPeriodForConfigNodeSimpleInMs = forceWalPeriodForConfigNodeSimpleInMs;
   }
+
+  public Properties getOtherProperties() {
+    return otherProperties;
+  }
+
+  public void setOtherProperties(Properties otherProperties) {
+    this.otherProperties = otherProperties;
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 1bcfa488f2..9b3ac486bb 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -772,6 +772,7 @@ public class ConfigNodeDescriptor {
                     "force_wal_period_for_confignode_simple_in_ms",
                     String.valueOf(conf.getForceWalPeriodForConfigNodeSimpleInMs()))
                 .trim()));
+    conf.setOtherProperties(properties);
   }
 
   private void loadCQConfig(Properties properties) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index aa914cf7c0..c3e9faffb6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -89,6 +89,7 @@ public class ConsensusManager {
                       .setThisNode(
                           new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort()))
                       .setStorageDir("target" + java.io.File.separator + "simple")
+                      .setProperties(CONF.getOtherProperties())
                       .build(),
                   gid -> stateMachine)
               .orElseThrow(
@@ -177,6 +178,7 @@ public class ConsensusManager {
                                       .build())
                               .build())
                       .setStorageDir(CONF.getConsensusDir())
+                      .setProperties(CONF.getOtherProperties())
                       .build(),
                   gid -> stateMachine)
               .orElseThrow(
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
index 853e18934a..2c561db6a2 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.consensus.config;
 
+import java.util.Properties;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 
 import java.util.Optional;
@@ -31,13 +32,15 @@ public class ConsensusConfig {
   private final RatisConfig ratisConfig;
   private final IoTConsensusConfig ioTConsensusConfig;
   private final RPCConfig rpcConfig;
+  private final Properties properties;
 
   private ConsensusConfig(
       TEndPoint thisNode,
       int thisNodeId,
       String storageDir,
       RatisConfig ratisConfig,
-      IoTConsensusConfig ioTConsensusConfig) {
+      IoTConsensusConfig ioTConsensusConfig,
+      Properties properties) {
     this.thisNodeEndPoint = thisNode;
     this.thisNodeId = thisNodeId;
     this.storageDir = storageDir;
@@ -55,6 +58,7 @@ public class ConsensusConfig {
                 ioTConsensusConfig.getRpc().getRpcMaxConcurrentClientNum())
             .setThriftMaxFrameSize(ioTConsensusConfig.getRpc().getThriftMaxFrameSize())
             .build();
+    this.properties = properties;
   }
 
   public TEndPoint getThisNodeEndPoint() {
@@ -85,6 +89,10 @@ public class ConsensusConfig {
     return rpcConfig;
   }
 
+  public Properties getProperties() {
+    return properties;
+  }
+
   public static class Builder {
 
     private TEndPoint thisNode;
@@ -92,6 +100,7 @@ public class ConsensusConfig {
     private String storageDir;
     private RatisConfig ratisConfig;
     private IoTConsensusConfig ioTConsensusConfig;
+    private Properties properties = new Properties();
 
     public ConsensusConfig build() {
       return new ConsensusConfig(
@@ -100,7 +109,8 @@ public class ConsensusConfig {
           storageDir,
           Optional.ofNullable(ratisConfig).orElseGet(() -> RatisConfig.newBuilder().build()),
           Optional.ofNullable(ioTConsensusConfig)
-              .orElseGet(() -> IoTConsensusConfig.newBuilder().build()));
+              .orElseGet(() -> IoTConsensusConfig.newBuilder().build()),
+          properties);
     }
 
     public Builder setThisNode(TEndPoint thisNode) {
@@ -127,5 +137,10 @@ public class ConsensusConfig {
       this.ioTConsensusConfig = ioTConsensusConfig;
       return this;
     }
+
+    public Builder setProperties(Properties properties) {
+      this.properties = properties;
+      return this;
+    }
   }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
index 6414a0ccfc..560e89d303 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
@@ -1,27 +1,28 @@
 /*
 
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-
-
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+
+
+ */
 
 package org.apache.iotdb.consensus.natraft.protocol;
 
+import java.util.Properties;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.consensus.config.RPCConfig;
 import org.apache.iotdb.consensus.natraft.protocol.consistency.ConsistencyLevel;
@@ -43,43 +44,18 @@ public class RaftConfig {
   private int uncommittedRaftLogNumForRejectThreshold = 10000;
   private int heartbeatIntervalMs = 1000;
   private int electionTimeoutMs = 20_000;
-  /** max number of clients in a ClientPool of a member for one node. */
-  private int maxClientPerNode = 2000;
-
-  /** max number of idle clients in a ClientPool of a member for one node. */
-  private int maxIdleClientPerNode = 1000;
-
-  /**
-   * If the number of connections created for a node exceeds `max_client_pernode_permember_number`,
-   * we need to wait so much time for other connections to be released until timeout, or a new
-   * connection will be created.
-   */
-  private long waitClientTimeoutMS = 5 * 1000L;
-
-  /**
-   * ClientPool will have so many selector threads (TAsyncClientManager) to distribute to its
-   * clients.
-   */
-  private int selectorNumOfClientPool =
-      Runtime.getRuntime().availableProcessors() / 3 > 0
-          ? Runtime.getRuntime().availableProcessors() / 3
-          : 1;
-
   private int connectionTimeoutInMS = (int) TimeUnit.SECONDS.toMillis(20);
   private boolean enableUsePersistLogOnDiskToCatchUp;
   private long writeOperationTimeoutMS = 20_000L;
-  // TODO-raft: apply to thrift
-  private int thriftMaxFrameSize = 64 * 1024 * 1024;
   private int logNumInBatch = 100;
   private int dispatcherBindingThreadNum = 16;
   private int followerLoadBalanceWindowsToUse = 1;
   private double followerLoadBalanceOverestimateFactor = 1.1;
-  private int flowMonitorMaxWindowSize = 1000;
+  private int flowMonitorMaxWindowNum = 1000;
   private long flowMonitorWindowInterval = 1000;
   private String storageDir = "data";
   private long electionMaxWaitMs = 5000;
   private long unAppliedRaftLogNumForRejectThreshold = 10000;
-  private long checkPeriodWhenInsertBlocked = 100;
   private long maxWaitingTimeWhenInsertBlocked = 10000;
   private boolean useFollowerLoadBalance;
   private int raftLogBufferSize = 64 * 1024 * 1024;
@@ -89,8 +65,7 @@ public class RaftConfig {
   private int maxPersistRaftLogNumberOnDisk = 10_000_000;
   private int flushRaftLogThreshold = 100_000;
   private long maxSyncLogLag = 100_000;
-  private long syncLeaderMaxWaitMs = 30_000;
-
+  private int syncLeaderMaxWaitMs = 30_000;
   private boolean enableCompressedDispatching = true;
   private CompressionType dispatchingCompressionType = CompressionType.SNAPPY;
   private ConsistencyLevel consistencyLevel = ConsistencyLevel.STRONG_CONSISTENCY;
@@ -100,6 +75,7 @@ public class RaftConfig {
     this.storageDir = config.getStorageDir();
     new File(this.storageDir).mkdirs();
     this.rpcConfig = config.getRPCConfig();
+    loadProperties(config.getProperties());
   }
 
   public boolean isEnableWeakAcceptance() {
@@ -192,35 +168,12 @@ public class RaftConfig {
   }
 
   public int getMaxClientPerNode() {
-    return maxClientPerNode;
+    return rpcConfig.getRpcMaxConcurrentClientNum();
   }
 
-  public void setMaxClientPerNode(int maxClientPerNode) {
-    this.maxClientPerNode = maxClientPerNode;
-  }
 
   public int getMaxIdleClientPerNode() {
-    return maxIdleClientPerNode;
-  }
-
-  public void setMaxIdleClientPerNode(int maxIdleClientPerNode) {
-    this.maxIdleClientPerNode = maxIdleClientPerNode;
-  }
-
-  public long getWaitClientTimeoutMS() {
-    return waitClientTimeoutMS;
-  }
-
-  public void setWaitClientTimeoutMS(long waitClientTimeoutMS) {
-    this.waitClientTimeoutMS = waitClientTimeoutMS;
-  }
-
-  public int getSelectorNumOfClientPool() {
-    return selectorNumOfClientPool;
-  }
-
-  public void setSelectorNumOfClientPool(int selectorNumOfClientPool) {
-    this.selectorNumOfClientPool = selectorNumOfClientPool;
+    return rpcConfig.getRpcMinConcurrentClientNum();
   }
 
   public int getConnectionTimeoutInMS() {
@@ -248,11 +201,7 @@ public class RaftConfig {
   }
 
   public int getThriftMaxFrameSize() {
-    return thriftMaxFrameSize;
-  }
-
-  public void setThriftMaxFrameSize(int thriftMaxFrameSize) {
-    this.thriftMaxFrameSize = thriftMaxFrameSize;
+    return rpcConfig.getThriftMaxFrameSize();
   }
 
   public int getLogNumInBatch() {
@@ -288,12 +237,12 @@ public class RaftConfig {
     this.followerLoadBalanceOverestimateFactor = followerLoadBalanceOverestimateFactor;
   }
 
-  public int getFlowMonitorMaxWindowSize() {
-    return flowMonitorMaxWindowSize;
+  public int getFlowMonitorMaxWindowNum() {
+    return flowMonitorMaxWindowNum;
   }
 
-  public void setFlowMonitorMaxWindowSize(int flowMonitorMaxWindowSize) {
-    this.flowMonitorMaxWindowSize = flowMonitorMaxWindowSize;
+  public void setFlowMonitorMaxWindowNum(int flowMonitorMaxWindowNum) {
+    this.flowMonitorMaxWindowNum = flowMonitorMaxWindowNum;
   }
 
   public long getFlowMonitorWindowInterval() {
@@ -328,14 +277,6 @@ public class RaftConfig {
     this.unAppliedRaftLogNumForRejectThreshold = unAppliedRaftLogNumForRejectThreshold;
   }
 
-  public long getCheckPeriodWhenInsertBlocked() {
-    return checkPeriodWhenInsertBlocked;
-  }
-
-  public void setCheckPeriodWhenInsertBlocked(long checkPeriodWhenInsertBlocked) {
-    this.checkPeriodWhenInsertBlocked = checkPeriodWhenInsertBlocked;
-  }
-
   public long getMaxWaitingTimeWhenInsertBlocked() {
     return maxWaitingTimeWhenInsertBlocked;
   }
@@ -412,7 +353,7 @@ public class RaftConfig {
     return syncLeaderMaxWaitMs;
   }
 
-  public void setSyncLeaderMaxWaitMs(long syncLeaderMaxWaitMs) {
+  public void setSyncLeaderMaxWaitMs(int syncLeaderMaxWaitMs) {
     this.syncLeaderMaxWaitMs = syncLeaderMaxWaitMs;
   }
 
@@ -443,4 +384,198 @@ public class RaftConfig {
   public void setDispatchingCompressionType(CompressionType dispatchingCompressionType) {
     this.dispatchingCompressionType = dispatchingCompressionType;
   }
+
+
+  public void loadProperties(Properties properties) {
+
+    this.setConnectionTimeoutInMS(
+        Integer.parseInt(
+            properties.getProperty(
+                "connection_timeout_ms", String.valueOf(this.getConnectionTimeoutInMS()))));
+
+    this.setHeartbeatIntervalMs(
+        Integer.parseInt(
+            properties.getProperty(
+                "heartbeat_interval_ms", String.valueOf(this.getHeartbeatIntervalMs()))));
+
+    this.setElectionTimeoutMs(
+        Integer.parseInt(
+            properties.getProperty(
+                "election_timeout_ms", String.valueOf(this.getElectionTimeoutMs()))));
+
+    this.setElectionMaxWaitMs(
+        Integer.parseInt(
+            properties.getProperty(
+                "election_max_wait_ms", String.valueOf(this.getElectionMaxWaitMs()))));
+
+    this.setCatchUpTimeoutMS(
+        Integer.parseInt(
+            properties.getProperty(
+                "catch_up_timeout_ms", String.valueOf(this.getCatchUpTimeoutMS()))));
+
+    this.setWriteOperationTimeoutMS(
+        Integer.parseInt(
+            properties.getProperty(
+                "write_operation_timeout_ms",
+                String.valueOf(this.getWriteOperationTimeoutMS()))));
+
+    this.setSyncLeaderMaxWaitMs(Integer.parseInt(
+        properties.getProperty(
+            "sync_leader_max_wait",
+            String.valueOf(this.getSyncLeaderMaxWaitMs()))));
+
+    this.setMinNumOfLogsInMem(
+        Integer.parseInt(
+            properties.getProperty(
+                "min_num_of_logs_in_mem", String.valueOf(this.getMinNumOfLogsInMem()))));
+
+    this.setMaxNumOfLogsInMem(
+        Integer.parseInt(
+            properties.getProperty(
+                "max_num_of_logs_in_mem", String.valueOf(this.getMaxNumOfLogsInMem()))));
+
+    this.setMaxMemorySizeForRaftLog(
+        Long.parseLong(
+            properties.getProperty(
+                "max_memory_for_logs", String.valueOf(this.getMaxNumOfLogsInMem()))));
+
+    this.setMaxWaitingTimeWhenInsertBlocked(
+        Long.parseLong(
+            properties.getProperty(
+                "max_insert_block_time_ms", String.valueOf(this.getMaxWaitingTimeWhenInsertBlocked()))));
+
+    this.setLogDeleteCheckIntervalSecond(
+        Integer.parseInt(
+            properties.getProperty(
+                "log_deletion_check_interval_second",
+                String.valueOf(this.getLogDeleteCheckIntervalSecond()))));
+
+    this.setEnableRaftLogPersistence(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "is_enable_raft_log_persistence",
+                String.valueOf(this.isEnableRaftLogPersistence()))));
+
+    this.setFlushRaftLogThreshold(
+        Integer.parseInt(
+            properties.getProperty(
+                "flush_raft_log_threshold", String.valueOf(this.getFlushRaftLogThreshold()))));
+
+    this.setRaftLogBufferSize(
+        Integer.parseInt(
+            properties.getProperty(
+                "raft_log_buffer_size", String.valueOf(this.getRaftLogBufferSize()))));
+
+    this.setLogNumInBatch(
+        Integer.parseInt(
+            properties.getProperty(
+                "log_batch_num", String.valueOf(this.getLogNumInBatch()))));
+
+    this.setMaxRaftLogIndexSizeInMemory(
+        Integer.parseInt(
+            properties.getProperty(
+                "max_raft_log_index_size_in_memory",
+                String.valueOf(this.getMaxRaftLogIndexSizeInMemory()))));
+
+    this.setUncommittedRaftLogNumForRejectThreshold(
+        Integer.parseInt(
+            properties.getProperty(
+                "uncommitted_raft_log_num_for_reject_threshold",
+                String.valueOf(this.getUncommittedRaftLogNumForRejectThreshold()))));
+
+    this.setUnAppliedRaftLogNumForRejectThreshold(
+        Integer.parseInt(
+            properties.getProperty(
+                "unapplied_raft_log_num_for_reject_threshold",
+                String.valueOf(this.getUnAppliedRaftLogNumForRejectThreshold()))));
+
+    this.setMaxNumberOfPersistRaftLogFiles(
+        Integer.parseInt(
+            properties.getProperty(
+                "max_number_of_persist_raft_log_files",
+                String.valueOf(this.getMaxNumberOfPersistRaftLogFiles()))));
+
+    this.setMaxPersistRaftLogNumberOnDisk(
+        Integer.parseInt(
+            properties.getProperty(
+                "max_persist_raft_log_number_on_disk",
+                String.valueOf(this.getMaxPersistRaftLogNumberOnDisk()))));
+
+    this.setMaxNumberOfLogsPerFetchOnDisk(
+        Integer.parseInt(
+            properties.getProperty(
+                "max_number_of_logs_per_fetch_on_disk",
+                String.valueOf(this.getMaxNumberOfLogsPerFetchOnDisk()))));
+
+    this.setEnableUsePersistLogOnDiskToCatchUp(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_use_persist_log_on_disk_to_catch_up",
+                String.valueOf(this.isEnableUsePersistLogOnDiskToCatchUp()))));
+
+    this.setMaxSyncLogLag(
+        Long.parseLong(
+            properties.getProperty("max_sync_log_lag", String.valueOf(this.getMaxSyncLogLag()))));
+
+    this.setUseFollowerSlidingWindow(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "use_follower_sliding_window",
+                String.valueOf(this.isUseFollowerSlidingWindow()))));
+
+    this.setEnableWeakAcceptance(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_weak_acceptance", String.valueOf(this.isEnableWeakAcceptance()))));
+
+    this.setDispatcherBindingThreadNum(
+        Integer.parseInt(
+            properties.getProperty(
+                "dispatcher_binding_thread_num",
+                String.valueOf(this.getDispatcherBindingThreadNum()))));
+
+    this.setUseFollowerLoadBalance(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "use_follower_load_balance", String.valueOf(this.isUseFollowerLoadBalance()))));
+
+    this.setFollowerLoadBalanceWindowsToUse(
+        Integer.parseInt(
+            properties.getProperty(
+                "follower_load_balance_windows_to_use",
+                String.valueOf(this.getFollowerLoadBalanceWindowsToUse()))));
+
+    this.setFollowerLoadBalanceOverestimateFactor(
+        Double.parseDouble(
+            properties.getProperty(
+                "follower_load_balance_overestimate_factor",
+                String.valueOf(this.getFollowerLoadBalanceOverestimateFactor()))));
+
+    this.setFlowMonitorMaxWindowNum(
+        Integer.parseInt(
+            properties.getProperty(
+                "follower_load_balance_window_num",
+                String.valueOf(this.getFlowMonitorMaxWindowNum()))));
+
+    this.setFlowMonitorWindowInterval(
+        Integer.parseInt(
+            properties.getProperty(
+                "follower_load_balance_window_interval",
+                String.valueOf(this.getFlowMonitorWindowInterval()))));
+
+    this.setEnableCompressedDispatching(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "use_compressed_dispatching", String.valueOf(this.isEnableCompressedDispatching()))));
+
+    this.setDispatchingCompressionType(
+        CompressionType.valueOf(CompressionType.class, properties.getProperty(
+            "default_boolean_encoding", this.getDispatchingCompressionType().toString()))
+    );
+
+    String consistencyLevel = properties.getProperty("consistency_level");
+    if (consistencyLevel != null) {
+      this.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
+    }
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
index 8138562e3e..2496ce316b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
@@ -146,7 +146,7 @@ public class BlockingLogAppender implements LogAppender {
       }
 
       try {
-        TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked());
+        TimeUnit.MILLISECONDS.sleep(10);
         if (System.currentTimeMillis() - startWaitingTime
             > config.getMaxWaitingTimeWhenInsertBlocked()) {
           result.status = Response.RESPONSE_TOO_BUSY;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
index 0985204aa5..cd84e82df1 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
@@ -137,7 +137,7 @@ public class SlidingWindowLogAppender implements LogAppender {
         break;
       }
       try {
-        TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked());
+        TimeUnit.MILLISECONDS.sleep(10);
         if (System.currentTimeMillis() - startWaitingTime
             > config.getMaxWaitingTimeWhenInsertBlocked()) {
           result.status = Response.RESPONSE_TOO_BUSY;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
index 3b83a0cd7d..bf1c9768fa 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
@@ -82,7 +82,7 @@ public class LogDispatcher {
   protected ICompressor compressor;
 
   public int bindingThreadNum;
-  public static int maxBatchSize = 10;
+  public int maxBatchSize = 10;
 
   public LogDispatcher(RaftMember member, RaftConfig config) {
     this.member = member;
@@ -97,6 +97,7 @@ public class LogDispatcher {
     this.allNodes = member.getAllNodes();
     this.newNodes = member.getNewNodes();
     createQueueAndBindingThreads(unionNodes(allNodes, newNodes));
+    maxBatchSize = config.getLogNumInBatch();
   }
 
   public void updateRateLimiter() {
@@ -449,4 +450,5 @@ public class LogDispatcher {
       }
     }
   }
+
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitor.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitor.java
index feba556131..79ff88984b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitor.java
@@ -51,7 +51,7 @@ public class FlowMonitor {
   private RaftConfig config;
 
   public FlowMonitor(Peer node, RaftConfig config) throws IOException {
-    this.maxWindowSize = config.getFlowMonitorMaxWindowSize();
+    this.maxWindowSize = config.getFlowMonitorMaxWindowNum();
     this.windows = new ArrayDeque<>(maxWindowSize);
     this.windowInterval = config.getFlowMonitorWindowInterval();
     this.node = node;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java
index 8b0445dfd5..97df15c2aa 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java
@@ -84,7 +84,7 @@ public class SynchronousSequencer implements LogSequencer {
       }
 
       try {
-        TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked());
+        TimeUnit.MILLISECONDS.sleep(10);
         if (System.currentTimeMillis() - startWaitingTime
             > config.getMaxWaitingTimeWhenInsertBlocked()) {
           return null;
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 8c5b060644..6b176c26fd 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1454,6 +1454,8 @@ public class IoTDBDescriptor {
       if (prevDeleteWalFilesPeriodInMs != conf.getDeleteWalFilesPeriodInMs()) {
         WALManager.getInstance().rebootWALDeleteThread();
       }
+
+      conf.getCustomizedProperties().putAll(properties);
     } catch (Exception e) {
       throw new QueryProcessException(String.format("Fail to reload configuration because %s", e));
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index 905a8275c5..5684202a0c 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -175,6 +175,7 @@ public class DataRegionConsensusImpl {
                                           conf.getDataRatisConsensusLogAppenderBufferSizeMax())
                                       .build())
                               .build())
+                      .setProperties(conf.getCustomizedProperties())
                       .build(),
                   DataRegionConsensusImpl::createDataRegionStateMachine)
               .orElseThrow(
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java b/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
index eec582234d..bb29138bad 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
@@ -135,6 +135,7 @@ public class SchemaRegionConsensusImpl {
                                       .build())
                               .build())
                       .setStorageDir(conf.getSchemaRegionConsensusDir())
+                      .setProperties(conf.getCustomizedProperties())
                       .build(),
                   gid ->
                       new SchemaRegionStateMachine(