You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2021/09/12 08:24:54 UTC
[rocketmq] branch develop updated: [ISSUE #3314] Make mqClientApi
request timeout settable
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 3604100 [ISSUE #3314] Make mqClientApi request timeout settable
3604100 is described below
commit 36041006aaa92867077849f0dc060d4da295712e
Author: lizhiboo <li...@yeah.net>
AuthorDate: Sun Sep 12 16:24:37 2021 +0800
[ISSUE #3314] Make mqClientApi request timeout settable
---
.../main/java/org/apache/rocketmq/client/ClientConfig.java | 11 ++++++++++-
.../rocketmq/client/impl/factory/MQClientInstance.java | 12 ++++++------
2 files changed, 16 insertions(+), 7 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index beeeb2f..b2c043e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -60,6 +60,8 @@ public class ClientConfig {
private boolean useTLS = TlsSystemConfig.tlsEnable;
+ private int mqClientApiTimeout = 3 * 1000;
+
private LanguageCode language = LanguageCode.JAVA;
public String buildMQClientId() {
@@ -298,6 +300,13 @@ public class ClientConfig {
this.accessChannel = accessChannel;
}
+ public int getMqClientApiTimeout() {
+ return mqClientApiTimeout;
+ }
+
+ public void setMqClientApiTimeout(int mqClientApiTimeout) {
+ this.mqClientApiTimeout = mqClientApiTimeout;
+ }
@Override
public String toString() {
@@ -305,6 +314,6 @@ public class ClientConfig {
+ ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval
+ ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval=" + persistConsumerOffsetInterval
+ ", pullTimeDelayMillsWhenException=" + pullTimeDelayMillsWhenException + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
- + vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + ", namespace=" + namespace + "]";
+ + vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + ", namespace=" + namespace + ", mqClientApiTimeout=" + mqClientApiTimeout + "]";
}
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index d30534f..e897d49 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -447,7 +447,7 @@ public class MQClientInstance {
if (addr != null) {
try {
this.getMQClientAPIImpl().checkClientInBroker(
- addr, entry.getKey(), this.clientId, subscriptionData, 3 * 1000
+ addr, entry.getKey(), this.clientId, subscriptionData, clientConfig.getMqClientApiTimeout()
);
} catch (Exception e) {
if (e instanceof MQClientException) {
@@ -554,7 +554,7 @@ public class MQClientInstance {
}
try {
- int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
+ int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());
if (!this.brokerVersionTable.containsKey(brokerName)) {
this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
}
@@ -610,7 +610,7 @@ public class MQClientInstance {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
- 1000 * 3);
+ clientConfig.getMqClientApiTimeout());
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
@@ -619,7 +619,7 @@ public class MQClientInstance {
}
}
} else {
- topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
+ topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
}
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
@@ -894,7 +894,7 @@ public class MQClientInstance {
String addr = entry1.getValue();
if (addr != null) {
try {
- this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000);
+ this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, clientConfig.getMqClientApiTimeout());
log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, consumerGroup, brokerName, entry1.getKey(), addr);
} catch (RemotingException e) {
log.error("unregister client exception from broker: " + addr, e);
@@ -1064,7 +1064,7 @@ public class MQClientInstance {
if (null != brokerAddr) {
try {
- return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000);
+ return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, clientConfig.getMqClientApiTimeout());
} catch (Exception e) {
log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e);
}