You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/01/07 07:27:56 UTC
[iotdb] 01/01: finish
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch jira5384
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c135f8a6ede2850dbfe7c193282e9917e59938b0
Author: OneSizeFitQuorum <ta...@apache.org>
AuthorDate: Sat Jan 7 15:27:31 2023 +0800
finish
Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
---
.../resources/conf/iotdb-confignode.properties | 11 ++
.../iotdb/confignode/conf/ConfigNodeConfig.java | 33 ++++
.../confignode/conf/ConfigNodeDescriptor.java | 16 ++
.../iotdb/confignode/manager/ConsensusManager.java | 10 +-
.../iotdb/consensus/config/ConsensusConfig.java | 9 +-
.../iotdb/consensus/config/IoTConsensusConfig.java | 40 ++--
.../apache/iotdb/consensus/config/RatisConfig.java | 219 ++++++++++++++-------
.../iot/client/IoTConsensusClientPool.java | 14 +-
.../apache/iotdb/consensus/ratis/RatisClient.java | 10 +-
.../iotdb/consensus/ratis/RatisConsensus.java | 12 +-
.../iotdb/consensus/ratis/RatisConsensusTest.java | 2 +-
.../Reference/ConfigNode-Config-Manual.md | 18 ++
docs/UserGuide/Reference/DataNode-Config-Manual.md | 18 ++
.../Reference/ConfigNode-Config-Manual.md | 20 +-
.../UserGuide/Reference/DataNode-Config-Manual.md | 18 ++
.../apache/iotdb/commons/client/ClientManager.java | 7 +-
.../iotdb/commons/client/ClientPoolFactory.java | 30 ++-
.../client/property/ClientPoolProperty.java | 34 ++--
.../iotdb/commons/concurrent/ThreadName.java | 4 +-
.../apache/iotdb/commons/conf/CommonConfig.java | 22 +--
.../iotdb/commons/conf/CommonDescriptor.java | 28 ++-
.../iotdb/commons/client/ClientManagerTest.java | 10 +-
.../resources/conf/iotdb-datanode.properties | 64 +-----
.../iotdb/db/client/DataNodeClientPoolFactory.java | 10 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 40 ++--
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 12 +-
.../db/consensus/DataRegionConsensusImpl.java | 26 ++-
.../db/consensus/SchemaRegionConsensusImpl.java | 28 +--
28 files changed, 510 insertions(+), 255 deletions(-)
diff --git a/confignode/src/assembly/resources/conf/iotdb-confignode.properties b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
index 932864ba58..726c0264b3 100644
--- a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
+++ b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
@@ -103,6 +103,17 @@ cn_target_config_node_list=127.0.0.1:10710
# Datatype: int
# cn_selector_thread_nums_of_client_manager=1
+# The maximum number of clients that can be idle for a node in a clientManager.
+# When the number of idle clients on a node exceeds this number, newly returned clients will be released
+# Datatype: int
+# cn_core_client_count_for_each_node_in_client_manager=200
+
+# The maximum number of clients that can be allocated for a node in a clientManager.
+# when the number of the client to a single node exceeds this number, the thread for applying for a client will be blocked
+# for a while, then ClientManager will throw ClientManagerException if there are no clients after the block time.
+# Datatype: int
+# cn_max_client_count_for_each_node_in_client_manager=300
+
####################
### Metric Configuration
####################
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 94e906a9ad..1b48ba80b1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.confignode.conf;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer;
@@ -125,6 +126,20 @@ public class ConfigNodeConfig {
/** just for test wait for 60 second by default. */
private int thriftServerAwaitTimeForStopService = 60;
+ /**
+ * The maximum number of clients that can be idle for a node in a clientManager. When the number
+ * of idle clients on a node exceeds this number, newly returned clients will be released
+ */
+ private int coreClientNumForEachNode = DefaultProperty.CORE_CLIENT_NUM_FOR_EACH_NODE;
+
+ /**
+ * The maximum number of clients that can be allocated for a node in a clientManager. When the
+ * number of the client to a single node exceeds this number, the thread for applying for a client
+ * will be blocked for a while, then ClientManager will throw ClientManagerException if there are
+ * no clients after the block time.
+ */
+ private int maxClientNumForEachNode = DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;
+
/** System directory, including version file for each database and metadata */
private String systemDir =
ConfigNodeConstant.DATA_DIR + File.separator + IoTDBConstant.SYSTEM_FOLDER_NAME;
@@ -409,6 +424,24 @@ public class ConfigNodeConfig {
this.thriftDefaultBufferSize = thriftDefaultBufferSize;
}
+ public int getCoreClientNumForEachNode() {
+ return coreClientNumForEachNode;
+ }
+
+ public ConfigNodeConfig setCoreClientNumForEachNode(int coreClientNumForEachNode) {
+ this.coreClientNumForEachNode = coreClientNumForEachNode;
+ return this;
+ }
+
+ public int getMaxClientNumForEachNode() {
+ return maxClientNumForEachNode;
+ }
+
+ public ConfigNodeConfig setMaxClientNumForEachNode(int maxClientNumForEachNode) {
+ this.maxClientNumForEachNode = maxClientNumForEachNode;
+ return this;
+ }
+
public String getConsensusDir() {
return consensusDir;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 63d30fc6cd..4c1873258a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -308,6 +308,22 @@ public class ConfigNodeDescriptor {
"cn_thrift_max_frame_size", String.valueOf(conf.getCnThriftMaxFrameSize()))
.trim()));
+ conf.setCoreClientNumForEachNode(
+ Integer.parseInt(
+ properties
+ .getProperty(
+ "cn_core_client_count_for_each_node_in_client_manager",
+ String.valueOf(conf.getCoreClientNumForEachNode()))
+ .trim()));
+
+ conf.setMaxClientNumForEachNode(
+ Integer.parseInt(
+ properties
+ .getProperty(
+ "cn_max_client_count_for_each_node_in_client_manager",
+ String.valueOf(conf.getMaxClientNumForEachNode()))
+ .trim()));
+
conf.setSystemDir(properties.getProperty("cn_system_dir", conf.getSystemDir()).trim());
conf.setConsensusDir(properties.getProperty("cn_consensus_dir", conf.getConsensusDir()).trim());
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index 5064dff74f..aa914cf7c0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -157,8 +157,8 @@ public class ConsensusManager {
CONF.getRatisFirstElectionTimeoutMaxMs(),
TimeUnit.MILLISECONDS))
.build())
- .setRatisConsensus(
- RatisConfig.Impl.newBuilder()
+ .setClient(
+ RatisConfig.Client.newBuilder()
.setClientRequestTimeoutMillis(
CONF.getConfigNodeRatisRequestTimeoutMs())
.setClientMaxRetryAttempt(
@@ -167,6 +167,12 @@ public class ConsensusManager {
CONF.getConfigNodeRatisInitialSleepTimeMs())
.setClientRetryMaxSleepTimeMs(
CONF.getConfigNodeRatisMaxSleepTimeMs())
+ .setCoreClientNumForEachNode(
+ CONF.getCoreClientNumForEachNode())
+ .setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
+ .build())
+ .setImpl(
+ RatisConfig.Impl.newBuilder()
.setTriggerSnapshotFileSize(CONF.getConfigNodeRatisLogMax())
.build())
.build())
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
index fb82dae2a9..92db8f8a84 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.consensus.config;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import java.util.Optional;
+
public class ConsensusConfig {
private final TEndPoint thisNodeEndPoint;
@@ -79,10 +81,9 @@ public class ConsensusConfig {
thisNode,
thisNodeId,
storageDir,
- ratisConfig != null ? ratisConfig : RatisConfig.newBuilder().build(),
- ioTConsensusConfig != null
- ? ioTConsensusConfig
- : IoTConsensusConfig.newBuilder().build());
+ Optional.ofNullable(ratisConfig).orElseGet(() -> RatisConfig.newBuilder().build()),
+ Optional.ofNullable(ioTConsensusConfig)
+ .orElseGet(() -> IoTConsensusConfig.newBuilder().build()));
}
public Builder setThisNode(TEndPoint thisNode) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
index 5d7d27156d..ae070852fc 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
@@ -19,6 +19,9 @@
package org.apache.iotdb.consensus.config;
+import org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty;
+
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
public class IoTConsensusConfig {
@@ -50,8 +53,8 @@ public class IoTConsensusConfig {
public IoTConsensusConfig build() {
return new IoTConsensusConfig(
- rpc != null ? rpc : new RPC.Builder().build(),
- replication != null ? replication : new Replication.Builder().build());
+ Optional.ofNullable(rpc).orElseGet(() -> new RPC.Builder().build()),
+ Optional.ofNullable(replication).orElseGet(() -> new Replication.Builder().build()));
}
public Builder setRpc(RPC rpc) {
@@ -75,7 +78,8 @@ public class IoTConsensusConfig {
private final int selectorNumOfClientManager;
private final int connectionTimeoutInMs;
private final int thriftMaxFrameSize;
- private final int maxConnectionForInternalService;
+ private final int coreClientNumForEachNode;
+ private final int maxClientNumForEachNode;
private RPC(
int rpcSelectorThreadNum,
@@ -86,7 +90,8 @@ public class IoTConsensusConfig {
int selectorNumOfClientManager,
int connectionTimeoutInMs,
int thriftMaxFrameSize,
- int maxConnectionForInternalService) {
+ int coreClientNumForEachNode,
+ int maxClientNumForEachNode) {
this.rpcSelectorThreadNum = rpcSelectorThreadNum;
this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum;
this.rpcMaxConcurrentClientNum = rpcMaxConcurrentClientNum;
@@ -95,7 +100,8 @@ public class IoTConsensusConfig {
this.selectorNumOfClientManager = selectorNumOfClientManager;
this.connectionTimeoutInMs = connectionTimeoutInMs;
this.thriftMaxFrameSize = thriftMaxFrameSize;
- this.maxConnectionForInternalService = maxConnectionForInternalService;
+ this.coreClientNumForEachNode = coreClientNumForEachNode;
+ this.maxClientNumForEachNode = maxClientNumForEachNode;
}
public int getRpcSelectorThreadNum() {
@@ -130,8 +136,12 @@ public class IoTConsensusConfig {
return thriftMaxFrameSize;
}
- public int getMaxConnectionForInternalService() {
- return maxConnectionForInternalService;
+ public int getCoreClientNumForEachNode() {
+ return coreClientNumForEachNode;
+ }
+
+ public int getMaxClientNumForEachNode() {
+ return maxClientNumForEachNode;
}
public static RPC.Builder newBuilder() {
@@ -149,7 +159,9 @@ public class IoTConsensusConfig {
private int connectionTimeoutInMs = (int) TimeUnit.SECONDS.toMillis(20);
private int thriftMaxFrameSize = 536870912;
- private int maxConnectionForInternalService = 100;
+ private int coreClientNumForEachNode = DefaultProperty.CORE_CLIENT_NUM_FOR_EACH_NODE;
+
+ private int maxClientNumForEachNode = DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;
public RPC.Builder setRpcSelectorThreadNum(int rpcSelectorThreadNum) {
this.rpcSelectorThreadNum = rpcSelectorThreadNum;
@@ -192,8 +204,13 @@ public class IoTConsensusConfig {
return this;
}
- public RPC.Builder setMaxConnectionForInternalService(int maxConnectionForInternalService) {
- this.maxConnectionForInternalService = maxConnectionForInternalService;
+ public RPC.Builder setCoreClientNumForEachNode(int coreClientNumForEachNode) {
+ this.coreClientNumForEachNode = coreClientNumForEachNode;
+ return this;
+ }
+
+ public Builder setMaxClientNumForEachNode(int maxClientNumForEachNode) {
+ this.maxClientNumForEachNode = maxClientNumForEachNode;
return this;
}
@@ -207,7 +224,8 @@ public class IoTConsensusConfig {
selectorNumOfClientManager,
connectionTimeoutInMs,
thriftMaxFrameSize,
- maxConnectionForInternalService);
+ coreClientNumForEachNode,
+ maxClientNumForEachNode);
}
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
index 8f877662bd..5917586800 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
@@ -19,11 +19,14 @@
package org.apache.iotdb.consensus.config;
+import org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty;
+
import org.apache.ratis.grpc.GrpcConfigKeys.Server;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
public class RatisConfig {
@@ -33,9 +36,10 @@ public class RatisConfig {
private final Snapshot snapshot;
private final ThreadPool threadPool;
private final Log log;
- private final LeaderLogAppender leaderLogAppender;
private final Grpc grpc;
+ private final Client client;
private final Impl impl;
+ private final LeaderLogAppender leaderLogAppender;
private RatisConfig(
Rpc rpc,
@@ -44,16 +48,18 @@ public class RatisConfig {
ThreadPool threadPool,
Log log,
Grpc grpc,
- LeaderLogAppender leaderLogAppender,
- Impl impl) {
+ Client client,
+ Impl impl,
+ LeaderLogAppender leaderLogAppender) {
this.rpc = rpc;
this.leaderElection = leaderElection;
this.snapshot = snapshot;
this.threadPool = threadPool;
this.log = log;
- this.leaderLogAppender = leaderLogAppender;
this.grpc = grpc;
+ this.client = client;
this.impl = impl;
+ this.leaderLogAppender = leaderLogAppender;
}
public Rpc getRpc() {
@@ -76,42 +82,51 @@ public class RatisConfig {
return log;
}
- public LeaderLogAppender getLeaderLogAppender() {
- return leaderLogAppender;
- }
-
public Grpc getGrpc() {
return grpc;
}
+ public Client getClient() {
+ return client;
+ }
+
public Impl getImpl() {
return impl;
}
+ public LeaderLogAppender getLeaderLogAppender() {
+ return leaderLogAppender;
+ }
+
public static Builder newBuilder() {
return new Builder();
}
public static class Builder {
+
private Rpc rpc;
private LeaderElection leaderElection;
private Snapshot snapshot;
private ThreadPool threadPool;
private Log log;
- private LeaderLogAppender leaderLogAppender;
private Grpc grpc;
+
+ private Client client;
private Impl impl;
+ private LeaderLogAppender leaderLogAppender;
public RatisConfig build() {
return new RatisConfig(
- rpc != null ? rpc : Rpc.newBuilder().build(),
- leaderElection != null ? leaderElection : LeaderElection.newBuilder().build(),
- snapshot != null ? snapshot : Snapshot.newBuilder().build(),
- threadPool != null ? threadPool : ThreadPool.newBuilder().build(),
- log != null ? log : Log.newBuilder().build(),
- grpc != null ? grpc : Grpc.newBuilder().build(),
- leaderLogAppender != null ? leaderLogAppender : LeaderLogAppender.newBuilder().build(),
- impl != null ? impl : Impl.newBuilder().build());
+ Optional.ofNullable(rpc).orElseGet(() -> Rpc.newBuilder().build()),
+ Optional.ofNullable(leaderElection).orElseGet(() -> LeaderElection.newBuilder().build()),
+ Optional.ofNullable(snapshot).orElseGet(() -> Snapshot.newBuilder().build()),
+ Optional.ofNullable(threadPool).orElseGet(() -> ThreadPool.newBuilder().build()),
+ Optional.ofNullable(log).orElseGet(() -> Log.newBuilder().build()),
+ Optional.ofNullable(grpc).orElseGet(() -> Grpc.newBuilder().build()),
+ Optional.ofNullable(client).orElseGet(() -> Client.newBuilder().build()),
+ Optional.ofNullable(impl).orElseGet(() -> Impl.newBuilder().build()),
+ Optional.ofNullable(leaderLogAppender)
+ .orElseGet(() -> LeaderLogAppender.newBuilder().build()));
}
public Builder setRpc(Rpc rpc) {
@@ -144,7 +159,12 @@ public class RatisConfig {
return this;
}
- public Builder setRatisConsensus(Impl impl) {
+ public Builder setClient(Client client) {
+ this.client = client;
+ return this;
+ }
+
+ public Builder setImpl(Impl impl) {
this.impl = impl;
return this;
}
@@ -157,6 +177,7 @@ public class RatisConfig {
/** server rpc timeout related */
public static class Rpc {
+
private final TimeDuration timeoutMin;
private final TimeDuration timeoutMax;
private final TimeDuration requestTimeout;
@@ -215,6 +236,7 @@ public class RatisConfig {
}
public static class Builder {
+
private TimeDuration timeoutMin = TimeDuration.valueOf(2, TimeUnit.SECONDS);
private TimeDuration timeoutMax = TimeDuration.valueOf(4, TimeUnit.SECONDS);
private TimeDuration requestTimeout = TimeDuration.valueOf(20, TimeUnit.SECONDS);
@@ -276,6 +298,7 @@ public class RatisConfig {
}
public static class LeaderElection {
+
private final TimeDuration leaderStepDownWaitTimeKey;
private final boolean preVote;
@@ -297,6 +320,7 @@ public class RatisConfig {
}
public static class Builder {
+
private TimeDuration leaderStepDownWaitTimeKey = TimeDuration.valueOf(30, TimeUnit.SECONDS);
private boolean preVote = RaftServerConfigKeys.LeaderElection.PRE_VOTE_DEFAULT;
@@ -318,6 +342,7 @@ public class RatisConfig {
}
public static class Snapshot {
+
private final boolean autoTriggerEnabled;
private final long creationGap;
private final long autoTriggerThreshold;
@@ -355,6 +380,7 @@ public class RatisConfig {
}
public static class Builder {
+
private boolean autoTriggerEnabled = true;
private long creationGap = RaftServerConfigKeys.Snapshot.CREATION_GAP_DEFAULT;
private long autoTriggerThreshold =
@@ -389,6 +415,7 @@ public class RatisConfig {
}
public static class ThreadPool {
+
private final boolean proxyCached;
private final int proxySize;
private final boolean serverCached;
@@ -440,6 +467,7 @@ public class RatisConfig {
}
public static class Builder {
+
private boolean proxyCached = RaftServerConfigKeys.ThreadPool.PROXY_CACHED_DEFAULT;
private int proxySize = RaftServerConfigKeys.ThreadPool.PROXY_SIZE_DEFAULT;
private boolean serverCached = RaftServerConfigKeys.ThreadPool.SERVER_CACHED_DEFAULT;
@@ -586,6 +614,7 @@ public class RatisConfig {
}
public static class Builder {
+
private boolean useMemory = false;
private int queueElementLimit = 4096;
private SizeInBytes queueByteLimit = SizeInBytes.valueOf("64MB");
@@ -685,6 +714,7 @@ public class RatisConfig {
}
public static class Grpc {
+
private final SizeInBytes messageSizeMax;
private final SizeInBytes flowControlWindow;
private final boolean asyncRequestThreadPoolCached;
@@ -729,6 +759,7 @@ public class RatisConfig {
}
public static class Builder {
+
private SizeInBytes messageSizeMax = SizeInBytes.valueOf("512MB");
private SizeInBytes flowControlWindow = SizeInBytes.valueOf("4MB");
private boolean asyncRequestThreadPoolCached =
@@ -772,43 +803,28 @@ public class RatisConfig {
}
}
- public static class Impl {
- private final int retryTimesMax;
- private final long retryWaitMillis;
+ public static class Client {
private final long clientRequestTimeoutMillis;
private final int clientMaxRetryAttempt;
private final long clientRetryInitialSleepTimeMs;
private final long clientRetryMaxSleepTimeMs;
+ private final int coreClientNumForEachNode;
+ private final int maxClientNumForEachNode;
- private final long triggerSnapshotTime;
- private final long triggerSnapshotFileSize;
-
- private Impl(
- int retryTimesMax,
- long retryWaitMillis,
+ public Client(
long clientRequestTimeoutMillis,
int clientMaxRetryAttempt,
long clientRetryInitialSleepTimeMs,
long clientRetryMaxSleepTimeMs,
- long triggerSnapshotTime,
- long triggerSnapshotFileSize) {
- this.retryTimesMax = retryTimesMax;
- this.retryWaitMillis = retryWaitMillis;
+ int coreClientNumForEachNode,
+ int maxClientNumForEachNode) {
this.clientRequestTimeoutMillis = clientRequestTimeoutMillis;
this.clientMaxRetryAttempt = clientMaxRetryAttempt;
this.clientRetryInitialSleepTimeMs = clientRetryInitialSleepTimeMs;
this.clientRetryMaxSleepTimeMs = clientRetryMaxSleepTimeMs;
- this.triggerSnapshotTime = triggerSnapshotTime;
- this.triggerSnapshotFileSize = triggerSnapshotFileSize;
- }
-
- public int getRetryTimesMax() {
- return retryTimesMax;
- }
-
- public long getRetryWaitMillis() {
- return retryWaitMillis;
+ this.coreClientNumForEachNode = coreClientNumForEachNode;
+ this.maxClientNumForEachNode = maxClientNumForEachNode;
}
public long getClientRequestTimeoutMillis() {
@@ -827,71 +843,132 @@ public class RatisConfig {
return clientRetryMaxSleepTimeMs;
}
- public long getTriggerSnapshotTime() {
- return triggerSnapshotTime;
+ public int getCoreClientNumForEachNode() {
+ return coreClientNumForEachNode;
}
- public long getTriggerSnapshotFileSize() {
- return triggerSnapshotFileSize;
+ public int getMaxClientNumForEachNode() {
+ return maxClientNumForEachNode;
}
- public static Impl.Builder newBuilder() {
+ public static Client.Builder newBuilder() {
return new Builder();
}
public static class Builder {
- private int retryTimesMax = 3;
- private long retryWaitMillis = 500;
private long clientRequestTimeoutMillis = 10000;
private int clientMaxRetryAttempt = 10;
private long clientRetryInitialSleepTimeMs = 100;
private long clientRetryMaxSleepTimeMs = 10000;
- // 120s
- private long triggerSnapshotTime = 120;
- // 20GB
- private long triggerSnapshotFileSize = 20L << 30;
+ private int coreClientNumForEachNode = DefaultProperty.CORE_CLIENT_NUM_FOR_EACH_NODE;
- public Impl build() {
- return new Impl(
- retryTimesMax,
- retryWaitMillis,
+ private int maxClientNumForEachNode = DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;
+
+ public Client build() {
+ return new Client(
clientRequestTimeoutMillis,
clientMaxRetryAttempt,
clientRetryInitialSleepTimeMs,
clientRetryMaxSleepTimeMs,
- triggerSnapshotTime,
- triggerSnapshotFileSize);
+ coreClientNumForEachNode,
+ maxClientNumForEachNode);
}
- public Impl.Builder setRetryTimesMax(int retryTimesMax) {
- this.retryTimesMax = retryTimesMax;
+ public Builder setClientRequestTimeoutMillis(long clientRequestTimeoutMillis) {
+ this.clientRequestTimeoutMillis = clientRequestTimeoutMillis;
return this;
}
- public Impl.Builder setRetryWaitMillis(long retryWaitMillis) {
- this.retryWaitMillis = retryWaitMillis;
+ public Builder setClientMaxRetryAttempt(int clientMaxRetryAttempt) {
+ this.clientMaxRetryAttempt = clientMaxRetryAttempt;
return this;
}
- public Impl.Builder setClientRequestTimeoutMillis(long clientRequestTimeoutMillis) {
- this.clientRequestTimeoutMillis = clientRequestTimeoutMillis;
+ public Builder setClientRetryInitialSleepTimeMs(long clientRetryInitialSleepTimeMs) {
+ this.clientRetryInitialSleepTimeMs = clientRetryInitialSleepTimeMs;
return this;
}
- public Impl.Builder setClientMaxRetryAttempt(int clientMaxRetryAttempt) {
- this.clientMaxRetryAttempt = clientMaxRetryAttempt;
+ public Builder setClientRetryMaxSleepTimeMs(long clientRetryMaxSleepTimeMs) {
+ this.clientRetryMaxSleepTimeMs = clientRetryMaxSleepTimeMs;
return this;
}
- public Impl.Builder setClientRetryInitialSleepTimeMs(long clientRetryInitialSleepTimeMs) {
- this.clientRetryInitialSleepTimeMs = clientRetryInitialSleepTimeMs;
+ public Builder setCoreClientNumForEachNode(int coreClientNumForEachNode) {
+ this.coreClientNumForEachNode = coreClientNumForEachNode;
return this;
}
- public Impl.Builder setClientRetryMaxSleepTimeMs(long clientRetryMaxSleepTimeMs) {
- this.clientRetryMaxSleepTimeMs = clientRetryMaxSleepTimeMs;
+ public Builder setMaxClientNumForEachNode(int maxClientNumForEachNode) {
+ this.maxClientNumForEachNode = maxClientNumForEachNode;
+ return this;
+ }
+ }
+ }
+
+ public static class Impl {
+
+ private final int retryTimesMax;
+ private final long retryWaitMillis;
+
+ private final long triggerSnapshotTime;
+ private final long triggerSnapshotFileSize;
+
+ public Impl(
+ int retryTimesMax,
+ long retryWaitMillis,
+ long triggerSnapshotTime,
+ long triggerSnapshotFileSize) {
+ this.retryTimesMax = retryTimesMax;
+ this.retryWaitMillis = retryWaitMillis;
+ this.triggerSnapshotTime = triggerSnapshotTime;
+ this.triggerSnapshotFileSize = triggerSnapshotFileSize;
+ }
+
+ public int getRetryTimesMax() {
+ return retryTimesMax;
+ }
+
+ public long getRetryWaitMillis() {
+ return retryWaitMillis;
+ }
+
+ public long getTriggerSnapshotTime() {
+ return triggerSnapshotTime;
+ }
+
+ public long getTriggerSnapshotFileSize() {
+ return triggerSnapshotFileSize;
+ }
+
+ public static Impl.Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private int retryTimesMax = 3;
+ private long retryWaitMillis = 500;
+
+ // 120s
+ private long triggerSnapshotTime = 120;
+ // 20GB
+ private long triggerSnapshotFileSize = 20L << 30;
+
+ public Impl build() {
+ return new Impl(
+ retryTimesMax, retryWaitMillis, triggerSnapshotTime, triggerSnapshotFileSize);
+ }
+
+ public Impl.Builder setRetryTimesMax(int retryTimesMax) {
+ this.retryTimesMax = retryTimesMax;
+ return this;
+ }
+
+ public Impl.Builder setRetryWaitMillis(long retryWaitMillis) {
+ this.retryWaitMillis = retryWaitMillis;
return this;
}
@@ -908,6 +985,7 @@ public class RatisConfig {
}
public static class LeaderLogAppender {
+
private final SizeInBytes bufferByteLimit;
private final SizeInBytes snapshotChunkSizeMax;
private final boolean installSnapshotEnabled;
@@ -938,6 +1016,7 @@ public class RatisConfig {
}
public static class Builder {
+
private SizeInBytes bufferByteLimit =
RaftServerConfigKeys.Log.Appender.BUFFER_BYTE_LIMIT_DEFAULT;
private SizeInBytes snapshotChunkSizeMax =
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
index eddb1067fd..90e5aed12c 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.ClientManager;
import org.apache.iotdb.commons.client.IClientPoolFactory;
import org.apache.iotdb.commons.client.property.ClientPoolProperty;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.consensus.config.IoTConsensusConfig;
import org.apache.commons.pool2.KeyedObjectPool;
@@ -52,7 +53,11 @@ public class IoTConsensusClientPool {
.setConnectionTimeoutMs(config.getRpc().getConnectionTimeoutInMs())
.setRpcThriftCompressionEnabled(config.getRpc().isRpcThriftCompressionEnabled())
.build()),
- new ClientPoolProperty.Builder<SyncIoTConsensusServiceClient>().build().getConfig());
+ new ClientPoolProperty.Builder<SyncIoTConsensusServiceClient>()
+ .setCoreClientNumForEachNode(config.getRpc().getCoreClientNumForEachNode())
+ .setMaxClientNumForEachNode(config.getRpc().getMaxClientNumForEachNode())
+ .build()
+ .getConfig());
}
}
@@ -60,7 +65,6 @@ public class IoTConsensusClientPool {
implements IClientPoolFactory<TEndPoint, AsyncIoTConsensusServiceClient> {
private final IoTConsensusConfig config;
- private static final String IOT_CONSENSUS_CLIENT_POOL_THREAD_NAME = "IoTConsensusClientPool";
public AsyncIoTConsensusServiceClientPoolFactory(IoTConsensusConfig config) {
this.config = config;
@@ -78,10 +82,10 @@ public class IoTConsensusClientPool {
.setSelectorNumOfAsyncClientManager(
config.getRpc().getSelectorNumOfClientManager())
.build(),
- IOT_CONSENSUS_CLIENT_POOL_THREAD_NAME),
+ ThreadName.ASYNC_DATANODE_IOT_CONSENSUS_CLIENT_POOL.getName()),
new ClientPoolProperty.Builder<AsyncIoTConsensusServiceClient>()
- .setMaxIdleClientForEachNode(config.getRpc().getMaxConnectionForInternalService())
- .setMaxTotalClientForEachNode(config.getRpc().getMaxConnectionForInternalService())
+ .setCoreClientNumForEachNode(config.getRpc().getCoreClientNumForEachNode())
+ .setMaxClientNumForEachNode(config.getRpc().getMaxClientNumForEachNode())
.build()
.getConfig());
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java
index 80e80232da..2900256016 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java
@@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
public class RatisClient {
@@ -75,13 +74,13 @@ public class RatisClient {
private final RaftProperties raftProperties;
private final RaftClientRpc clientRpc;
- private final Supplier<RatisConfig.Impl> config;
+ private final RatisConfig.Client config;
public Factory(
ClientManager<RaftGroup, RatisClient> clientManager,
RaftProperties raftProperties,
RaftClientRpc clientRpc,
- Supplier<RatisConfig.Impl> config) {
+ RatisConfig.Client config) {
super(clientManager);
this.raftProperties = raftProperties;
this.clientRpc = clientRpc;
@@ -101,7 +100,7 @@ public class RatisClient {
RaftClient.newBuilder()
.setProperties(raftProperties)
.setRaftGroup(group)
- .setRetryPolicy(new RatisRetryPolicy(config.get()))
+ .setRetryPolicy(new RatisRetryPolicy(config))
.setClientRpc(clientRpc)
.build(),
clientManager));
@@ -125,10 +124,9 @@ public class RatisClient {
private static class RatisRetryPolicy implements RetryPolicy {
private static final Logger logger = LoggerFactory.getLogger(RatisClient.class);
- private static final int MAX_ATTEMPTS = 10;
RetryPolicy defaultPolicy;
- public RatisRetryPolicy(RatisConfig.Impl config) {
+ public RatisRetryPolicy(RatisConfig.Client config) {
defaultPolicy =
ExponentialBackoffRetry.newBuilder()
.setBaseSleepTime(
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 94be8ca634..1375a88cca 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -71,7 +71,6 @@ import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
import org.apache.ratis.server.DivisionInfo;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.function.CheckedSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -186,6 +185,7 @@ class RatisConsensus implements IConsensus {
// currently, we only retry when ResourceUnavailableException is caught
return !reply.isSuccess() && (reply.getException() instanceof ResourceUnavailableException);
}
+
/** launch a consensus write with retry mechanism */
private RaftClientReply writeWithRetry(CheckedSupplier<RaftClientReply, IOException> caller)
throws IOException {
@@ -808,13 +808,17 @@ class RatisConsensus implements IConsensus {
}
private class RatisClientPoolFactory implements IClientPoolFactory<RaftGroup, RatisClient> {
+
@Override
public KeyedObjectPool<RaftGroup, RatisClient> createClientPool(
ClientManager<RaftGroup, RatisClient> manager) {
return new GenericKeyedObjectPool<>(
- new RatisClient.Factory(
- manager, properties, clientRpc, MemoizedSupplier.valueOf(config::getImpl)),
- new ClientPoolProperty.Builder<RatisClient>().build().getConfig());
+ new RatisClient.Factory(manager, properties, clientRpc, config.getClient()),
+ new ClientPoolProperty.Builder<RatisClient>()
+ .setCoreClientNumForEachNode(config.getClient().getCoreClientNumForEachNode())
+ .setMaxClientNumForEachNode(config.getClient().getMaxClientNumForEachNode())
+ .build()
+ .getConfig());
}
}
}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index bcb18a34d5..ea38b15115 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -76,7 +76,7 @@ public class RatisConsensusTest {
.setAutoTriggerThreshold(100)
.setCreationGap(10)
.build())
- .setRatisConsensus(
+ .setImpl(
RatisConfig.Impl.newBuilder()
.setTriggerSnapshotFileSize(1)
.setTriggerSnapshotTime(4)
diff --git a/docs/UserGuide/Reference/ConfigNode-Config-Manual.md b/docs/UserGuide/Reference/ConfigNode-Config-Manual.md
index 8f300afbc1..5dba8989b9 100644
--- a/docs/UserGuide/Reference/ConfigNode-Config-Manual.md
+++ b/docs/UserGuide/Reference/ConfigNode-Config-Manual.md
@@ -202,4 +202,22 @@ The global configuration of cluster is in ConfigNode.
| Default | 1 |
| Effective | After restarting system |
+* cn\_core\_client\_count\_for\_each\_node\_in\_client\_manager
+
+| Name | cn\_core\_client\_count\_for\_each\_node\_in\_client\_manager |
+|:------------:|:---------------------------------------------------------------|
+| Description | Number of core clients routed to each node in a ClientManager |
+| Type | int |
+| Default | 200 |
+| Effective | After restarting system |
+
+* cn\_max\_client\_count\_for\_each\_node\_in\_client\_manager
+
+| Name | cn\_max\_client\_count\_for\_each\_node\_in\_client\_manager |
+|:--------------:|:-------------------------------------------------------------|
+| Description | Number of max clients routed to each node in a ClientManager |
+| Type | int |
+| Default | 300 |
+| Effective | After restarting system |
+
### Metric Configuration
diff --git a/docs/UserGuide/Reference/DataNode-Config-Manual.md b/docs/UserGuide/Reference/DataNode-Config-Manual.md
index 34dd38a75e..e8a8770045 100644
--- a/docs/UserGuide/Reference/DataNode-Config-Manual.md
+++ b/docs/UserGuide/Reference/DataNode-Config-Manual.md
@@ -255,6 +255,24 @@ The permission definitions are in ${IOTDB\_CONF}/conf/jmx.access.
|Default| 1024 |
|Effective|After restarting system|
+* dn\_core\_client\_count\_for\_each\_node\_in\_client\_manager
+
+| Name | dn\_core\_client\_count\_for\_each\_node\_in\_client\_manager |
+|:------------:|:--------------------------------------------------------------|
+| Description | Number of core clients routed to each node in a ClientManager |
+| Type | int |
+| Default | 200 |
+| Effective | After restarting system |
+
+* dn\_max\_client\_count\_for\_each\_node\_in\_client\_manager
+
+| Name | dn\_max\_client\_count\_for\_each\_node\_in\_client\_manager |
+|:--------------:|:-------------------------------------------------------------|
+| Description | Number of max clients routed to each node in a ClientManager |
+| Type | int |
+| Default | 300 |
+| Effective | After restarting system |
+
### Dictionary Configuration
* dn\_system\_dir
diff --git a/docs/zh/UserGuide/Reference/ConfigNode-Config-Manual.md b/docs/zh/UserGuide/Reference/ConfigNode-Config-Manual.md
index c3cf639c71..98fcca5eea 100644
--- a/docs/zh/UserGuide/Reference/ConfigNode-Config-Manual.md
+++ b/docs/zh/UserGuide/Reference/ConfigNode-Config-Manual.md
@@ -58,7 +58,6 @@ IoTDB ConfigNode 配置文件均位于 IoTDB 安装目录:`conf`文件夹下
|默认值|默认与最大堆内存相等|
|改后生效方式|重启服务生效|
-
## 系统配置项(iotdb-confignode.properties)
IoTDB 集群的全局配置通过 ConfigNode 配置。
@@ -105,7 +104,6 @@ IoTDB 集群的全局配置通过 ConfigNode 配置。
| 默认值 | 127.0.0.1:10710 |
| 改后生效方式 | 重启服务生效 |
-
### 数据目录
* cn\_system\_dir
@@ -191,4 +189,22 @@ IoTDB 集群的全局配置通过 ConfigNode 配置。
| 默认值 | 1 |
| 改后生效方式 | 重启服务生效 |
+* cn\_core\_client\_count\_for\_each\_node\_in\_client\_manager
+
+| 名字 | cn\_core\_client\_count\_for\_each\_node\_in\_client\_manager |
+|:------:|:--------------------------------------------------------------|
+| 描述 | 单 ClientManager 中路由到每个节点的核心 Client 个数 |
+| 类型 | int |
+| 默认值 | 200 |
+| 改后生效方式 | 重启服务生效 |
+
+* cn\_max\_client\_count\_for\_each\_node\_in\_client\_manager
+
+| 名字 | cn\_max\_client\_count\_for\_each\_node\_in\_client\_manager |
+|:------:|:-------------------------------------------------------------|
+| 描述 | 单 ClientManager 中路由到每个节点的最大 Client 个数 |
+| 类型 | int |
+| 默认值 | 300 |
+| 改后生效方式 | 重启服务生效 |
+
### Metric 监控配置
\ No newline at end of file
diff --git a/docs/zh/UserGuide/Reference/DataNode-Config-Manual.md b/docs/zh/UserGuide/Reference/DataNode-Config-Manual.md
index fad04934f0..9f15e54720 100644
--- a/docs/zh/UserGuide/Reference/DataNode-Config-Manual.md
+++ b/docs/zh/UserGuide/Reference/DataNode-Config-Manual.md
@@ -247,6 +247,24 @@ IoTDB DataNode 与 Standalone 模式共用一套配置文件,均位于 IoTDB
|默认值| 1024 |
|改后生效方式|重启服务生效|
+* dn\_core\_client\_count\_for\_each\_node\_in\_client\_manager
+
+| 名字 | dn\_core\_client\_count\_for\_each\_node\_in\_client\_manager |
+|:------:|:--------------------------------------------------------------|
+| 描述 | 单 ClientManager 中路由到每个节点的核心 Client 个数 |
+| 类型 | int |
+| 默认值 | 200 |
+| 改后生效方式 | 重启服务生效 |
+
+* dn\_max\_client\_count\_for\_each\_node\_in\_client\_manager
+
+| 名字 | dn\_max\_client\_count\_for\_each\_node\_in\_client\_manager |
+|:------:|:-------------------------------------------------------------|
+| 描述 | 单 ClientManager 中路由到每个节点的最大 Client 个数 |
+| 类型 | int |
+| 默认值 | 300 |
+| 改后生效方式 | 重启服务生效 |
+
### 目录配置
* dn\_system\_dir
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
index dc1f5fc83f..770c729bbe 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
@@ -68,7 +68,7 @@ public class ClientManager<K, V> implements IClientManager<K, V> {
try {
pool.returnObject(node, client);
} catch (Exception e) {
- logger.error(
+ logger.warn(
String.format("Return client %s for node %s to pool failed.", client, node), e);
}
});
@@ -82,8 +82,7 @@ public class ClientManager<K, V> implements IClientManager<K, V> {
try {
pool.clear(node);
} catch (Exception e) {
- logger.error(
- String.format("Clear all client in pool for node %s failed.", node), e);
+ logger.warn(String.format("Clear all client in pool for node %s failed.", node), e);
}
});
}
@@ -93,7 +92,7 @@ public class ClientManager<K, V> implements IClientManager<K, V> {
try {
pool.close();
} catch (Exception e) {
- logger.error("Close client pool failed", e);
+ logger.warn("Close client pool failed", e);
}
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index a795b20199..b30620f8be 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -56,7 +56,11 @@ public class ClientPoolFactory {
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
.build()),
- new ClientPoolProperty.Builder<SyncConfigNodeIServiceClient>().build().getConfig());
+ new ClientPoolProperty.Builder<SyncConfigNodeIServiceClient>()
+ .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
+ .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
+ .build()
+ .getConfig());
}
}
@@ -76,8 +80,8 @@ public class ClientPoolFactory {
.build(),
ThreadName.ASYNC_CONFIGNODE_CLIENT_POOL.getName()),
new ClientPoolProperty.Builder<AsyncConfigNodeIServiceClient>()
- .setMaxIdleClientForEachNode(conf.getMaxIdleClientForEachNode())
- .setMaxTotalClientForEachNode(conf.getMaxTotalClientForEachNode())
+ .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
+ .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
}
@@ -96,7 +100,11 @@ public class ClientPoolFactory {
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
.build()),
- new ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>().build().getConfig());
+ new ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>()
+ .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
+ .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
+ .build()
+ .getConfig());
}
}
@@ -115,7 +123,11 @@ public class ClientPoolFactory {
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
.build(),
ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()),
- new ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>().build().getConfig());
+ new ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>()
+ .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
+ .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
+ .build()
+ .getConfig());
}
}
@@ -135,6 +147,8 @@ public class ClientPoolFactory {
.build(),
ThreadName.ASYNC_CONFIGNODE_HEARTBEAT_CLIENT_POOL.getName()),
new ClientPoolProperty.Builder<AsyncConfigNodeHeartbeatServiceClient>()
+ .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
+ .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
}
@@ -156,6 +170,8 @@ public class ClientPoolFactory {
.build(),
ThreadName.ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL.getName()),
new ClientPoolProperty.Builder<AsyncDataNodeHeartbeatServiceClient>()
+ .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
+ .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
}
@@ -175,6 +191,8 @@ public class ClientPoolFactory {
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
.build()),
new ClientPoolProperty.Builder<SyncDataNodeMPPDataExchangeServiceClient>()
+ .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
+ .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
}
@@ -196,6 +214,8 @@ public class ClientPoolFactory {
.build(),
ThreadName.ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL.getName()),
new ClientPoolProperty.Builder<AsyncDataNodeMPPDataExchangeServiceClient>()
+ .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
+ .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java
index fc9ae843f9..907609658d 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java
@@ -39,40 +39,40 @@ public class ClientPoolProperty<V> {
public static class Builder<V> {
/**
- * when the number of the client to a single node exceeds maxTotalConnectionForEachNode, the
- * current thread will block waitClientTimeoutMS, ClientManager throws ClientManagerException if
- * there are no clients after the block time.
+ * when the number of the client to a single node exceeds maxClientNumForEachNode, the thread
+ * for applying for a client will be blocked for waitClientTimeoutMs, then ClientManager will
+ * throw ClientManagerException if there are no clients after the block time.
*/
- private long waitClientTimeoutMS = DefaultProperty.WAIT_CLIENT_TIMEOUT_MS;
+ private long waitClientTimeoutMs = DefaultProperty.WAIT_CLIENT_TIMEOUT_MS;
/** the maximum number of clients that can be allocated for a node. */
- private int maxTotalClientForEachNode = DefaultProperty.MAX_TOTAL_CLIENT_FOR_EACH_NODE;
+ private int maxClientNumForEachNode = DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;
/**
* the maximum number of clients that can be idle for a node. When the number of idle clients on
* a node exceeds this number, newly returned clients will be released.
*/
- private int maxIdleClientForEachNode = DefaultProperty.MAX_IDLE_CLIENT_FOR_EACH_NODE;
+ private int coreClientNumForEachNode = DefaultProperty.CORE_CLIENT_NUM_FOR_EACH_NODE;
- public Builder<V> setWaitClientTimeoutMS(long waitClientTimeoutMS) {
- this.waitClientTimeoutMS = waitClientTimeoutMS;
+ public Builder<V> setWaitClientTimeoutMs(long waitClientTimeoutMs) {
+ this.waitClientTimeoutMs = waitClientTimeoutMs;
return this;
}
- public Builder<V> setMaxTotalClientForEachNode(int maxTotalClientForEachNode) {
- this.maxTotalClientForEachNode = maxTotalClientForEachNode;
+ public Builder<V> setMaxClientNumForEachNode(int maxClientNumForEachNode) {
+ this.maxClientNumForEachNode = maxClientNumForEachNode;
return this;
}
- public Builder<V> setMaxIdleClientForEachNode(int maxIdleClientForEachNode) {
- this.maxIdleClientForEachNode = maxIdleClientForEachNode;
+ public Builder<V> setCoreClientNumForEachNode(int coreClientNumForEachNode) {
+ this.coreClientNumForEachNode = coreClientNumForEachNode;
return this;
}
public ClientPoolProperty<V> build() {
GenericKeyedObjectPoolConfig<V> poolConfig = new GenericKeyedObjectPoolConfig<>();
- poolConfig.setMaxTotalPerKey(maxTotalClientForEachNode);
- poolConfig.setMaxIdlePerKey(maxIdleClientForEachNode);
- poolConfig.setMaxWait(Duration.ofMillis(waitClientTimeoutMS));
+ poolConfig.setMaxTotalPerKey(maxClientNumForEachNode);
+ poolConfig.setMaxIdlePerKey(coreClientNumForEachNode);
+ poolConfig.setMaxWait(Duration.ofMillis(waitClientTimeoutMs));
poolConfig.setTestOnReturn(true);
poolConfig.setTestOnBorrow(true);
return new ClientPoolProperty<>(poolConfig);
@@ -84,7 +84,7 @@ public class ClientPoolProperty<V> {
private DefaultProperty() {}
public static final long WAIT_CLIENT_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10);
- public static final int MAX_TOTAL_CLIENT_FOR_EACH_NODE = 300;
- public static final int MAX_IDLE_CLIENT_FOR_EACH_NODE = 200;
+ public static final int MAX_CLIENT_NUM_FOR_EACH_NODE = 300;
+ public static final int CORE_CLIENT_NUM_FOR_EACH_NODE = 200;
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 2d23373b99..f0d3bac43a 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -61,7 +61,9 @@ public enum ThreadName {
ASYNC_CONFIGNODE_HEARTBEAT_CLIENT_POOL("AsyncConfigNodeHeartbeatServiceClientPool"),
ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL("AsyncDataNodeHeartbeatServiceClientPool"),
ASYNC_CONFIGNODE_CLIENT_POOL("AsyncConfigNodeIServiceClientPool"),
- ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL("AsyncDataNodeMPPDataExchangeServiceClientPool");
+ ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL("AsyncDataNodeMPPDataExchangeServiceClientPool"),
+
+ ASYNC_DATANODE_IOT_CONSENSUS_CLIENT_POOL("AsyncDataNodeMPPDataExchangeServiceClientPool");
private final String name;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 212498e522..5fcf500672 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.commons.conf;
+import org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.enums.HandleSystemErrorStrategy;
import org.apache.iotdb.tsfile.fileSystem.FSType;
@@ -104,9 +105,8 @@ public class CommonConfig {
/** whether to use thrift compression. */
private boolean isRpcThriftCompressionEnabled = false;
- private int maxTotalClientForEachNode = 300;
-
- private int maxIdleClientForEachNode = 200;
+ private int coreClientNumForEachNode = DefaultProperty.CORE_CLIENT_NUM_FOR_EACH_NODE;
+ private int maxClientNumForEachNode = DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;
/** What will the system do when unrecoverable error occurs. */
private HandleSystemErrorStrategy handleSystemErrorStrategy =
@@ -271,20 +271,20 @@ public class CommonConfig {
isRpcThriftCompressionEnabled = rpcThriftCompressionEnabled;
}
- public int getMaxTotalClientForEachNode() {
- return maxTotalClientForEachNode;
+ public int getMaxClientNumForEachNode() {
+ return maxClientNumForEachNode;
}
- public void setMaxTotalClientForEachNode(int maxTotalClientForEachNode) {
- this.maxTotalClientForEachNode = maxTotalClientForEachNode;
+ public void setMaxClientNumForEachNode(int maxClientNumForEachNode) {
+ this.maxClientNumForEachNode = maxClientNumForEachNode;
}
- public int getMaxIdleClientForEachNode() {
- return maxIdleClientForEachNode;
+ public int getCoreClientNumForEachNode() {
+ return coreClientNumForEachNode;
}
- public void setMaxIdleClientForEachNode(int maxIdleClientForEachNode) {
- this.maxIdleClientForEachNode = maxIdleClientForEachNode;
+ public void setCoreClientNumForEachNode(int coreClientNumForEachNode) {
+ this.coreClientNumForEachNode = coreClientNumForEachNode;
}
HandleSystemErrorStrategy getHandleSystemErrorStrategy() {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index f53521f77b..d0b932b53e 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -116,6 +116,22 @@ public class CommonDescriptor {
String.valueOf(config.getSelectorNumOfClientManager()))
.trim()));
+ config.setCoreClientNumForEachNode(
+ Integer.parseInt(
+ properties
+ .getProperty(
+ "cn_core_client_count_for_each_node_in_client_manager",
+ String.valueOf(config.getCoreClientNumForEachNode()))
+ .trim()));
+
+ config.setMaxClientNumForEachNode(
+ Integer.parseInt(
+ properties
+ .getProperty(
+ "cn_max_client_count_for_each_node_in_client_manager",
+ String.valueOf(config.getMaxClientNumForEachNode()))
+ .trim()));
+
config.setConnectionTimeoutInMS(
Integer.parseInt(
properties
@@ -139,20 +155,20 @@ public class CommonDescriptor {
String.valueOf(config.getSelectorNumOfClientManager()))
.trim()));
- config.setMaxTotalClientForEachNode(
+ config.setCoreClientNumForEachNode(
Integer.parseInt(
properties
.getProperty(
- "dn_max_connection_for_internal_service",
- String.valueOf(config.getMaxTotalClientForEachNode()))
+ "dn_core_client_count_for_each_node_in_client_manager",
+ String.valueOf(config.getCoreClientNumForEachNode()))
.trim()));
- config.setMaxIdleClientForEachNode(
+ config.setMaxClientNumForEachNode(
Integer.parseInt(
properties
.getProperty(
- "dn_core_connection_for_internal_service",
- String.valueOf(config.getMaxIdleClientForEachNode()))
+ "dn_max_client_count_for_each_node_in_client_manager",
+ String.valueOf(config.getMaxClientNumForEachNode()))
.trim()));
config.setHandleSystemErrorStrategy(
diff --git a/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java b/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
index 061fd0f775..94f7454cef 100644
--- a/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
+++ b/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
@@ -181,7 +181,7 @@ public class ClientManagerTest {
new SyncDataNodeInternalServiceClient.Factory(
manager, new ThriftClientProperty.Builder().build()),
new ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>()
- .setMaxIdleClientForEachNode(maxIdleClientForEachNode)
+ .setCoreClientNumForEachNode(maxIdleClientForEachNode)
.build()
.getConfig());
}
@@ -241,8 +241,8 @@ public class ClientManagerTest {
new SyncDataNodeInternalServiceClient.Factory(
manager, new ThriftClientProperty.Builder().build()),
new ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>()
- .setMaxTotalClientForEachNode(maxTotalClientForEachNode)
- .setWaitClientTimeoutMS(waitClientTimeoutMs)
+ .setMaxClientNumForEachNode(maxTotalClientForEachNode)
+ .setWaitClientTimeoutMs(waitClientTimeoutMs)
.build()
.getConfig());
}
@@ -315,8 +315,8 @@ public class ClientManagerTest {
new SyncDataNodeInternalServiceClient.Factory(
manager, new ThriftClientProperty.Builder().build()),
new ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>()
- .setWaitClientTimeoutMS(waitClientTimeoutMS)
- .setMaxTotalClientForEachNode(maxTotalClientForEachNode)
+ .setWaitClientTimeoutMs(waitClientTimeoutMS)
+ .setMaxClientNumForEachNode(maxTotalClientForEachNode)
.build()
.getConfig());
}
diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties b/server/src/assembly/resources/conf/iotdb-datanode.properties
index 03ea81c3a4..f28aba76c8 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -16,116 +16,92 @@
# specific language governing permissions and limitations
# under the License.
#
-
####################
### Data Node RPC Configuration
####################
-
# Used for connection of IoTDB native clients(Session)
# Could set 127.0.0.1(for local test) or ipv4 address
# Datatype: String
dn_rpc_address=127.0.0.1
-
# Used for connection of IoTDB native clients(Session)
# Bind with dn_rpc_address
# Datatype: int
dn_rpc_port=6667
-
# Used for communication inside cluster.
# could set 127.0.0.1(for local test) or ipv4 address.
# Datatype: String
dn_internal_address=127.0.0.1
-
# Used for communication inside cluster.
# Bind with dn_internal_address
# Datatype: int
dn_internal_port=10730
-
# Port for data exchange among DataNodes inside cluster
# Bind with dn_internal_address
# Datatype: int
dn_mpp_data_exchange_port=10740
-
# port for consensus's communication for schema region inside cluster.
# Bind with dn_internal_address
# Datatype: int
dn_schema_region_consensus_port=10750
-
# port for consensus's communication for data region inside cluster.
# Bind with dn_internal_address
# Datatype: int
dn_data_region_consensus_port=10760
-
# Datatype: long
# The time of data node waiting for the next retry to join into the cluster.
# dn_join_cluster_retry_interval_ms=5000
-
####################
### Target Config Nodes
####################
-
# For the first ConfigNode to start, cn_target_config_node_list points to its own cn_internal_address:cn_internal_port.
# For other ConfigNodes that to join the cluster, target_config_node_list points to any running ConfigNode's cn_internal_address:cn_internal_port.
# Format: address:port(,address:port)* e.g. 127.0.0.1:10710,127.0.0.1:10711
# Datatype: String
dn_target_config_node_list=127.0.0.1:10710
-
####################
### Connection Configuration
####################
-
# The maximum session idle time. unit: ms
# Idle sessions are the ones that performs neither query or non-query operations for a period of time
# Set to 0 to disable session timeout
# Datatype: int
# dn_session_timeout_threshold=0
-
# Datatype: boolean
# dn_rpc_thrift_compression_enable=false
-
# if true, a snappy based compression method will be called before sending data by the network
# Datatype: boolean
# this feature is under development, set this as false before it is done.
# dn_rpc_advanced_compression_enable=false
-
# Datatype: int
# dn_rpc_selector_thread_count=1
-
# Datatype: int
# dn_rpc_min_concurrent_client_num=1
-
# Datatype: int
# dn_rpc_max_concurrent_client_num=65535
-
# thrift max frame size, 512MB by default
# Datatype: int
# dn_thrift_max_frame_size=536870912
-
# thrift init buffer size
# Datatype: int
# dn_thrift_init_buffer_size=1024
-
# Thrift socket and connection timeout between raft nodes, in milliseconds.
# Datatype: int
# dn_connection_timeout_ms=20000
-
-# The maximum number of clients that can be idle for a node's InternalService.
-# When the number of idle clients on a node exceeds this number, newly returned clients will be released
-# Datatype: int
-# dn_core_connection_for_internal_service=100
-
-# The maximum number of clients that can be applied for a node's InternalService
-# Datatype: int
-# dn_max_connection_for_internal_service=100
-
# selector thread (TAsyncClientManager) nums for async thread in a clientManager
# Datatype: int
# dn_selector_thread_count_of_client_manager=1
-
+# The maximum number of clients that can be idle for a node in a clientManager.
+# When the number of idle clients on a node exceeds this number, newly returned clients will be released
+# Datatype: int
+# dn_core_client_count_for_each_node_in_client_manager=200
+# The maximum number of clients that can be allocated for a node in a clientManager.
+# When the number of the client to a single node exceeds this number, the thread for applying for a client will be blocked
+# for a while, then ClientManager will throw ClientManagerException if there are no clients after the block time.
+# Datatype: int
+# dn_max_client_count_for_each_node_in_client_manager=300
####################
### Directory Configuration
####################
-
# system dir
# If this property is unset, system will save the data in the default relative path directory under the IoTDB folder(i.e., %IOTDB_HOME%/data/datanode/system).
# If it is absolute, system will save the data in exact location it points to.
@@ -136,8 +112,6 @@ dn_target_config_node_list=127.0.0.1:10710
# For Linux platform
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
# dn_system_dir=data/datanode/system
-
-
# data dirs
# If this property is unset, system will save the data in the default relative path directory under the IoTDB folder(i.e., %IOTDB_HOME%/data/datanode/data).
# If it is absolute, system will save the data in exact location it points to.
@@ -150,8 +124,6 @@ dn_target_config_node_list=127.0.0.1:10710
# For Linux platform
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
# dn_data_dirs=data/datanode/data
-
-
# multi_dir_strategy
# The strategy is used to choose a directory from data_dirs for the system to store a new tsfile.
# System provides four strategies to choose from, or user can create his own strategy by extending org.apache.iotdb.db.conf.directories.strategy.DirectoryStrategy.
@@ -164,7 +136,6 @@ dn_target_config_node_list=127.0.0.1:10710
# If this property is unset, system will use MaxDiskUsableSpaceFirstStrategy as default strategy.
# For this property, fully-qualified class name (include package name) and simple class name are both acceptable.
# dn_multi_dir_strategy=MaxDiskUsableSpaceFirstStrategy
-
# consensus dir
# If this property is unset, system will save the data in the default relative path directory under the IoTDB folder(i.e., %IOTDB_HOME%/data/datanode).
# If it is absolute, system will save the data in the exact location it points to.
@@ -176,7 +147,6 @@ dn_target_config_node_list=127.0.0.1:10710
# For Linux platform
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
# dn_consensus_dir=data/datanode/consensus
-
# wal dirs
# If this property is unset, system will save the data in the default relative path directory under the IoTDB folder(i.e., %IOTDB_HOME%/data/datanode).
# If it is absolute, system will save the data in the exact location it points to.
@@ -189,7 +159,6 @@ dn_target_config_node_list=127.0.0.1:10710
# For Linux platform
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
# dn_wal_dirs=data/datanode/wal
-
# tracing dir
# Uncomment following fields to configure the tracing root directory.
# For Window platform, the index is as follows:
@@ -197,7 +166,6 @@ dn_target_config_node_list=127.0.0.1:10710
# For Linux platform
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
# dn_tracing_dir=datanode/tracing
-
# sync dir
# If this property is unset, system will save the data in the default relative path directory under the IoTDB folder(i.e., %IOTDB_HOME%/data/datanode).
# If it is absolute, system will save the data in the exact location it points to.
@@ -209,65 +177,51 @@ dn_target_config_node_list=127.0.0.1:10710
# For Linux platform
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
# dn_sync_dir=data/datanode/sync
-
####################
### Metric Configuration
####################
-
# The reporters of metric module to report metrics
# If there are more than one reporter, please separate them by commas ",".
# Options: [JMX, PROMETHEUS, IOTDB]
# Datatype: String
# dn_metric_reporter_list=
-
# The type of metric framework which manage metrics
# Options: [MICROMETER, DROPWIZARD]
# Datatype: String
# dn_metric_frame_type=MICROMETER
-
# The level of metric module
# Options: [CORE, IMPORTANT, NORMAL, ALL]
# Datatype: String
# dn_metric_level=CORE
-
# The period of async collection of some metrics in second
# Datatype: int
# dn_metric_async_collect_period=5
-
# The port of prometheus reporter of metric module
# Datatype: int
# dn_metric_prometheus_reporter_port=9091
-
# The host of IoTDB reporter of metric module
# Could set 127.0.0.1(for local test) or ipv4 address
# Datatype: String
# dn_metric_iotdb_reporter_host=127.0.0.1
-
# The port of IoTDB reporter of metric module
# Datatype: int
# dn_metric_iotdb_reporter_port=6667
-
# The username of IoTDB reporter of metric module
# Datatype: String
# dn_metric_iotdb_reporter_username=root
-
# The password of IoTDB reporter of metric module
# Datatype: String
# dn_metric_iotdb_reporter_password=root
-
# The max connection number of IoTDB reporter of metric module
# Datatype: int
# dn_metric_iotdb_reporter_max_connection_number=3
-
# The location of IoTDB reporter of metric module
# The metrics will write into root.__system.${location}
# Datatype: String
# dn_metric_iotdb_reporter_location=metric
-
# The push period of IoTDB reporter of metric module in second
# Datatype: int
# dn_metric_iotdb_reporter_push_period=15
-
# The type of internal reporter in metric module
# Options: [MEMORY, IOTDB]
# Datatype: String
diff --git a/server/src/main/java/org/apache/iotdb/db/client/DataNodeClientPoolFactory.java b/server/src/main/java/org/apache/iotdb/db/client/DataNodeClientPoolFactory.java
index b1451c9337..52114a1415 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/DataNodeClientPoolFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/DataNodeClientPoolFactory.java
@@ -50,8 +50,8 @@ public class DataNodeClientPoolFactory {
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
.build()),
new ClientPoolProperty.Builder<ConfigNodeClient>()
- .setMaxIdleClientForEachNode(conf.getCoreConnectionForInternalService())
- .setMaxTotalClientForEachNode(conf.getMaxConnectionForInternalService())
+ .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
+ .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
}
@@ -74,7 +74,11 @@ public class DataNodeClientPoolFactory {
? conf.getSelectorNumOfClientManager() / 10
: 1)
.build()),
- new ClientPoolProperty.Builder<ConfigNodeClient>().build().getConfig());
+ new ClientPoolProperty.Builder<ConfigNodeClient>()
+ .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
+ .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
+ .build()
+ .getConfig());
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index d562199db4..7eb63a2029 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.conf;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
@@ -918,15 +919,6 @@ public class IoTDBConfig {
/** Thrift socket and connection timeout between data node and config node. */
private int connectionTimeoutInMS = (int) TimeUnit.SECONDS.toMillis(20);
- /** the maximum number of clients that can be applied for a node's InternalService */
- private int maxConnectionForInternalService = 100;
-
- /**
- * the maximum number of clients that can be idle for a node's InternalService. When the number of
- * idle clients on a node exceeds this number, newly returned clients will be released
- */
- private int coreConnectionForInternalService = 100;
-
/**
* ClientManager will have so many selector threads (TAsyncClientManager) to distribute to its
* clients.
@@ -936,6 +928,20 @@ public class IoTDBConfig {
? Runtime.getRuntime().availableProcessors() / 4
: 1;
+ /**
+ * The maximum number of clients that can be idle for a node in a clientManager. When the number
+ * of idle clients on a node exceeds this number, newly returned clients will be released
+ */
+ private int coreClientNumForEachNode = DefaultProperty.CORE_CLIENT_NUM_FOR_EACH_NODE;
+
+ /**
+ * The maximum number of clients that can be allocated for a node in a clientManager. When the
+ * number of the client to a single node exceeds this number, the thread for applying for a client
+ * will be blocked for a while, then ClientManager will throw ClientManagerException if there are
+ * no clients after the block time.
+ */
+ private int maxClientNumForEachNode = DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;
+
/**
* Cache size of partition cache in {@link
* org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher}
@@ -3041,20 +3047,20 @@ public class IoTDBConfig {
this.connectionTimeoutInMS = connectionTimeoutInMS;
}
- public int getMaxConnectionForInternalService() {
- return maxConnectionForInternalService;
+ public int getMaxClientNumForEachNode() {
+ return maxClientNumForEachNode;
}
- public void setMaxConnectionForInternalService(int maxConnectionForInternalService) {
- this.maxConnectionForInternalService = maxConnectionForInternalService;
+ public void setMaxClientNumForEachNode(int maxClientNumForEachNode) {
+ this.maxClientNumForEachNode = maxClientNumForEachNode;
}
- public int getCoreConnectionForInternalService() {
- return coreConnectionForInternalService;
+ public int getCoreClientNumForEachNode() {
+ return coreClientNumForEachNode;
}
- public void setCoreConnectionForInternalService(int coreConnectionForInternalService) {
- this.coreConnectionForInternalService = coreConnectionForInternalService;
+ public void setCoreClientNumForEachNode(int coreClientNumForEachNode) {
+ this.coreClientNumForEachNode = coreClientNumForEachNode;
}
public int getSelectorNumOfClientManager() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 43f5da3278..8b8f32de4b 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -228,20 +228,20 @@ public class IoTDBDescriptor {
"dn_connection_timeout_ms", String.valueOf(conf.getConnectionTimeoutInMS()))
.trim()));
- conf.setMaxConnectionForInternalService(
+ conf.setCoreClientNumForEachNode(
Integer.parseInt(
properties
.getProperty(
- "dn_max_connection_for_internal_service",
- String.valueOf(conf.getMaxConnectionForInternalService()))
+ "dn_core_client_count_for_each_node_in_client_manager",
+ String.valueOf(conf.getCoreClientNumForEachNode()))
.trim()));
- conf.setCoreConnectionForInternalService(
+ conf.setMaxClientNumForEachNode(
Integer.parseInt(
properties
.getProperty(
- "dn_core_connection_for_internal_service",
- String.valueOf(conf.getCoreConnectionForInternalService()))
+ "dn_max_client_count_for_each_node_in_client_manager",
+ String.valueOf(conf.getMaxClientNumForEachNode()))
.trim()));
conf.setSelectorNumOfClientManager(
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index abc29d55af..104fbc087f 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -43,6 +43,7 @@ import java.util.concurrent.TimeUnit;
* dataRegion's reading and writing
*/
public class DataRegionConsensusImpl {
+
private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
private static IConsensus INSTANCE = null;
@@ -82,8 +83,9 @@ public class DataRegionConsensusImpl {
.setThriftServerAwaitTimeForStopService(
conf.getThriftServerAwaitTimeForStopService())
.setThriftMaxFrameSize(conf.getThriftMaxFrameSize())
- .setMaxConnectionForInternalService(
- conf.getMaxConnectionForInternalService())
+ .setCoreClientNumForEachNode(
+ conf.getCoreClientNumForEachNode())
+ .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build())
.setReplication(
IoTConsensusConfig.Replication.newBuilder()
@@ -144,13 +146,8 @@ public class DataRegionConsensusImpl {
conf.getRatisFirstElectionTimeoutMaxMs(),
TimeUnit.MILLISECONDS))
.build())
- .setLeaderLogAppender(
- RatisConfig.LeaderLogAppender.newBuilder()
- .setBufferByteLimit(
- conf.getDataRatisConsensusLogAppenderBufferSizeMax())
- .build())
- .setRatisConsensus(
- RatisConfig.Impl.newBuilder()
+ .setClient(
+ RatisConfig.Client.newBuilder()
.setClientRequestTimeoutMillis(
conf.getDataRatisConsensusRequestTimeoutMs())
.setClientMaxRetryAttempt(
@@ -159,8 +156,19 @@ public class DataRegionConsensusImpl {
conf.getDataRatisConsensusInitialSleepTimeMs())
.setClientRetryMaxSleepTimeMs(
conf.getDataRatisConsensusMaxSleepTimeMs())
+ .setCoreClientNumForEachNode(
+ conf.getCoreClientNumForEachNode())
+ .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
+ .build())
+ .setImpl(
+ RatisConfig.Impl.newBuilder()
.setTriggerSnapshotFileSize(conf.getDataRatisLogMax())
.build())
+ .setLeaderLogAppender(
+ RatisConfig.LeaderLogAppender.newBuilder()
+ .setBufferByteLimit(
+ conf.getDataRatisConsensusLogAppenderBufferSizeMax())
+ .build())
.build())
.build(),
gid ->
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java b/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
index 38a94598a1..eec582234d 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
@@ -110,23 +110,29 @@ public class SchemaRegionConsensusImpl {
conf.getRatisFirstElectionTimeoutMaxMs(),
TimeUnit.MILLISECONDS))
.build())
- .setLeaderLogAppender(
- RatisConfig.LeaderLogAppender.newBuilder()
- .setBufferByteLimit(
- conf.getSchemaRatisConsensusLogAppenderBufferSizeMax())
- .build())
- .setRatisConsensus(
- RatisConfig.Impl.newBuilder()
+ .setClient(
+ RatisConfig.Client.newBuilder()
.setClientRequestTimeoutMillis(
- conf.getSchemaRatisConsensusRequestTimeoutMs())
+ conf.getDataRatisConsensusRequestTimeoutMs())
.setClientMaxRetryAttempt(
- conf.getSchemaRatisConsensusMaxRetryAttempts())
+ conf.getDataRatisConsensusMaxRetryAttempts())
.setClientRetryInitialSleepTimeMs(
- conf.getSchemaRatisConsensusInitialSleepTimeMs())
+ conf.getDataRatisConsensusInitialSleepTimeMs())
.setClientRetryMaxSleepTimeMs(
- conf.getSchemaRatisConsensusMaxSleepTimeMs())
+ conf.getDataRatisConsensusMaxSleepTimeMs())
+ .setCoreClientNumForEachNode(
+ conf.getCoreClientNumForEachNode())
+ .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
+ .build())
+ .setImpl(
+ RatisConfig.Impl.newBuilder()
.setTriggerSnapshotFileSize(conf.getSchemaRatisLogMax())
.build())
+ .setLeaderLogAppender(
+ RatisConfig.LeaderLogAppender.newBuilder()
+ .setBufferByteLimit(
+ conf.getSchemaRatisConsensusLogAppenderBufferSizeMax())
+ .build())
.build())
.setStorageDir(conf.getSchemaRegionConsensusDir())
.build(),