You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/12/07 09:26:21 UTC

[rocketmq] branch develop updated: [ISSUSE #5589] record subscription from request (#5590)

This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7cab0a1aa [ISSUSE #5589] record subscription from request (#5590)
7cab0a1aa is described below

commit 7cab0a1aadc6336e95e2152fab8d188d33a43b78
Author: SSpirits <ad...@lv5.moe>
AuthorDate: Wed Dec 7 17:26:00 2022 +0800

    [ISSUSE #5589] record subscription from request (#5590)
---
 .../apache/rocketmq/broker/BrokerController.java   | 10 +--
 .../rocketmq/broker/client/ConsumerGroupInfo.java  |  4 ++
 .../rocketmq/broker/client/ConsumerManager.java    | 82 ++++++++++++++++++++--
 .../broker/processor/AdminBrokerProcessor.java     |  4 +-
 .../broker/processor/PopMessageProcessor.java      | 13 ++++
 .../broker/processor/PullMessageProcessor.java     | 18 ++++-
 .../broker/client/ConsumerManagerScannerTest.java  |  2 +-
 .../broker/client/ConsumerManagerTest.java         | 67 ++++++++++++++++--
 .../org/apache/rocketmq/common/BrokerConfig.java   | 19 +++++
 .../apache/rocketmq/proxy/config/ProxyConfig.java  | 12 +++-
 .../proxy/service/ClusterServiceManager.java       |  4 +-
 .../rocketmq/remoting/protocol/RequestSource.java  |  7 ++
 .../remoting/protocol/heartbeat/ConsumeType.java   |  4 +-
 .../remoting/protocol/RequestSourceTest.java       | 44 ++++++++++++
 14 files changed, 264 insertions(+), 26 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 0a5df7cb0..b584e8769 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -315,7 +315,7 @@ public class BrokerController {
         this.replyMessageProcessor = new ReplyMessageProcessor(this);
         this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService, this.popMessageProcessor, this.notificationProcessor);
         this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
-        this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener, this.brokerStatsManager);
+        this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener, this.brokerStatsManager, this.brokerConfig);
         this.producerManager = new ProducerManager(this.brokerStatsManager);
         this.consumerFilterManager = new ConsumerFilterManager(this);
         this.consumerOrderInfoManager = new ConsumerOrderInfoManager(this);
@@ -932,18 +932,18 @@ public class BrokerController {
             LOG.info("The broker dose not enable acl");
             return;
         }
-    
+
         List<AccessValidator> accessValidators = ServiceProvider.load(AccessValidator.class);
         if (accessValidators.isEmpty()) {
             LOG.info("ServiceProvider loaded no AccessValidator, using default org.apache.rocketmq.acl.plain.PlainAccessValidator");
             accessValidators.add(new PlainAccessValidator());
         }
-    
+
         for (AccessValidator accessValidator : accessValidators) {
             final AccessValidator validator = accessValidator;
             accessValidatorMap.put(validator.getClass(), validator);
             this.registerServerRPCHook(new RPCHook() {
-            
+
                 @Override
                 public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
                     //Do not catch the exception
@@ -959,7 +959,7 @@ public class BrokerController {
     }
 
     private void initialRpcHooks() {
-    
+
         List<RPCHook> rpcHooks = ServiceProvider.load(RPCHook.class);
         if (rpcHooks == null || rpcHooks.isEmpty()) {
             return;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
index f75c369b6..867b9c720 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
@@ -52,6 +52,10 @@ public class ConsumerGroupInfo {
         this.consumeFromWhere = consumeFromWhere;
     }
 
+    public ConsumerGroupInfo(String groupName) {
+        this.groupName = groupName;
+    }
+
     public ClientChannelInfo findChannel(final String clientId) {
         Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator();
         while (it.hasNext()) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
index 1201037b6..5f95ac1af 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.broker.client;
 
 import io.netty.channel.Channel;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -26,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.stream.Collectors;
+import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -38,21 +40,28 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager;
 
 public class ConsumerManager {
     private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-    private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
     private final ConcurrentMap<String, ConsumerGroupInfo> consumerTable =
         new ConcurrentHashMap<>(1024);
+    private final ConcurrentMap<String, ConsumerGroupInfo> consumerCompensationTable =
+        new ConcurrentHashMap<>(1024);
     private final List<ConsumerIdsChangeListener> consumerIdsChangeListenerList = new CopyOnWriteArrayList<>();
     protected final BrokerStatsManager brokerStatsManager;
+    private final long channelExpiredTimeout;
+    private final long subscriptionExpiredTimeout;
 
-    public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener) {
+    public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener, long expiredTimeout) {
         this.consumerIdsChangeListenerList.add(consumerIdsChangeListener);
         this.brokerStatsManager = null;
+        this.channelExpiredTimeout = expiredTimeout;
+        this.subscriptionExpiredTimeout = expiredTimeout;
     }
 
     public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener,
-        final BrokerStatsManager brokerStatsManager) {
+        final BrokerStatsManager brokerStatsManager, BrokerConfig brokerConfig) {
         this.consumerIdsChangeListenerList.add(consumerIdsChangeListener);
         this.brokerStatsManager = brokerStatsManager;
+        this.channelExpiredTimeout = brokerConfig.getChannelExpiredTimeout();
+        this.subscriptionExpiredTimeout = brokerConfig.getSubscriptionExpiredTimeout();
     }
 
     public ClientChannelInfo findChannel(final String group, final String clientId) {
@@ -72,11 +81,25 @@ public class ConsumerManager {
     }
 
     public SubscriptionData findSubscriptionData(final String group, final String topic) {
-        ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);
+        return findSubscriptionData(group, topic, true);
+    }
+
+    public SubscriptionData findSubscriptionData(final String group, final String topic,
+        boolean fromCompensationTable) {
+        ConsumerGroupInfo consumerGroupInfo = getConsumerGroupInfo(group, false);
         if (consumerGroupInfo != null) {
-            return consumerGroupInfo.findSubscriptionData(topic);
+            SubscriptionData subscriptionData = consumerGroupInfo.findSubscriptionData(topic);
+            if (subscriptionData != null) {
+                return subscriptionData;
+            }
         }
 
+        if (fromCompensationTable) {
+            ConsumerGroupInfo consumerGroupCompensationInfo = consumerCompensationTable.get(group);
+            if (consumerGroupCompensationInfo != null) {
+                return consumerGroupCompensationInfo.findSubscriptionData(topic);
+            }
+        }
         return null;
     }
 
@@ -85,7 +108,15 @@ public class ConsumerManager {
     }
 
     public ConsumerGroupInfo getConsumerGroupInfo(final String group) {
-        return this.consumerTable.get(group);
+        return getConsumerGroupInfo(group, false);
+    }
+
+    public ConsumerGroupInfo getConsumerGroupInfo(String group, boolean fromCompensationTable) {
+        ConsumerGroupInfo consumerGroupInfo = consumerTable.get(group);
+        if (consumerGroupInfo == null && fromCompensationTable) {
+            consumerGroupInfo = consumerCompensationTable.get(group);
+        }
+        return consumerGroupInfo;
     }
 
     public int findSubscriptionDataCount(final String group) {
@@ -121,6 +152,19 @@ public class ConsumerManager {
         return removed;
     }
 
+    // compensate consumer info for consumer without heartbeat
+    public void compensateBasicConsumerInfo(String group, ConsumeType consumeType, MessageModel messageModel) {
+        ConsumerGroupInfo consumerGroupInfo = consumerCompensationTable.computeIfAbsent(group, ConsumerGroupInfo::new);
+        consumerGroupInfo.setConsumeType(consumeType);
+        consumerGroupInfo.setMessageModel(messageModel);
+    }
+
+    // compensate subscription for pull consumer and consumer via proxy
+    public void compensateSubscribeData(String group, String topic, SubscriptionData subscriptionData) {
+        ConsumerGroupInfo consumerGroupInfo = consumerCompensationTable.computeIfAbsent(group, ConsumerGroupInfo::new);
+        consumerGroupInfo.getSubscriptionTable().put(topic, subscriptionData);
+    }
+
     public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
         ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
         final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
@@ -185,6 +229,29 @@ public class ConsumerManager {
         }
     }
 
+    public void removeExpireConsumerGroupInfo() {
+        List<String> removeList = new ArrayList<>();
+        consumerCompensationTable.forEach((group, consumerGroupInfo) -> {
+            List<String> removeTopicList = new ArrayList<>();
+            ConcurrentMap<String, SubscriptionData> subscriptionTable = consumerGroupInfo.getSubscriptionTable();
+            subscriptionTable.forEach((topic, subscriptionData) -> {
+                long diff = System.currentTimeMillis() - subscriptionData.getSubVersion();
+                if (diff > subscriptionExpiredTimeout) {
+                    removeTopicList.add(topic);
+                }
+            });
+            for (String topic : removeTopicList) {
+                subscriptionTable.remove(topic);
+                if (subscriptionTable.isEmpty()) {
+                    removeList.add(group);
+                }
+            }
+        });
+        for (String group : removeList) {
+            consumerCompensationTable.remove(group);
+        }
+    }
+
     public void scanNotActiveChannel() {
         Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
         while (it.hasNext()) {
@@ -199,7 +266,7 @@ public class ConsumerManager {
                 Entry<Channel, ClientChannelInfo> nextChannel = itChannel.next();
                 ClientChannelInfo clientChannelInfo = nextChannel.getValue();
                 long diff = System.currentTimeMillis() - clientChannelInfo.getLastUpdateTimestamp();
-                if (diff > CHANNEL_EXPIRED_TIMEOUT) {
+                if (diff > channelExpiredTimeout) {
                     LOGGER.warn(
                         "SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}",
                         RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel()), group);
@@ -216,6 +283,7 @@ public class ConsumerManager {
                 it.remove();
             }
         }
+        removeExpireConsumerGroupInfo();
     }
 
     public HashSet<String> queryTopicConsumeByWho(final String topic) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index ad86ab34a..12eab475b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -1836,8 +1836,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         QuerySubscriptionByConsumerRequestHeader requestHeader =
             (QuerySubscriptionByConsumerRequestHeader) request.decodeCommandCustomHeader(QuerySubscriptionByConsumerRequestHeader.class);
 
-        SubscriptionData subscriptionData =
-            this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getGroup(), requestHeader.getTopic());
+        SubscriptionData subscriptionData = this.brokerController.getConsumerManager()
+            .findSubscriptionData(requestHeader.getGroup(), requestHeader.getTopic());
 
         QuerySubscriptionResponseBody responseBody = new QuerySubscriptionResponseBody();
         responseBody.setGroup(requestHeader.getGroup());
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 393631e45..5bb81df5a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -70,6 +70,8 @@ import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
 import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
 import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.PopMessageResponseHeader;
+import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.store.GetMessageResult;
@@ -262,6 +264,9 @@ public class PopMessageProcessor implements NettyRequestProcessor {
             orderCountInfo = new StringBuilder(64);
         }
 
+        brokerController.getConsumerManager().compensateBasicConsumerInfo(requestHeader.getConsumerGroup(),
+            ConsumeType.CONSUME_POP, MessageModel.CLUSTERING);
+
         response.setOpaque(request.getOpaque());
 
         if (brokerController.getBrokerConfig().isEnablePopLog()) {
@@ -333,6 +338,14 @@ public class PopMessageProcessor implements NettyRequestProcessor {
         if (requestHeader.getExp() != null && requestHeader.getExp().length() > 0) {
             try {
                 SubscriptionData subscriptionData = FilterAPI.build(requestHeader.getTopic(), requestHeader.getExp(), requestHeader.getExpType());
+                brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(),
+                    requestHeader.getTopic(), subscriptionData);
+
+                String retryTopic = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup());
+                SubscriptionData retrySubscriptionData = FilterAPI.build(retryTopic, SubscriptionData.SUB_ALL, requestHeader.getExpType());
+                brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(),
+                    requestHeader.getTopic(), retrySubscriptionData);
+
                 ConsumerFilterData consumerFilterData = null;
                 if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
                     consumerFilterData = ConsumerFilterManager.build(
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index e1294c129..562c15275 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -23,14 +23,15 @@ import java.util.Objects;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.broker.client.ConsumerManager;
 import org.apache.rocketmq.broker.filter.ConsumerFilterData;
 import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
 import org.apache.rocketmq.broker.filter.ExpressionForRetryMessageFilter;
 import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
-import org.apache.rocketmq.common.AbortProcessException;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
 import org.apache.rocketmq.broker.plugin.PullMessageResultHandler;
+import org.apache.rocketmq.common.AbortProcessException;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -369,6 +370,19 @@ public class PullMessageProcessor implements NettyRequestProcessor {
             return response;
         }
 
+        ConsumerManager consumerManager = brokerController.getConsumerManager();
+        switch (RequestSource.parseInteger(requestHeader.getRequestSource())) {
+            case PROXY_FOR_BROADCAST:
+                consumerManager.compensateBasicConsumerInfo(requestHeader.getConsumerGroup(), ConsumeType.CONSUME_PASSIVELY, MessageModel.BROADCASTING);
+                break;
+            case PROXY_FOR_STREAM:
+                consumerManager.compensateBasicConsumerInfo(requestHeader.getConsumerGroup(), ConsumeType.CONSUME_ACTIVELY, MessageModel.CLUSTERING);
+                break;
+            default:
+                consumerManager.compensateBasicConsumerInfo(requestHeader.getConsumerGroup(), ConsumeType.CONSUME_PASSIVELY, MessageModel.CLUSTERING);
+                break;
+        }
+
         SubscriptionData subscriptionData = null;
         ConsumerFilterData consumerFilterData = null;
         if (hasSubscriptionFlag) {
@@ -376,6 +390,8 @@ public class PullMessageProcessor implements NettyRequestProcessor {
                 subscriptionData = FilterAPI.build(
                     requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType()
                 );
+                consumerManager.compensateSubscribeData(requestHeader.getConsumerGroup(), requestHeader.getTopic(), subscriptionData);
+
                 if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
                     consumerFilterData = ConsumerFilterManager.build(
                         requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(),
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerScannerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerScannerTest.java
index 40059d579..d190c0dac 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerScannerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerScannerTest.java
@@ -68,7 +68,7 @@ public class ConsumerManagerScannerTest {
             public void shutdown() {
 
             }
-        });
+        }, 1000 * 120);
     }
 
     private static class ConsumerIdsChangeListenerData {
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java
index 620be39ec..8c9098243 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java
@@ -63,7 +63,6 @@ public class ConsumerManagerTest {
 
     private BrokerStatsManager brokerStatsManager;
 
-
     private static final String GROUP = "DEFAULT_GROUP";
 
     private static final String CLIENT_ID = "1";
@@ -77,16 +76,43 @@ public class ConsumerManagerTest {
         clientChannelInfo = new ClientChannelInfo(channel, CLIENT_ID, LanguageCode.JAVA, VERSION);
         defaultConsumerIdsChangeListener = new DefaultConsumerIdsChangeListener(brokerController);
         brokerStatsManager = new BrokerStatsManager(brokerConfig);
-        consumerManager = new ConsumerManager(defaultConsumerIdsChangeListener, brokerStatsManager);
+        consumerManager = new ConsumerManager(defaultConsumerIdsChangeListener, brokerStatsManager, brokerConfig);
         broker2Client = new Broker2Client(brokerController);
         when(brokerController.getConsumerFilterManager()).thenReturn(consumerFilterManager);
         when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
         when(brokerController.getBroker2Client()).thenReturn(broker2Client);
-        register();
+    }
+
+    @Test
+    public void compensateBasicConsumerInfoTest() {
+        ConsumerGroupInfo consumerGroupInfo = consumerManager.getConsumerGroupInfo(GROUP, true);
+        Assertions.assertThat(consumerGroupInfo).isNull();
+
+        consumerManager.compensateBasicConsumerInfo(GROUP, ConsumeType.CONSUME_ACTIVELY, MessageModel.BROADCASTING);
+        consumerGroupInfo = consumerManager.getConsumerGroupInfo(GROUP, true);
+        Assertions.assertThat(consumerGroupInfo).isNotNull();
+        Assertions.assertThat(consumerGroupInfo.getConsumeType()).isEqualTo(ConsumeType.CONSUME_ACTIVELY);
+        Assertions.assertThat(consumerGroupInfo.getMessageModel()).isEqualTo(MessageModel.BROADCASTING);
+    }
+
+    @Test
+    public void compensateSubscribeDataTest() {
+        ConsumerGroupInfo consumerGroupInfo = consumerManager.getConsumerGroupInfo(GROUP, true);
+        Assertions.assertThat(consumerGroupInfo).isNull();
+
+        consumerManager.compensateSubscribeData(GROUP, TOPIC, new SubscriptionData(TOPIC, SubscriptionData.SUB_ALL));
+        consumerGroupInfo = consumerManager.getConsumerGroupInfo(GROUP, true);
+        Assertions.assertThat(consumerGroupInfo).isNotNull();
+        Assertions.assertThat(consumerGroupInfo.getSubscriptionTable().size()).isEqualTo(1);
+        SubscriptionData subscriptionData = consumerGroupInfo.getSubscriptionTable().get(TOPIC);
+        Assertions.assertThat(subscriptionData).isNotNull();
+        Assertions.assertThat(subscriptionData.getTopic()).isEqualTo(TOPIC);
+        Assertions.assertThat(subscriptionData.getSubString()).isEqualTo(SubscriptionData.SUB_ALL);
     }
 
     @Test
     public void registerConsumerTest() {
+        register();
         final Set<SubscriptionData> subList = new HashSet<>();
         SubscriptionData subscriptionData = new SubscriptionData(TOPIC, "*");
         subList.add(subscriptionData);
@@ -107,32 +133,50 @@ public class ConsumerManagerTest {
 
     @Test
     public void findChannelTest() {
-
+        register();
         final ClientChannelInfo consumerManagerChannel = consumerManager.findChannel(GROUP, CLIENT_ID);
         Assertions.assertThat(consumerManagerChannel).isNotNull();
     }
 
     @Test
     public void findSubscriptionDataTest() {
+        register();
         final SubscriptionData subscriptionData = consumerManager.findSubscriptionData(GROUP, TOPIC);
         Assertions.assertThat(subscriptionData).isNotNull();
     }
 
     @Test
     public void findSubscriptionDataCountTest() {
+        register();
         final int count = consumerManager.findSubscriptionDataCount(GROUP);
         assert count > 0;
     }
 
+    @Test
+    public void findSubscriptionTest() {
+        SubscriptionData subscriptionData = consumerManager.findSubscriptionData(GROUP, TOPIC, true);
+        Assertions.assertThat(subscriptionData).isNull();
+
+        consumerManager.compensateSubscribeData(GROUP, TOPIC, new SubscriptionData(TOPIC, SubscriptionData.SUB_ALL));
+        subscriptionData = consumerManager.findSubscriptionData(GROUP, TOPIC, true);
+        Assertions.assertThat(subscriptionData).isNotNull();
+        Assertions.assertThat(subscriptionData.getTopic()).isEqualTo(TOPIC);
+        Assertions.assertThat(subscriptionData.getSubString()).isEqualTo(SubscriptionData.SUB_ALL);
+
+        subscriptionData = consumerManager.findSubscriptionData(GROUP, TOPIC, false);
+        Assertions.assertThat(subscriptionData).isNull();
+    }
+
     @Test
     public void scanNotActiveChannelTest() {
-        clientChannelInfo.setLastUpdateTimestamp(System.currentTimeMillis() - 1000 * 200);
+        clientChannelInfo.setLastUpdateTimestamp(System.currentTimeMillis() - brokerConfig.getChannelExpiredTimeout() * 2);
         consumerManager.scanNotActiveChannel();
-        assert consumerManager.getConsumerTable().size() == 0;
+        Assertions.assertThat(consumerManager.getConsumerTable().size()).isEqualTo(0);
     }
 
     @Test
     public void queryTopicConsumeByWhoTest() {
+        register();
         final HashSet<String> consumeGroup = consumerManager.queryTopicConsumeByWho(TOPIC);
         assert consumeGroup.size() > 0;
     }
@@ -152,4 +196,15 @@ public class ConsumerManagerTest {
             MessageModel.BROADCASTING, ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET, subList, true);
     }
 
+    @Test
+    public void removeExpireConsumerGroupInfo() {
+        SubscriptionData subscriptionData = new SubscriptionData(TOPIC, SubscriptionData.SUB_ALL);
+        subscriptionData.setSubVersion(System.currentTimeMillis() - brokerConfig.getSubscriptionExpiredTimeout() * 2);
+        consumerManager.compensateSubscribeData(GROUP, TOPIC, subscriptionData);
+        consumerManager.compensateSubscribeData(GROUP, TOPIC + "_1", new SubscriptionData(TOPIC, SubscriptionData.SUB_ALL));
+        consumerManager.removeExpireConsumerGroupInfo();
+        Assertions.assertThat(consumerManager.getConsumerGroupInfo(GROUP, true)).isNotNull();
+        Assertions.assertThat(consumerManager.findSubscriptionData(GROUP, TOPIC)).isNull();
+        Assertions.assertThat(consumerManager.findSubscriptionData(GROUP, TOPIC + "_1")).isNotNull();
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 81531e3f1..1dee9101b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -387,6 +387,9 @@ public class BrokerConfig extends BrokerIdentity {
 
     private boolean metricsInDelta = false;
 
+    private long channelExpiredTimeout = 1000 * 120;
+    private long subscriptionExpiredTimeout = 1000 * 60 * 10;
+
     /**
      * Estimate accumulation or not when subscription filter type is tag and is not SUB_ALL.
      */
@@ -1592,6 +1595,22 @@ public class BrokerConfig extends BrokerIdentity {
         this.transactionOpBatchInterval = transactionOpBatchInterval;
     }
 
+    public long getChannelExpiredTimeout() {
+        return channelExpiredTimeout;
+    }
+
+    public void setChannelExpiredTimeout(long channelExpiredTimeout) {
+        this.channelExpiredTimeout = channelExpiredTimeout;
+    }
+
+    public long getSubscriptionExpiredTimeout() {
+        return subscriptionExpiredTimeout;
+    }
+
+    public void setSubscriptionExpiredTimeout(long subscriptionExpiredTimeout) {
+        this.subscriptionExpiredTimeout = subscriptionExpiredTimeout;
+    }
+
     public boolean isValidateSystemTopicWhenUpdateTopic() {
         return validateSystemTopicWhenUpdateTopic;
     }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index d01458de0..6bb488984 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -31,9 +31,9 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.proxy.ProxyMode;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.proxy.ProxyMode;
 
 public class ProxyConfig implements ConfigFile {
     private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
@@ -193,6 +193,8 @@ public class ProxyConfig implements ConfigFile {
 
     private boolean metricsInDelta = false;
 
+    private long channelExpiredTimeout = 1000 * 120;
+
     @Override
     public void initData() {
         parseDelayLevel();
@@ -1038,4 +1040,12 @@ public class ProxyConfig implements ConfigFile {
     public void setMetricsInDelta(boolean metricsInDelta) {
         this.metricsInDelta = metricsInDelta;
     }
+
+    public long getChannelExpiredTimeout() {
+        return channelExpiredTimeout;
+    }
+
+    public void setChannelExpiredTimeout(long channelExpiredTimeout) {
+        this.channelExpiredTimeout = channelExpiredTimeout;
+    }
 }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
index c68f77401..ac1ff6a88 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
@@ -64,11 +64,11 @@ public class ClusterServiceManager extends AbstractStartAndShutdown implements S
     protected MQClientAPIFactory transactionClientAPIFactory;
 
     public ClusterServiceManager(RPCHook rpcHook) {
+        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
         this.scheduledExecutorService = Executors.newScheduledThreadPool(3);
         this.producerManager = new ProducerManager();
-        this.consumerManager = new ConsumerManager(new ConsumerIdsChangeListenerImpl());
+        this.consumerManager = new ConsumerManager(new ConsumerIdsChangeListenerImpl(), proxyConfig.getChannelExpiredTimeout());
 
-        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
         this.messagingClientAPIFactory = new MQClientAPIFactory(
             "ClusterMQClient_",
             proxyConfig.getRocketmqMQClientNum(),
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestSource.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestSource.java
index 26c3ab402..5d8116013 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestSource.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestSource.java
@@ -37,4 +37,11 @@ public enum RequestSource {
     public static boolean isValid(Integer value) {
         return null != value && value >= -1 && value < RequestSource.values().length - 1;
     }
+
+    public static RequestSource parseInteger(Integer value) {
+        if (isValid(value)) {
+            return RequestSource.values()[value + 1];
+        }
+        return SDK;
+    }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/heartbeat/ConsumeType.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/heartbeat/ConsumeType.java
index 10f8da527..fbcca5d5e 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/heartbeat/ConsumeType.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/heartbeat/ConsumeType.java
@@ -24,7 +24,9 @@ public enum ConsumeType {
 
     CONSUME_ACTIVELY("PULL"),
 
-    CONSUME_PASSIVELY("PUSH");
+    CONSUME_PASSIVELY("PUSH"),
+
+    CONSUME_POP("POP");
 
     private String typeCN;
 
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RequestSourceTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RequestSourceTest.java
new file mode 100644
index 000000000..b2ed1e341
--- /dev/null
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RequestSourceTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.protocol;
+
+import junit.framework.TestCase;
+
+public class RequestSourceTest extends TestCase {
+
+    public void testIsValid() {
+        assertEquals(4, RequestSource.values().length);
+
+        assertTrue(RequestSource.isValid(-1));
+        assertTrue(RequestSource.isValid(0));
+        assertTrue(RequestSource.isValid(1));
+        assertTrue(RequestSource.isValid(2));
+
+        assertFalse(RequestSource.isValid(-2));
+        assertFalse(RequestSource.isValid(3));
+    }
+
+    public void testParseInteger() {
+        assertEquals(RequestSource.SDK, RequestSource.parseInteger(-1));
+        assertEquals(RequestSource.PROXY_FOR_ORDER, RequestSource.parseInteger(0));
+        assertEquals(RequestSource.PROXY_FOR_BROADCAST, RequestSource.parseInteger(1));
+        assertEquals(RequestSource.PROXY_FOR_STREAM, RequestSource.parseInteger(2));
+
+        assertEquals(RequestSource.SDK, RequestSource.parseInteger(-10));
+        assertEquals(RequestSource.SDK, RequestSource.parseInteger(10));
+    }
+}