You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/09 11:54:05 UTC

[rocketmq] 24/26: [ISSUE #5485] Use local address, remoting port and grpc port to build unique local proxy Id

This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 4b311ff4dd36cb2ad6ef160efae9f07d6985b286
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Thu Dec 8 19:44:43 2022 +0800

    [ISSUE #5485] Use local address, remoting port and grpc port to build unique local proxy Id
---
 .../proxy/service/sysmessage/HeartbeatSyncer.java        | 16 +++++++++++-----
 .../proxy/service/sysmessage/HeartbeatSyncerData.java    | 16 ++++++++--------
 .../proxy/service/sysmessage/HeartbeatSyncerTest.java    |  8 ++++----
 3 files changed, 23 insertions(+), 17 deletions(-)

diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
index 9e333902e..51af02170 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
@@ -50,11 +50,13 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer {
     protected ThreadPoolExecutor threadPoolExecutor;
     protected ConsumerManager consumerManager;
     protected final Map<String /* channelId as longText */, RemoteChannel> remoteChannelMap = new ConcurrentHashMap<>();
+    protected String localProxyId;
 
     public HeartbeatSyncer(TopicRouteService topicRouteService, AdminService adminService,
         ConsumerManager consumerManager, MQClientAPIFactory mqClientAPIFactory) {
         super(topicRouteService, adminService, mqClientAPIFactory);
         this.consumerManager = consumerManager;
+        this.localProxyId = buildLocalProxyId();
         this.init();
     }
 
@@ -104,7 +106,6 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer {
         try {
             this.threadPoolExecutor.submit(() -> {
                 try {
-                    ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
                     RemoteChannel remoteChannel = RemoteChannel.create(clientChannelInfo.getChannel());
                     if (remoteChannel == null) {
                         return;
@@ -118,7 +119,7 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer {
                         consumeType,
                         messageModel,
                         consumeFromWhere,
-                        proxyConfig.getLocalServeAddr(),
+                        localProxyId,
                         remoteChannel.encode()
                     );
                     data.setSubscriptionDataSet(subList);
@@ -143,7 +144,6 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer {
         try {
             this.threadPoolExecutor.submit(() -> {
                 try {
-                    ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
                     RemoteChannel remoteChannel = RemoteChannel.create(clientChannelInfo.getChannel());
                     if (remoteChannel == null) {
                         return;
@@ -157,7 +157,7 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer {
                         null,
                         null,
                         null,
-                        proxyConfig.getLocalServeAddr(),
+                        localProxyId,
                         remoteChannel.encode()
                     );
 
@@ -183,7 +183,7 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer {
         for (MessageExt msg : msgs) {
             try {
                 HeartbeatSyncerData data = JSON.parseObject(new String(msg.getBody(), StandardCharsets.UTF_8), HeartbeatSyncerData.class);
-                if (data.getConnectProxyIp().equals(ConfigurationManager.getProxyConfig().getLocalServeAddr())) {
+                if (data.getLocalProxyId().equals(localProxyId)) {
                     continue;
                 }
 
@@ -221,4 +221,10 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer {
 
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
     }
+
+    private String buildLocalProxyId() {
+        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+        // use local address, remoting port and grpc port to build unique local proxy Id
+        return proxyConfig.getLocalServeAddr() + "%" + proxyConfig.getRemotingListenPort() + "%" + proxyConfig.getGrpcServerPort();
+    }
 }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java
index 10c6f1206..97760506f 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java
@@ -36,7 +36,7 @@ public class HeartbeatSyncerData {
     private ConsumeType consumeType;
     private MessageModel messageModel;
     private ConsumeFromWhere consumeFromWhere;
-    private String connectProxyIp;
+    private String localProxyId;
     private String channelData;
 
     public HeartbeatSyncerData() {
@@ -45,7 +45,7 @@ public class HeartbeatSyncerData {
     public HeartbeatSyncerData(HeartbeatType heartbeatType, String clientId,
         LanguageCode language, int version, String group,
         ConsumeType consumeType, MessageModel messageModel,
-        ConsumeFromWhere consumeFromWhere, String connectProxyIp,
+        ConsumeFromWhere consumeFromWhere, String localProxyId,
         String channelData) {
         this.heartbeatType = heartbeatType;
         this.clientId = clientId;
@@ -55,7 +55,7 @@ public class HeartbeatSyncerData {
         this.consumeType = consumeType;
         this.messageModel = messageModel;
         this.consumeFromWhere = consumeFromWhere;
-        this.connectProxyIp = connectProxyIp;
+        this.localProxyId = localProxyId;
         this.channelData = channelData;
     }
 
@@ -140,12 +140,12 @@ public class HeartbeatSyncerData {
         this.consumeFromWhere = consumeFromWhere;
     }
 
-    public String getConnectProxyIp() {
-        return connectProxyIp;
+    public String getLocalProxyId() {
+        return localProxyId;
     }
 
-    public void setConnectProxyIp(String connectProxyIp) {
-        this.connectProxyIp = connectProxyIp;
+    public void setLocalProxyId(String localProxyId) {
+        this.localProxyId = localProxyId;
     }
 
     public String getChannelData() {
@@ -169,7 +169,7 @@ public class HeartbeatSyncerData {
             .add("consumeType", consumeType)
             .add("messageModel", messageModel)
             .add("consumeFromWhere", consumeFromWhere)
-            .add("connectProxyIp", connectProxyIp)
+            .add("connectProxyIp", localProxyId)
             .add("channelData", channelData)
             .toString();
     }
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
index 95152186d..df98f31dc 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
@@ -185,7 +185,7 @@ public class HeartbeatSyncerTest extends InitConfigTest {
 
         String localServeAddr = ConfigurationManager.getProxyConfig().getLocalServeAddr();
         // change local serve addr, to simulate other proxy receive messages
-        ConfigurationManager.getProxyConfig().setLocalServeAddr(RandomStringUtils.randomAlphabetic(10));
+        heartbeatSyncer.localProxyId = RandomStringUtils.randomAlphabetic(10);
         ArgumentCaptor<ClientChannelInfo> syncChannelInfoArgumentCaptor = ArgumentCaptor.forClass(ClientChannelInfo.class);
         doReturn(true).when(consumerManager).registerConsumer(anyString(), syncChannelInfoArgumentCaptor.capture(), any(), any(), any(), any(), anyBoolean());
 
@@ -207,7 +207,7 @@ public class HeartbeatSyncerTest extends InitConfigTest {
         doNothing().when(consumerManager).unregisterConsumer(anyString(), syncUnRegisterChannelInfoArgumentCaptor.capture(), anyBoolean());
 
         // change local serve addr, to simulate other proxy receive messages
-        ConfigurationManager.getProxyConfig().setLocalServeAddr(RandomStringUtils.randomAlphabetic(10));
+        heartbeatSyncer.localProxyId = RandomStringUtils.randomAlphabetic(10);
         heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getAllValues().get(1))), null);
         assertSame(channelInfoList.get(0).getChannel(), syncUnRegisterChannelInfoArgumentCaptor.getValue().getChannel());
     }
@@ -248,7 +248,7 @@ public class HeartbeatSyncerTest extends InitConfigTest {
 
         String localServeAddr = ConfigurationManager.getProxyConfig().getLocalServeAddr();
         // change local serve addr, to simulate other proxy receive messages
-        ConfigurationManager.getProxyConfig().setLocalServeAddr(RandomStringUtils.randomAlphabetic(10));
+        heartbeatSyncer.localProxyId = RandomStringUtils.randomAlphabetic(10);
         ArgumentCaptor<ClientChannelInfo> syncChannelInfoArgumentCaptor = ArgumentCaptor.forClass(ClientChannelInfo.class);
         doReturn(true).when(consumerManager).registerConsumer(anyString(), syncChannelInfoArgumentCaptor.capture(), any(), any(), any(), any(), anyBoolean());
 
@@ -270,7 +270,7 @@ public class HeartbeatSyncerTest extends InitConfigTest {
         doNothing().when(consumerManager).unregisterConsumer(anyString(), syncUnRegisterChannelInfoArgumentCaptor.capture(), anyBoolean());
 
         // change local serve addr, to simulate other proxy receive messages
-        ConfigurationManager.getProxyConfig().setLocalServeAddr(RandomStringUtils.randomAlphabetic(10));
+        heartbeatSyncer.localProxyId = RandomStringUtils.randomAlphabetic(10);
         heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getAllValues().get(1))), null);
         assertSame(channelInfoList.get(0).getChannel(), syncUnRegisterChannelInfoArgumentCaptor.getValue().getChannel());
     }