You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/03/13 06:50:12 UTC
[iotdb] branch native_raft updated: adapt config
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new 1e6b1dda5b adapt config
1e6b1dda5b is described below
commit 1e6b1dda5b29deb95776df8b6ca248455b6df51d
Author: Tian Jiang <jt...@163.com>
AuthorDate: Mon Mar 13 14:51:59 2023 +0800
adapt config
---
.../iotdb/confignode/conf/ConfigNodeConfig.java | 10 +
.../confignode/conf/ConfigNodeDescriptor.java | 1 +
.../iotdb/confignode/manager/ConsensusManager.java | 2 +
.../iotdb/consensus/config/ConsensusConfig.java | 19 +-
.../consensus/natraft/protocol/RaftConfig.java | 315 +++++++++++++++------
.../protocol/log/appender/BlockingLogAppender.java | 2 +-
.../log/appender/SlidingWindowLogAppender.java | 2 +-
.../protocol/log/dispatch/LogDispatcher.java | 4 +-
.../log/dispatch/flowcontrol/FlowMonitor.java | 2 +-
.../log/sequencing/SynchronousSequencer.java | 2 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 2 +
.../db/consensus/DataRegionConsensusImpl.java | 1 +
.../db/consensus/SchemaRegionConsensusImpl.java | 1 +
13 files changed, 266 insertions(+), 97 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 03442f59a6..d850139696 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.confignode.conf;
+import java.util.Properties;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty;
import org.apache.iotdb.commons.conf.IoTDBConstant;
@@ -288,6 +289,7 @@ public class ConfigNodeConfig {
private boolean isEnablePrintingNewlyCreatedPartition = false;
private long forceWalPeriodForConfigNodeSimpleInMs = 100;
+ private Properties otherProperties = new Properties();
public ConfigNodeConfig() {
// empty constructor
@@ -1110,4 +1112,12 @@ public class ConfigNodeConfig {
public void setForceWalPeriodForConfigNodeSimpleInMs(long forceWalPeriodForConfigNodeSimpleInMs) {
this.forceWalPeriodForConfigNodeSimpleInMs = forceWalPeriodForConfigNodeSimpleInMs;
}
+
+ public Properties getOtherProperties() {
+ return otherProperties;
+ }
+
+ public void setOtherProperties(Properties otherProperties) {
+ this.otherProperties = otherProperties;
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 1bcfa488f2..9b3ac486bb 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -772,6 +772,7 @@ public class ConfigNodeDescriptor {
"force_wal_period_for_confignode_simple_in_ms",
String.valueOf(conf.getForceWalPeriodForConfigNodeSimpleInMs()))
.trim()));
+ conf.setOtherProperties(properties);
}
private void loadCQConfig(Properties properties) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index aa914cf7c0..c3e9faffb6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -89,6 +89,7 @@ public class ConsensusManager {
.setThisNode(
new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort()))
.setStorageDir("target" + java.io.File.separator + "simple")
+ .setProperties(CONF.getOtherProperties())
.build(),
gid -> stateMachine)
.orElseThrow(
@@ -177,6 +178,7 @@ public class ConsensusManager {
.build())
.build())
.setStorageDir(CONF.getConsensusDir())
+ .setProperties(CONF.getOtherProperties())
.build(),
gid -> stateMachine)
.orElseThrow(
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
index 853e18934a..2c561db6a2 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.consensus.config;
+import java.util.Properties;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import java.util.Optional;
@@ -31,13 +32,15 @@ public class ConsensusConfig {
private final RatisConfig ratisConfig;
private final IoTConsensusConfig ioTConsensusConfig;
private final RPCConfig rpcConfig;
+ private final Properties properties;
private ConsensusConfig(
TEndPoint thisNode,
int thisNodeId,
String storageDir,
RatisConfig ratisConfig,
- IoTConsensusConfig ioTConsensusConfig) {
+ IoTConsensusConfig ioTConsensusConfig,
+ Properties properties) {
this.thisNodeEndPoint = thisNode;
this.thisNodeId = thisNodeId;
this.storageDir = storageDir;
@@ -55,6 +58,7 @@ public class ConsensusConfig {
ioTConsensusConfig.getRpc().getRpcMaxConcurrentClientNum())
.setThriftMaxFrameSize(ioTConsensusConfig.getRpc().getThriftMaxFrameSize())
.build();
+ this.properties = properties;
}
public TEndPoint getThisNodeEndPoint() {
@@ -85,6 +89,10 @@ public class ConsensusConfig {
return rpcConfig;
}
+ public Properties getProperties() {
+ return properties;
+ }
+
public static class Builder {
private TEndPoint thisNode;
@@ -92,6 +100,7 @@ public class ConsensusConfig {
private String storageDir;
private RatisConfig ratisConfig;
private IoTConsensusConfig ioTConsensusConfig;
+ private Properties properties = new Properties();
public ConsensusConfig build() {
return new ConsensusConfig(
@@ -100,7 +109,8 @@ public class ConsensusConfig {
storageDir,
Optional.ofNullable(ratisConfig).orElseGet(() -> RatisConfig.newBuilder().build()),
Optional.ofNullable(ioTConsensusConfig)
- .orElseGet(() -> IoTConsensusConfig.newBuilder().build()));
+ .orElseGet(() -> IoTConsensusConfig.newBuilder().build()),
+ properties);
}
public Builder setThisNode(TEndPoint thisNode) {
@@ -127,5 +137,10 @@ public class ConsensusConfig {
this.ioTConsensusConfig = ioTConsensusConfig;
return this;
}
+
+ public Builder setProperties(Properties properties) {
+ this.properties = properties;
+ return this;
+ }
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
index 6414a0ccfc..560e89d303 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
@@ -1,27 +1,28 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-
-
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+
+
+ */
package org.apache.iotdb.consensus.natraft.protocol;
+import java.util.Properties;
import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.consensus.config.RPCConfig;
import org.apache.iotdb.consensus.natraft.protocol.consistency.ConsistencyLevel;
@@ -43,43 +44,18 @@ public class RaftConfig {
private int uncommittedRaftLogNumForRejectThreshold = 10000;
private int heartbeatIntervalMs = 1000;
private int electionTimeoutMs = 20_000;
- /** max number of clients in a ClientPool of a member for one node. */
- private int maxClientPerNode = 2000;
-
- /** max number of idle clients in a ClientPool of a member for one node. */
- private int maxIdleClientPerNode = 1000;
-
- /**
- * If the number of connections created for a node exceeds `max_client_pernode_permember_number`,
- * we need to wait so much time for other connections to be released until timeout, or a new
- * connection will be created.
- */
- private long waitClientTimeoutMS = 5 * 1000L;
-
- /**
- * ClientPool will have so many selector threads (TAsyncClientManager) to distribute to its
- * clients.
- */
- private int selectorNumOfClientPool =
- Runtime.getRuntime().availableProcessors() / 3 > 0
- ? Runtime.getRuntime().availableProcessors() / 3
- : 1;
-
private int connectionTimeoutInMS = (int) TimeUnit.SECONDS.toMillis(20);
private boolean enableUsePersistLogOnDiskToCatchUp;
private long writeOperationTimeoutMS = 20_000L;
- // TODO-raft: apply to thrift
- private int thriftMaxFrameSize = 64 * 1024 * 1024;
private int logNumInBatch = 100;
private int dispatcherBindingThreadNum = 16;
private int followerLoadBalanceWindowsToUse = 1;
private double followerLoadBalanceOverestimateFactor = 1.1;
- private int flowMonitorMaxWindowSize = 1000;
+ private int flowMonitorMaxWindowNum = 1000;
private long flowMonitorWindowInterval = 1000;
private String storageDir = "data";
private long electionMaxWaitMs = 5000;
private long unAppliedRaftLogNumForRejectThreshold = 10000;
- private long checkPeriodWhenInsertBlocked = 100;
private long maxWaitingTimeWhenInsertBlocked = 10000;
private boolean useFollowerLoadBalance;
private int raftLogBufferSize = 64 * 1024 * 1024;
@@ -89,8 +65,7 @@ public class RaftConfig {
private int maxPersistRaftLogNumberOnDisk = 10_000_000;
private int flushRaftLogThreshold = 100_000;
private long maxSyncLogLag = 100_000;
- private long syncLeaderMaxWaitMs = 30_000;
-
+ private int syncLeaderMaxWaitMs = 30_000;
private boolean enableCompressedDispatching = true;
private CompressionType dispatchingCompressionType = CompressionType.SNAPPY;
private ConsistencyLevel consistencyLevel = ConsistencyLevel.STRONG_CONSISTENCY;
@@ -100,6 +75,7 @@ public class RaftConfig {
this.storageDir = config.getStorageDir();
new File(this.storageDir).mkdirs();
this.rpcConfig = config.getRPCConfig();
+ loadProperties(config.getProperties());
}
public boolean isEnableWeakAcceptance() {
@@ -192,35 +168,12 @@ public class RaftConfig {
}
public int getMaxClientPerNode() {
- return maxClientPerNode;
+ return rpcConfig.getRpcMaxConcurrentClientNum();
}
- public void setMaxClientPerNode(int maxClientPerNode) {
- this.maxClientPerNode = maxClientPerNode;
- }
public int getMaxIdleClientPerNode() {
- return maxIdleClientPerNode;
- }
-
- public void setMaxIdleClientPerNode(int maxIdleClientPerNode) {
- this.maxIdleClientPerNode = maxIdleClientPerNode;
- }
-
- public long getWaitClientTimeoutMS() {
- return waitClientTimeoutMS;
- }
-
- public void setWaitClientTimeoutMS(long waitClientTimeoutMS) {
- this.waitClientTimeoutMS = waitClientTimeoutMS;
- }
-
- public int getSelectorNumOfClientPool() {
- return selectorNumOfClientPool;
- }
-
- public void setSelectorNumOfClientPool(int selectorNumOfClientPool) {
- this.selectorNumOfClientPool = selectorNumOfClientPool;
+ return rpcConfig.getRpcMinConcurrentClientNum();
}
public int getConnectionTimeoutInMS() {
@@ -248,11 +201,7 @@ public class RaftConfig {
}
public int getThriftMaxFrameSize() {
- return thriftMaxFrameSize;
- }
-
- public void setThriftMaxFrameSize(int thriftMaxFrameSize) {
- this.thriftMaxFrameSize = thriftMaxFrameSize;
+ return rpcConfig.getThriftMaxFrameSize();
}
public int getLogNumInBatch() {
@@ -288,12 +237,12 @@ public class RaftConfig {
this.followerLoadBalanceOverestimateFactor = followerLoadBalanceOverestimateFactor;
}
- public int getFlowMonitorMaxWindowSize() {
- return flowMonitorMaxWindowSize;
+ public int getFlowMonitorMaxWindowNum() {
+ return flowMonitorMaxWindowNum;
}
- public void setFlowMonitorMaxWindowSize(int flowMonitorMaxWindowSize) {
- this.flowMonitorMaxWindowSize = flowMonitorMaxWindowSize;
+ public void setFlowMonitorMaxWindowNum(int flowMonitorMaxWindowNum) {
+ this.flowMonitorMaxWindowNum = flowMonitorMaxWindowNum;
}
public long getFlowMonitorWindowInterval() {
@@ -328,14 +277,6 @@ public class RaftConfig {
this.unAppliedRaftLogNumForRejectThreshold = unAppliedRaftLogNumForRejectThreshold;
}
- public long getCheckPeriodWhenInsertBlocked() {
- return checkPeriodWhenInsertBlocked;
- }
-
- public void setCheckPeriodWhenInsertBlocked(long checkPeriodWhenInsertBlocked) {
- this.checkPeriodWhenInsertBlocked = checkPeriodWhenInsertBlocked;
- }
-
public long getMaxWaitingTimeWhenInsertBlocked() {
return maxWaitingTimeWhenInsertBlocked;
}
@@ -412,7 +353,7 @@ public class RaftConfig {
return syncLeaderMaxWaitMs;
}
- public void setSyncLeaderMaxWaitMs(long syncLeaderMaxWaitMs) {
+ public void setSyncLeaderMaxWaitMs(int syncLeaderMaxWaitMs) {
this.syncLeaderMaxWaitMs = syncLeaderMaxWaitMs;
}
@@ -443,4 +384,198 @@ public class RaftConfig {
public void setDispatchingCompressionType(CompressionType dispatchingCompressionType) {
this.dispatchingCompressionType = dispatchingCompressionType;
}
+
+
+ public void loadProperties(Properties properties) {
+
+ this.setConnectionTimeoutInMS(
+ Integer.parseInt(
+ properties.getProperty(
+ "connection_timeout_ms", String.valueOf(this.getConnectionTimeoutInMS()))));
+
+ this.setHeartbeatIntervalMs(
+ Integer.parseInt(
+ properties.getProperty(
+ "heartbeat_interval_ms", String.valueOf(this.getHeartbeatIntervalMs()))));
+
+ this.setElectionTimeoutMs(
+ Integer.parseInt(
+ properties.getProperty(
+ "election_timeout_ms", String.valueOf(this.getElectionTimeoutMs()))));
+
+ this.setElectionMaxWaitMs(
+ Integer.parseInt(
+ properties.getProperty(
+ "election_max_wait_ms", String.valueOf(this.getElectionMaxWaitMs()))));
+
+ this.setCatchUpTimeoutMS(
+ Integer.parseInt(
+ properties.getProperty(
+ "catch_up_timeout_ms", String.valueOf(this.getCatchUpTimeoutMS()))));
+
+ this.setWriteOperationTimeoutMS(
+ Integer.parseInt(
+ properties.getProperty(
+ "write_operation_timeout_ms",
+ String.valueOf(this.getWriteOperationTimeoutMS()))));
+
+ this.setSyncLeaderMaxWaitMs(Integer.parseInt(
+ properties.getProperty(
+ "sync_leader_max_wait",
+ String.valueOf(this.getSyncLeaderMaxWaitMs()))));
+
+ this.setMinNumOfLogsInMem(
+ Integer.parseInt(
+ properties.getProperty(
+ "min_num_of_logs_in_mem", String.valueOf(this.getMinNumOfLogsInMem()))));
+
+ this.setMaxNumOfLogsInMem(
+ Integer.parseInt(
+ properties.getProperty(
+ "max_num_of_logs_in_mem", String.valueOf(this.getMaxNumOfLogsInMem()))));
+
+ this.setMaxMemorySizeForRaftLog(
+ Long.parseLong(
+ properties.getProperty(
+ "max_memory_for_logs", String.valueOf(this.getMaxNumOfLogsInMem()))));
+
+ this.setMaxWaitingTimeWhenInsertBlocked(
+ Long.parseLong(
+ properties.getProperty(
+ "max_insert_block_time_ms", String.valueOf(this.getMaxWaitingTimeWhenInsertBlocked()))));
+
+ this.setLogDeleteCheckIntervalSecond(
+ Integer.parseInt(
+ properties.getProperty(
+ "log_deletion_check_interval_second",
+ String.valueOf(this.getLogDeleteCheckIntervalSecond()))));
+
+ this.setEnableRaftLogPersistence(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "is_enable_raft_log_persistence",
+ String.valueOf(this.isEnableRaftLogPersistence()))));
+
+ this.setFlushRaftLogThreshold(
+ Integer.parseInt(
+ properties.getProperty(
+ "flush_raft_log_threshold", String.valueOf(this.getFlushRaftLogThreshold()))));
+
+ this.setRaftLogBufferSize(
+ Integer.parseInt(
+ properties.getProperty(
+ "raft_log_buffer_size", String.valueOf(this.getRaftLogBufferSize()))));
+
+ this.setLogNumInBatch(
+ Integer.parseInt(
+ properties.getProperty(
+ "log_batch_num", String.valueOf(this.getLogNumInBatch()))));
+
+ this.setMaxRaftLogIndexSizeInMemory(
+ Integer.parseInt(
+ properties.getProperty(
+ "max_raft_log_index_size_in_memory",
+ String.valueOf(this.getMaxRaftLogIndexSizeInMemory()))));
+
+ this.setUncommittedRaftLogNumForRejectThreshold(
+ Integer.parseInt(
+ properties.getProperty(
+ "uncommitted_raft_log_num_for_reject_threshold",
+ String.valueOf(this.getUncommittedRaftLogNumForRejectThreshold()))));
+
+ this.setUnAppliedRaftLogNumForRejectThreshold(
+ Integer.parseInt(
+ properties.getProperty(
+ "unapplied_raft_log_num_for_reject_threshold",
+ String.valueOf(this.getUnAppliedRaftLogNumForRejectThreshold()))));
+
+ this.setMaxNumberOfPersistRaftLogFiles(
+ Integer.parseInt(
+ properties.getProperty(
+ "max_number_of_persist_raft_log_files",
+ String.valueOf(this.getMaxNumberOfPersistRaftLogFiles()))));
+
+ this.setMaxPersistRaftLogNumberOnDisk(
+ Integer.parseInt(
+ properties.getProperty(
+ "max_persist_raft_log_number_on_disk",
+ String.valueOf(this.getMaxPersistRaftLogNumberOnDisk()))));
+
+ this.setMaxNumberOfLogsPerFetchOnDisk(
+ Integer.parseInt(
+ properties.getProperty(
+ "max_number_of_logs_per_fetch_on_disk",
+ String.valueOf(this.getMaxNumberOfLogsPerFetchOnDisk()))));
+
+ this.setEnableUsePersistLogOnDiskToCatchUp(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_use_persist_log_on_disk_to_catch_up",
+ String.valueOf(this.isEnableUsePersistLogOnDiskToCatchUp()))));
+
+ this.setMaxSyncLogLag(
+ Long.parseLong(
+ properties.getProperty("max_sync_log_lag", String.valueOf(this.getMaxSyncLogLag()))));
+
+ this.setUseFollowerSlidingWindow(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "use_follower_sliding_window",
+ String.valueOf(this.isUseFollowerSlidingWindow()))));
+
+ this.setEnableWeakAcceptance(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_weak_acceptance", String.valueOf(this.isEnableWeakAcceptance()))));
+
+ this.setDispatcherBindingThreadNum(
+ Integer.parseInt(
+ properties.getProperty(
+ "dispatcher_binding_thread_num",
+ String.valueOf(this.getDispatcherBindingThreadNum()))));
+
+ this.setUseFollowerLoadBalance(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "use_follower_load_balance", String.valueOf(this.isUseFollowerLoadBalance()))));
+
+ this.setFollowerLoadBalanceWindowsToUse(
+ Integer.parseInt(
+ properties.getProperty(
+ "follower_load_balance_windows_to_use",
+ String.valueOf(this.getFollowerLoadBalanceWindowsToUse()))));
+
+ this.setFollowerLoadBalanceOverestimateFactor(
+ Double.parseDouble(
+ properties.getProperty(
+ "follower_load_balance_overestimate_factor",
+ String.valueOf(this.getFollowerLoadBalanceOverestimateFactor()))));
+
+ this.setFlowMonitorMaxWindowNum(
+ Integer.parseInt(
+ properties.getProperty(
+ "follower_load_balance_window_num",
+ String.valueOf(this.getFlowMonitorMaxWindowNum()))));
+
+ this.setFlowMonitorWindowInterval(
+ Integer.parseInt(
+ properties.getProperty(
+ "follower_load_balance_window_interval",
+ String.valueOf(this.getFlowMonitorWindowInterval()))));
+
+ this.setEnableCompressedDispatching(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "use_compressed_dispatching", String.valueOf(this.isEnableCompressedDispatching()))));
+
+ this.setDispatchingCompressionType(
+ CompressionType.valueOf(CompressionType.class, properties.getProperty(
+ "default_boolean_encoding", this.getDispatchingCompressionType().toString()))
+ );
+
+ String consistencyLevel = properties.getProperty("consistency_level");
+ if (consistencyLevel != null) {
+ this.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
+ }
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
index 8138562e3e..2496ce316b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
@@ -146,7 +146,7 @@ public class BlockingLogAppender implements LogAppender {
}
try {
- TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked());
+ TimeUnit.MILLISECONDS.sleep(10);
if (System.currentTimeMillis() - startWaitingTime
> config.getMaxWaitingTimeWhenInsertBlocked()) {
result.status = Response.RESPONSE_TOO_BUSY;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
index 0985204aa5..cd84e82df1 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
@@ -137,7 +137,7 @@ public class SlidingWindowLogAppender implements LogAppender {
break;
}
try {
- TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked());
+ TimeUnit.MILLISECONDS.sleep(10);
if (System.currentTimeMillis() - startWaitingTime
> config.getMaxWaitingTimeWhenInsertBlocked()) {
result.status = Response.RESPONSE_TOO_BUSY;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
index 3b83a0cd7d..bf1c9768fa 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
@@ -82,7 +82,7 @@ public class LogDispatcher {
protected ICompressor compressor;
public int bindingThreadNum;
- public static int maxBatchSize = 10;
+ public int maxBatchSize = 10;
public LogDispatcher(RaftMember member, RaftConfig config) {
this.member = member;
@@ -97,6 +97,7 @@ public class LogDispatcher {
this.allNodes = member.getAllNodes();
this.newNodes = member.getNewNodes();
createQueueAndBindingThreads(unionNodes(allNodes, newNodes));
+ maxBatchSize = config.getLogNumInBatch();
}
public void updateRateLimiter() {
@@ -449,4 +450,5 @@ public class LogDispatcher {
}
}
}
+
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitor.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitor.java
index feba556131..79ff88984b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitor.java
@@ -51,7 +51,7 @@ public class FlowMonitor {
private RaftConfig config;
public FlowMonitor(Peer node, RaftConfig config) throws IOException {
- this.maxWindowSize = config.getFlowMonitorMaxWindowSize();
+ this.maxWindowSize = config.getFlowMonitorMaxWindowNum();
this.windows = new ArrayDeque<>(maxWindowSize);
this.windowInterval = config.getFlowMonitorWindowInterval();
this.node = node;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java
index 8b0445dfd5..97df15c2aa 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java
@@ -84,7 +84,7 @@ public class SynchronousSequencer implements LogSequencer {
}
try {
- TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked());
+ TimeUnit.MILLISECONDS.sleep(10);
if (System.currentTimeMillis() - startWaitingTime
> config.getMaxWaitingTimeWhenInsertBlocked()) {
return null;
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 8c5b060644..6b176c26fd 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1454,6 +1454,8 @@ public class IoTDBDescriptor {
if (prevDeleteWalFilesPeriodInMs != conf.getDeleteWalFilesPeriodInMs()) {
WALManager.getInstance().rebootWALDeleteThread();
}
+
+ conf.getCustomizedProperties().putAll(properties);
} catch (Exception e) {
throw new QueryProcessException(String.format("Fail to reload configuration because %s", e));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index 905a8275c5..5684202a0c 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -175,6 +175,7 @@ public class DataRegionConsensusImpl {
conf.getDataRatisConsensusLogAppenderBufferSizeMax())
.build())
.build())
+ .setProperties(conf.getCustomizedProperties())
.build(),
DataRegionConsensusImpl::createDataRegionStateMachine)
.orElseThrow(
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java b/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
index eec582234d..bb29138bad 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
@@ -135,6 +135,7 @@ public class SchemaRegionConsensusImpl {
.build())
.build())
.setStorageDir(conf.getSchemaRegionConsensusDir())
+ .setProperties(conf.getCustomizedProperties())
.build(),
gid ->
new SchemaRegionStateMachine(