You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/09 11:53:53 UTC
[rocketmq] 12/26: [ISSUE #5485] polish channel management which is been synced from other proxy
This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 6351e1957ff6ac39e0e17c899fd0f82ec4ab600f
Author: kaiyi.lk <ka...@alibaba-inc.com>
AuthorDate: Fri Nov 11 17:53:00 2022 +0800
[ISSUE #5485] polish channel management which is been synced from other proxy
---
.../proxy/grpc/v2/DefaultGrpcMessingActivity.java | 1 +
.../proxy/grpc/v2/client/ClientActivity.java | 30 +++-----------
.../grpc/v2/common/GrpcClientSettingsManager.java | 48 +++++++++++++++++++++-
.../proxy/processor/ReceiptHandleProcessor.java | 1 +
.../proxy/processor/channel/RemoteChannel.java | 18 ++++++++
.../remoting/activity/ClientManagerActivity.java | 1 +
.../proxy/remoting/channel/RemotingChannel.java | 12 ++++++
.../proxy/service/sysmessage/HeartbeatSyncer.java | 16 ++++----
.../service/sysmessage/HeartbeatSyncerData.java | 30 ++++++++++----
9 files changed, 113 insertions(+), 44 deletions(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java
index f30519d74..194b9204f 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java
@@ -91,6 +91,7 @@ public class DefaultGrpcMessingActivity extends AbstractStartAndShutdown impleme
this.clientActivity = new ClientActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager);
this.appendStartAndShutdown(this.receiptHandleProcessor);
+ this.appendStartAndShutdown(this.grpcClientSettingsManager);
}
@Override
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
index 00cc862a4..de8fba4a6 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
@@ -40,7 +40,6 @@ import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
-import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ProducerChangeListener;
import org.apache.rocketmq.broker.client.ProducerGroupEvent;
@@ -436,32 +435,12 @@ public class ClientActivity extends AbstractMessingActivity {
}
if (args[0] instanceof ClientChannelInfo) {
ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0];
- String clientId = clientChannelInfo.getClientId();
if (ChannelHelper.isRemote(clientChannelInfo.getChannel())) {
- grpcClientSettingsManager.computeIfPresent(clientId, orgSettings -> {
- if (grpcChannelManager.getChannel(clientId) == null) {
- // if there is no channel connect directly to this proxy
- return null;
- }
- return orgSettings;
- });
- } else {
- grpcChannelManager.removeChannel(clientId);
- grpcClientSettingsManager.computeIfPresent(clientId, orgSettings -> {
- ConsumerGroupInfo consumerGroupInfo = messagingProcessor.getConsumerGroupInfo(group);
- if (consumerGroupInfo == null) {
- return null;
- }
- List<Channel> allChannels = consumerGroupInfo.getAllChannel();
- if (allChannels == null || allChannels.isEmpty() || allChannels.size() == 1) {
- // if there is only one channel of this clientId or
- // there is no channel if this clientId
- // remove settings of this client
- return null;
- }
- return orgSettings;
- });
+ return;
}
+ GrpcClientChannel removedChannel = grpcChannelManager.removeChannel(clientChannelInfo.getClientId());
+ log.info("remove grpc channel when client unregister. group:{}, clientChannelInfo:{}, removed:{}",
+ group, clientChannelInfo, removedChannel != null);
}
}
@@ -475,6 +454,7 @@ public class ClientActivity extends AbstractMessingActivity {
if (ChannelHelper.isRemote(channel)) {
// save settings from channel sync from other proxy
Settings settings = GrpcClientChannel.parseChannelExtendAttribute(channel);
+ log.debug("save client settings sync from other proxy. group:{}, channelInfo:{}, settings:{}", group, clientChannelInfo, settings);
if (settings == null) {
return;
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java
index 21c3395ae..b5b82fbdc 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.proxy.grpc.v2.common;
import apache.rocketmq.v2.Address;
import apache.rocketmq.v2.AddressScheme;
+import apache.rocketmq.v2.ClientType;
import apache.rocketmq.v2.CustomizedBackoff;
import apache.rocketmq.v2.Endpoints;
import apache.rocketmq.v2.ExponentialBackoff;
@@ -29,10 +30,18 @@ import com.google.protobuf.util.Durations;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
+import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.common.StartAndShutdown;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.MetricCollectorMode;
import org.apache.rocketmq.proxy.config.ProxyConfig;
@@ -43,8 +52,8 @@ import org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicy;
import org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicyType;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
-public class GrpcClientSettingsManager {
-
+public class GrpcClientSettingsManager extends ServiceThread implements StartAndShutdown {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
protected static final Map<String, Settings> CLIENT_SETTINGS_MAP = new ConcurrentHashMap<>();
private final MessagingProcessor messagingProcessor;
@@ -205,4 +214,39 @@ public class GrpcClientSettingsManager {
}
return mergeMetric(settings);
}
+
+ @Override
+ public String getServiceName() {
+ return "GrpcClientSettingsManagerCleaner";
+ }
+
+ @Override
+ public void run() {
+ while (!this.isStopped()) {
+ this.waitForRunning(TimeUnit.SECONDS.toMillis(5));
+ }
+ }
+
+ @Override
+ protected void onWaitEnd() {
+ Set<String> clientIdSet = CLIENT_SETTINGS_MAP.keySet();
+ for (String clientId : clientIdSet) {
+ try {
+ CLIENT_SETTINGS_MAP.computeIfPresent(clientId, (clientIdKey, settings) -> {
+ if (!settings.getClientType().equals(ClientType.PUSH_CONSUMER) && !settings.getClientType().equals(ClientType.SIMPLE_CONSUMER)) {
+ return settings;
+ }
+ String consumerGroup = GrpcConverter.getInstance().wrapResourceWithNamespace(settings.getSubscription().getGroup());
+ ConsumerGroupInfo consumerGroupInfo = this.messagingProcessor.getConsumerGroupInfo(consumerGroup);
+ if (consumerGroupInfo == null || consumerGroupInfo.findChannel(clientId) == null) {
+ log.info("remove unused grpc client settings. group:{}, settings:{}", consumerGroupInfo, settings);
+ return null;
+ }
+ return settings;
+ });
+ } catch (Throwable t) {
+ log.error("check expired grpc client settings failed. clientId:{}", clientId, t);
+ }
+ }
+ }
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index 9d000bfe9..5e096bc6b 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -111,6 +111,7 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
return;
}
clearGroup(new ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group));
+ log.info("clear handle of this client when client unregister. group:{}, clientChannelInfo:{}", group, clientChannelInfo);
}
}
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannel.java
index 5d9c9afcc..fb9666afc 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannel.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannel.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.proxy.processor.channel;
+import com.google.common.base.MoreObjects;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import org.apache.rocketmq.proxy.service.channel.SimpleChannel;
@@ -57,6 +58,13 @@ public class RemoteChannel extends SimpleChannel implements ChannelExtendAttribu
public int compareTo(ChannelId o) {
return this.id.compareTo(o.asLongText());
}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("id", id)
+ .toString();
+ }
}
@Override
@@ -95,4 +103,14 @@ public class RemoteChannel extends SimpleChannel implements ChannelExtendAttribu
public String getChannelExtendAttribute() {
return this.extendAttribute;
}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("channelId", id())
+ .add("type", type)
+ .add("remoteProxyIp", remoteProxyIp)
+ .add("extendAttribute", extendAttribute)
+ .toString();
+ }
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
index 62400e033..10f7fa324 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
@@ -156,6 +156,7 @@ public class ClientManagerActivity extends AbstractRemotingActivity {
if (args[0] instanceof ClientChannelInfo) {
ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0];
remotingChannelManager.removeConsumerChannel(group, clientChannelInfo.getChannel());
+ log.info("remove remoting channel when client unregister. clientChannelInfo:{}", clientChannelInfo);
}
}
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java
index 2b2cfca79..8b4832cad 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.proxy.remoting.channel;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
+import com.google.common.base.MoreObjects;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import java.time.Duration;
@@ -221,4 +222,15 @@ public class RemotingChannel extends ProxyChannel implements RemoteChannelConver
ChannelProtocolType.REMOTING,
this.getChannelExtendAttribute());
}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("parent", parent())
+ .add("clientId", clientId)
+ .add("remoteAddress", remoteAddress)
+ .add("localAddress", localAddress)
+ .add("subscriptionData", subscriptionData)
+ .toString();
+ }
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
index ce3403766..12504a2f0 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
@@ -119,11 +119,11 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer {
messageModel,
consumeFromWhere,
proxyConfig.getLocalServeAddr(),
- remoteChannel.encode(),
- remoteChannel.getChannelExtendAttribute()
+ remoteChannel.encode()
);
data.setSubscriptionDataSet(subList);
+ log.debug("sync register heart beat. topic:{}, data:{}", this.getBroadcastTopicName(), data);
this.sendSystemMessage(data);
} catch (Throwable t) {
log.error("heartbeat register broadcast failed. group:{}, clientChannelInfo:{}, consumeType:{}, messageModel:{}, consumeFromWhere:{}, subList:{}",
@@ -158,10 +158,10 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer {
null,
null,
proxyConfig.getLocalServeAddr(),
- remoteChannel.encode(),
- remoteChannel.getChannelExtendAttribute()
+ remoteChannel.encode()
);
+ log.debug("sync unregister heart beat. topic:{}, data:{}", this.getBroadcastTopicName(), data);
this.sendSystemMessage(data);
} catch (Throwable t) {
log.error("heartbeat unregister broadcast failed. group:{}, clientChannelInfo:{}, consumeType:{}",
@@ -187,16 +187,16 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer {
continue;
}
- RemoteChannel channel = RemoteChannel.decode(data.getChannelData());
- RemoteChannel finalChannel = channel;
- channel = remoteChannelMap.computeIfAbsent(channel.id().asLongText(), key -> finalChannel);
- channel.setExtendAttribute(data.getChannelExtendAttribute());
+ RemoteChannel decodedChannel = RemoteChannel.decode(data.getChannelData());
+ RemoteChannel channel = remoteChannelMap.computeIfAbsent(decodedChannel.id().asLongText(), key -> decodedChannel);
+ channel.setExtendAttribute(decodedChannel.getChannelExtendAttribute());
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
channel,
data.getClientId(),
data.getLanguage(),
data.getVersion()
);
+ log.debug("start process remote channel. data:{}, clientChannelInfo:{}", data, clientChannelInfo);
if (data.getHeartbeatType().equals(HeartbeatType.REGISTER)) {
this.consumerManager.registerConsumer(
data.getGroup(),
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java
index 20fee7aac..f3b96ac9a 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.proxy.service.sysmessage;
+import com.google.common.base.MoreObjects;
import java.util.Set;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
@@ -37,13 +38,15 @@ public class HeartbeatSyncerData {
private ConsumeFromWhere consumeFromWhere;
private String connectProxyIp;
private String channelData;
- private String channelExtendAttribute;
+
+ public HeartbeatSyncerData() {
+ }
public HeartbeatSyncerData(HeartbeatType heartbeatType, String clientId,
LanguageCode language, int version, String group,
ConsumeType consumeType, MessageModel messageModel,
ConsumeFromWhere consumeFromWhere, String connectProxyIp,
- String channelData, String channelExtendAttribute) {
+ String channelData) {
this.heartbeatType = heartbeatType;
this.clientId = clientId;
this.language = language;
@@ -54,7 +57,6 @@ public class HeartbeatSyncerData {
this.consumeFromWhere = consumeFromWhere;
this.connectProxyIp = connectProxyIp;
this.channelData = channelData;
- this.channelExtendAttribute = channelExtendAttribute;
}
public HeartbeatType getHeartbeatType() {
@@ -154,11 +156,21 @@ public class HeartbeatSyncerData {
this.channelData = channelData;
}
- public String getChannelExtendAttribute() {
- return channelExtendAttribute;
- }
-
- public void setChannelExtendAttribute(String channelExtendAttribute) {
- this.channelExtendAttribute = channelExtendAttribute;
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("heartbeatType", heartbeatType)
+ .add("clientId", clientId)
+ .add("language", language)
+ .add("version", version)
+ .add("lastUpdateTimestamp", lastUpdateTimestamp)
+ .add("subscriptionDataSet", subscriptionDataSet)
+ .add("group", group)
+ .add("consumeType", consumeType)
+ .add("messageModel", messageModel)
+ .add("consumeFromWhere", consumeFromWhere)
+ .add("connectProxyIp", connectProxyIp)
+ .add("channelData", channelData)
+ .toString();
}
}