You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/09 11:53:55 UTC

[rocketmq] 14/26: [ISSUE #5485] add test cases for channel management

This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 834287af0c0cf7923cf884da8491329ca71f3a78
Author: kaiyi.lk <ka...@alibaba-inc.com>
AuthorDate: Thu Nov 17 14:20:47 2022 +0800

    [ISSUE #5485] add test cases for channel management
---
 .../grpc/v2/channel/GrpcClientChannelTest.java     |  82 ++++++
 .../proxy/processor/channel/RemoteChannelTest.java |  50 ++++
 .../channel/RemotingChannelManagerTest.java        | 162 +++++++++++
 .../remoting/channel/RemotingChannelTest.java      |  80 ++++++
 .../service/admin/DefaultAdminServiceTest.java     | 103 +++++++
 .../service/sysmessage/HeartbeatSyncerTest.java    | 319 +++++++++++++++++++++
 6 files changed, 796 insertions(+)

diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java
new file mode 100644
index 000000000..70e10bc2b
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.grpc.v2.channel;
+
+import apache.rocketmq.v2.Publishing;
+import apache.rocketmq.v2.Resource;
+import apache.rocketmq.v2.Settings;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest;
+import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
+import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType;
+import org.apache.rocketmq.proxy.processor.channel.RemoteChannel;
+import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel;
+import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class GrpcClientChannelTest extends InitConfigAndLoggerTest {
+
+    @Mock
+    private ProxyRelayService proxyRelayService;
+    @Mock
+    private GrpcClientSettingsManager grpcClientSettingsManager;
+    @Mock
+    private GrpcChannelManager grpcChannelManager;
+
+    private String clientId;
+    private GrpcClientChannel grpcClientChannel;
+
+    @Before
+    public void before() throws Throwable {
+        super.before();
+        this.clientId = RandomStringUtils.randomAlphabetic(10);
+        this.grpcClientChannel = new GrpcClientChannel(proxyRelayService, grpcClientSettingsManager, grpcChannelManager,
+            ProxyContext.create().setRemoteAddress("10.152.39.53:9768").setLocalAddress("11.193.0.1:1210"),
+            this.clientId);
+    }
+
+    @Test
+    public void testChannelExtendAttributeParse() {
+        Settings clientSettings = Settings.newBuilder()
+            .setPublishing(Publishing.newBuilder()
+                .addTopics(Resource.newBuilder()
+                    .setName("topic")
+                    .build())
+                .build())
+            .build();
+        when(grpcClientSettingsManager.getRawClientSettings(eq(clientId))).thenReturn(clientSettings);
+
+        RemoteChannel remoteChannel = this.grpcClientChannel.toRemoteChannel();
+        assertEquals(ChannelProtocolType.GRPC_V2, remoteChannel.getType());
+        assertEquals(clientSettings, GrpcClientChannel.parseChannelExtendAttribute(remoteChannel));
+        assertEquals(clientSettings, GrpcClientChannel.parseChannelExtendAttribute(this.grpcClientChannel));
+        assertNull(GrpcClientChannel.parseChannelExtendAttribute(mock(RemotingChannel.class)));
+    }
+}
\ No newline at end of file
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelTest.java
new file mode 100644
index 000000000..d504fdc5f
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.processor.channel;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class RemoteChannelTest {
+
+    @Test
+    public void testEncodeAndDecode() {
+        String remoteProxyIp = "11.193.0.1";
+        String remoteAddress = "10.152.39.53:9768";
+        String localAddress = "11.193.0.1:1210";
+        ChannelProtocolType type = ChannelProtocolType.REMOTING;
+        String extendAttribute = RandomStringUtils.randomAlphabetic(10);
+        RemoteChannel remoteChannel = new RemoteChannel(remoteProxyIp, remoteAddress, localAddress, type, extendAttribute);
+
+        String encodedData = remoteChannel.encode();
+        assertNotNull(encodedData);
+
+        RemoteChannel decodedChannel = RemoteChannel.decode(encodedData);
+        assertEquals(remoteProxyIp, decodedChannel.remoteProxyIp);
+        assertEquals(remoteAddress, decodedChannel.getRemoteAddress());
+        assertEquals(localAddress, decodedChannel.getLocalAddress());
+        assertEquals(type, decodedChannel.type);
+        assertEquals(extendAttribute, decodedChannel.extendAttribute);
+
+        assertNull(RemoteChannel.decode(""));
+    }
+}
\ No newline at end of file
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManagerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManagerTest.java
new file mode 100644
index 000000000..5a5b441e9
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManagerTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.channel;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelId;
+import java.util.HashSet;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.rocketmq.proxy.remoting.RemotingProxyOutClient;
+import org.apache.rocketmq.proxy.service.channel.SimpleChannel;
+import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(MockitoJUnitRunner.class)
+public class RemotingChannelManagerTest {
+    @Mock
+    private RemotingProxyOutClient remotingProxyOutClient;
+    @Mock
+    private ProxyRelayService proxyRelayService;
+
+    private final String remoteAddress = "10.152.39.53:9768";
+    private final String localAddress = "11.193.0.1:1210";
+    private RemotingChannelManager remotingChannelManager;
+
+    @Before
+    public void before() {
+        this.remotingChannelManager = new RemotingChannelManager(this.remotingProxyOutClient, this.proxyRelayService);
+    }
+
+    @Test
+    public void testCreateChannel() {
+        String group = "group";
+        String clientId = RandomStringUtils.randomAlphabetic(10);
+
+        Channel producerChannel = createMockChannel();
+        RemotingChannel producerRemotingChannel = this.remotingChannelManager.createProducerChannel(producerChannel, group, clientId);
+        assertNotNull(producerRemotingChannel);
+        assertSame(producerRemotingChannel, this.remotingChannelManager.createProducerChannel(producerChannel, group, clientId));
+
+        Channel consumerChannel = createMockChannel();
+        RemotingChannel consumerRemotingChannel = this.remotingChannelManager.createConsumerChannel(consumerChannel, group, clientId, new HashSet<>());
+        assertSame(consumerRemotingChannel, this.remotingChannelManager.createConsumerChannel(consumerChannel, group, clientId, new HashSet<>()));
+        assertNotNull(consumerRemotingChannel);
+
+        assertNotSame(producerRemotingChannel, consumerRemotingChannel);
+    }
+
+    @Test
+    public void testRemoveProducerChannel() {
+        String group = "group";
+        String clientId = RandomStringUtils.randomAlphabetic(10);
+
+        {
+            Channel producerChannel = createMockChannel();
+            RemotingChannel producerRemotingChannel = this.remotingChannelManager.createProducerChannel(producerChannel, group, clientId);
+            assertSame(producerRemotingChannel, this.remotingChannelManager.removeProducerChannel(group, producerRemotingChannel));
+            assertTrue(this.remotingChannelManager.groupChannelMap.isEmpty());
+        }
+        {
+            Channel producerChannel = createMockChannel();
+            RemotingChannel producerRemotingChannel = this.remotingChannelManager.createProducerChannel(producerChannel, group, clientId);
+            assertSame(producerRemotingChannel, this.remotingChannelManager.removeProducerChannel(group, producerChannel));
+            assertTrue(this.remotingChannelManager.groupChannelMap.isEmpty());
+        }
+    }
+
+    @Test
+    public void testRemoveConsumerChannel() {
+        String group = "group";
+        String clientId = RandomStringUtils.randomAlphabetic(10);
+
+        {
+            Channel consumerChannel = createMockChannel();
+            RemotingChannel consumerRemotingChannel = this.remotingChannelManager.createConsumerChannel(consumerChannel, group, clientId, new HashSet<>());
+            assertSame(consumerRemotingChannel, this.remotingChannelManager.removeConsumerChannel(group, consumerRemotingChannel));
+            assertTrue(this.remotingChannelManager.groupChannelMap.isEmpty());
+        }
+        {
+            Channel consumerChannel = createMockChannel();
+            RemotingChannel consumerRemotingChannel = this.remotingChannelManager.createConsumerChannel(consumerChannel, group, clientId, new HashSet<>());
+            assertSame(consumerRemotingChannel, this.remotingChannelManager.removeConsumerChannel(group, consumerChannel));
+            assertTrue(this.remotingChannelManager.groupChannelMap.isEmpty());
+        }
+    }
+
+    @Test
+    public void testRemoveChannel() {
+        String consumerGroup = "consumerGroup";
+        String producerGroup = "producerGroup";
+        String clientId = RandomStringUtils.randomAlphabetic(10);
+
+        Channel consumerChannel = createMockChannel();
+        RemotingChannel consumerRemotingChannel = this.remotingChannelManager.createConsumerChannel(consumerChannel, consumerGroup, clientId, new HashSet<>());
+        Channel producerChannel = createMockChannel();
+        RemotingChannel producerRemotingChannel = this.remotingChannelManager.createProducerChannel(producerChannel, producerGroup, clientId);
+
+        assertSame(consumerRemotingChannel, this.remotingChannelManager.removeChannel(consumerChannel).stream().findFirst().get());
+        assertSame(producerRemotingChannel, this.remotingChannelManager.removeChannel(producerChannel).stream().findFirst().get());
+
+        assertTrue(this.remotingChannelManager.groupChannelMap.isEmpty());
+    }
+
+    private Channel createMockChannel() {
+        return new MockChannel(RandomStringUtils.randomAlphabetic(10));
+    }
+
+    private class MockChannel extends SimpleChannel {
+
+        public MockChannel(String channelId) {
+            super(null, new MockChannelId(channelId), RemotingChannelManagerTest.this.remoteAddress, RemotingChannelManagerTest.this.localAddress);
+        }
+    }
+
+    private static class MockChannelId implements ChannelId {
+
+        private final String channelId;
+
+        public MockChannelId(String channelId) {
+            this.channelId = channelId;
+        }
+
+        @Override
+        public String asShortText() {
+            return channelId;
+        }
+
+        @Override
+        public String asLongText() {
+            return channelId;
+        }
+
+        @Override
+        public int compareTo(@NotNull ChannelId o) {
+            return this.channelId.compareTo(o.asLongText());
+        }
+    }
+}
\ No newline at end of file
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelTest.java
new file mode 100644
index 000000000..840f3e40f
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.channel;
+
+import io.netty.channel.Channel;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.rocketmq.common.filter.FilterAPI;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest;
+import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel;
+import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType;
+import org.apache.rocketmq.proxy.processor.channel.RemoteChannel;
+import org.apache.rocketmq.proxy.remoting.RemotingProxyOutClient;
+import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class RemotingChannelTest extends InitConfigAndLoggerTest {
+    @Mock
+    private RemotingProxyOutClient remotingProxyOutClient;
+    @Mock
+    private ProxyRelayService proxyRelayService;
+    @Mock
+    private Channel parent;
+
+    private String clientId;
+    private Set<SubscriptionData> subscriptionData;
+    private RemotingChannel remotingChannel;
+
+    private final String remoteAddress = "10.152.39.53:9768";
+    private final String localAddress = "11.193.0.1:1210";
+
+    @Before
+    public void before() throws Throwable {
+        super.before();
+        this.clientId = RandomStringUtils.randomAlphabetic(10);
+        when(parent.remoteAddress()).thenReturn(RemotingUtil.string2SocketAddress(remoteAddress));
+        when(parent.localAddress()).thenReturn(RemotingUtil.string2SocketAddress(localAddress));
+        this.subscriptionData = new HashSet<>();
+        this.subscriptionData.add(FilterAPI.buildSubscriptionData("topic", "subTag"));
+        this.remotingChannel = new RemotingChannel(remotingProxyOutClient, proxyRelayService,
+            parent, clientId, subscriptionData);
+    }
+
+    @Test
+    public void testChannelExtendAttributeParse() {
+        RemoteChannel remoteChannel = this.remotingChannel.toRemoteChannel();
+        assertEquals(ChannelProtocolType.REMOTING, remoteChannel.getType());
+        assertEquals(subscriptionData, RemotingChannel.parseChannelExtendAttribute(remoteChannel));
+        assertEquals(subscriptionData, RemotingChannel.parseChannelExtendAttribute(this.remotingChannel));
+        assertNull(RemotingChannel.parseChannelExtendAttribute(mock(GrpcClientChannel.class)));
+    }
+}
\ No newline at end of file
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminServiceTest.java
new file mode 100644
index 000000000..039efd8b4
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminServiceTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.admin;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIExt;
+import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DefaultAdminServiceTest {
+    @Mock
+    private MQClientAPIFactory mqClientAPIFactory;
+    @Mock
+    private MQClientAPIExt mqClientAPIExt;
+
+    private DefaultAdminService defaultAdminService;
+
+    @Before
+    public void before() {
+        when(mqClientAPIFactory.getClient()).thenReturn(mqClientAPIExt);
+        defaultAdminService = new DefaultAdminService(mqClientAPIFactory);
+    }
+
+    @Test
+    public void testCreateTopic() throws Exception {
+        when(mqClientAPIExt.getTopicRouteInfoFromNameServer(eq("createTopic"), anyLong()))
+            .thenThrow(new MQClientException(ResponseCode.TOPIC_NOT_EXIST, ""))
+            .thenReturn(createTopicRouteData(1));
+        when(mqClientAPIExt.getTopicRouteInfoFromNameServer(eq("sampleTopic"), anyLong()))
+            .thenReturn(createTopicRouteData(2));
+
+        ArgumentCaptor<String> addrArgumentCaptor = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<TopicConfig> topicConfigArgumentCaptor = ArgumentCaptor.forClass(TopicConfig.class);
+        doNothing().when(mqClientAPIExt).createTopic(addrArgumentCaptor.capture(), anyString(), topicConfigArgumentCaptor.capture(), anyLong());
+
+        assertTrue(defaultAdminService.createTopicOnTopicBrokerIfNotExist(
+            "createTopic",
+            "sampleTopic",
+            7,
+            8,
+            true,
+            1
+        ));
+
+        assertEquals(2, addrArgumentCaptor.getAllValues().size());
+        Set<String> createAddr = new HashSet<>(addrArgumentCaptor.getAllValues());
+        assertTrue(createAddr.contains("127.0.0.1:10911"));
+        assertTrue(createAddr.contains("127.0.0.2:10911"));
+        assertEquals("createTopic", topicConfigArgumentCaptor.getValue().getTopicName());
+        assertEquals(7, topicConfigArgumentCaptor.getValue().getWriteQueueNums());
+        assertEquals(8, topicConfigArgumentCaptor.getValue().getReadQueueNums());
+    }
+
+    private TopicRouteData createTopicRouteData(int brokerNum) {
+        TopicRouteData topicRouteData = new TopicRouteData();
+        for (int i = 0; i < brokerNum; i++) {
+            BrokerData brokerData = new BrokerData();
+            HashMap<Long, String> addrMap = new HashMap<>();
+            addrMap.put(0L, "127.0.0." + (i + 1) + ":10911");
+            brokerData.setBrokerAddrs(addrMap);
+            brokerData.setBrokerName("broker-" + i);
+            brokerData.setCluster("cluster");
+            topicRouteData.getBrokerDatas().add(brokerData);
+        }
+        return topicRouteData;
+    }
+}
\ No newline at end of file
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
new file mode 100644
index 000000000..8ac74f533
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.sysmessage;
+
+import apache.rocketmq.v2.FilterExpression;
+import apache.rocketmq.v2.FilterType;
+import apache.rocketmq.v2.Resource;
+import apache.rocketmq.v2.Settings;
+import apache.rocketmq.v2.Subscription;
+import apache.rocketmq.v2.SubscriptionEntry;
+import com.google.common.collect.Sets;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelId;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.client.ConsumerManagerInterface;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.filter.FilterAPI;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest;
+import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager;
+import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel;
+import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
+import org.apache.rocketmq.proxy.remoting.RemotingProxyOutClient;
+import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel;
+import org.apache.rocketmq.proxy.service.admin.AdminService;
+import org.apache.rocketmq.proxy.service.channel.SimpleChannel;
+import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIExt;
+import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory;
+import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
+import org.apache.rocketmq.proxy.service.route.MessageQueueView;
+import org.apache.rocketmq.proxy.service.route.TopicRouteService;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.assertj.core.util.Lists;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class HeartbeatSyncerTest extends InitConfigAndLoggerTest {
+    @Mock
+    private TopicRouteService topicRouteService;
+    @Mock
+    private AdminService adminService;
+    @Mock
+    private ConsumerManagerInterface consumerManager;
+    @Mock
+    private MQClientAPIFactory mqClientAPIFactory;
+    @Mock
+    private MQClientAPIExt mqClientAPIExt;
+    @Mock
+    private ProxyRelayService proxyRelayService;
+
+    private String clientId;
+    private final String remoteAddress = "10.152.39.53:9768";
+    private final String localAddress = "11.193.0.1:1210";
+    private final String clusterName = "cluster";
+    private final String brokerName = "broker-01";
+
+    @Before
+    public void before() throws Throwable {
+        super.before();
+        this.clientId = RandomStringUtils.randomAlphabetic(10);
+        when(mqClientAPIFactory.getClient()).thenReturn(mqClientAPIExt);
+
+        {
+            TopicRouteData topicRouteData = new TopicRouteData();
+            QueueData queueData = new QueueData();
+            queueData.setReadQueueNums(8);
+            queueData.setWriteQueueNums(8);
+            queueData.setPerm(6);
+            queueData.setBrokerName(brokerName);
+            topicRouteData.getQueueDatas().add(queueData);
+            BrokerData brokerData = new BrokerData();
+            brokerData.setCluster(clusterName);
+            brokerData.setBrokerName(brokerName);
+            HashMap<Long, String> brokerAddr = new HashMap<>();
+            brokerAddr.put(0L, "127.0.0.1:10911");
+            brokerData.setBrokerAddrs(brokerAddr);
+            topicRouteData.getBrokerDatas().add(brokerData);
+            MessageQueueView messageQueueView = new MessageQueueView("foo", topicRouteData);
+            when(this.topicRouteService.getAllMessageQueueView(anyString())).thenReturn(messageQueueView);
+        }
+    }
+
+    @Test
+    public void testSyncGrpcV2Channel() throws Exception {
+        String consumerGroup = "consumerGroup";
+        GrpcClientSettingsManager grpcClientSettingsManager = mock(GrpcClientSettingsManager.class);
+        GrpcChannelManager grpcChannelManager = mock(GrpcChannelManager.class);
+        GrpcClientChannel grpcClientChannel = new GrpcClientChannel(
+            proxyRelayService, grpcClientSettingsManager, grpcChannelManager,
+            ProxyContext.create().setRemoteAddress(remoteAddress).setLocalAddress(localAddress),
+            clientId);
+        ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
+            grpcClientChannel,
+            clientId,
+            LanguageCode.JAVA,
+            5
+        );
+
+        ArgumentCaptor<Message> messageArgumentCaptor = ArgumentCaptor.forClass(Message.class);
+        SendResult sendResult = new SendResult();
+        sendResult.setSendStatus(SendStatus.SEND_OK);
+        doReturn(CompletableFuture.completedFuture(sendResult)).when(this.mqClientAPIExt)
+            .sendMessageAsync(anyString(), anyString(), messageArgumentCaptor.capture(), any(), anyLong());
+
+        Settings settings = Settings.newBuilder()
+            .setSubscription(Subscription.newBuilder()
+                .addSubscriptions(SubscriptionEntry.newBuilder()
+                    .setTopic(Resource.newBuilder().setName("topic").build())
+                    .setExpression(FilterExpression.newBuilder()
+                        .setType(FilterType.TAG)
+                        .setExpression("tag")
+                        .build())
+                    .build())
+                .build())
+            .build();
+        when(grpcClientSettingsManager.getRawClientSettings(eq(clientId))).thenReturn(settings);
+
+        HeartbeatSyncer heartbeatSyncer = new HeartbeatSyncer(topicRouteService, adminService, consumerManager, mqClientAPIFactory);
+        heartbeatSyncer.onConsumerRegister(
+            consumerGroup,
+            clientChannelInfo,
+            ConsumeType.CONSUME_PASSIVELY,
+            MessageModel.CLUSTERING,
+            ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET,
+            Sets.newHashSet(FilterAPI.buildSubscriptionData("topic", "tag"))
+        );
+
+        await().atMost(Duration.ofSeconds(3)).until(() -> !messageArgumentCaptor.getAllValues().isEmpty());
+        heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getValue())), null);
+        verify(consumerManager, never()).registerConsumer(anyString(), any(), any(), any(), any(), any(), anyBoolean());
+
+        String localServeAddr = ConfigurationManager.getProxyConfig().getLocalServeAddr();
+        // change local serve addr, to simulate other proxy receive messages
+        ConfigurationManager.getProxyConfig().setLocalServeAddr(RandomStringUtils.randomAlphabetic(10));
+        ArgumentCaptor<ClientChannelInfo> syncChannelInfoArgumentCaptor = ArgumentCaptor.forClass(ClientChannelInfo.class);
+        doReturn(true).when(consumerManager).registerConsumer(anyString(), syncChannelInfoArgumentCaptor.capture(), any(), any(), any(), any(), anyBoolean());
+
+        heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getValue())), null);
+        heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getValue())), null);
+        assertEquals(2, syncChannelInfoArgumentCaptor.getAllValues().size());
+        List<ClientChannelInfo> channelInfoList = syncChannelInfoArgumentCaptor.getAllValues();
+        assertSame(channelInfoList.get(0).getChannel(), channelInfoList.get(1).getChannel());
+        assertEquals(settings, GrpcClientChannel.parseChannelExtendAttribute(channelInfoList.get(0).getChannel()));
+        assertEquals(settings, GrpcClientChannel.parseChannelExtendAttribute(channelInfoList.get(1).getChannel()));
+
+        // start test sync client unregister
+        // reset localServeAddr
+        ConfigurationManager.getProxyConfig().setLocalServeAddr(localServeAddr);
+        heartbeatSyncer.onConsumerUnRegister(consumerGroup, clientChannelInfo);
+        await().atMost(Duration.ofSeconds(3)).until(() -> messageArgumentCaptor.getAllValues().size() == 2);
+
+        ArgumentCaptor<ClientChannelInfo> syncUnRegisterChannelInfoArgumentCaptor = ArgumentCaptor.forClass(ClientChannelInfo.class);
+        doNothing().when(consumerManager).unregisterConsumer(anyString(), syncUnRegisterChannelInfoArgumentCaptor.capture(), anyBoolean());
+
+        // change local serve addr, to simulate other proxy receive messages
+        ConfigurationManager.getProxyConfig().setLocalServeAddr(RandomStringUtils.randomAlphabetic(10));
+        heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getAllValues().get(1))), null);
+        assertSame(channelInfoList.get(0).getChannel(), syncUnRegisterChannelInfoArgumentCaptor.getValue().getChannel());
+    }
+
+    @Test
+    public void testSyncRemotingChannel() throws Exception {
+        String consumerGroup = "consumerGroup";
+        Set<SubscriptionData> subscriptionDataSet = new HashSet<>();
+        subscriptionDataSet.add(FilterAPI.buildSubscriptionData("topic", "tagSub"));
+        RemotingProxyOutClient remotingProxyOutClient = mock(RemotingProxyOutClient.class);
+        RemotingChannel remotingChannel = new RemotingChannel(remotingProxyOutClient, proxyRelayService, createMockChannel(), clientId, subscriptionDataSet);
+        ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
+            remotingChannel,
+            clientId,
+            LanguageCode.JAVA,
+            4
+        );
+
+        ArgumentCaptor<Message> messageArgumentCaptor = ArgumentCaptor.forClass(Message.class);
+        SendResult sendResult = new SendResult();
+        sendResult.setSendStatus(SendStatus.SEND_OK);
+        doReturn(CompletableFuture.completedFuture(sendResult)).when(this.mqClientAPIExt)
+            .sendMessageAsync(anyString(), anyString(), messageArgumentCaptor.capture(), any(), anyLong());
+
+        HeartbeatSyncer heartbeatSyncer = new HeartbeatSyncer(topicRouteService, adminService, consumerManager, mqClientAPIFactory);
+        heartbeatSyncer.onConsumerRegister(
+            consumerGroup,
+            clientChannelInfo,
+            ConsumeType.CONSUME_PASSIVELY,
+            MessageModel.CLUSTERING,
+            ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET,
+            subscriptionDataSet
+        );
+
+        await().atMost(Duration.ofSeconds(3)).until(() -> !messageArgumentCaptor.getAllValues().isEmpty());
+        heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getValue())), null);
+        verify(consumerManager, never()).registerConsumer(anyString(), any(), any(), any(), any(), any(), anyBoolean());
+
+        String localServeAddr = ConfigurationManager.getProxyConfig().getLocalServeAddr();
+        // change local serve addr, to simulate other proxy receive messages
+        ConfigurationManager.getProxyConfig().setLocalServeAddr(RandomStringUtils.randomAlphabetic(10));
+        ArgumentCaptor<ClientChannelInfo> syncChannelInfoArgumentCaptor = ArgumentCaptor.forClass(ClientChannelInfo.class);
+        doReturn(true).when(consumerManager).registerConsumer(anyString(), syncChannelInfoArgumentCaptor.capture(), any(), any(), any(), any(), anyBoolean());
+
+        heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getValue())), null);
+        heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getValue())), null);
+        assertEquals(2, syncChannelInfoArgumentCaptor.getAllValues().size());
+        List<ClientChannelInfo> channelInfoList = syncChannelInfoArgumentCaptor.getAllValues();
+        assertSame(channelInfoList.get(0).getChannel(), channelInfoList.get(1).getChannel());
+        assertEquals(subscriptionDataSet, RemotingChannel.parseChannelExtendAttribute(channelInfoList.get(0).getChannel()));
+        assertEquals(subscriptionDataSet, RemotingChannel.parseChannelExtendAttribute(channelInfoList.get(1).getChannel()));
+
+        // start test sync client unregister
+        // reset localServeAddr
+        ConfigurationManager.getProxyConfig().setLocalServeAddr(localServeAddr);
+        heartbeatSyncer.onConsumerUnRegister(consumerGroup, clientChannelInfo);
+        await().atMost(Duration.ofSeconds(3)).until(() -> messageArgumentCaptor.getAllValues().size() == 2);
+
+        ArgumentCaptor<ClientChannelInfo> syncUnRegisterChannelInfoArgumentCaptor = ArgumentCaptor.forClass(ClientChannelInfo.class);
+        doNothing().when(consumerManager).unregisterConsumer(anyString(), syncUnRegisterChannelInfoArgumentCaptor.capture(), anyBoolean());
+
+        // change local serve addr, to simulate other proxy receive messages
+        ConfigurationManager.getProxyConfig().setLocalServeAddr(RandomStringUtils.randomAlphabetic(10));
+        heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getAllValues().get(1))), null);
+        assertSame(channelInfoList.get(0).getChannel(), syncUnRegisterChannelInfoArgumentCaptor.getValue().getChannel());
+    }
+
+    private MessageExt convertFromMessage(Message message) {
+        MessageExt messageExt = new MessageExt();
+        messageExt.setTopic(message.getTopic());
+        messageExt.setBody(message.getBody());
+        return messageExt;
+    }
+
+    private Channel createMockChannel() {
+        return new MockChannel(RandomStringUtils.randomAlphabetic(10));
+    }
+
+    private class MockChannel extends SimpleChannel {
+
+        public MockChannel(String channelId) {
+            super(null, new MockChannelId(channelId), HeartbeatSyncerTest.this.remoteAddress, HeartbeatSyncerTest.this.localAddress);
+        }
+    }
+
+    private static class MockChannelId implements ChannelId {
+
+        private final String channelId;
+
+        public MockChannelId(String channelId) {
+            this.channelId = channelId;
+        }
+
+        @Override
+        public String asShortText() {
+            return channelId;
+        }
+
+        @Override
+        public String asLongText() {
+            return channelId;
+        }
+
+        @Override
+        public int compareTo(@NotNull ChannelId o) {
+            return this.channelId.compareTo(o.asLongText());
+        }
+    }
+}
\ No newline at end of file