You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2021/09/12 08:24:54 UTC

[rocketmq] branch develop updated: [ISSUE #3314] Make mqClientApi request timeout settable

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3604100  [ISSUE #3314] Make mqClientApi request timeout settable
3604100 is described below

commit 36041006aaa92867077849f0dc060d4da295712e
Author: lizhiboo <li...@yeah.net>
AuthorDate: Sun Sep 12 16:24:37 2021 +0800

    [ISSUE #3314] Make mqClientApi request timeout settable
---
 .../main/java/org/apache/rocketmq/client/ClientConfig.java   | 11 ++++++++++-
 .../rocketmq/client/impl/factory/MQClientInstance.java       | 12 ++++++------
 2 files changed, 16 insertions(+), 7 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index beeeb2f..b2c043e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -60,6 +60,8 @@ public class ClientConfig {
 
     private boolean useTLS = TlsSystemConfig.tlsEnable;
 
+    private int mqClientApiTimeout = 3 * 1000;
+
     private LanguageCode language = LanguageCode.JAVA;
 
     public String buildMQClientId() {
@@ -298,6 +300,13 @@ public class ClientConfig {
         this.accessChannel = accessChannel;
     }
 
+    public int getMqClientApiTimeout() {
+        return mqClientApiTimeout;
+    }
+
+    public void setMqClientApiTimeout(int mqClientApiTimeout) {
+        this.mqClientApiTimeout = mqClientApiTimeout;
+    }
 
     @Override
     public String toString() {
@@ -305,6 +314,6 @@ public class ClientConfig {
             + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval
             + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval=" + persistConsumerOffsetInterval
             + ", pullTimeDelayMillsWhenException=" + pullTimeDelayMillsWhenException + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
-            + vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + ", namespace=" + namespace + "]";
+            + vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + ", namespace=" + namespace + ", mqClientApiTimeout=" + mqClientApiTimeout + "]";
     }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index d30534f..e897d49 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -447,7 +447,7 @@ public class MQClientInstance {
                 if (addr != null) {
                     try {
                         this.getMQClientAPIImpl().checkClientInBroker(
-                            addr, entry.getKey(), this.clientId, subscriptionData, 3 * 1000
+                            addr, entry.getKey(), this.clientId, subscriptionData, clientConfig.getMqClientApiTimeout()
                         );
                     } catch (Exception e) {
                         if (e instanceof MQClientException) {
@@ -554,7 +554,7 @@ public class MQClientInstance {
                             }
 
                             try {
-                                int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
+                                int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());
                                 if (!this.brokerVersionTable.containsKey(brokerName)) {
                                     this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
                                 }
@@ -610,7 +610,7 @@ public class MQClientInstance {
                     TopicRouteData topicRouteData;
                     if (isDefault && defaultMQProducer != null) {
                         topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
-                            1000 * 3);
+                            clientConfig.getMqClientApiTimeout());
                         if (topicRouteData != null) {
                             for (QueueData data : topicRouteData.getQueueDatas()) {
                                 int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
@@ -619,7 +619,7 @@ public class MQClientInstance {
                             }
                         }
                     } else {
-                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
+                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
                     }
                     if (topicRouteData != null) {
                         TopicRouteData old = this.topicRouteTable.get(topic);
@@ -894,7 +894,7 @@ public class MQClientInstance {
                     String addr = entry1.getValue();
                     if (addr != null) {
                         try {
-                            this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000);
+                            this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, clientConfig.getMqClientApiTimeout());
                             log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, consumerGroup, brokerName, entry1.getKey(), addr);
                         } catch (RemotingException e) {
                             log.error("unregister client exception from broker: " + addr, e);
@@ -1064,7 +1064,7 @@ public class MQClientInstance {
 
         if (null != brokerAddr) {
             try {
-                return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000);
+                return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, clientConfig.getMqClientApiTimeout());
             } catch (Exception e) {
                 log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e);
             }