You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/12/22 03:11:56 UTC

[iotdb] branch master updated: [IOTDB-5261]support modify the dn_max_connection_for_internal_servic of IoTConsensus (#8565)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 87baa0fb32 [IOTDB-5261]support modify the dn_max_connection_for_internal_servic of IoTConsensus (#8565)
87baa0fb32 is described below

commit 87baa0fb324580f05e4b78fdd04b8b3323f7378d
Author: Houliang Qi <ne...@163.com>
AuthorDate: Thu Dec 22 11:11:51 2022 +0800

    [IOTDB-5261]support modify the dn_max_connection_for_internal_servic of IoTConsensus (#8565)
    
    * support modify the dn_max_connection_for_internal_servic of IoTConsensus
    
    * fix the comment
---
 .../iotdb/consensus/config/IoTConsensusConfig.java    | 19 +++++++++++++++++--
 .../consensus/iot/client/IoTConsensusClientPool.java  |  7 ++++++-
 .../iotdb/db/consensus/DataRegionConsensusImpl.java   |  2 ++
 3 files changed, 25 insertions(+), 3 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
index d0600a1eee..5d7d27156d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
@@ -75,6 +75,7 @@ public class IoTConsensusConfig {
     private final int selectorNumOfClientManager;
     private final int connectionTimeoutInMs;
     private final int thriftMaxFrameSize;
+    private final int maxConnectionForInternalService;
 
     private RPC(
         int rpcSelectorThreadNum,
@@ -84,7 +85,8 @@ public class IoTConsensusConfig {
         boolean isRpcThriftCompressionEnabled,
         int selectorNumOfClientManager,
         int connectionTimeoutInMs,
-        int thriftMaxFrameSize) {
+        int thriftMaxFrameSize,
+        int maxConnectionForInternalService) {
       this.rpcSelectorThreadNum = rpcSelectorThreadNum;
       this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum;
       this.rpcMaxConcurrentClientNum = rpcMaxConcurrentClientNum;
@@ -93,6 +95,7 @@ public class IoTConsensusConfig {
       this.selectorNumOfClientManager = selectorNumOfClientManager;
       this.connectionTimeoutInMs = connectionTimeoutInMs;
       this.thriftMaxFrameSize = thriftMaxFrameSize;
+      this.maxConnectionForInternalService = maxConnectionForInternalService;
     }
 
     public int getRpcSelectorThreadNum() {
@@ -127,6 +130,10 @@ public class IoTConsensusConfig {
       return thriftMaxFrameSize;
     }
 
+    public int getMaxConnectionForInternalService() {
+      return maxConnectionForInternalService;
+    }
+
     public static RPC.Builder newBuilder() {
       return new RPC.Builder();
     }
@@ -142,6 +149,8 @@ public class IoTConsensusConfig {
       private int connectionTimeoutInMs = (int) TimeUnit.SECONDS.toMillis(20);
       private int thriftMaxFrameSize = 536870912;
 
+      private int maxConnectionForInternalService = 100;
+
       public RPC.Builder setRpcSelectorThreadNum(int rpcSelectorThreadNum) {
         this.rpcSelectorThreadNum = rpcSelectorThreadNum;
         return this;
@@ -183,6 +192,11 @@ public class IoTConsensusConfig {
         return this;
       }
 
+      public RPC.Builder setMaxConnectionForInternalService(int maxConnectionForInternalService) {
+        this.maxConnectionForInternalService = maxConnectionForInternalService;
+        return this;
+      }
+
       public RPC build() {
         return new RPC(
             rpcSelectorThreadNum,
@@ -192,7 +206,8 @@ public class IoTConsensusConfig {
             isRpcThriftCompressionEnabled,
             selectorNumOfClientManager,
             connectionTimeoutInMs,
-            thriftMaxFrameSize);
+            thriftMaxFrameSize,
+            maxConnectionForInternalService);
       }
     }
   }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
index 3c7f4a4b8d..babc9fb4f5 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
@@ -35,6 +35,7 @@ public class IoTConsensusClientPool {
 
   public static class SyncIoTConsensusServiceClientPoolFactory
       implements IClientPoolFactory<TEndPoint, SyncIoTConsensusServiceClient> {
+
     private final IoTConsensusConfig config;
 
     public SyncIoTConsensusServiceClientPoolFactory(IoTConsensusConfig config) {
@@ -80,7 +81,11 @@ public class IoTConsensusClientPool {
                       config.getRpc().getSelectorNumOfClientManager())
                   .build(),
               IOT_CONSENSUS_CLIENT_POOL_THREAD_NAME),
-          new ClientPoolProperty.Builder<AsyncIoTConsensusServiceClient>().build().getConfig());
+          new ClientPoolProperty.Builder<AsyncIoTConsensusServiceClient>()
+              .setMaxIdleClientForEachNode(config.getRpc().getMaxConnectionForInternalService())
+              .setMaxTotalClientForEachNode(config.getRpc().getMaxConnectionForInternalService())
+              .build()
+              .getConfig());
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index 53319ea7c9..abc29d55af 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -82,6 +82,8 @@ public class DataRegionConsensusImpl {
                                       .setThriftServerAwaitTimeForStopService(
                                           conf.getThriftServerAwaitTimeForStopService())
                                       .setThriftMaxFrameSize(conf.getThriftMaxFrameSize())
+                                      .setMaxConnectionForInternalService(
+                                          conf.getMaxConnectionForInternalService())
                                       .build())
                               .setReplication(
                                   IoTConsensusConfig.Replication.newBuilder()