You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/09 11:54:02 UTC

[rocketmq] 21/26: [ISSUE #5485] Fix GrpcBaseIT

This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 23c8efd801e60d91f1a2ab0a7d74a835c56efbeb
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Mon Nov 28 15:54:06 2022 +0800

    [ISSUE #5485] Fix GrpcBaseIT
---
 .../apache/rocketmq/proxy/config/ProxyConfig.java  | 26 +++++++++++++++-------
 .../sysmessage/AbstractSystemMessageSyncer.java    | 16 ++++---------
 .../apache/rocketmq/test/grpc/v2/GrpcBaseIT.java   |  1 +
 3 files changed, 23 insertions(+), 20 deletions(-)

diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index e0f971202..4d5084cfd 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -54,16 +54,18 @@ public class ProxyConfig implements ConfigFile {
         }
     }
 
-    private String rocketMQClusterName = "";
+    private String rocketMQClusterName = DEFAULT_CLUSTER_NAME;
     private String proxyClusterName = DEFAULT_CLUSTER_NAME;
     private String proxyName = StringUtils.isEmpty(localHostName) ? "DEFAULT_PROXY" : localHostName;
 
     private String localServeAddr = "";
 
-    private String systemTopicClusterName = "";
+    private String heartbeatSyncerTopicClusterName = "";
     private int heartbeatSyncerThreadPoolNums = 4;
     private int heartbeatSyncerThreadPoolQueueCapacity = 100;
 
+    private String heartbeatSyncerTopicName = "DefaultHeartBeatSyncerTopic";
+
     /**
      * configuration for ThreadPoolMonitor
      */
@@ -250,8 +252,8 @@ public class ProxyConfig implements ConfigFile {
         if (StringUtils.isBlank(remotingAccessAddr)) {
             this.remotingAccessAddr = this.localServeAddr;
         }
-        if (StringUtils.isBlank(systemTopicClusterName)) {
-            this.systemTopicClusterName = this.rocketMQClusterName;
+        if (StringUtils.isBlank(heartbeatSyncerTopicClusterName)) {
+            this.heartbeatSyncerTopicClusterName = this.rocketMQClusterName;
         }
     }
 
@@ -324,12 +326,12 @@ public class ProxyConfig implements ConfigFile {
         this.localServeAddr = localServeAddr;
     }
 
-    public String getSystemTopicClusterName() {
-        return systemTopicClusterName;
+    public String getHeartbeatSyncerTopicClusterName() {
+        return heartbeatSyncerTopicClusterName;
     }
 
-    public void setSystemTopicClusterName(String systemTopicClusterName) {
-        this.systemTopicClusterName = systemTopicClusterName;
+    public void setHeartbeatSyncerTopicClusterName(String heartbeatSyncerTopicClusterName) {
+        this.heartbeatSyncerTopicClusterName = heartbeatSyncerTopicClusterName;
     }
 
     public int getHeartbeatSyncerThreadPoolNums() {
@@ -348,6 +350,14 @@ public class ProxyConfig implements ConfigFile {
         this.heartbeatSyncerThreadPoolQueueCapacity = heartbeatSyncerThreadPoolQueueCapacity;
     }
 
+    public String getHeartbeatSyncerTopicName() {
+        return heartbeatSyncerTopicName;
+    }
+
+    public void setHeartbeatSyncerTopicName(String heartbeatSyncerTopicName) {
+        this.heartbeatSyncerTopicName = heartbeatSyncerTopicName;
+    }
+
     public boolean isEnablePrintJstack() {
         return enablePrintJstack;
     }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java
index e0a9fd702..d08d5dfb1 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java
@@ -49,7 +49,6 @@ public abstract class AbstractSystemMessageSyncer implements StartAndShutdown, M
     protected static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
     protected final TopicRouteService topicRouteService;
     protected final AdminService adminService;
-    protected final String systemResourceName;
     protected final MQClientAPIFactory mqClientAPIFactory;
     protected DefaultMQPushConsumer defaultMQPushConsumer;
 
@@ -57,25 +56,18 @@ public abstract class AbstractSystemMessageSyncer implements StartAndShutdown, M
         this.topicRouteService = topicRouteService;
         this.adminService = adminService;
         this.mqClientAPIFactory = mqClientAPIFactory;
-
-        this.systemResourceName = this.getSystemResourceName();
-    }
-
-    protected String getSystemResourceName() {
-        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
-        return TopicValidator.SYSTEM_TOPIC_PREFIX + "proxy_" + this.getClass().getSimpleName() + "_" + proxyConfig.getProxyClusterName();
     }
 
     protected String getSystemMessageProducerId() {
-        return "PID_" + this.systemResourceName;
+        return "PID_" + getBroadcastTopicName();
     }
 
     protected String getSystemMessageConsumerId() {
-        return "CID_" + this.systemResourceName;
+        return "CID_" + getBroadcastTopicName();
     }
 
     protected String getBroadcastTopicName() {
-        return this.systemResourceName;
+        return ConfigurationManager.getProxyConfig().getHeartbeatSyncerTopicName();
     }
 
     protected String getSubTag() {
@@ -84,7 +76,7 @@ public abstract class AbstractSystemMessageSyncer implements StartAndShutdown, M
 
     protected String getBroadcastTopicClusterName() {
         ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
-        return proxyConfig.getSystemTopicClusterName();
+        return proxyConfig.getHeartbeatSyncerTopicClusterName();
     }
 
     protected int getBroadcastTopicQueueNum() {
diff --git a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java
index 3fb955a0c..95810b97c 100644
--- a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java
@@ -156,6 +156,7 @@ public class GrpcBaseIT extends BaseConf {
         // Set LongPollingReserveTimeInMillis to 500ms to reserve more time for IT
         ConfigurationManager.getProxyConfig().setLongPollingReserveTimeInMillis(500);
         ConfigurationManager.getProxyConfig().setRocketMQClusterName(brokerController1.getBrokerConfig().getBrokerClusterName());
+        ConfigurationManager.getProxyConfig().setHeartbeatSyncerTopicClusterName(brokerController1.getBrokerConfig().getBrokerClusterName());
         ConfigurationManager.getProxyConfig().setMinInvisibleTimeMillsForRecv(3);
     }