You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/09 11:54:05 UTC
[rocketmq] 24/26: [ISSUE #5485] Use local address, remoting port and grpc port to build unique local proxy Id
This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 4b311ff4dd36cb2ad6ef160efae9f07d6985b286
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Thu Dec 8 19:44:43 2022 +0800
[ISSUE #5485] Use local address, remoting port and grpc port to build unique local proxy Id
---
.../proxy/service/sysmessage/HeartbeatSyncer.java | 16 +++++++++++-----
.../proxy/service/sysmessage/HeartbeatSyncerData.java | 16 ++++++++--------
.../proxy/service/sysmessage/HeartbeatSyncerTest.java | 8 ++++----
3 files changed, 23 insertions(+), 17 deletions(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
index 9e333902e..51af02170 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
@@ -50,11 +50,13 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer {
protected ThreadPoolExecutor threadPoolExecutor;
protected ConsumerManager consumerManager;
protected final Map<String /* channelId as longText */, RemoteChannel> remoteChannelMap = new ConcurrentHashMap<>();
+ protected String localProxyId;
public HeartbeatSyncer(TopicRouteService topicRouteService, AdminService adminService,
ConsumerManager consumerManager, MQClientAPIFactory mqClientAPIFactory) {
super(topicRouteService, adminService, mqClientAPIFactory);
this.consumerManager = consumerManager;
+ this.localProxyId = buildLocalProxyId();
this.init();
}
@@ -104,7 +106,6 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer {
try {
this.threadPoolExecutor.submit(() -> {
try {
- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
RemoteChannel remoteChannel = RemoteChannel.create(clientChannelInfo.getChannel());
if (remoteChannel == null) {
return;
@@ -118,7 +119,7 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer {
consumeType,
messageModel,
consumeFromWhere,
- proxyConfig.getLocalServeAddr(),
+ localProxyId,
remoteChannel.encode()
);
data.setSubscriptionDataSet(subList);
@@ -143,7 +144,6 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer {
try {
this.threadPoolExecutor.submit(() -> {
try {
- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
RemoteChannel remoteChannel = RemoteChannel.create(clientChannelInfo.getChannel());
if (remoteChannel == null) {
return;
@@ -157,7 +157,7 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer {
null,
null,
null,
- proxyConfig.getLocalServeAddr(),
+ localProxyId,
remoteChannel.encode()
);
@@ -183,7 +183,7 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer {
for (MessageExt msg : msgs) {
try {
HeartbeatSyncerData data = JSON.parseObject(new String(msg.getBody(), StandardCharsets.UTF_8), HeartbeatSyncerData.class);
- if (data.getConnectProxyIp().equals(ConfigurationManager.getProxyConfig().getLocalServeAddr())) {
+ if (data.getLocalProxyId().equals(localProxyId)) {
continue;
}
@@ -221,4 +221,10 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
+
+ private String buildLocalProxyId() {
+ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+ // use local address, remoting port and grpc port to build unique local proxy Id
+ return proxyConfig.getLocalServeAddr() + "%" + proxyConfig.getRemotingListenPort() + "%" + proxyConfig.getGrpcServerPort();
+ }
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java
index 10c6f1206..97760506f 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java
@@ -36,7 +36,7 @@ public class HeartbeatSyncerData {
private ConsumeType consumeType;
private MessageModel messageModel;
private ConsumeFromWhere consumeFromWhere;
- private String connectProxyIp;
+ private String localProxyId;
private String channelData;
public HeartbeatSyncerData() {
@@ -45,7 +45,7 @@ public class HeartbeatSyncerData {
public HeartbeatSyncerData(HeartbeatType heartbeatType, String clientId,
LanguageCode language, int version, String group,
ConsumeType consumeType, MessageModel messageModel,
- ConsumeFromWhere consumeFromWhere, String connectProxyIp,
+ ConsumeFromWhere consumeFromWhere, String localProxyId,
String channelData) {
this.heartbeatType = heartbeatType;
this.clientId = clientId;
@@ -55,7 +55,7 @@ public class HeartbeatSyncerData {
this.consumeType = consumeType;
this.messageModel = messageModel;
this.consumeFromWhere = consumeFromWhere;
- this.connectProxyIp = connectProxyIp;
+ this.localProxyId = localProxyId;
this.channelData = channelData;
}
@@ -140,12 +140,12 @@ public class HeartbeatSyncerData {
this.consumeFromWhere = consumeFromWhere;
}
- public String getConnectProxyIp() {
- return connectProxyIp;
+ public String getLocalProxyId() {
+ return localProxyId;
}
- public void setConnectProxyIp(String connectProxyIp) {
- this.connectProxyIp = connectProxyIp;
+ public void setLocalProxyId(String localProxyId) {
+ this.localProxyId = localProxyId;
}
public String getChannelData() {
@@ -169,7 +169,7 @@ public class HeartbeatSyncerData {
.add("consumeType", consumeType)
.add("messageModel", messageModel)
.add("consumeFromWhere", consumeFromWhere)
- .add("connectProxyIp", connectProxyIp)
+ .add("connectProxyIp", localProxyId)
.add("channelData", channelData)
.toString();
}
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
index 95152186d..df98f31dc 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
@@ -185,7 +185,7 @@ public class HeartbeatSyncerTest extends InitConfigTest {
String localServeAddr = ConfigurationManager.getProxyConfig().getLocalServeAddr();
// change local serve addr, to simulate other proxy receive messages
- ConfigurationManager.getProxyConfig().setLocalServeAddr(RandomStringUtils.randomAlphabetic(10));
+ heartbeatSyncer.localProxyId = RandomStringUtils.randomAlphabetic(10);
ArgumentCaptor<ClientChannelInfo> syncChannelInfoArgumentCaptor = ArgumentCaptor.forClass(ClientChannelInfo.class);
doReturn(true).when(consumerManager).registerConsumer(anyString(), syncChannelInfoArgumentCaptor.capture(), any(), any(), any(), any(), anyBoolean());
@@ -207,7 +207,7 @@ public class HeartbeatSyncerTest extends InitConfigTest {
doNothing().when(consumerManager).unregisterConsumer(anyString(), syncUnRegisterChannelInfoArgumentCaptor.capture(), anyBoolean());
// change local serve addr, to simulate other proxy receive messages
- ConfigurationManager.getProxyConfig().setLocalServeAddr(RandomStringUtils.randomAlphabetic(10));
+ heartbeatSyncer.localProxyId = RandomStringUtils.randomAlphabetic(10);
heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getAllValues().get(1))), null);
assertSame(channelInfoList.get(0).getChannel(), syncUnRegisterChannelInfoArgumentCaptor.getValue().getChannel());
}
@@ -248,7 +248,7 @@ public class HeartbeatSyncerTest extends InitConfigTest {
String localServeAddr = ConfigurationManager.getProxyConfig().getLocalServeAddr();
// change local serve addr, to simulate other proxy receive messages
- ConfigurationManager.getProxyConfig().setLocalServeAddr(RandomStringUtils.randomAlphabetic(10));
+ heartbeatSyncer.localProxyId = RandomStringUtils.randomAlphabetic(10);
ArgumentCaptor<ClientChannelInfo> syncChannelInfoArgumentCaptor = ArgumentCaptor.forClass(ClientChannelInfo.class);
doReturn(true).when(consumerManager).registerConsumer(anyString(), syncChannelInfoArgumentCaptor.capture(), any(), any(), any(), any(), anyBoolean());
@@ -270,7 +270,7 @@ public class HeartbeatSyncerTest extends InitConfigTest {
doNothing().when(consumerManager).unregisterConsumer(anyString(), syncUnRegisterChannelInfoArgumentCaptor.capture(), anyBoolean());
// change local serve addr, to simulate other proxy receive messages
- ConfigurationManager.getProxyConfig().setLocalServeAddr(RandomStringUtils.randomAlphabetic(10));
+ heartbeatSyncer.localProxyId = RandomStringUtils.randomAlphabetic(10);
heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getAllValues().get(1))), null);
assertSame(channelInfoList.get(0).getChannel(), syncUnRegisterChannelInfoArgumentCaptor.getValue().getChannel());
}