You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/09 11:54:02 UTC
[rocketmq] 21/26: [ISSUE #5485] Fix GrpcBaseIT
This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 23c8efd801e60d91f1a2ab0a7d74a835c56efbeb
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Mon Nov 28 15:54:06 2022 +0800
[ISSUE #5485] Fix GrpcBaseIT
---
.../apache/rocketmq/proxy/config/ProxyConfig.java | 26 +++++++++++++++-------
.../sysmessage/AbstractSystemMessageSyncer.java | 16 ++++---------
.../apache/rocketmq/test/grpc/v2/GrpcBaseIT.java | 1 +
3 files changed, 23 insertions(+), 20 deletions(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index e0f971202..4d5084cfd 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -54,16 +54,18 @@ public class ProxyConfig implements ConfigFile {
}
}
- private String rocketMQClusterName = "";
+ private String rocketMQClusterName = DEFAULT_CLUSTER_NAME;
private String proxyClusterName = DEFAULT_CLUSTER_NAME;
private String proxyName = StringUtils.isEmpty(localHostName) ? "DEFAULT_PROXY" : localHostName;
private String localServeAddr = "";
- private String systemTopicClusterName = "";
+ private String heartbeatSyncerTopicClusterName = "";
private int heartbeatSyncerThreadPoolNums = 4;
private int heartbeatSyncerThreadPoolQueueCapacity = 100;
+ private String heartbeatSyncerTopicName = "DefaultHeartBeatSyncerTopic";
+
/**
* configuration for ThreadPoolMonitor
*/
@@ -250,8 +252,8 @@ public class ProxyConfig implements ConfigFile {
if (StringUtils.isBlank(remotingAccessAddr)) {
this.remotingAccessAddr = this.localServeAddr;
}
- if (StringUtils.isBlank(systemTopicClusterName)) {
- this.systemTopicClusterName = this.rocketMQClusterName;
+ if (StringUtils.isBlank(heartbeatSyncerTopicClusterName)) {
+ this.heartbeatSyncerTopicClusterName = this.rocketMQClusterName;
}
}
@@ -324,12 +326,12 @@ public class ProxyConfig implements ConfigFile {
this.localServeAddr = localServeAddr;
}
- public String getSystemTopicClusterName() {
- return systemTopicClusterName;
+ public String getHeartbeatSyncerTopicClusterName() {
+ return heartbeatSyncerTopicClusterName;
}
- public void setSystemTopicClusterName(String systemTopicClusterName) {
- this.systemTopicClusterName = systemTopicClusterName;
+ public void setHeartbeatSyncerTopicClusterName(String heartbeatSyncerTopicClusterName) {
+ this.heartbeatSyncerTopicClusterName = heartbeatSyncerTopicClusterName;
}
public int getHeartbeatSyncerThreadPoolNums() {
@@ -348,6 +350,14 @@ public class ProxyConfig implements ConfigFile {
this.heartbeatSyncerThreadPoolQueueCapacity = heartbeatSyncerThreadPoolQueueCapacity;
}
+ public String getHeartbeatSyncerTopicName() {
+ return heartbeatSyncerTopicName;
+ }
+
+ public void setHeartbeatSyncerTopicName(String heartbeatSyncerTopicName) {
+ this.heartbeatSyncerTopicName = heartbeatSyncerTopicName;
+ }
+
public boolean isEnablePrintJstack() {
return enablePrintJstack;
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java
index e0a9fd702..d08d5dfb1 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java
@@ -49,7 +49,6 @@ public abstract class AbstractSystemMessageSyncer implements StartAndShutdown, M
protected static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
protected final TopicRouteService topicRouteService;
protected final AdminService adminService;
- protected final String systemResourceName;
protected final MQClientAPIFactory mqClientAPIFactory;
protected DefaultMQPushConsumer defaultMQPushConsumer;
@@ -57,25 +56,18 @@ public abstract class AbstractSystemMessageSyncer implements StartAndShutdown, M
this.topicRouteService = topicRouteService;
this.adminService = adminService;
this.mqClientAPIFactory = mqClientAPIFactory;
-
- this.systemResourceName = this.getSystemResourceName();
- }
-
- protected String getSystemResourceName() {
- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
- return TopicValidator.SYSTEM_TOPIC_PREFIX + "proxy_" + this.getClass().getSimpleName() + "_" + proxyConfig.getProxyClusterName();
}
protected String getSystemMessageProducerId() {
- return "PID_" + this.systemResourceName;
+ return "PID_" + getBroadcastTopicName();
}
protected String getSystemMessageConsumerId() {
- return "CID_" + this.systemResourceName;
+ return "CID_" + getBroadcastTopicName();
}
protected String getBroadcastTopicName() {
- return this.systemResourceName;
+ return ConfigurationManager.getProxyConfig().getHeartbeatSyncerTopicName();
}
protected String getSubTag() {
@@ -84,7 +76,7 @@ public abstract class AbstractSystemMessageSyncer implements StartAndShutdown, M
protected String getBroadcastTopicClusterName() {
ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
- return proxyConfig.getSystemTopicClusterName();
+ return proxyConfig.getHeartbeatSyncerTopicClusterName();
}
protected int getBroadcastTopicQueueNum() {
diff --git a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java
index 3fb955a0c..95810b97c 100644
--- a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java
@@ -156,6 +156,7 @@ public class GrpcBaseIT extends BaseConf {
// Set LongPollingReserveTimeInMillis to 500ms to reserve more time for IT
ConfigurationManager.getProxyConfig().setLongPollingReserveTimeInMillis(500);
ConfigurationManager.getProxyConfig().setRocketMQClusterName(brokerController1.getBrokerConfig().getBrokerClusterName());
+ ConfigurationManager.getProxyConfig().setHeartbeatSyncerTopicClusterName(brokerController1.getBrokerConfig().getBrokerClusterName());
ConfigurationManager.getProxyConfig().setMinInvisibleTimeMillsForRecv(3);
}