You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/09 11:53:45 UTC

[rocketmq] 04/26: [ISSUE #5485] client connection management

This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 56551b596b3f2f7b3e16e5f271200bceaf28734a
Author: kaiyi.lk <ka...@alibaba-inc.com>
AuthorDate: Tue Nov 8 17:18:24 2022 +0800

    [ISSUE #5485] client connection management
---
 .../rocketmq/broker/client/ConsumerManager.java    |  13 +-
 .../broker/client/ConsumerManagerInterface.java    |  60 ++++++
 .../rocketmq/broker/client/ProducerManager.java    |  10 +
 .../broker/client/ProducerManagerInterface.java    |  44 ++++
 .../proxy/common/channel/ChannelHelper.java        |  49 +++++
 .../apache/rocketmq/proxy/config/ProxyConfig.java  |  49 +++++
 .../proxy/grpc/v2/DefaultGrpcMessingActivity.java  |   2 +-
 .../proxy/grpc/v2/channel/GrpcChannelManager.java  |   7 +-
 .../proxy/grpc/v2/channel/GrpcClientChannel.java   |  71 ++++++-
 .../proxy/grpc/v2/client/ClientActivity.java       |  76 ++++++-
 .../grpc/v2/common/GrpcClientSettingsManager.java  |   9 +
 .../rocketmq/proxy/processor/ClientProcessor.java  |   5 +
 .../proxy/processor/DefaultMessagingProcessor.java |   5 +
 .../proxy/processor/MessagingProcessor.java        |   2 +
 .../proxy/processor/ReceiptHandleProcessor.java    |   5 +
 .../channel/ChannelExtendAttributeGetter.java      |  23 +++
 .../processor/channel/ChannelProtocolType.java     |  35 ++++
 .../proxy/processor/channel/RemoteChannel.java     |  98 +++++++++
 .../processor/channel/RemoteChannelConverter.java  |  23 +++
 .../processor/channel/RemoteChannelSerializer.java |  65 ++++++
 .../proxy/remoting/ClientHousekeepingService.java  |  53 +++++
 .../proxy/remoting/RemotingProtocolServer.java     |  72 +++++++
 .../proxy/remoting/RemotingProxyOutClient.java     |  27 +++
 .../remoting/activity/ClientManagerActivity.java   | 178 ++++++++++++++++
 .../proxy/remoting/channel/RemotingChannel.java    | 224 +++++++++++++++++++++
 .../remoting/channel/RemotingChannelManager.java   | 142 +++++++++++++
 .../proxy/remoting/common/RemotingConverter.java   |  74 +++++++
 .../proxy/service/ClusterServiceManager.java       |  17 +-
 .../proxy/service/LocalServiceManager.java         |  11 +-
 .../rocketmq/proxy/service/ServiceManager.java     |  11 +-
 .../rocketmq/proxy/service/admin/AdminService.java |  32 +++
 .../proxy/service/admin/DefaultAdminService.java   | 142 +++++++++++++
 .../proxy/service/channel/SimpleChannel.java       |   8 +
 .../service/client/ClusterConsumerManager.java     |  70 +++++++
 .../sysmessage/AbstractSystemMessageSyncer.java    | 190 +++++++++++++++++
 .../proxy/service/sysmessage/HeartbeatSyncer.java  | 224 +++++++++++++++++++++
 .../service/sysmessage/HeartbeatSyncerData.java    | 164 +++++++++++++++
 .../proxy/service/sysmessage/HeartbeatType.java    |  23 +++
 .../mqclient/ProxyClientRemotingProcessorTest.java |   5 +-
 39 files changed, 2285 insertions(+), 33 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
index 5f95ac1af..0582ce75e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
@@ -64,6 +64,7 @@ public class ConsumerManager {
         this.subscriptionExpiredTimeout = brokerConfig.getSubscriptionExpiredTimeout();
     }
 
+    @Override
     public ClientChannelInfo findChannel(final String group, final String clientId) {
         ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
         if (consumerGroupInfo != null) {
@@ -72,6 +73,7 @@ public class ConsumerManager {
         return null;
     }
 
+    @Override
     public ClientChannelInfo findChannel(final String group, final Channel channel) {
         ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
         if (consumerGroupInfo != null) {
@@ -80,6 +82,7 @@ public class ConsumerManager {
         return null;
     }
 
+    @Override
     public SubscriptionData findSubscriptionData(final String group, final String topic) {
         return findSubscriptionData(group, topic, true);
     }
@@ -107,6 +110,7 @@ public class ConsumerManager {
         return this.consumerTable;
     }
 
+    @Override
     public ConsumerGroupInfo getConsumerGroupInfo(final String group) {
         return getConsumerGroupInfo(group, false);
     }
@@ -119,6 +123,7 @@ public class ConsumerManager {
         return consumerGroupInfo;
     }
 
+    @Override
     public int findSubscriptionDataCount(final String group) {
         ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);
         if (consumerGroupInfo != null) {
@@ -128,6 +133,7 @@ public class ConsumerManager {
         return 0;
     }
 
+    @Override
     public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) {
         boolean removed = false;
         Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
@@ -172,6 +178,7 @@ public class ConsumerManager {
             isNotifyConsumerIdsChangedEnable, true);
     }
 
+    @Override
     public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
         ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
         final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable, boolean updateSubscription) {
@@ -202,11 +209,12 @@ public class ConsumerManager {
             this.brokerStatsManager.incConsumerRegisterTime((int) (System.currentTimeMillis() - start));
         }
 
-        callConsumerIdsChangeListener(ConsumerGroupEvent.REGISTER, group, subList);
+        callConsumerIdsChangeListener(ConsumerGroupEvent.REGISTER, group, subList, clientChannelInfo);
 
         return r1 || r2;
     }
 
+    @Override
     public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo,
         boolean isNotifyConsumerIdsChangedEnable) {
         ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
@@ -252,6 +260,7 @@ public class ConsumerManager {
         }
     }
 
+    @Override
     public void scanNotActiveChannel() {
         Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
         while (it.hasNext()) {
@@ -286,6 +295,7 @@ public class ConsumerManager {
         removeExpireConsumerGroupInfo();
     }
 
+    @Override
     public HashSet<String> queryTopicConsumeByWho(final String topic) {
         HashSet<String> groups = new HashSet<>();
         Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
@@ -300,6 +310,7 @@ public class ConsumerManager {
         return groups;
     }
 
+    @Override
     public void appendConsumerIdsChangeListener(ConsumerIdsChangeListener listener) {
         consumerIdsChangeListenerList.add(listener);
     }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManagerInterface.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManagerInterface.java
new file mode 100644
index 000000000..895a2e491
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManagerInterface.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.client;
+
+import io.netty.channel.Channel;
+import java.util.Set;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+
+public interface ConsumerManagerInterface {
+
+    ClientChannelInfo findChannel(String group, String clientId);
+
+    ClientChannelInfo findChannel(String group, Channel channel);
+
+    SubscriptionData findSubscriptionData(String group, String topic);
+
+    ConsumerGroupInfo getConsumerGroupInfo(String group);
+
+    int findSubscriptionDataCount(String group);
+
+    boolean doChannelCloseEvent(String remoteAddr, Channel channel);
+
+    default boolean registerConsumer(String group, ClientChannelInfo clientChannelInfo,
+        ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
+        Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
+        return registerConsumer(group, clientChannelInfo, consumeType, messageModel, consumeFromWhere, subList,
+            isNotifyConsumerIdsChangedEnable, true);
+    }
+
+    boolean registerConsumer(String group, ClientChannelInfo clientChannelInfo,
+        ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
+        Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable, boolean updateSubscription);
+
+    void unregisterConsumer(String group, ClientChannelInfo clientChannelInfo,
+        boolean isNotifyConsumerIdsChangedEnable);
+
+    void scanNotActiveChannel();
+
+    Set<String> queryTopicConsumeByWho(String topic);
+
+    void appendConsumerIdsChangeListener(ConsumerIdsChangeListener listener);
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
index 52d67bf28..047aa8be9 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
@@ -54,10 +54,12 @@ public class ProducerManager {
         this.brokerStatsManager = brokerStatsManager;
     }
 
+    @Override
     public int groupSize() {
         return this.groupChannelTable.size();
     }
 
+    @Override
     public boolean groupOnline(String group) {
         Map<Channel, ClientChannelInfo> channels = this.groupChannelTable.get(group);
         return channels != null && !channels.isEmpty();
@@ -67,6 +69,7 @@ public class ProducerManager {
         return groupChannelTable;
     }
 
+    @Override
     public ProducerTableInfo getProducerTable() {
         Map<String, List<ProducerInfo>> map = new HashMap<>();
         for (String group : this.groupChannelTable.keySet()) {
@@ -94,6 +97,7 @@ public class ProducerManager {
         return new ProducerTableInfo(map);
     }
 
+    @Override
     public void scanNotActiveChannel() {
         Iterator<Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>>> iterator = this.groupChannelTable.entrySet().iterator();
 
@@ -129,6 +133,7 @@ public class ProducerManager {
         }
     }
 
+    @Override
     public synchronized boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) {
         boolean removed = false;
         if (channel != null) {
@@ -160,6 +165,7 @@ public class ProducerManager {
         return removed;
     }
 
+    @Override
     public synchronized void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {
         ClientChannelInfo clientChannelInfoFound = null;
 
@@ -183,6 +189,7 @@ public class ProducerManager {
         }
     }
 
+    @Override
     public synchronized void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {
         ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
         if (null != channelTable && !channelTable.isEmpty()) {
@@ -202,6 +209,7 @@ public class ProducerManager {
         }
     }
 
+    @Override
     public Channel getAvailableChannel(String groupId) {
         if (groupId == null) {
             return null;
@@ -242,6 +250,7 @@ public class ProducerManager {
         return lastActiveChannel;
     }
 
+    @Override
     public Channel findChannel(String clientId) {
         return clientChannelTable.get(clientId);
     }
@@ -257,6 +266,7 @@ public class ProducerManager {
         }
     }
 
+    @Override
     public void appendProducerChangeListener(ProducerChangeListener producerChangeListener) {
         producerChangeListenerList.add(producerChangeListener);
     }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManagerInterface.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManagerInterface.java
new file mode 100644
index 000000000..3f2ece7cd
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManagerInterface.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.client;
+
+import io.netty.channel.Channel;
+import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
+
+public interface ProducerManagerInterface {
+
+    int groupSize();
+
+    boolean groupOnline(String group);
+
+    ProducerTableInfo getProducerTable();
+
+    void scanNotActiveChannel();
+
+    boolean doChannelCloseEvent(String remoteAddr, Channel channel);
+
+    void registerProducer(String group, ClientChannelInfo clientChannelInfo);
+
+    void unregisterProducer(String group, ClientChannelInfo clientChannelInfo);
+
+    Channel getAvailableChannel(String groupId);
+
+    Channel findChannel(String clientId);
+
+    void appendProducerChangeListener(ProducerChangeListener producerChangeListener);
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/channel/ChannelHelper.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/channel/ChannelHelper.java
new file mode 100644
index 000000000..dd15c85fb
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/channel/ChannelHelper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.common.channel;
+
+import io.netty.channel.Channel;
+import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel;
+import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType;
+import org.apache.rocketmq.proxy.processor.channel.RemoteChannel;
+import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel;
+
+public class ChannelHelper {
+
+    /**
+     * judge channel is sync from other proxy or not
+     *
+     * @param channel channel
+     * @return true if is sync from other proxy
+     */
+    public static boolean isRemote(Channel channel) {
+        return channel instanceof RemoteChannel;
+    }
+
+    public static ChannelProtocolType getChannelProtocolType(Channel channel) {
+        if (channel instanceof GrpcClientChannel) {
+            return ChannelProtocolType.GRPC_V2;
+        } else if (channel instanceof RemotingChannel) {
+            return ChannelProtocolType.REMOTING;
+        } else if (channel instanceof RemoteChannel) {
+            RemoteChannel remoteChannel = (RemoteChannel) channel;
+            return remoteChannel.getType();
+        }
+        return ChannelProtocolType.UNKNOWN;
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index cbedc3c50..0efca05b4 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -34,6 +34,8 @@ import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.proxy.ProxyMode;
+import org.apache.rocketmq.proxy.common.ProxyException;
+import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
 
 public class ProxyConfig implements ConfigFile {
     private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
@@ -55,6 +57,12 @@ public class ProxyConfig implements ConfigFile {
     private String proxyClusterName = DEFAULT_CLUSTER_NAME;
     private String proxyName = StringUtils.isEmpty(localHostName) ? "DEFAULT_PROXY" : localHostName;
 
+    private String localServeAddr = "";
+
+    private String systemTopicClusterName = "";
+    private int heartbeatSyncerThreadPoolNums = 4;
+    private int heartbeatSyncerThreadPoolQueueCapacity = 100;
+
     /**
      * configuration for ThreadPoolMonitor
      */
@@ -204,6 +212,15 @@ public class ProxyConfig implements ConfigFile {
     @Override
     public void initData() {
         parseDelayLevel();
+        if (StringUtils.isEmpty(localServeAddr)) {
+            this.localServeAddr = RemotingUtil.getLocalAddress();
+        }
+        if (StringUtils.isBlank(localServeAddr)) {
+            throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "get local serve ip failed");
+        }
+        if (StringUtils.isBlank(systemTopicClusterName)) {
+            this.systemTopicClusterName = this.rocketMQClusterName;
+        }
     }
 
     public int computeDelayLevel(long timeMillis) {
@@ -267,6 +284,38 @@ public class ProxyConfig implements ConfigFile {
         this.proxyName = proxyName;
     }
 
+    public String getLocalServeAddr() {
+        return localServeAddr;
+    }
+
+    public void setLocalServeAddr(String localServeAddr) {
+        this.localServeAddr = localServeAddr;
+    }
+
+    public String getSystemTopicClusterName() {
+        return systemTopicClusterName;
+    }
+
+    public void setSystemTopicClusterName(String systemTopicClusterName) {
+        this.systemTopicClusterName = systemTopicClusterName;
+    }
+
+    public int getHeartbeatSyncerThreadPoolNums() {
+        return heartbeatSyncerThreadPoolNums;
+    }
+
+    public void setHeartbeatSyncerThreadPoolNums(int heartbeatSyncerThreadPoolNums) {
+        this.heartbeatSyncerThreadPoolNums = heartbeatSyncerThreadPoolNums;
+    }
+
+    public int getHeartbeatSyncerThreadPoolQueueCapacity() {
+        return heartbeatSyncerThreadPoolQueueCapacity;
+    }
+
+    public void setHeartbeatSyncerThreadPoolQueueCapacity(int heartbeatSyncerThreadPoolQueueCapacity) {
+        this.heartbeatSyncerThreadPoolQueueCapacity = heartbeatSyncerThreadPoolQueueCapacity;
+    }
+
     public boolean isEnablePrintJstack() {
         return enablePrintJstack;
     }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java
index 81a819007..f30519d74 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java
@@ -78,7 +78,7 @@ public class DefaultGrpcMessingActivity extends AbstractStartAndShutdown impleme
 
     protected void init(MessagingProcessor messagingProcessor) {
         this.grpcClientSettingsManager = new GrpcClientSettingsManager(messagingProcessor);
-        this.grpcChannelManager = new GrpcChannelManager(messagingProcessor.getProxyRelayService());
+        this.grpcChannelManager = new GrpcChannelManager(messagingProcessor.getProxyRelayService(), this.grpcClientSettingsManager);
         this.receiptHandleProcessor = new ReceiptHandleProcessor(messagingProcessor);
 
         this.receiveMessageActivity = new ReceiveMessageActivity(messagingProcessor, receiptHandleProcessor, grpcClientSettingsManager, grpcChannelManager);
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java
index 97a0ae6da..fb6df2562 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java
@@ -30,12 +30,14 @@ import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.common.StartAndShutdown;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
+import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
 import org.apache.rocketmq.proxy.service.relay.ProxyRelayResult;
 import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
 
 public class GrpcChannelManager implements StartAndShutdown {
     private final ProxyRelayService proxyRelayService;
+    private final GrpcClientSettingsManager grpcClientSettingsManager;
     protected final ConcurrentMap<String, GrpcClientChannel> clientIdChannelMap = new ConcurrentHashMap<>();
 
     protected final AtomicLong nonceIdGenerator = new AtomicLong(0);
@@ -45,8 +47,9 @@ public class GrpcChannelManager implements StartAndShutdown {
         new ThreadFactoryImpl("GrpcChannelManager_")
     );
 
-    public GrpcChannelManager(ProxyRelayService proxyRelayService) {
+    public GrpcChannelManager(ProxyRelayService proxyRelayService, GrpcClientSettingsManager grpcClientSettingsManager) {
         this.proxyRelayService = proxyRelayService;
+        this.grpcClientSettingsManager = grpcClientSettingsManager;
     }
 
     protected void init() {
@@ -58,7 +61,7 @@ public class GrpcChannelManager implements StartAndShutdown {
 
     public GrpcClientChannel createChannel(ProxyContext ctx, String clientId) {
         return this.clientIdChannelMap.computeIfAbsent(clientId,
-            k -> new GrpcClientChannel(proxyRelayService, this, ctx, clientId));
+            k -> new GrpcClientChannel(proxyRelayService, grpcClientSettingsManager, this, ctx, clientId));
     }
 
     public GrpcClientChannel getChannel(String clientId) {
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java
index ec14473fd..714d0bf01 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java
@@ -18,13 +18,17 @@ package org.apache.rocketmq.proxy.grpc.v2.channel;
 
 import apache.rocketmq.v2.PrintThreadStackTraceCommand;
 import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
+import apache.rocketmq.v2.Settings;
 import apache.rocketmq.v2.TelemetryCommand;
 import apache.rocketmq.v2.VerifyMessageCommand;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ComparisonChain;
+import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.TextFormat;
+import com.google.protobuf.util.JsonFormat;
 import io.grpc.StatusRuntimeException;
 import io.grpc.stub.StreamObserver;
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelId;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
@@ -33,7 +37,14 @@ import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
 import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter;
+import org.apache.rocketmq.proxy.processor.channel.ChannelExtendAttributeGetter;
+import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType;
+import org.apache.rocketmq.proxy.processor.channel.RemoteChannel;
+import org.apache.rocketmq.proxy.processor.channel.RemoteChannelConverter;
 import org.apache.rocketmq.proxy.service.relay.ProxyChannel;
 import org.apache.rocketmq.proxy.service.relay.ProxyRelayResult;
 import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
@@ -45,24 +56,70 @@ import org.apache.rocketmq.remoting.protocol.header.CheckTransactionStateRequest
 import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.GetConsumerRunningInfoRequestHeader;
 
-public class GrpcClientChannel extends ProxyChannel {
+public class GrpcClientChannel extends ProxyChannel implements ChannelExtendAttributeGetter, RemoteChannelConverter {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
 
     private final GrpcChannelManager grpcChannelManager;
+    private final GrpcClientSettingsManager grpcClientSettingsManager;
 
     private final AtomicReference<StreamObserver<TelemetryCommand>> telemetryCommandRef = new AtomicReference<>();
     private final Object telemetryWriteLock = new Object();
     private final String clientId;
 
-    public GrpcClientChannel(ProxyRelayService proxyRelayService, GrpcChannelManager grpcChannelManager,
-        ProxyContext ctx, String clientId) {
+    public GrpcClientChannel(ProxyRelayService proxyRelayService, GrpcClientSettingsManager grpcClientSettingsManager,
+        GrpcChannelManager grpcChannelManager, ProxyContext ctx, String clientId) {
         super(proxyRelayService, null, new GrpcChannelId(clientId),
             ctx.getRemoteAddress(),
             ctx.getLocalAddress());
         this.grpcChannelManager = grpcChannelManager;
+        this.grpcClientSettingsManager = grpcClientSettingsManager;
         this.clientId = clientId;
     }
 
+    @Override
+    public String getChannelExtendAttribute() {
+        Settings settings = this.grpcClientSettingsManager.getRawClientSettings(this.clientId);
+        if (settings == null) {
+            return null;
+        }
+        try {
+            return JsonFormat.printer().print(settings);
+        } catch (InvalidProtocolBufferException e) {
+            log.error("convert settings to json data failed. settings:{}", settings, e);
+        }
+        return null;
+    }
+
+    public static Settings parseChannelExtendAttribute(Channel channel) {
+        if (ChannelHelper.getChannelProtocolType(channel).equals(ChannelProtocolType.GRPC_V2) &&
+            channel instanceof ChannelExtendAttributeGetter) {
+            String attr = ((ChannelExtendAttributeGetter) channel).getChannelExtendAttribute();
+            if (attr == null) {
+                return null;
+            }
+
+            Settings.Builder builder = Settings.newBuilder();
+            try {
+                JsonFormat.parser().merge(attr, builder);
+                return builder.build();
+            } catch (InvalidProtocolBufferException e) {
+                log.error("convert settings json data to settings failed. data:{}", attr, e);
+                return null;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public RemoteChannel toRemoteChannel() {
+        return new RemoteChannel(
+            ConfigurationManager.getProxyConfig().getLocalServeAddr(),
+            this.getRemoteAddress(),
+            this.getLocalAddress(),
+            ChannelProtocolType.GRPC_V2,
+            this.getChannelExtendAttribute());
+    }
+
     protected static class GrpcChannelId implements ChannelId {
 
         private final String clientId;
@@ -181,14 +238,6 @@ public class GrpcClientChannel extends ProxyChannel {
         return clientId;
     }
 
-    public String getRemoteAddress() {
-        return remoteAddress;
-    }
-
-    public String getLocalAddress() {
-        return localAddress;
-    }
-
     public void writeTelemetryCommand(TelemetryCommand command) {
         StreamObserver<TelemetryCommand> observer = this.telemetryCommandRef.get();
         if (observer == null) {
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
index 20035f7a1..00cc862a4 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
@@ -32,6 +32,7 @@ import apache.rocketmq.v2.ThreadStackTrace;
 import apache.rocketmq.v2.VerifyMessageResult;
 import io.grpc.StatusRuntimeException;
 import io.grpc.stub.StreamObserver;
+import io.netty.channel.Channel;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -39,6 +40,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
+import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
 import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
 import org.apache.rocketmq.broker.client.ProducerChangeListener;
 import org.apache.rocketmq.broker.client.ProducerGroupEvent;
@@ -49,6 +51,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
 import org.apache.rocketmq.proxy.grpc.v2.AbstractMessingActivity;
 import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager;
 import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel;
@@ -209,7 +212,8 @@ public class ClientActivity extends AbstractMessingActivity {
         };
     }
 
-    protected void processTelemetryException(TelemetryCommand request, Throwable t, StreamObserver<TelemetryCommand> responseObserver) {
+    protected void processTelemetryException(TelemetryCommand request, Throwable t,
+        StreamObserver<TelemetryCommand> responseObserver) {
         StatusRuntimeException exception = io.grpc.Status.INTERNAL
             .withDescription("process client telemetryCommand failed. " + t.getMessage())
             .withCause(t)
@@ -291,7 +295,8 @@ public class ClientActivity extends AbstractMessingActivity {
         return channel;
     }
 
-    protected GrpcClientChannel registerConsumer(ProxyContext ctx, String consumerGroup, ClientType clientType, List<SubscriptionEntry> subscriptionEntryList, boolean updateSubscription) {
+    protected GrpcClientChannel registerConsumer(ProxyContext ctx, String consumerGroup, ClientType clientType,
+        List<SubscriptionEntry> subscriptionEntryList, boolean updateSubscription) {
         String clientId = ctx.getClientID();
         LanguageCode languageCode = LanguageCode.valueOf(ctx.getLanguage());
 
@@ -413,14 +418,67 @@ public class ClientActivity extends AbstractMessingActivity {
 
         @Override
         public void handle(ConsumerGroupEvent event, String group, Object... args) {
-            if (event == ConsumerGroupEvent.CLIENT_UNREGISTER) {
-                if (args == null || args.length < 1) {
-                    return;
+            switch (event) {
+                case CLIENT_UNREGISTER:
+                    processClientUnregister(group, args);
+                    break;
+                case REGISTER:
+                    processClientRegister(group, args);
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        protected void processClientUnregister(String group, Object... args) {
+            if (args == null || args.length < 1) {
+                return;
+            }
+            if (args[0] instanceof ClientChannelInfo) {
+                ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0];
+                String clientId = clientChannelInfo.getClientId();
+                if (ChannelHelper.isRemote(clientChannelInfo.getChannel())) {
+                    grpcClientSettingsManager.computeIfPresent(clientId, orgSettings -> {
+                        if (grpcChannelManager.getChannel(clientId) == null) {
+                            // if there is no channel connect directly to this proxy
+                            return null;
+                        }
+                        return orgSettings;
+                    });
+                } else {
+                    grpcChannelManager.removeChannel(clientId);
+                    grpcClientSettingsManager.computeIfPresent(clientId, orgSettings -> {
+                        ConsumerGroupInfo consumerGroupInfo = messagingProcessor.getConsumerGroupInfo(group);
+                        if (consumerGroupInfo == null) {
+                            return null;
+                        }
+                        List<Channel> allChannels = consumerGroupInfo.getAllChannel();
+                        if (allChannels == null || allChannels.isEmpty() || allChannels.size() == 1) {
+                            // if there is only one channel of this clientId or
+                            // there is no channel if this clientId
+                            // remove settings of this client
+                            return null;
+                        }
+                        return orgSettings;
+                    });
                 }
-                if (args[0] instanceof ClientChannelInfo) {
-                    ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0];
-                    grpcChannelManager.removeChannel(clientChannelInfo.getClientId());
-                    grpcClientSettingsManager.removeClientSettings(clientChannelInfo.getClientId());
+            }
+        }
+
+        protected void processClientRegister(String group, Object... args) {
+            if (args == null || args.length < 2) {
+                return;
+            }
+            if (args[1] instanceof ClientChannelInfo) {
+                ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[1];
+                Channel channel = clientChannelInfo.getChannel();
+                if (ChannelHelper.isRemote(channel)) {
+                    // save settings from channel sync from other proxy
+                    Settings settings = GrpcClientChannel.parseChannelExtendAttribute(channel);
+                    if (settings == null) {
+                        return;
+                    }
+                    grpcClientSettingsManager.updateClientSettings(clientChannelInfo.getClientId(), settings);
                 }
             }
         }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java
index 8c3c3a8a4..21c3395ae 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java
@@ -30,6 +30,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
@@ -52,6 +53,10 @@ public class GrpcClientSettingsManager {
         this.messagingProcessor = messagingProcessor;
     }
 
+    public Settings getRawClientSettings(String clientId) {
+        return CLIENT_SETTINGS_MAP.get(clientId);
+    }
+
     public Settings getClientSettings(ProxyContext ctx) {
         String clientId = ctx.getClientID();
         Settings settings = CLIENT_SETTINGS_MAP.get(clientId);
@@ -184,6 +189,10 @@ public class GrpcClientSettingsManager {
         CLIENT_SETTINGS_MAP.remove(clientId);
     }
 
+    public void computeIfPresent(String clientId, Function<Settings, Settings> function) {
+        CLIENT_SETTINGS_MAP.computeIfPresent(clientId, (clientIdKey, value) -> function.apply(value));
+    }
+
     public Settings removeAndGetClientSettings(ProxyContext ctx) {
         String clientId = ctx.getClientID();
         Settings settings = CLIENT_SETTINGS_MAP.remove(clientId);
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java
index 5408fa066..8fb6eaf7d 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java
@@ -101,6 +101,11 @@ public class ClientProcessor extends AbstractProcessor {
         this.serviceManager.getConsumerManager().unregisterConsumer(consumerGroup, clientChannelInfo, false);
     }
 
+    public void doChannelCloseEvent(String remoteAddr, Channel channel) {
+        this.serviceManager.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
+        this.serviceManager.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
+    }
+
     public void registerConsumerIdsChangeListener(ConsumerIdsChangeListener listener) {
         this.serviceManager.getConsumerManager().appendConsumerIdsChangeListener(listener);
     }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
index 1b7baba0a..66239f0e8 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
@@ -274,6 +274,11 @@ public class DefaultMessagingProcessor extends AbstractStartAndShutdown implemen
         this.clientProcessor.registerConsumerIdsChangeListener(consumerIdsChangeListener);
     }
 
+    @Override
+    public void doChannelCloseEvent(String remoteAddr, Channel channel) {
+        this.clientProcessor.doChannelCloseEvent(remoteAddr, channel);
+    }
+
     @Override
     public ConsumerGroupInfo getConsumerGroupInfo(String consumerGroup) {
         return this.clientProcessor.getConsumerGroupInfo(consumerGroup);
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
index 3e8b8084e..3c4e6303f 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
@@ -285,6 +285,8 @@ public interface MessagingProcessor extends StartAndShutdown {
         ConsumerIdsChangeListener consumerIdsChangeListener
     );
 
+    void doChannelCloseEvent(String remoteAddr, Channel channel);
+
     ConsumerGroupInfo getConsumerGroupInfo(String consumerGroup);
 
     void addTransactionSubscription(
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index d435b0c2e..9d000bfe9 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -47,6 +47,7 @@ import org.apache.rocketmq.proxy.common.ProxyException;
 import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
 import org.apache.rocketmq.proxy.common.ReceiptHandleGroup;
 import org.apache.rocketmq.proxy.common.StartAndShutdown;
+import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
 import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
@@ -105,6 +106,10 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
                     }
                     if (args[0] instanceof ClientChannelInfo) {
                         ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0];
+                        if (ChannelHelper.isRemote(clientChannelInfo.getChannel())) {
+                            // if the channel sync from other proxy is expired, not to clear data of connect to current proxy
+                            return;
+                        }
                         clearGroup(new ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group));
                     }
                 }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/ChannelExtendAttributeGetter.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/ChannelExtendAttributeGetter.java
new file mode 100644
index 000000000..3538a9496
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/ChannelExtendAttributeGetter.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.processor.channel;
+
+public interface ChannelExtendAttributeGetter {
+
+    String getChannelExtendAttribute();
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/ChannelProtocolType.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/ChannelProtocolType.java
new file mode 100644
index 000000000..d2eeb8353
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/ChannelProtocolType.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.processor.channel;
+
+public enum ChannelProtocolType {
+    UNKNOWN("unknown"),
+    GRPC_V2("grpc_v2"),
+    GRPC_V1("grpc_v1"),
+    REMOTING("remoting");
+
+    private final String name;
+
+    ChannelProtocolType(String name) {
+        this.name = name;
+    }
+
+    public String getName() {
+        return name;
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannel.java
new file mode 100644
index 000000000..5d9c9afcc
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannel.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.processor.channel;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelId;
+import org.apache.rocketmq.proxy.service.channel.SimpleChannel;
+
+public class RemoteChannel extends SimpleChannel implements ChannelExtendAttributeGetter {
+    protected final ChannelProtocolType type;
+    protected final String remoteProxyIp;
+    protected volatile String extendAttribute;
+
+    public RemoteChannel(String remoteProxyIp, String remoteAddress, String localAddress, ChannelProtocolType type, String extendAttribute) {
+        super(null,
+            new RemoteChannelId(remoteProxyIp, remoteAddress, localAddress, type),
+            remoteAddress, localAddress);
+        this.type = type;
+        this.remoteProxyIp = remoteProxyIp;
+        this.extendAttribute = extendAttribute;
+    }
+
+    public static class RemoteChannelId implements ChannelId {
+
+        private final String id;
+
+        public RemoteChannelId(String remoteProxyIp, String remoteAddress, String localAddress, ChannelProtocolType type) {
+            this.id = remoteProxyIp + "@" + remoteAddress + "@" + localAddress + "@" + type;
+        }
+
+        @Override
+        public String asShortText() {
+            return this.id;
+        }
+
+        @Override
+        public String asLongText() {
+            return this.id;
+        }
+
+        @Override
+        public int compareTo(ChannelId o) {
+            return this.id.compareTo(o.asLongText());
+        }
+    }
+
+    @Override
+    public boolean isWritable() {
+        return false;
+    }
+
+    public ChannelProtocolType getType() {
+        return type;
+    }
+
+    public String encode() {
+        return RemoteChannelSerializer.toJson(this);
+    }
+
+    public static RemoteChannel decode(String data) {
+        return RemoteChannelSerializer.decodeFromJson(data);
+    }
+
+    public static RemoteChannel create(Channel channel) {
+        if (channel instanceof RemoteChannelConverter) {
+            return ((RemoteChannelConverter) channel).toRemoteChannel();
+        }
+        return null;
+    }
+
+    public String getRemoteProxyIp() {
+        return remoteProxyIp;
+    }
+
+    public void setExtendAttribute(String extendAttribute) {
+        this.extendAttribute = extendAttribute;
+    }
+
+    @Override
+    public String getChannelExtendAttribute() {
+        return this.extendAttribute;
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelConverter.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelConverter.java
new file mode 100644
index 000000000..9f886e85d
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelConverter.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.processor.channel;
+
+public interface RemoteChannelConverter {
+
+    RemoteChannel toRemoteChannel();
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelSerializer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelSerializer.java
new file mode 100644
index 000000000..8fd216219
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelSerializer.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.processor.channel;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+public class RemoteChannelSerializer {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+    private static final String REMOTE_PROXY_IP_KEY = "remoteProxyIp";
+    private static final String REMOTE_ADDRESS_KEY = "remoteAddress";
+    private static final String LOCAL_ADDRESS_KEY = "localAddress";
+    private static final String TYPE_KEY = "type";
+    private static final String EXTEND_ATTRIBUTE_KEY = "extendAttribute";
+
+    public static String toJson(RemoteChannel remoteChannel) {
+        Map<String, Object> data = new HashMap<>();
+        data.put(REMOTE_PROXY_IP_KEY, remoteChannel.getRemoteProxyIp());
+        data.put(REMOTE_ADDRESS_KEY, remoteChannel.getRemoteAddress());
+        data.put(LOCAL_ADDRESS_KEY, remoteChannel.getLocalAddress());
+        data.put(TYPE_KEY, remoteChannel.getType());
+        data.put(EXTEND_ATTRIBUTE_KEY, remoteChannel.getChannelExtendAttribute());
+        return JSON.toJSONString(data);
+    }
+
+    public static RemoteChannel decodeFromJson(String jsonData) {
+        if (StringUtils.isBlank(jsonData)) {
+            return null;
+        }
+        try {
+            JSONObject jsonObject = JSON.parseObject(jsonData);
+            return new RemoteChannel(
+                jsonObject.getString(REMOTE_PROXY_IP_KEY),
+                jsonObject.getString(REMOTE_ADDRESS_KEY),
+                jsonObject.getString(LOCAL_ADDRESS_KEY),
+                jsonObject.getObject(TYPE_KEY, ChannelProtocolType.class),
+                jsonObject.getObject(EXTEND_ATTRIBUTE_KEY, String.class)
+            );
+        } catch (Throwable t) {
+            log.error("decode remote channel data failed. data:{}", jsonData, t);
+        }
+        return null;
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java
new file mode 100644
index 000000000..e213ae855
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting;
+
+import io.netty.channel.Channel;
+import org.apache.rocketmq.proxy.remoting.activity.ClientManagerActivity;
+import org.apache.rocketmq.remoting.ChannelEventListener;
+
+public class ClientHousekeepingService implements ChannelEventListener {
+
+    private final ClientManagerActivity clientManagerActivity;
+
+    public ClientHousekeepingService(ClientManagerActivity clientManagerActivity) {
+        this.clientManagerActivity = clientManagerActivity;
+    }
+
+    @Override
+    public void onChannelConnect(String remoteAddr, Channel channel) {
+
+    }
+
+    @Override
+    public void onChannelClose(String remoteAddr, Channel channel) {
+        this.clientManagerActivity.doChannelCloseEvent(remoteAddr, channel);
+    }
+
+    @Override
+    public void onChannelException(String remoteAddr, Channel channel) {
+        this.clientManagerActivity.doChannelCloseEvent(remoteAddr, channel);
+    }
+
+    @Override
+    public void onChannelIdle(String remoteAddr, Channel channel) {
+        this.clientManagerActivity.doChannelCloseEvent(remoteAddr, channel);
+    }
+
+}
+
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
new file mode 100644
index 000000000..58b257641
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting;
+
+import io.netty.channel.Channel;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.proxy.common.StartAndShutdown;
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.remoting.RemotingServer;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOutClient {
+
+    private final MessagingProcessor messagingProcessor;
+    private RemotingServer defaultRemotingServer;
+
+    public RemotingProtocolServer(MessagingProcessor messagingProcessor) {
+        this.messagingProcessor = messagingProcessor;
+    }
+
+    protected void init() {
+
+    }
+
+    protected void registerRemotingServer(RemotingServer remotingServer) {
+
+    }
+
+    @Override
+    public void shutdown() throws Exception {
+
+    }
+
+    @Override
+    public void start() throws Exception {
+
+    }
+
+    @Override
+    public CompletableFuture<RemotingCommand> invokeToClient(Channel channel, RemotingCommand request,
+        long timeoutMillis) {
+        CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+        try {
+            this.defaultRemotingServer.invokeAsync(channel, request, timeoutMillis, responseFuture -> {
+                if (responseFuture.getResponseCommand() == null) {
+                    future.completeExceptionally(new MQClientException("response is null after send request to client", responseFuture.getCause()));
+                    return;
+                }
+                future.complete(responseFuture.getResponseCommand());
+            });
+        } catch (Throwable t) {
+            future.completeExceptionally(t);
+        }
+        return future;
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProxyOutClient.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProxyOutClient.java
new file mode 100644
index 000000000..5a96c41c9
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProxyOutClient.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting;
+
+import io.netty.channel.Channel;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public interface RemotingProxyOutClient {
+
+    CompletableFuture<RemotingCommand> invokeToClient(Channel channel, RemotingCommand request, long timeoutMillis);
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
new file mode 100644
index 000000000..62400e033
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.activity;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.Set;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
+import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
+import org.apache.rocketmq.broker.client.ProducerChangeListener;
+import org.apache.rocketmq.broker.client.ProducerGroupEvent;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
+import org.apache.rocketmq.common.protocol.header.UnregisterClientResponseHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
+import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
+import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel;
+import org.apache.rocketmq.proxy.remoting.channel.RemotingChannelManager;
+import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class ClientManagerActivity extends AbstractRemotingActivity {
+
+    private final RemotingChannelManager remotingChannelManager;
+
+    public ClientManagerActivity(RequestPipeline requestPipeline, MessagingProcessor messagingProcessor,
+        RemotingChannelManager manager) {
+        super(requestPipeline, messagingProcessor);
+        this.remotingChannelManager = manager;
+        this.init();
+    }
+
+    protected void init() {
+        this.messagingProcessor.registerConsumerListener(new ConsumerIdsChangeListenerImpl());
+        this.messagingProcessor.registerProducerListener(new ProducerChangeListenerImpl());
+    }
+
+    @Override
+    protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request,
+        ProxyContext context) throws Exception {
+        switch (request.getCode()) {
+            case RequestCode.HEART_BEAT:
+                return this.heartBeat(ctx, request, context);
+            case RequestCode.UNREGISTER_CLIENT:
+                return this.unregisterClient(ctx, request, context);
+            case RequestCode.CHECK_CLIENT_CONFIG:
+                return this.checkClientConfig(ctx, request, context);
+            default:
+                break;
+        }
+        return null;
+    }
+
+    protected RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request,
+        ProxyContext context) {
+        HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
+        String clientId = heartbeatData.getClientID();
+
+        for (ProducerData data : heartbeatData.getProducerDataSet()) {
+            ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
+                this.remotingChannelManager.createProducerChannel(ctx.channel(), data.getGroupName(), clientId),
+                clientId, request.getLanguage(),
+                request.getVersion());
+            messagingProcessor.registerProducer(context, data.getGroupName(), clientChannelInfo);
+        }
+
+        for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
+            ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
+                this.remotingChannelManager.createConsumerChannel(ctx.channel(), data.getGroupName(), clientId, data.getSubscriptionDataSet()),
+                clientId, request.getLanguage(),
+                request.getVersion());
+            messagingProcessor.registerConsumer(context, data.getGroupName(), clientChannelInfo, data.getConsumeType(),
+                data.getMessageModel(), data.getConsumeFromWhere(), data.getSubscriptionDataSet(), true);
+        }
+
+        RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark("");
+        return response;
+    }
+
+    protected RemotingCommand unregisterClient(ChannelHandlerContext ctx, RemotingCommand request,
+        ProxyContext context) throws RemotingCommandException {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(UnregisterClientResponseHeader.class);
+        final UnregisterClientRequestHeader requestHeader =
+            (UnregisterClientRequestHeader) request.decodeCommandCustomHeader(UnregisterClientRequestHeader.class);
+        final String producerGroup = requestHeader.getProducerGroup();
+        if (producerGroup != null) {
+            RemotingChannel channel = this.remotingChannelManager.removeProducerChannel(producerGroup, ctx.channel());
+            ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
+                channel,
+                requestHeader.getClientID(),
+                request.getLanguage(),
+                request.getVersion());
+            this.messagingProcessor.unRegisterProducer(context, producerGroup, clientChannelInfo);
+        }
+        final String consumerGroup = requestHeader.getConsumerGroup();
+        if (consumerGroup != null) {
+            RemotingChannel channel = this.remotingChannelManager.removeConsumerChannel(consumerGroup, ctx.channel());
+            ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
+                channel,
+                requestHeader.getClientID(),
+                request.getLanguage(),
+                request.getVersion());
+            this.messagingProcessor.unRegisterConsumer(context, consumerGroup, clientChannelInfo);
+        }
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark("");
+        return response;
+    }
+
+    protected RemotingCommand checkClientConfig(ChannelHandlerContext ctx, RemotingCommand request,
+        ProxyContext context) {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark("");
+        return response;
+    }
+
+    public void doChannelCloseEvent(String remoteAddr, Channel channel) {
+        Set<RemotingChannel> remotingChannelSet = this.remotingChannelManager.removeChannel(channel);
+        for (RemotingChannel remotingChannel : remotingChannelSet) {
+            this.messagingProcessor.doChannelCloseEvent(remoteAddr, remotingChannel);
+        }
+    }
+
+    protected class ConsumerIdsChangeListenerImpl implements ConsumerIdsChangeListener {
+
+        @Override
+        public void handle(ConsumerGroupEvent event, String group, Object... args) {
+            if (event == ConsumerGroupEvent.CLIENT_UNREGISTER) {
+                if (args == null || args.length < 1) {
+                    return;
+                }
+                if (args[0] instanceof ClientChannelInfo) {
+                    ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0];
+                    remotingChannelManager.removeConsumerChannel(group, clientChannelInfo.getChannel());
+                }
+            }
+        }
+
+        @Override
+        public void shutdown() {
+
+        }
+    }
+
+    protected class ProducerChangeListenerImpl implements ProducerChangeListener {
+
+        @Override
+        public void handle(ProducerGroupEvent event, String group, ClientChannelInfo clientChannelInfo) {
+            if (event == ProducerGroupEvent.CLIENT_UNREGISTER) {
+                remotingChannelManager.removeProducerChannel(group, clientChannelInfo.getChannel());
+            }
+        }
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java
new file mode 100644
index 000000000..2b2cfca79
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.channel;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.TypeReference;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+import java.time.Duration;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.processor.channel.ChannelExtendAttributeGetter;
+import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType;
+import org.apache.rocketmq.proxy.processor.channel.RemoteChannel;
+import org.apache.rocketmq.proxy.processor.channel.RemoteChannelConverter;
+import org.apache.rocketmq.proxy.remoting.RemotingProxyOutClient;
+import org.apache.rocketmq.proxy.remoting.common.RemotingConverter;
+import org.apache.rocketmq.proxy.service.relay.ProxyChannel;
+import org.apache.rocketmq.proxy.service.relay.ProxyRelayResult;
+import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
+import org.apache.rocketmq.proxy.service.transaction.TransactionData;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class RemotingChannel extends ProxyChannel implements RemoteChannelConverter, ChannelExtendAttributeGetter {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+    private static final long DEFAULT_MQ_CLIENT_TIMEOUT = Duration.ofSeconds(3).toMillis();
+    private final String clientId;
+    private final String remoteAddress;
+    private final String localAddress;
+    private final RemotingProxyOutClient remotingProxyOutClient;
+    private final Set<SubscriptionData> subscriptionData;
+
+    public RemotingChannel(RemotingProxyOutClient remotingProxyOutClient, ProxyRelayService proxyRelayService,
+        Channel parent,
+        String clientId, Set<SubscriptionData> subscriptionData) {
+        super(proxyRelayService, parent, parent.id(),
+            RemotingUtil.socketAddress2String(parent.remoteAddress()),
+            RemotingUtil.socketAddress2String(parent.localAddress()));
+        this.remotingProxyOutClient = remotingProxyOutClient;
+        this.clientId = clientId;
+        this.remoteAddress = RemotingUtil.socketAddress2String(parent.remoteAddress());
+        this.localAddress = RemotingUtil.socketAddress2String(parent.localAddress());
+        this.subscriptionData = subscriptionData;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return this.parent().isOpen();
+    }
+
+    @Override
+    public boolean isActive() {
+        return this.parent().isActive();
+    }
+
+    @Override
+    public boolean isWritable() {
+        return this.parent().isWritable();
+    }
+
+    @Override
+    protected CompletableFuture<Void> processOtherMessage(Object msg) {
+        this.parent().writeAndFlush(msg);
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    protected CompletableFuture<Void> processCheckTransaction(CheckTransactionStateRequestHeader header,
+        MessageExt messageExt, TransactionData transactionData,
+        CompletableFuture<ProxyRelayResult<Void>> responseFuture) {
+        CompletableFuture<Void> writeFuture = new CompletableFuture<>();
+        try {
+            CheckTransactionStateRequestHeader requestHeader = new CheckTransactionStateRequestHeader();
+            requestHeader.setCommitLogOffset(transactionData.getCommitLogOffset());
+            requestHeader.setTranStateTableOffset(transactionData.getTranStateTableOffset());
+            requestHeader.setTransactionId(transactionData.getTransactionId());
+            requestHeader.setMsgId(header.getMsgId());
+
+            RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
+            request.setBody(RemotingConverter.getInstance().convertMsgToBytes(messageExt));
+
+            this.parent().writeAndFlush(request).addListener((ChannelFutureListener) future -> {
+                if (future.isSuccess()) {
+                    responseFuture.complete(null);
+                    writeFuture.complete(null);
+                } else {
+                    Exception e = new RemotingException("write and flush data failed");
+                    responseFuture.completeExceptionally(e);
+                    writeFuture.completeExceptionally(e);
+                }
+            });
+        } catch (Throwable t) {
+            responseFuture.completeExceptionally(t);
+            writeFuture.completeExceptionally(t);
+        }
+        return writeFuture;
+    }
+
+    @Override
+    protected CompletableFuture<Void> processGetConsumerRunningInfo(RemotingCommand command,
+        GetConsumerRunningInfoRequestHeader header,
+        CompletableFuture<ProxyRelayResult<ConsumerRunningInfo>> responseFuture) {
+        CompletableFuture<Void> writeFuture = new CompletableFuture<>();
+        try {
+            RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, header);
+            return this.remotingProxyOutClient.invokeToClient(this.parent(), request, DEFAULT_MQ_CLIENT_TIMEOUT)
+                .thenApply(response -> {
+                    if (response.getCode() == ResponseCode.SUCCESS) {
+                        ConsumerRunningInfo consumerRunningInfo = ConsumerRunningInfo.decode(response.getBody(), ConsumerRunningInfo.class);
+                        responseFuture.complete(new ProxyRelayResult<>(ResponseCode.SUCCESS, "", consumerRunningInfo));
+                        return null;
+                    }
+                    String errMsg = String.format("get consumer running info failed, code:%s remark:%s", response.getCode(), response.getRemark());
+                    RuntimeException e = new RuntimeException(errMsg);
+                    responseFuture.completeExceptionally(e);
+                    throw e;
+                });
+        } catch (Throwable t) {
+            responseFuture.completeExceptionally(t);
+            writeFuture.completeExceptionally(t);
+        }
+        return writeFuture;
+    }
+
+    @Override
+    protected CompletableFuture<Void> processConsumeMessageDirectly(RemotingCommand command,
+        ConsumeMessageDirectlyResultRequestHeader header, MessageExt messageExt,
+        CompletableFuture<ProxyRelayResult<ConsumeMessageDirectlyResult>> responseFuture) {
+        CompletableFuture<Void> writeFuture = new CompletableFuture<>();
+        try {
+            RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUME_MESSAGE_DIRECTLY, header);
+            request.setBody(RemotingConverter.getInstance().convertMsgToBytes(messageExt));
+
+            return this.remotingProxyOutClient.invokeToClient(this.parent(), request, DEFAULT_MQ_CLIENT_TIMEOUT)
+                .thenApply(response -> {
+                    if (response.getCode() == ResponseCode.SUCCESS) {
+                        ConsumeMessageDirectlyResult result = ConsumeMessageDirectlyResult.decode(response.getBody(), ConsumeMessageDirectlyResult.class);
+                        responseFuture.complete(new ProxyRelayResult<>(ResponseCode.SUCCESS, "", result));
+                        return null;
+                    }
+                    String errMsg = String.format("consume message directly failed, code:%s remark:%s", response.getCode(), response.getRemark());
+                    RuntimeException e = new RuntimeException(errMsg);
+                    responseFuture.completeExceptionally(e);
+                    throw e;
+                });
+        } catch (Throwable t) {
+            responseFuture.completeExceptionally(t);
+            writeFuture.completeExceptionally(t);
+        }
+        return writeFuture;
+    }
+
+    public String getClientId() {
+        return clientId;
+    }
+
+    @Override
+    public String getChannelExtendAttribute() {
+        if (this.subscriptionData == null) {
+            return null;
+        }
+        return JSON.toJSONString(this.subscriptionData);
+    }
+
+    public static Set<SubscriptionData> parseChannelExtendAttribute(Channel channel) {
+        if (ChannelHelper.getChannelProtocolType(channel).equals(ChannelProtocolType.REMOTING) &&
+            channel instanceof ChannelExtendAttributeGetter) {
+            String attr = ((ChannelExtendAttributeGetter) channel).getChannelExtendAttribute();
+            if (attr == null) {
+                return null;
+            }
+
+            try {
+                return JSON.parseObject(attr, new TypeReference<Set<SubscriptionData>>() {
+                });
+            } catch (Exception e) {
+                log.error("convert remoting extend attribute to subscriptionDataSet failed. data:{}", attr, e);
+                return null;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public RemoteChannel toRemoteChannel() {
+        return new RemoteChannel(
+            ConfigurationManager.getProxyConfig().getLocalServeAddr(),
+            this.getRemoteAddress(),
+            this.getLocalAddress(),
+            ChannelProtocolType.REMOTING,
+            this.getChannelExtendAttribute());
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java
new file mode 100644
index 000000000..bdc6457e7
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.channel;
+
+import io.netty.channel.Channel;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.proxy.common.StartAndShutdown;
+import org.apache.rocketmq.proxy.remoting.RemotingProxyOutClient;
+import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RemotingChannelManager implements StartAndShutdown {
+    protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+    private final ProxyRelayService proxyRelayService;
+    protected final ConcurrentMap<String /* group */, Map<Channel /* raw channel */, RemotingChannel>> groupChannelMap = new ConcurrentHashMap<>();
+
+    private final RemotingProxyOutClient remotingProxyOutClient;
+
+    public RemotingChannelManager(RemotingProxyOutClient remotingProxyOutClient, ProxyRelayService proxyRelayService) {
+        this.remotingProxyOutClient = remotingProxyOutClient;
+        this.proxyRelayService = proxyRelayService;
+    }
+
+    protected String buildProducerKey(String group) {
+        return buildKey("p", group);
+    }
+
+    protected String buildConsumerKey(String group) {
+        return buildKey("c", group);
+    }
+
+    protected String buildKey(String prefix, String group) {
+        return prefix + group;
+    }
+
+    protected String getGroupFromKey(String key) {
+        return key.substring(1);
+    }
+
+    public RemotingChannel createProducerChannel(Channel channel, String group, String clientId) {
+        return createChannel(channel, buildProducerKey(group), clientId, Collections.emptySet());
+    }
+
+    public RemotingChannel createConsumerChannel(Channel channel, String group, String clientId, Set<SubscriptionData> subscriptionData) {
+        return createChannel(channel, buildConsumerKey(group), clientId, subscriptionData);
+    }
+
+    protected RemotingChannel createChannel(Channel channel, String group, String clientId, Set<SubscriptionData> subscriptionData) {
+        this.groupChannelMap.compute(group, (groupKey, clientIdMap) -> {
+            if (clientIdMap == null) {
+                clientIdMap = new ConcurrentHashMap<>();
+            }
+            clientIdMap.computeIfAbsent(channel, clientIdKey -> new RemotingChannel(remotingProxyOutClient, proxyRelayService, channel, clientId, subscriptionData));
+            return clientIdMap;
+        });
+        return getChannel(group, channel);
+    }
+
+    public RemotingChannel getConsumerChannel(String group, Channel channel) {
+        return getChannel(buildConsumerKey(group), channel);
+    }
+
+    public RemotingChannel getProducerChannel(String group, Channel channel) {
+        return getChannel(buildProducerKey(group), channel);
+    }
+
+    protected RemotingChannel getChannel(String group, Channel channel) {
+        Map<Channel, RemotingChannel> clientIdChannelMap = this.groupChannelMap.get(group);
+        if (clientIdChannelMap == null) {
+            return null;
+        }
+        return clientIdChannelMap.get(channel);
+    }
+
+    public Set<RemotingChannel> removeChannel(Channel channel) {
+        Set<RemotingChannel> removedChannelSet = new HashSet<>();
+        for (Map.Entry<String, Map<Channel /* raw channel */, RemotingChannel>> entry : groupChannelMap.entrySet()) {
+            Map<Channel /* raw channel */, RemotingChannel> channelMap = entry.getValue();
+
+            RemotingChannel remotingChannel = channelMap.remove(channel);
+            if (remotingChannel != null) {
+                removedChannelSet.add(remotingChannel);
+            }
+        }
+        return removedChannelSet;
+    }
+
+    public RemotingChannel removeProducerChannel(String group, Channel channel) {
+        return removeChannel(buildProducerKey(group), channel);
+    }
+
+    public RemotingChannel removeConsumerChannel(String group, Channel channel) {
+        return removeChannel(buildConsumerKey(group), channel);
+    }
+
+    protected RemotingChannel removeChannel(String group, Channel channel) {
+        AtomicReference<RemotingChannel> channelRef = new AtomicReference<>();
+
+        this.groupChannelMap.computeIfPresent(group, (groupKey, channelMap) -> {
+            channelRef.set(channelMap.remove(channel));
+            if (channelMap.isEmpty()) {
+                return null;
+            }
+            return channelMap;
+        });
+        return channelRef.get();
+    }
+
+    @Override
+    public void shutdown() throws Exception {
+
+    }
+
+    @Override
+    public void start() throws Exception {
+
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/common/RemotingConverter.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/common/RemotingConverter.java
new file mode 100644
index 000000000..55af1ff19
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/common/RemotingConverter.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.common;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+public class RemotingConverter {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+
+    protected static final Object INSTANCE_CREATE_LOCK = new Object();
+    protected static volatile RemotingConverter instance;
+
+    public static RemotingConverter getInstance() {
+        if (instance == null) {
+            synchronized (INSTANCE_CREATE_LOCK) {
+                if (instance == null) {
+                    instance = new RemotingConverter();
+                }
+            }
+        }
+        return instance;
+    }
+
+    public byte[] convertMsgToBytes(List<MessageExt> msgList) {
+        // set response body
+        byte[][] msgBufferList = new byte[msgList.size()][];
+        int bodyTotalSize = 0;
+        for (int i = 0; i < msgList.size(); i++) {
+            try {
+                msgBufferList[i] = convertMsgToBytes(msgList.get(i));
+                bodyTotalSize += msgBufferList[i].length;
+            } catch (Exception e) {
+                log.error("messageToByteBuffer UnsupportedEncodingException", e);
+            }
+        }
+
+        ByteBuffer body = ByteBuffer.allocate(bodyTotalSize);
+        for (byte[] bb : msgBufferList) {
+            body.put(bb);
+        }
+
+        return body.array();
+    }
+
+    public byte[] convertMsgToBytes(final MessageExt msg) throws Exception {
+        // change to 0 for recalculate storeSize
+        msg.setStoreSize(0);
+        if (msg.getTopic().length() > Byte.MAX_VALUE) {
+            log.warn("Topic length is too long, topic: {}", msg.getTopic());
+        }
+        return MessageDecoder.encode(msg, false);
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
index ac1ff6a88..24b27aaa2 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
@@ -32,6 +32,9 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.proxy.common.AbstractStartAndShutdown;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
+import org.apache.rocketmq.proxy.service.admin.AdminService;
+import org.apache.rocketmq.proxy.service.admin.DefaultAdminService;
+import org.apache.rocketmq.proxy.service.client.ClusterConsumerManager;
 import org.apache.rocketmq.proxy.service.message.ClusterMessageService;
 import org.apache.rocketmq.proxy.service.message.MessageService;
 import org.apache.rocketmq.proxy.service.metadata.ClusterMetadataService;
@@ -52,11 +55,12 @@ public class ClusterServiceManager extends AbstractStartAndShutdown implements S
 
     protected ClusterTransactionService clusterTransactionService;
     protected ProducerManager producerManager;
-    protected ConsumerManager consumerManager;
+    protected ClusterConsumerManager consumerManager;
     protected TopicRouteService topicRouteService;
     protected MessageService messageService;
     protected ProxyRelayService proxyRelayService;
     protected ClusterMetadataService metadataService;
+    protected AdminService adminService;
 
     protected ScheduledExecutorService scheduledExecutorService;
     protected MQClientAPIFactory messagingClientAPIFactory;
@@ -67,7 +71,7 @@ public class ClusterServiceManager extends AbstractStartAndShutdown implements S
         ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
         this.scheduledExecutorService = Executors.newScheduledThreadPool(3);
         this.producerManager = new ProducerManager();
-        this.consumerManager = new ConsumerManager(new ConsumerIdsChangeListenerImpl(), proxyConfig.getChannelExpiredTimeout());
+        this.consumerManager = new ClusterConsumerManager(this.topicRouteService, this.adminService, this.operationClientAPIFactory, new ConsumerIdsChangeListenerImpl(), proxyConfig.getChannelExpiredTimeout());
 
         this.messagingClientAPIFactory = new MQClientAPIFactory(
             "ClusterMQClient_",
@@ -76,7 +80,7 @@ public class ClusterServiceManager extends AbstractStartAndShutdown implements S
             rpcHook,
             scheduledExecutorService);
         this.operationClientAPIFactory = new MQClientAPIFactory(
-            "TopicRouteServiceClient_",
+            "OperationClient_",
             1,
             new DoNothingClientRemotingProcessor(null),
             rpcHook,
@@ -95,6 +99,7 @@ public class ClusterServiceManager extends AbstractStartAndShutdown implements S
             this.transactionClientAPIFactory);
         this.proxyRelayService = new ClusterProxyRelayService(this.clusterTransactionService);
         this.metadataService = new ClusterMetadataService(topicRouteService, operationClientAPIFactory);
+        this.adminService = new DefaultAdminService(this.operationClientAPIFactory);
 
         this.init();
     }
@@ -118,6 +123,7 @@ public class ClusterServiceManager extends AbstractStartAndShutdown implements S
         this.appendStartAndShutdown(this.topicRouteService);
         this.appendStartAndShutdown(this.clusterTransactionService);
         this.appendStartAndShutdown(this.metadataService);
+        this.appendStartAndShutdown(this.consumerManager);
     }
 
     @Override
@@ -155,6 +161,11 @@ public class ClusterServiceManager extends AbstractStartAndShutdown implements S
         return this.metadataService;
     }
 
+    @Override
+    public AdminService getAdminService() {
+        return this.adminService;
+    }
+
     protected static class ConsumerIdsChangeListenerImpl implements ConsumerIdsChangeListener {
 
         @Override
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java
index 6afc86c57..4f829caa6 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java
@@ -25,6 +25,8 @@ import org.apache.rocketmq.broker.client.ProducerManager;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.proxy.common.AbstractStartAndShutdown;
 import org.apache.rocketmq.proxy.common.StartAndShutdown;
+import org.apache.rocketmq.proxy.service.admin.AdminService;
+import org.apache.rocketmq.proxy.service.admin.DefaultAdminService;
 import org.apache.rocketmq.proxy.service.channel.ChannelManager;
 import org.apache.rocketmq.proxy.service.message.LocalMessageService;
 import org.apache.rocketmq.proxy.service.message.MessageService;
@@ -48,6 +50,7 @@ public class LocalServiceManager extends AbstractStartAndShutdown implements Ser
     private final TransactionService transactionService;
     private final ProxyRelayService proxyRelayService;
     private final MetadataService metadataService;
+    private final AdminService adminService;
 
     private final MQClientAPIFactory mqClientAPIFactory;
     private final ChannelManager channelManager;
@@ -60,7 +63,7 @@ public class LocalServiceManager extends AbstractStartAndShutdown implements Ser
         this.channelManager = new ChannelManager();
         this.messageService = new LocalMessageService(brokerController, channelManager, rpcHook);
         this.mqClientAPIFactory = new MQClientAPIFactory(
-            "TopicRouteServiceClient_",
+            "LocalMQClient_",
             1,
             new DoNothingClientRemotingProcessor(null),
             rpcHook,
@@ -70,6 +73,7 @@ public class LocalServiceManager extends AbstractStartAndShutdown implements Ser
         this.transactionService = new LocalTransactionService(brokerController.getBrokerConfig());
         this.proxyRelayService = new LocalProxyRelayService(brokerController, this.transactionService);
         this.metadataService = new LocalMetadataService(brokerController);
+        this.adminService = new DefaultAdminService(this.mqClientAPIFactory);
         this.init();
     }
 
@@ -114,6 +118,11 @@ public class LocalServiceManager extends AbstractStartAndShutdown implements Ser
         return this.metadataService;
     }
 
+    @Override
+    public AdminService getAdminService() {
+        return this.adminService;
+    }
+
     private class LocalServiceManagerStartAndShutdown implements StartAndShutdown {
         @Override
         public void start() throws Exception {
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManager.java
index 563b56715..ce84832ca 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManager.java
@@ -16,9 +16,10 @@
  */
 package org.apache.rocketmq.proxy.service;
 
-import org.apache.rocketmq.broker.client.ConsumerManager;
-import org.apache.rocketmq.broker.client.ProducerManager;
+import org.apache.rocketmq.broker.client.ConsumerManagerInterface;
+import org.apache.rocketmq.broker.client.ProducerManagerInterface;
 import org.apache.rocketmq.proxy.common.StartAndShutdown;
+import org.apache.rocketmq.proxy.service.admin.AdminService;
 import org.apache.rocketmq.proxy.service.message.MessageService;
 import org.apache.rocketmq.proxy.service.metadata.MetadataService;
 import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
@@ -30,13 +31,15 @@ public interface ServiceManager extends StartAndShutdown {
 
     TopicRouteService getTopicRouteService();
 
-    ProducerManager getProducerManager();
+    ProducerManagerInterface getProducerManager();
 
-    ConsumerManager getConsumerManager();
+    ConsumerManagerInterface getConsumerManager();
 
     TransactionService getTransactionService();
 
     ProxyRelayService getProxyRelayService();
 
     MetadataService getMetadataService();
+
+    AdminService getAdminService();
 }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/AdminService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/AdminService.java
new file mode 100644
index 000000000..d98d17ff7
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/AdminService.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.admin;
+
+import java.util.List;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+
+public interface AdminService {
+
+    boolean topicExist(String topic);
+
+    boolean createTopicOnTopicBrokerIfNotExist(String createTopic, String sampleTopic, int wQueueNum,
+        int rQueueNum, boolean examineTopic, int retryCheckCount);
+
+    boolean createTopicOnBroker(String topic, int wQueueNum, int rQueueNum, List<BrokerData> curBrokerDataList,
+        List<BrokerData> sampleBrokerDataList, boolean examineTopic, int retryCheckCount) throws Exception;
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminService.java
new file mode 100644
index 000000000..4f3b407d6
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminService.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.admin;
+
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIExt;
+import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory;
+import org.apache.rocketmq.proxy.service.route.TopicRouteHelper;
+
+public class DefaultAdminService implements AdminService {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+    private final MQClientAPIFactory mqClientAPIFactory;
+
+    public DefaultAdminService(MQClientAPIFactory mqClientAPIFactory) {
+        this.mqClientAPIFactory = mqClientAPIFactory;
+    }
+
+    @Override
+    public boolean topicExist(String topic) {
+        boolean topicExist;
+        TopicRouteData topicRouteData;
+        try {
+            topicRouteData = this.getTopicRouteDataDirectlyFromNameServer(topic);
+            topicExist = topicRouteData != null;
+        } catch (Throwable e) {
+            topicExist = false;
+        }
+
+        return topicExist;
+    }
+
+    @Override
+    public boolean createTopicOnTopicBrokerIfNotExist(String createTopic, String sampleTopic, int wQueueNum,
+        int rQueueNum, boolean examineTopic, int retryCheckCount) {
+        TopicRouteData curTopicRouteData = new TopicRouteData();
+        try {
+            curTopicRouteData = this.getTopicRouteDataDirectlyFromNameServer(createTopic);
+        } catch (Exception e) {
+            if (!TopicRouteHelper.isTopicNotExistError(e)) {
+                log.error("get cur topic route {} failed.", createTopic, e);
+                return false;
+            }
+        }
+
+        TopicRouteData sampleTopicRouteData = null;
+        try {
+            sampleTopicRouteData = this.getTopicRouteDataDirectlyFromNameServer(sampleTopic);
+        } catch (Exception e) {
+            log.error("create topic {} failed.", createTopic, e);
+            return false;
+        }
+
+        if (sampleTopicRouteData == null || sampleTopicRouteData.getBrokerDatas().isEmpty()) {
+            return false;
+        }
+
+        try {
+            return this.createTopicOnBroker(createTopic, wQueueNum, rQueueNum, curTopicRouteData.getBrokerDatas(),
+                sampleTopicRouteData.getBrokerDatas(), examineTopic, retryCheckCount);
+        } catch (Exception e) {
+            log.error("create topic {} failed.", createTopic, e);
+        }
+        return false;
+    }
+
+    @Override
+    public boolean createTopicOnBroker(String topic, int wQueueNum, int rQueueNum, List<BrokerData> curBrokerDataList,
+        List<BrokerData> sampleBrokerDataList, boolean examineTopic, int retryCheckCount) throws Exception {
+        Set<String> curBrokerAddr = new HashSet<>();
+        if (curBrokerDataList != null) {
+            for (BrokerData brokerData : curBrokerDataList) {
+                curBrokerAddr.add(brokerData.getBrokerAddrs().get(MixAll.MASTER_ID));
+            }
+        }
+
+        TopicConfig topicConfig = new TopicConfig();
+        topicConfig.setTopicName(topic);
+        topicConfig.setWriteQueueNums(wQueueNum);
+        topicConfig.setReadQueueNums(rQueueNum);
+        topicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
+
+        for (BrokerData brokerData : sampleBrokerDataList) {
+            String addr = brokerData.getBrokerAddrs() == null ? null : brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
+            if (addr == null) {
+                continue;
+            }
+            if (curBrokerAddr.contains(addr)) {
+                continue;
+            }
+
+            this.getClient().createTopic(addr, TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC, topicConfig, Duration.ofSeconds(3).toMillis());
+        }
+
+        if (examineTopic) {
+            // examine topic exist.
+            int count = retryCheckCount;
+            while (count-- > 0) {
+                if (this.topicExist(topic)) {
+                    return true;
+                }
+            }
+        } else {
+            return true;
+        }
+        return false;
+    }
+
+    protected TopicRouteData getTopicRouteDataDirectlyFromNameServer(String topic) throws Exception {
+        return this.getClient().getTopicRouteInfoFromNameServer(topic, Duration.ofSeconds(3).toMillis());
+    }
+
+    protected MQClientAPIExt getClient() {
+        return this.mqClientAPIFactory.getClient();
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/SimpleChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/SimpleChannel.java
index 04ad5e269..65c1fd406 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/SimpleChannel.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/SimpleChannel.java
@@ -196,6 +196,14 @@ public class SimpleChannel extends AbstractChannel {
 
     }
 
+    public String getRemoteAddress() {
+        return remoteAddress;
+    }
+
+    public String getLocalAddress() {
+        return localAddress;
+    }
+
     public ChannelHandlerContext getChannelHandlerContext() {
         return channelHandlerContext;
     }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java
new file mode 100644
index 000000000..3bb65b03e
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.client;
+
+import java.util.Set;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
+import org.apache.rocketmq.broker.client.ConsumerManager;
+import org.apache.rocketmq.broker.client.ConsumerManagerInterface;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.proxy.common.StartAndShutdown;
+import org.apache.rocketmq.proxy.service.admin.AdminService;
+import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory;
+import org.apache.rocketmq.proxy.service.route.TopicRouteService;
+import org.apache.rocketmq.proxy.service.sysmessage.HeartbeatSyncer;
+
+public class ClusterConsumerManager extends ConsumerManager implements ConsumerManagerInterface, StartAndShutdown {
+
+    protected HeartbeatSyncer heartbeatSyncer;
+
+    public ClusterConsumerManager(TopicRouteService topicRouteService, AdminService adminService,
+        MQClientAPIFactory mqClientAPIFactory, ConsumerIdsChangeListener consumerIdsChangeListener, long channelExpiredTimeout) {
+        super(consumerIdsChangeListener, channelExpiredTimeout);
+        this.heartbeatSyncer = new HeartbeatSyncer(topicRouteService, adminService, this, mqClientAPIFactory);
+    }
+
+    @Override
+    public boolean registerConsumer(String group, ClientChannelInfo clientChannelInfo,
+        ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
+        Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable, boolean updateSubscription) {
+        this.heartbeatSyncer.onConsumerRegister(group, clientChannelInfo, consumeType, messageModel, consumeFromWhere, subList);
+        return super.registerConsumer(group, clientChannelInfo, consumeType, messageModel, consumeFromWhere, subList,
+            isNotifyConsumerIdsChangedEnable, updateSubscription);
+    }
+
+    @Override
+    public void unregisterConsumer(String group, ClientChannelInfo clientChannelInfo,
+        boolean isNotifyConsumerIdsChangedEnable) {
+        this.heartbeatSyncer.onConsumerUnRegister(group, clientChannelInfo);
+        super.unregisterConsumer(group, clientChannelInfo, isNotifyConsumerIdsChangedEnable);
+    }
+
+    @Override
+    public void shutdown() throws Exception {
+        this.heartbeatSyncer.shutdown();
+    }
+
+    @Override
+    public void start() throws Exception {
+        this.heartbeatSyncer.start();
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java
new file mode 100644
index 000000000..72593525e
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.sysmessage;
+
+import com.alibaba.fastjson.JSON;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.proxy.common.ProxyException;
+import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
+import org.apache.rocketmq.proxy.common.StartAndShutdown;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.config.ProxyConfig;
+import org.apache.rocketmq.proxy.service.admin.AdminService;
+import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory;
+import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
+import org.apache.rocketmq.proxy.service.route.TopicRouteService;
+import org.apache.rocketmq.remoting.RPCHook;
+
+public abstract class AbstractSystemMessageSyncer implements StartAndShutdown, MessageListenerConcurrently {
+    protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+    protected final TopicRouteService topicRouteService;
+    protected final AdminService adminService;
+    protected final String systemResourceName;
+    protected final MQClientAPIFactory mqClientAPIFactory;
+    protected DefaultMQPushConsumer defaultMQPushConsumer;
+
+    public AbstractSystemMessageSyncer(TopicRouteService topicRouteService, AdminService adminService, MQClientAPIFactory mqClientAPIFactory) {
+        this.topicRouteService = topicRouteService;
+        this.adminService = adminService;
+        this.mqClientAPIFactory = mqClientAPIFactory;
+
+        this.systemResourceName = this.getSystemResourceName();
+    }
+
+    protected String getSystemResourceName() {
+        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+        return TopicValidator.SYSTEM_TOPIC_PREFIX + "proxy_" + this.getClass().getSimpleName() + "_" + proxyConfig.getProxyClusterName();
+    }
+
+    protected String getSystemMessageProducerId() {
+        return "PID_" + this.systemResourceName;
+    }
+
+    protected String getSystemMessageConsumerId() {
+        return "CID_" + this.systemResourceName;
+    }
+
+    protected String getBroadcastTopicName() {
+        return this.systemResourceName;
+    }
+
+    protected String getSubTag() {
+        return "*";
+    }
+
+    protected String getBroadcastTopicClusterName() {
+        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+        return proxyConfig.getSystemTopicClusterName();
+    }
+
+    protected int getBroadcastTopicQueueNum() {
+        return 1;
+    }
+
+    protected RPCHook getRpcHook() {
+        return null;
+    }
+
+    protected void sendSystemMessage(Object data) {
+        String targetTopic = this.getBroadcastTopicName();
+        try {
+            Message message = new Message(
+                targetTopic,
+                JSON.toJSONString(data).getBytes(StandardCharsets.UTF_8)
+            );
+
+            AddressableMessageQueue messageQueue = this.topicRouteService.getAllMessageQueueView(targetTopic)
+                .getWriteSelector().selectOne(true);
+            this.mqClientAPIFactory.getClient().sendMessageAsync(
+                messageQueue.getBrokerAddr(),
+                messageQueue.getBrokerName(),
+                message,
+                buildSendMessageRequestHeader(message, this.getSystemMessageProducerId(), messageQueue.getQueueId()),
+                Duration.ofSeconds(3).toMillis()
+            ).whenCompleteAsync((result, throwable) -> {
+                if (throwable != null) {
+                    log.error("send system message failed. data: {}, topic: {}", data, getBroadcastTopicName(), throwable);
+                    return;
+                }
+                if (SendStatus.SEND_OK != result.getSendStatus()) {
+                    log.error("send system message failed. data: {}, topic: {}, sendResult:{}", data, getBroadcastTopicName(), result);
+                }
+            });
+        } catch (Throwable t) {
+            log.error("send system message failed. data: {}, topic: {}", data, targetTopic, t);
+        }
+    }
+
+    protected SendMessageRequestHeader buildSendMessageRequestHeader(Message message,
+        String producerGroup, int queueId) {
+        SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
+
+        requestHeader.setProducerGroup(producerGroup);
+        requestHeader.setTopic(message.getTopic());
+        requestHeader.setDefaultTopic(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC);
+        requestHeader.setDefaultTopicQueueNums(0);
+        requestHeader.setQueueId(queueId);
+        requestHeader.setSysFlag(0);
+        requestHeader.setBornTimestamp(System.currentTimeMillis());
+        requestHeader.setFlag(message.getFlag());
+        requestHeader.setProperties(MessageDecoder.messageProperties2String(message.getProperties()));
+        requestHeader.setReconsumeTimes(0);
+        requestHeader.setBatch(false);
+        return requestHeader;
+    }
+
+    @Override
+    public void start() throws Exception {
+        this.createSysTopic();
+        RPCHook rpcHook = this.getRpcHook();
+        this.defaultMQPushConsumer = new DefaultMQPushConsumer(null, this.getSystemMessageConsumerId(), rpcHook);
+
+        this.defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+        this.defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
+        try {
+            this.defaultMQPushConsumer.subscribe(this.getBroadcastTopicName(), this.getSubTag());
+        } catch (MQClientException e) {
+            throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "subscribe to broadcast topic " + this.getBroadcastTopicName() + " failed. " + e.getMessage());
+        }
+        this.defaultMQPushConsumer.registerMessageListener(this);
+        this.defaultMQPushConsumer.start();
+    }
+
+    protected void createSysTopic() {
+        if (this.adminService.topicExist(this.getBroadcastTopicName())) {
+            return;
+        }
+
+        String clusterName = this.getBroadcastTopicClusterName();
+        if (StringUtils.isEmpty(clusterName)) {
+            throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "system topic cluster cannot be empty");
+        }
+
+        boolean createSuccess = this.adminService.createTopicOnTopicBrokerIfNotExist(
+            this.getBroadcastTopicName(),
+            clusterName,
+            this.getBroadcastTopicQueueNum(),
+            this.getBroadcastTopicQueueNum(),
+            true,
+            3
+        );
+        if (!createSuccess) {
+            throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "create system broadcast topic " + this.getBroadcastTopicName() + " failed on cluster " + clusterName);
+        }
+    }
+
+    @Override
+    public void shutdown() throws Exception {
+        this.defaultMQPushConsumer.shutdown();
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
new file mode 100644
index 000000000..ce3403766
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.sysmessage;
+
+import com.alibaba.fastjson.JSON;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
+import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
+import org.apache.rocketmq.broker.client.ConsumerManagerInterface;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
+import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.config.ProxyConfig;
+import org.apache.rocketmq.proxy.processor.channel.RemoteChannel;
+import org.apache.rocketmq.proxy.service.admin.AdminService;
+import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory;
+import org.apache.rocketmq.proxy.service.route.TopicRouteService;
+
+public class HeartbeatSyncer extends AbstractSystemMessageSyncer {
+
+    protected ThreadPoolExecutor threadPoolExecutor;
+    protected ConsumerManagerInterface consumerManager;
+    protected final Map<String /* channelId as longText */, RemoteChannel> remoteChannelMap = new ConcurrentHashMap<>();
+
+    public HeartbeatSyncer(TopicRouteService topicRouteService, AdminService adminService,
+        ConsumerManagerInterface consumerManager, MQClientAPIFactory mqClientAPIFactory) {
+        super(topicRouteService, adminService, mqClientAPIFactory);
+        this.consumerManager = consumerManager;
+        this.init();
+    }
+
+    protected void init() {
+        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+        this.threadPoolExecutor = ThreadPoolMonitor.createAndMonitor(
+            proxyConfig.getHeartbeatSyncerThreadPoolNums(),
+            proxyConfig.getHeartbeatSyncerThreadPoolNums(),
+            1,
+            TimeUnit.MINUTES,
+            "HeartbeatSyncer",
+            proxyConfig.getHeartbeatSyncerThreadPoolQueueCapacity()
+        );
+        this.consumerManager.appendConsumerIdsChangeListener(new ConsumerIdsChangeListener() {
+            @Override
+            public void handle(ConsumerGroupEvent event, String s, Object... args) {
+                if (event == ConsumerGroupEvent.CLIENT_UNREGISTER) {
+                    if (args == null || args.length < 1) {
+                        return;
+                    }
+                    if (args[0] instanceof ClientChannelInfo) {
+                        ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0];
+                        remoteChannelMap.remove(clientChannelInfo.getChannel().id().asLongText());
+                    }
+                }
+            }
+
+            @Override
+            public void shutdown() {
+
+            }
+        });
+    }
+
+    @Override
+    public void shutdown() throws Exception {
+        this.threadPoolExecutor.shutdown();
+        super.shutdown();
+    }
+
+    public void onConsumerRegister(String consumerGroup, ClientChannelInfo clientChannelInfo,
+        ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
+        Set<SubscriptionData> subList) {
+        if (clientChannelInfo == null || ChannelHelper.isRemote(clientChannelInfo.getChannel())) {
+            return;
+        }
+        try {
+            this.threadPoolExecutor.submit(() -> {
+                try {
+                    ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+                    RemoteChannel remoteChannel = RemoteChannel.create(clientChannelInfo.getChannel());
+                    if (remoteChannel == null) {
+                        return;
+                    }
+                    HeartbeatSyncerData data = new HeartbeatSyncerData(
+                        HeartbeatType.REGISTER,
+                        clientChannelInfo.getClientId(),
+                        clientChannelInfo.getLanguage(),
+                        clientChannelInfo.getVersion(),
+                        consumerGroup,
+                        consumeType,
+                        messageModel,
+                        consumeFromWhere,
+                        proxyConfig.getLocalServeAddr(),
+                        remoteChannel.encode(),
+                        remoteChannel.getChannelExtendAttribute()
+                    );
+                    data.setSubscriptionDataSet(subList);
+
+                    this.sendSystemMessage(data);
+                } catch (Throwable t) {
+                    log.error("heartbeat register broadcast failed. group:{}, clientChannelInfo:{}, consumeType:{}, messageModel:{}, consumeFromWhere:{}, subList:{}",
+                        consumerGroup, clientChannelInfo, consumeType, messageModel, consumeFromWhere, subList, t);
+                }
+            });
+        } catch (Throwable t) {
+            log.error("heartbeat submit register broadcast failed. group:{}, clientChannelInfo:{}, consumeType:{}, messageModel:{}, consumeFromWhere:{}, subList:{}",
+                consumerGroup, clientChannelInfo, consumeType, messageModel, consumeFromWhere, subList, t);
+        }
+    }
+
+    public void onConsumerUnRegister(String consumerGroup, ClientChannelInfo clientChannelInfo) {
+        if (clientChannelInfo == null || ChannelHelper.isRemote(clientChannelInfo.getChannel())) {
+            return;
+        }
+        try {
+            this.threadPoolExecutor.submit(() -> {
+                try {
+                    ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+                    RemoteChannel remoteChannel = RemoteChannel.create(clientChannelInfo.getChannel());
+                    if (remoteChannel == null) {
+                        return;
+                    }
+                    HeartbeatSyncerData data = new HeartbeatSyncerData(
+                        HeartbeatType.UNREGISTER,
+                        clientChannelInfo.getClientId(),
+                        clientChannelInfo.getLanguage(),
+                        clientChannelInfo.getVersion(),
+                        consumerGroup,
+                        null,
+                        null,
+                        null,
+                        proxyConfig.getLocalServeAddr(),
+                        remoteChannel.encode(),
+                        remoteChannel.getChannelExtendAttribute()
+                    );
+
+                    this.sendSystemMessage(data);
+                } catch (Throwable t) {
+                    log.error("heartbeat unregister broadcast failed. group:{}, clientChannelInfo:{}, consumeType:{}",
+                        consumerGroup, clientChannelInfo, t);
+                }
+            });
+        } catch (Throwable t) {
+            log.error("heartbeat submit unregister broadcast failed. group:{}, clientChannelInfo:{}, consumeType:{}",
+                consumerGroup, clientChannelInfo, t);
+        }
+    }
+
+    @Override
+    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
+        if (msgs == null || msgs.isEmpty()) {
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        }
+
+        for (MessageExt msg : msgs) {
+            try {
+                HeartbeatSyncerData data = JSON.parseObject(new String(msg.getBody(), StandardCharsets.UTF_8), HeartbeatSyncerData.class);
+                if (data.getConnectProxyIp().equals(ConfigurationManager.getProxyConfig().getLocalServeAddr())) {
+                    continue;
+                }
+
+                RemoteChannel channel = RemoteChannel.decode(data.getChannelData());
+                RemoteChannel finalChannel = channel;
+                channel = remoteChannelMap.computeIfAbsent(channel.id().asLongText(), key -> finalChannel);
+                channel.setExtendAttribute(data.getChannelExtendAttribute());
+                ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
+                    channel,
+                    data.getClientId(),
+                    data.getLanguage(),
+                    data.getVersion()
+                );
+                if (data.getHeartbeatType().equals(HeartbeatType.REGISTER)) {
+                    this.consumerManager.registerConsumer(
+                        data.getGroup(),
+                        clientChannelInfo,
+                        data.getConsumeType(),
+                        data.getMessageModel(),
+                        data.getConsumeFromWhere(),
+                        data.getSubscriptionDataSet(),
+                        false
+                    );
+                } else {
+                    this.consumerManager.unregisterConsumer(
+                        data.getGroup(),
+                        clientChannelInfo,
+                        false
+                    );
+                }
+            } catch (Throwable t) {
+                log.error("heartbeat consume message failed. msg:{}, data:{}", msg, new String(msg.getBody(), StandardCharsets.UTF_8), t);
+            }
+        }
+
+        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java
new file mode 100644
index 000000000..20fee7aac
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.sysmessage;
+
+import java.util.Set;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+
+public class HeartbeatSyncerData {
+    private HeartbeatType heartbeatType;
+    private String clientId;
+    private LanguageCode language;
+    private int version;
+    private long lastUpdateTimestamp = System.currentTimeMillis();
+    private Set<SubscriptionData> subscriptionDataSet;
+    private String group;
+    private ConsumeType consumeType;
+    private MessageModel messageModel;
+    private ConsumeFromWhere consumeFromWhere;
+    private String connectProxyIp;
+    private String channelData;
+    private String channelExtendAttribute;
+
+    public HeartbeatSyncerData(HeartbeatType heartbeatType, String clientId,
+        LanguageCode language, int version, String group,
+        ConsumeType consumeType, MessageModel messageModel,
+        ConsumeFromWhere consumeFromWhere, String connectProxyIp,
+        String channelData, String channelExtendAttribute) {
+        this.heartbeatType = heartbeatType;
+        this.clientId = clientId;
+        this.language = language;
+        this.version = version;
+        this.group = group;
+        this.consumeType = consumeType;
+        this.messageModel = messageModel;
+        this.consumeFromWhere = consumeFromWhere;
+        this.connectProxyIp = connectProxyIp;
+        this.channelData = channelData;
+        this.channelExtendAttribute = channelExtendAttribute;
+    }
+
+    public HeartbeatType getHeartbeatType() {
+        return heartbeatType;
+    }
+
+    public void setHeartbeatType(HeartbeatType heartbeatType) {
+        this.heartbeatType = heartbeatType;
+    }
+
+    public String getClientId() {
+        return clientId;
+    }
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+    public LanguageCode getLanguage() {
+        return language;
+    }
+
+    public void setLanguage(LanguageCode language) {
+        this.language = language;
+    }
+
+    public int getVersion() {
+        return version;
+    }
+
+    public void setVersion(int version) {
+        this.version = version;
+    }
+
+    public long getLastUpdateTimestamp() {
+        return lastUpdateTimestamp;
+    }
+
+    public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
+        this.lastUpdateTimestamp = lastUpdateTimestamp;
+    }
+
+    public Set<SubscriptionData> getSubscriptionDataSet() {
+        return subscriptionDataSet;
+    }
+
+    public void setSubscriptionDataSet(
+        Set<SubscriptionData> subscriptionDataSet) {
+        this.subscriptionDataSet = subscriptionDataSet;
+    }
+
+    public String getGroup() {
+        return group;
+    }
+
+    public void setGroup(String group) {
+        this.group = group;
+    }
+
+    public ConsumeType getConsumeType() {
+        return consumeType;
+    }
+
+    public void setConsumeType(ConsumeType consumeType) {
+        this.consumeType = consumeType;
+    }
+
+    public MessageModel getMessageModel() {
+        return messageModel;
+    }
+
+    public void setMessageModel(MessageModel messageModel) {
+        this.messageModel = messageModel;
+    }
+
+    public ConsumeFromWhere getConsumeFromWhere() {
+        return consumeFromWhere;
+    }
+
+    public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
+        this.consumeFromWhere = consumeFromWhere;
+    }
+
+    public String getConnectProxyIp() {
+        return connectProxyIp;
+    }
+
+    public void setConnectProxyIp(String connectProxyIp) {
+        this.connectProxyIp = connectProxyIp;
+    }
+
+    public String getChannelData() {
+        return channelData;
+    }
+
+    public void setChannelData(String channelData) {
+        this.channelData = channelData;
+    }
+
+    public String getChannelExtendAttribute() {
+        return channelExtendAttribute;
+    }
+
+    public void setChannelExtendAttribute(String channelExtendAttribute) {
+        this.channelExtendAttribute = channelExtendAttribute;
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatType.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatType.java
new file mode 100644
index 000000000..8f0801f54
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatType.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.sysmessage;
+
+public enum HeartbeatType {
+    REGISTER,
+    UNREGISTER;
+}
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java
index 3a50d842f..eb90b9205 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java
@@ -36,6 +36,7 @@ import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.utils.NetworkUtil;
 import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel;
+import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
 import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext;
 import org.apache.rocketmq.proxy.service.relay.ProxyRelayResult;
 import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
@@ -64,6 +65,8 @@ public class ProxyClientRemotingProcessorTest {
     @Mock
     private ProducerManager producerManager;
     @Mock
+    private GrpcClientSettingsManager grpcClientSettingsManager;
+    @Mock
     private ProxyRelayService proxyRelayService;
 
     @Test
@@ -74,7 +77,7 @@ public class ProxyClientRemotingProcessorTest {
                 new TransactionData("brokerName", 0, 0, "id", System.currentTimeMillis(), 3000),
                 proxyRelayResultFuture));
 
-        GrpcClientChannel grpcClientChannel = new GrpcClientChannel(proxyRelayService, null,
+        GrpcClientChannel grpcClientChannel = new GrpcClientChannel(proxyRelayService, grpcClientSettingsManager, null,
             ProxyContext.create().setRemoteAddress("127.0.0.1:8888").setLocalAddress("127.0.0.1:10911"), "clientId");
         when(producerManager.getAvailableChannel(anyString()))
             .thenReturn(grpcClientChannel);