You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/09 11:53:53 UTC

[rocketmq] 12/26: [ISSUE #5485] polish channel management which is been synced from other proxy

This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 6351e1957ff6ac39e0e17c899fd0f82ec4ab600f
Author: kaiyi.lk <ka...@alibaba-inc.com>
AuthorDate: Fri Nov 11 17:53:00 2022 +0800

    [ISSUE #5485] polish channel management which is been synced from other proxy
---
 .../proxy/grpc/v2/DefaultGrpcMessingActivity.java  |  1 +
 .../proxy/grpc/v2/client/ClientActivity.java       | 30 +++-----------
 .../grpc/v2/common/GrpcClientSettingsManager.java  | 48 +++++++++++++++++++++-
 .../proxy/processor/ReceiptHandleProcessor.java    |  1 +
 .../proxy/processor/channel/RemoteChannel.java     | 18 ++++++++
 .../remoting/activity/ClientManagerActivity.java   |  1 +
 .../proxy/remoting/channel/RemotingChannel.java    | 12 ++++++
 .../proxy/service/sysmessage/HeartbeatSyncer.java  | 16 ++++----
 .../service/sysmessage/HeartbeatSyncerData.java    | 30 ++++++++++----
 9 files changed, 113 insertions(+), 44 deletions(-)

diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java
index f30519d74..194b9204f 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java
@@ -91,6 +91,7 @@ public class DefaultGrpcMessingActivity extends AbstractStartAndShutdown impleme
         this.clientActivity = new ClientActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager);
 
         this.appendStartAndShutdown(this.receiptHandleProcessor);
+        this.appendStartAndShutdown(this.grpcClientSettingsManager);
     }
 
     @Override
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
index 00cc862a4..de8fba4a6 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
@@ -40,7 +40,6 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
-import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
 import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
 import org.apache.rocketmq.broker.client.ProducerChangeListener;
 import org.apache.rocketmq.broker.client.ProducerGroupEvent;
@@ -436,32 +435,12 @@ public class ClientActivity extends AbstractMessingActivity {
             }
             if (args[0] instanceof ClientChannelInfo) {
                 ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0];
-                String clientId = clientChannelInfo.getClientId();
                 if (ChannelHelper.isRemote(clientChannelInfo.getChannel())) {
-                    grpcClientSettingsManager.computeIfPresent(clientId, orgSettings -> {
-                        if (grpcChannelManager.getChannel(clientId) == null) {
-                            // if there is no channel connect directly to this proxy
-                            return null;
-                        }
-                        return orgSettings;
-                    });
-                } else {
-                    grpcChannelManager.removeChannel(clientId);
-                    grpcClientSettingsManager.computeIfPresent(clientId, orgSettings -> {
-                        ConsumerGroupInfo consumerGroupInfo = messagingProcessor.getConsumerGroupInfo(group);
-                        if (consumerGroupInfo == null) {
-                            return null;
-                        }
-                        List<Channel> allChannels = consumerGroupInfo.getAllChannel();
-                        if (allChannels == null || allChannels.isEmpty() || allChannels.size() == 1) {
-                            // if there is only one channel of this clientId or
-                            // there is no channel if this clientId
-                            // remove settings of this client
-                            return null;
-                        }
-                        return orgSettings;
-                    });
+                    return;
                 }
+                GrpcClientChannel removedChannel = grpcChannelManager.removeChannel(clientChannelInfo.getClientId());
+                log.info("remove grpc channel when client unregister. group:{}, clientChannelInfo:{}, removed:{}",
+                    group, clientChannelInfo, removedChannel != null);
             }
         }
 
@@ -475,6 +454,7 @@ public class ClientActivity extends AbstractMessingActivity {
                 if (ChannelHelper.isRemote(channel)) {
                     // save settings from channel sync from other proxy
                     Settings settings = GrpcClientChannel.parseChannelExtendAttribute(channel);
+                    log.debug("save client settings sync from other proxy. group:{}, channelInfo:{}, settings:{}", group, clientChannelInfo, settings);
                     if (settings == null) {
                         return;
                     }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java
index 21c3395ae..b5b82fbdc 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.proxy.grpc.v2.common;
 
 import apache.rocketmq.v2.Address;
 import apache.rocketmq.v2.AddressScheme;
+import apache.rocketmq.v2.ClientType;
 import apache.rocketmq.v2.CustomizedBackoff;
 import apache.rocketmq.v2.Endpoints;
 import apache.rocketmq.v2.ExponentialBackoff;
@@ -29,10 +30,18 @@ import com.google.protobuf.util.Durations;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.common.StartAndShutdown;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.config.MetricCollectorMode;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
@@ -43,8 +52,8 @@ import org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicy;
 import org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicyType;
 import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 
-public class GrpcClientSettingsManager {
-
+public class GrpcClientSettingsManager extends ServiceThread implements StartAndShutdown {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
     protected static final Map<String, Settings> CLIENT_SETTINGS_MAP = new ConcurrentHashMap<>();
 
     private final MessagingProcessor messagingProcessor;
@@ -205,4 +214,39 @@ public class GrpcClientSettingsManager {
         }
         return mergeMetric(settings);
     }
+
+    @Override
+    public String getServiceName() {
+        return "GrpcClientSettingsManagerCleaner";
+    }
+
+    @Override
+    public void run() {
+        while (!this.isStopped()) {
+            this.waitForRunning(TimeUnit.SECONDS.toMillis(5));
+        }
+    }
+
+    @Override
+    protected void onWaitEnd() {
+        Set<String> clientIdSet = CLIENT_SETTINGS_MAP.keySet();
+        for (String clientId : clientIdSet) {
+            try {
+                CLIENT_SETTINGS_MAP.computeIfPresent(clientId, (clientIdKey, settings) -> {
+                    if (!settings.getClientType().equals(ClientType.PUSH_CONSUMER) && !settings.getClientType().equals(ClientType.SIMPLE_CONSUMER)) {
+                        return settings;
+                    }
+                    String consumerGroup = GrpcConverter.getInstance().wrapResourceWithNamespace(settings.getSubscription().getGroup());
+                    ConsumerGroupInfo consumerGroupInfo = this.messagingProcessor.getConsumerGroupInfo(consumerGroup);
+                    if (consumerGroupInfo == null || consumerGroupInfo.findChannel(clientId) == null) {
+                        log.info("remove unused grpc client settings. group:{}, settings:{}", consumerGroupInfo, settings);
+                        return null;
+                    }
+                    return settings;
+                });
+            } catch (Throwable t) {
+                log.error("check expired grpc client settings failed. clientId:{}", clientId, t);
+            }
+        }
+    }
 }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index 9d000bfe9..5e096bc6b 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -111,6 +111,7 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
                             return;
                         }
                         clearGroup(new ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group));
+                        log.info("clear handle of this client when client unregister. group:{}, clientChannelInfo:{}", group, clientChannelInfo);
                     }
                 }
             }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannel.java
index 5d9c9afcc..fb9666afc 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannel.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannel.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.proxy.processor.channel;
 
+import com.google.common.base.MoreObjects;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelId;
 import org.apache.rocketmq.proxy.service.channel.SimpleChannel;
@@ -57,6 +58,13 @@ public class RemoteChannel extends SimpleChannel implements ChannelExtendAttribu
         public int compareTo(ChannelId o) {
             return this.id.compareTo(o.asLongText());
         }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this)
+                .add("id", id)
+                .toString();
+        }
     }
 
     @Override
@@ -95,4 +103,14 @@ public class RemoteChannel extends SimpleChannel implements ChannelExtendAttribu
     public String getChannelExtendAttribute() {
         return this.extendAttribute;
     }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("channelId", id())
+            .add("type", type)
+            .add("remoteProxyIp", remoteProxyIp)
+            .add("extendAttribute", extendAttribute)
+            .toString();
+    }
 }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
index 62400e033..10f7fa324 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
@@ -156,6 +156,7 @@ public class ClientManagerActivity extends AbstractRemotingActivity {
                 if (args[0] instanceof ClientChannelInfo) {
                     ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0];
                     remotingChannelManager.removeConsumerChannel(group, clientChannelInfo.getChannel());
+                    log.info("remove remoting channel when client unregister. clientChannelInfo:{}", clientChannelInfo);
                 }
             }
         }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java
index 2b2cfca79..8b4832cad 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.proxy.remoting.channel;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.TypeReference;
+import com.google.common.base.MoreObjects;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFutureListener;
 import java.time.Duration;
@@ -221,4 +222,15 @@ public class RemotingChannel extends ProxyChannel implements RemoteChannelConver
             ChannelProtocolType.REMOTING,
             this.getChannelExtendAttribute());
     }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("parent", parent())
+            .add("clientId", clientId)
+            .add("remoteAddress", remoteAddress)
+            .add("localAddress", localAddress)
+            .add("subscriptionData", subscriptionData)
+            .toString();
+    }
 }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
index ce3403766..12504a2f0 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
@@ -119,11 +119,11 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer {
                         messageModel,
                         consumeFromWhere,
                         proxyConfig.getLocalServeAddr(),
-                        remoteChannel.encode(),
-                        remoteChannel.getChannelExtendAttribute()
+                        remoteChannel.encode()
                     );
                     data.setSubscriptionDataSet(subList);
 
+                    log.debug("sync register heart beat. topic:{}, data:{}", this.getBroadcastTopicName(), data);
                     this.sendSystemMessage(data);
                 } catch (Throwable t) {
                     log.error("heartbeat register broadcast failed. group:{}, clientChannelInfo:{}, consumeType:{}, messageModel:{}, consumeFromWhere:{}, subList:{}",
@@ -158,10 +158,10 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer {
                         null,
                         null,
                         proxyConfig.getLocalServeAddr(),
-                        remoteChannel.encode(),
-                        remoteChannel.getChannelExtendAttribute()
+                        remoteChannel.encode()
                     );
 
+                    log.debug("sync unregister heart beat. topic:{}, data:{}", this.getBroadcastTopicName(), data);
                     this.sendSystemMessage(data);
                 } catch (Throwable t) {
                     log.error("heartbeat unregister broadcast failed. group:{}, clientChannelInfo:{}, consumeType:{}",
@@ -187,16 +187,16 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer {
                     continue;
                 }
 
-                RemoteChannel channel = RemoteChannel.decode(data.getChannelData());
-                RemoteChannel finalChannel = channel;
-                channel = remoteChannelMap.computeIfAbsent(channel.id().asLongText(), key -> finalChannel);
-                channel.setExtendAttribute(data.getChannelExtendAttribute());
+                RemoteChannel decodedChannel = RemoteChannel.decode(data.getChannelData());
+                RemoteChannel channel = remoteChannelMap.computeIfAbsent(decodedChannel.id().asLongText(), key -> decodedChannel);
+                channel.setExtendAttribute(decodedChannel.getChannelExtendAttribute());
                 ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
                     channel,
                     data.getClientId(),
                     data.getLanguage(),
                     data.getVersion()
                 );
+                log.debug("start process remote channel. data:{}, clientChannelInfo:{}", data, clientChannelInfo);
                 if (data.getHeartbeatType().equals(HeartbeatType.REGISTER)) {
                     this.consumerManager.registerConsumer(
                         data.getGroup(),
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java
index 20fee7aac..f3b96ac9a 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.proxy.service.sysmessage;
 
+import com.google.common.base.MoreObjects;
 import java.util.Set;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
@@ -37,13 +38,15 @@ public class HeartbeatSyncerData {
     private ConsumeFromWhere consumeFromWhere;
     private String connectProxyIp;
     private String channelData;
-    private String channelExtendAttribute;
+
+    public HeartbeatSyncerData() {
+    }
 
     public HeartbeatSyncerData(HeartbeatType heartbeatType, String clientId,
         LanguageCode language, int version, String group,
         ConsumeType consumeType, MessageModel messageModel,
         ConsumeFromWhere consumeFromWhere, String connectProxyIp,
-        String channelData, String channelExtendAttribute) {
+        String channelData) {
         this.heartbeatType = heartbeatType;
         this.clientId = clientId;
         this.language = language;
@@ -54,7 +57,6 @@ public class HeartbeatSyncerData {
         this.consumeFromWhere = consumeFromWhere;
         this.connectProxyIp = connectProxyIp;
         this.channelData = channelData;
-        this.channelExtendAttribute = channelExtendAttribute;
     }
 
     public HeartbeatType getHeartbeatType() {
@@ -154,11 +156,21 @@ public class HeartbeatSyncerData {
         this.channelData = channelData;
     }
 
-    public String getChannelExtendAttribute() {
-        return channelExtendAttribute;
-    }
-
-    public void setChannelExtendAttribute(String channelExtendAttribute) {
-        this.channelExtendAttribute = channelExtendAttribute;
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("heartbeatType", heartbeatType)
+            .add("clientId", clientId)
+            .add("language", language)
+            .add("version", version)
+            .add("lastUpdateTimestamp", lastUpdateTimestamp)
+            .add("subscriptionDataSet", subscriptionDataSet)
+            .add("group", group)
+            .add("consumeType", consumeType)
+            .add("messageModel", messageModel)
+            .add("consumeFromWhere", consumeFromWhere)
+            .add("connectProxyIp", connectProxyIp)
+            .add("channelData", channelData)
+            .toString();
     }
 }