You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/09 11:53:55 UTC
[rocketmq] 14/26: [ISSUE #5485] add test cases for channel management
This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 834287af0c0cf7923cf884da8491329ca71f3a78
Author: kaiyi.lk <ka...@alibaba-inc.com>
AuthorDate: Thu Nov 17 14:20:47 2022 +0800
[ISSUE #5485] add test cases for channel management
---
.../grpc/v2/channel/GrpcClientChannelTest.java | 82 ++++++
.../proxy/processor/channel/RemoteChannelTest.java | 50 ++++
.../channel/RemotingChannelManagerTest.java | 162 +++++++++++
.../remoting/channel/RemotingChannelTest.java | 80 ++++++
.../service/admin/DefaultAdminServiceTest.java | 103 +++++++
.../service/sysmessage/HeartbeatSyncerTest.java | 319 +++++++++++++++++++++
6 files changed, 796 insertions(+)
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java
new file mode 100644
index 000000000..70e10bc2b
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.grpc.v2.channel;
+
+import apache.rocketmq.v2.Publishing;
+import apache.rocketmq.v2.Resource;
+import apache.rocketmq.v2.Settings;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest;
+import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
+import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType;
+import org.apache.rocketmq.proxy.processor.channel.RemoteChannel;
+import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel;
+import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class GrpcClientChannelTest extends InitConfigAndLoggerTest {
+
+ @Mock
+ private ProxyRelayService proxyRelayService;
+ @Mock
+ private GrpcClientSettingsManager grpcClientSettingsManager;
+ @Mock
+ private GrpcChannelManager grpcChannelManager;
+
+ private String clientId;
+ private GrpcClientChannel grpcClientChannel;
+
+ @Before
+ public void before() throws Throwable {
+ super.before();
+ this.clientId = RandomStringUtils.randomAlphabetic(10);
+ this.grpcClientChannel = new GrpcClientChannel(proxyRelayService, grpcClientSettingsManager, grpcChannelManager,
+ ProxyContext.create().setRemoteAddress("10.152.39.53:9768").setLocalAddress("11.193.0.1:1210"),
+ this.clientId);
+ }
+
+ @Test
+ public void testChannelExtendAttributeParse() {
+ Settings clientSettings = Settings.newBuilder()
+ .setPublishing(Publishing.newBuilder()
+ .addTopics(Resource.newBuilder()
+ .setName("topic")
+ .build())
+ .build())
+ .build();
+ when(grpcClientSettingsManager.getRawClientSettings(eq(clientId))).thenReturn(clientSettings);
+
+ RemoteChannel remoteChannel = this.grpcClientChannel.toRemoteChannel();
+ assertEquals(ChannelProtocolType.GRPC_V2, remoteChannel.getType());
+ assertEquals(clientSettings, GrpcClientChannel.parseChannelExtendAttribute(remoteChannel));
+ assertEquals(clientSettings, GrpcClientChannel.parseChannelExtendAttribute(this.grpcClientChannel));
+ assertNull(GrpcClientChannel.parseChannelExtendAttribute(mock(RemotingChannel.class)));
+ }
+}
\ No newline at end of file
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelTest.java
new file mode 100644
index 000000000..d504fdc5f
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.processor.channel;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class RemoteChannelTest {
+
+ @Test
+ public void testEncodeAndDecode() {
+ String remoteProxyIp = "11.193.0.1";
+ String remoteAddress = "10.152.39.53:9768";
+ String localAddress = "11.193.0.1:1210";
+ ChannelProtocolType type = ChannelProtocolType.REMOTING;
+ String extendAttribute = RandomStringUtils.randomAlphabetic(10);
+ RemoteChannel remoteChannel = new RemoteChannel(remoteProxyIp, remoteAddress, localAddress, type, extendAttribute);
+
+ String encodedData = remoteChannel.encode();
+ assertNotNull(encodedData);
+
+ RemoteChannel decodedChannel = RemoteChannel.decode(encodedData);
+ assertEquals(remoteProxyIp, decodedChannel.remoteProxyIp);
+ assertEquals(remoteAddress, decodedChannel.getRemoteAddress());
+ assertEquals(localAddress, decodedChannel.getLocalAddress());
+ assertEquals(type, decodedChannel.type);
+ assertEquals(extendAttribute, decodedChannel.extendAttribute);
+
+ assertNull(RemoteChannel.decode(""));
+ }
+}
\ No newline at end of file
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManagerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManagerTest.java
new file mode 100644
index 000000000..5a5b441e9
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManagerTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.channel;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelId;
+import java.util.HashSet;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.rocketmq.proxy.remoting.RemotingProxyOutClient;
+import org.apache.rocketmq.proxy.service.channel.SimpleChannel;
+import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(MockitoJUnitRunner.class)
+public class RemotingChannelManagerTest {
+ @Mock
+ private RemotingProxyOutClient remotingProxyOutClient;
+ @Mock
+ private ProxyRelayService proxyRelayService;
+
+ private final String remoteAddress = "10.152.39.53:9768";
+ private final String localAddress = "11.193.0.1:1210";
+ private RemotingChannelManager remotingChannelManager;
+
+ @Before
+ public void before() {
+ this.remotingChannelManager = new RemotingChannelManager(this.remotingProxyOutClient, this.proxyRelayService);
+ }
+
+ @Test
+ public void testCreateChannel() {
+ String group = "group";
+ String clientId = RandomStringUtils.randomAlphabetic(10);
+
+ Channel producerChannel = createMockChannel();
+ RemotingChannel producerRemotingChannel = this.remotingChannelManager.createProducerChannel(producerChannel, group, clientId);
+ assertNotNull(producerRemotingChannel);
+ assertSame(producerRemotingChannel, this.remotingChannelManager.createProducerChannel(producerChannel, group, clientId));
+
+ Channel consumerChannel = createMockChannel();
+ RemotingChannel consumerRemotingChannel = this.remotingChannelManager.createConsumerChannel(consumerChannel, group, clientId, new HashSet<>());
+ assertSame(consumerRemotingChannel, this.remotingChannelManager.createConsumerChannel(consumerChannel, group, clientId, new HashSet<>()));
+ assertNotNull(consumerRemotingChannel);
+
+ assertNotSame(producerRemotingChannel, consumerRemotingChannel);
+ }
+
+ @Test
+ public void testRemoveProducerChannel() {
+ String group = "group";
+ String clientId = RandomStringUtils.randomAlphabetic(10);
+
+ {
+ Channel producerChannel = createMockChannel();
+ RemotingChannel producerRemotingChannel = this.remotingChannelManager.createProducerChannel(producerChannel, group, clientId);
+ assertSame(producerRemotingChannel, this.remotingChannelManager.removeProducerChannel(group, producerRemotingChannel));
+ assertTrue(this.remotingChannelManager.groupChannelMap.isEmpty());
+ }
+ {
+ Channel producerChannel = createMockChannel();
+ RemotingChannel producerRemotingChannel = this.remotingChannelManager.createProducerChannel(producerChannel, group, clientId);
+ assertSame(producerRemotingChannel, this.remotingChannelManager.removeProducerChannel(group, producerChannel));
+ assertTrue(this.remotingChannelManager.groupChannelMap.isEmpty());
+ }
+ }
+
+ @Test
+ public void testRemoveConsumerChannel() {
+ String group = "group";
+ String clientId = RandomStringUtils.randomAlphabetic(10);
+
+ {
+ Channel consumerChannel = createMockChannel();
+ RemotingChannel consumerRemotingChannel = this.remotingChannelManager.createConsumerChannel(consumerChannel, group, clientId, new HashSet<>());
+ assertSame(consumerRemotingChannel, this.remotingChannelManager.removeConsumerChannel(group, consumerRemotingChannel));
+ assertTrue(this.remotingChannelManager.groupChannelMap.isEmpty());
+ }
+ {
+ Channel consumerChannel = createMockChannel();
+ RemotingChannel consumerRemotingChannel = this.remotingChannelManager.createConsumerChannel(consumerChannel, group, clientId, new HashSet<>());
+ assertSame(consumerRemotingChannel, this.remotingChannelManager.removeConsumerChannel(group, consumerChannel));
+ assertTrue(this.remotingChannelManager.groupChannelMap.isEmpty());
+ }
+ }
+
+ @Test
+ public void testRemoveChannel() {
+ String consumerGroup = "consumerGroup";
+ String producerGroup = "producerGroup";
+ String clientId = RandomStringUtils.randomAlphabetic(10);
+
+ Channel consumerChannel = createMockChannel();
+ RemotingChannel consumerRemotingChannel = this.remotingChannelManager.createConsumerChannel(consumerChannel, consumerGroup, clientId, new HashSet<>());
+ Channel producerChannel = createMockChannel();
+ RemotingChannel producerRemotingChannel = this.remotingChannelManager.createProducerChannel(producerChannel, producerGroup, clientId);
+
+ assertSame(consumerRemotingChannel, this.remotingChannelManager.removeChannel(consumerChannel).stream().findFirst().get());
+ assertSame(producerRemotingChannel, this.remotingChannelManager.removeChannel(producerChannel).stream().findFirst().get());
+
+ assertTrue(this.remotingChannelManager.groupChannelMap.isEmpty());
+ }
+
+ private Channel createMockChannel() {
+ return new MockChannel(RandomStringUtils.randomAlphabetic(10));
+ }
+
+ private class MockChannel extends SimpleChannel {
+
+ public MockChannel(String channelId) {
+ super(null, new MockChannelId(channelId), RemotingChannelManagerTest.this.remoteAddress, RemotingChannelManagerTest.this.localAddress);
+ }
+ }
+
+ private static class MockChannelId implements ChannelId {
+
+ private final String channelId;
+
+ public MockChannelId(String channelId) {
+ this.channelId = channelId;
+ }
+
+ @Override
+ public String asShortText() {
+ return channelId;
+ }
+
+ @Override
+ public String asLongText() {
+ return channelId;
+ }
+
+ @Override
+ public int compareTo(@NotNull ChannelId o) {
+ return this.channelId.compareTo(o.asLongText());
+ }
+ }
+}
\ No newline at end of file
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelTest.java
new file mode 100644
index 000000000..840f3e40f
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.channel;
+
+import io.netty.channel.Channel;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.rocketmq.common.filter.FilterAPI;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest;
+import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel;
+import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType;
+import org.apache.rocketmq.proxy.processor.channel.RemoteChannel;
+import org.apache.rocketmq.proxy.remoting.RemotingProxyOutClient;
+import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class RemotingChannelTest extends InitConfigAndLoggerTest {
+ @Mock
+ private RemotingProxyOutClient remotingProxyOutClient;
+ @Mock
+ private ProxyRelayService proxyRelayService;
+ @Mock
+ private Channel parent;
+
+ private String clientId;
+ private Set<SubscriptionData> subscriptionData;
+ private RemotingChannel remotingChannel;
+
+ private final String remoteAddress = "10.152.39.53:9768";
+ private final String localAddress = "11.193.0.1:1210";
+
+ @Before
+ public void before() throws Throwable {
+ super.before();
+ this.clientId = RandomStringUtils.randomAlphabetic(10);
+ when(parent.remoteAddress()).thenReturn(RemotingUtil.string2SocketAddress(remoteAddress));
+ when(parent.localAddress()).thenReturn(RemotingUtil.string2SocketAddress(localAddress));
+ this.subscriptionData = new HashSet<>();
+ this.subscriptionData.add(FilterAPI.buildSubscriptionData("topic", "subTag"));
+ this.remotingChannel = new RemotingChannel(remotingProxyOutClient, proxyRelayService,
+ parent, clientId, subscriptionData);
+ }
+
+ @Test
+ public void testChannelExtendAttributeParse() {
+ RemoteChannel remoteChannel = this.remotingChannel.toRemoteChannel();
+ assertEquals(ChannelProtocolType.REMOTING, remoteChannel.getType());
+ assertEquals(subscriptionData, RemotingChannel.parseChannelExtendAttribute(remoteChannel));
+ assertEquals(subscriptionData, RemotingChannel.parseChannelExtendAttribute(this.remotingChannel));
+ assertNull(RemotingChannel.parseChannelExtendAttribute(mock(GrpcClientChannel.class)));
+ }
+}
\ No newline at end of file
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminServiceTest.java
new file mode 100644
index 000000000..039efd8b4
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminServiceTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.admin;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIExt;
+import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DefaultAdminServiceTest {
+ @Mock
+ private MQClientAPIFactory mqClientAPIFactory;
+ @Mock
+ private MQClientAPIExt mqClientAPIExt;
+
+ private DefaultAdminService defaultAdminService;
+
+ @Before
+ public void before() {
+ when(mqClientAPIFactory.getClient()).thenReturn(mqClientAPIExt);
+ defaultAdminService = new DefaultAdminService(mqClientAPIFactory);
+ }
+
+ @Test
+ public void testCreateTopic() throws Exception {
+ when(mqClientAPIExt.getTopicRouteInfoFromNameServer(eq("createTopic"), anyLong()))
+ .thenThrow(new MQClientException(ResponseCode.TOPIC_NOT_EXIST, ""))
+ .thenReturn(createTopicRouteData(1));
+ when(mqClientAPIExt.getTopicRouteInfoFromNameServer(eq("sampleTopic"), anyLong()))
+ .thenReturn(createTopicRouteData(2));
+
+ ArgumentCaptor<String> addrArgumentCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<TopicConfig> topicConfigArgumentCaptor = ArgumentCaptor.forClass(TopicConfig.class);
+ doNothing().when(mqClientAPIExt).createTopic(addrArgumentCaptor.capture(), anyString(), topicConfigArgumentCaptor.capture(), anyLong());
+
+ assertTrue(defaultAdminService.createTopicOnTopicBrokerIfNotExist(
+ "createTopic",
+ "sampleTopic",
+ 7,
+ 8,
+ true,
+ 1
+ ));
+
+ assertEquals(2, addrArgumentCaptor.getAllValues().size());
+ Set<String> createAddr = new HashSet<>(addrArgumentCaptor.getAllValues());
+ assertTrue(createAddr.contains("127.0.0.1:10911"));
+ assertTrue(createAddr.contains("127.0.0.2:10911"));
+ assertEquals("createTopic", topicConfigArgumentCaptor.getValue().getTopicName());
+ assertEquals(7, topicConfigArgumentCaptor.getValue().getWriteQueueNums());
+ assertEquals(8, topicConfigArgumentCaptor.getValue().getReadQueueNums());
+ }
+
+ private TopicRouteData createTopicRouteData(int brokerNum) {
+ TopicRouteData topicRouteData = new TopicRouteData();
+ for (int i = 0; i < brokerNum; i++) {
+ BrokerData brokerData = new BrokerData();
+ HashMap<Long, String> addrMap = new HashMap<>();
+ addrMap.put(0L, "127.0.0." + (i + 1) + ":10911");
+ brokerData.setBrokerAddrs(addrMap);
+ brokerData.setBrokerName("broker-" + i);
+ brokerData.setCluster("cluster");
+ topicRouteData.getBrokerDatas().add(brokerData);
+ }
+ return topicRouteData;
+ }
+}
\ No newline at end of file
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
new file mode 100644
index 000000000..8ac74f533
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.sysmessage;
+
+import apache.rocketmq.v2.FilterExpression;
+import apache.rocketmq.v2.FilterType;
+import apache.rocketmq.v2.Resource;
+import apache.rocketmq.v2.Settings;
+import apache.rocketmq.v2.Subscription;
+import apache.rocketmq.v2.SubscriptionEntry;
+import com.google.common.collect.Sets;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelId;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.client.ConsumerManagerInterface;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.filter.FilterAPI;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest;
+import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager;
+import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel;
+import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
+import org.apache.rocketmq.proxy.remoting.RemotingProxyOutClient;
+import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel;
+import org.apache.rocketmq.proxy.service.admin.AdminService;
+import org.apache.rocketmq.proxy.service.channel.SimpleChannel;
+import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIExt;
+import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory;
+import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
+import org.apache.rocketmq.proxy.service.route.MessageQueueView;
+import org.apache.rocketmq.proxy.service.route.TopicRouteService;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.assertj.core.util.Lists;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class HeartbeatSyncerTest extends InitConfigAndLoggerTest {
+ @Mock
+ private TopicRouteService topicRouteService;
+ @Mock
+ private AdminService adminService;
+ @Mock
+ private ConsumerManagerInterface consumerManager;
+ @Mock
+ private MQClientAPIFactory mqClientAPIFactory;
+ @Mock
+ private MQClientAPIExt mqClientAPIExt;
+ @Mock
+ private ProxyRelayService proxyRelayService;
+
+ private String clientId;
+ private final String remoteAddress = "10.152.39.53:9768";
+ private final String localAddress = "11.193.0.1:1210";
+ private final String clusterName = "cluster";
+ private final String brokerName = "broker-01";
+
+ @Before
+ public void before() throws Throwable {
+ super.before();
+ this.clientId = RandomStringUtils.randomAlphabetic(10);
+ when(mqClientAPIFactory.getClient()).thenReturn(mqClientAPIExt);
+
+ {
+ TopicRouteData topicRouteData = new TopicRouteData();
+ QueueData queueData = new QueueData();
+ queueData.setReadQueueNums(8);
+ queueData.setWriteQueueNums(8);
+ queueData.setPerm(6);
+ queueData.setBrokerName(brokerName);
+ topicRouteData.getQueueDatas().add(queueData);
+ BrokerData brokerData = new BrokerData();
+ brokerData.setCluster(clusterName);
+ brokerData.setBrokerName(brokerName);
+ HashMap<Long, String> brokerAddr = new HashMap<>();
+ brokerAddr.put(0L, "127.0.0.1:10911");
+ brokerData.setBrokerAddrs(brokerAddr);
+ topicRouteData.getBrokerDatas().add(brokerData);
+ MessageQueueView messageQueueView = new MessageQueueView("foo", topicRouteData);
+ when(this.topicRouteService.getAllMessageQueueView(anyString())).thenReturn(messageQueueView);
+ }
+ }
+
+ @Test
+ public void testSyncGrpcV2Channel() throws Exception {
+ String consumerGroup = "consumerGroup";
+ GrpcClientSettingsManager grpcClientSettingsManager = mock(GrpcClientSettingsManager.class);
+ GrpcChannelManager grpcChannelManager = mock(GrpcChannelManager.class);
+ GrpcClientChannel grpcClientChannel = new GrpcClientChannel(
+ proxyRelayService, grpcClientSettingsManager, grpcChannelManager,
+ ProxyContext.create().setRemoteAddress(remoteAddress).setLocalAddress(localAddress),
+ clientId);
+ ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
+ grpcClientChannel,
+ clientId,
+ LanguageCode.JAVA,
+ 5
+ );
+
+ ArgumentCaptor<Message> messageArgumentCaptor = ArgumentCaptor.forClass(Message.class);
+ SendResult sendResult = new SendResult();
+ sendResult.setSendStatus(SendStatus.SEND_OK);
+ doReturn(CompletableFuture.completedFuture(sendResult)).when(this.mqClientAPIExt)
+ .sendMessageAsync(anyString(), anyString(), messageArgumentCaptor.capture(), any(), anyLong());
+
+ Settings settings = Settings.newBuilder()
+ .setSubscription(Subscription.newBuilder()
+ .addSubscriptions(SubscriptionEntry.newBuilder()
+ .setTopic(Resource.newBuilder().setName("topic").build())
+ .setExpression(FilterExpression.newBuilder()
+ .setType(FilterType.TAG)
+ .setExpression("tag")
+ .build())
+ .build())
+ .build())
+ .build();
+ when(grpcClientSettingsManager.getRawClientSettings(eq(clientId))).thenReturn(settings);
+
+ HeartbeatSyncer heartbeatSyncer = new HeartbeatSyncer(topicRouteService, adminService, consumerManager, mqClientAPIFactory);
+ heartbeatSyncer.onConsumerRegister(
+ consumerGroup,
+ clientChannelInfo,
+ ConsumeType.CONSUME_PASSIVELY,
+ MessageModel.CLUSTERING,
+ ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET,
+ Sets.newHashSet(FilterAPI.buildSubscriptionData("topic", "tag"))
+ );
+
+ await().atMost(Duration.ofSeconds(3)).until(() -> !messageArgumentCaptor.getAllValues().isEmpty());
+ heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getValue())), null);
+ verify(consumerManager, never()).registerConsumer(anyString(), any(), any(), any(), any(), any(), anyBoolean());
+
+ String localServeAddr = ConfigurationManager.getProxyConfig().getLocalServeAddr();
+ // change local serve addr, to simulate other proxy receive messages
+ ConfigurationManager.getProxyConfig().setLocalServeAddr(RandomStringUtils.randomAlphabetic(10));
+ ArgumentCaptor<ClientChannelInfo> syncChannelInfoArgumentCaptor = ArgumentCaptor.forClass(ClientChannelInfo.class);
+ doReturn(true).when(consumerManager).registerConsumer(anyString(), syncChannelInfoArgumentCaptor.capture(), any(), any(), any(), any(), anyBoolean());
+
+ heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getValue())), null);
+ heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getValue())), null);
+ assertEquals(2, syncChannelInfoArgumentCaptor.getAllValues().size());
+ List<ClientChannelInfo> channelInfoList = syncChannelInfoArgumentCaptor.getAllValues();
+ assertSame(channelInfoList.get(0).getChannel(), channelInfoList.get(1).getChannel());
+ assertEquals(settings, GrpcClientChannel.parseChannelExtendAttribute(channelInfoList.get(0).getChannel()));
+ assertEquals(settings, GrpcClientChannel.parseChannelExtendAttribute(channelInfoList.get(1).getChannel()));
+
+ // start test sync client unregister
+ // reset localServeAddr
+ ConfigurationManager.getProxyConfig().setLocalServeAddr(localServeAddr);
+ heartbeatSyncer.onConsumerUnRegister(consumerGroup, clientChannelInfo);
+ await().atMost(Duration.ofSeconds(3)).until(() -> messageArgumentCaptor.getAllValues().size() == 2);
+
+ ArgumentCaptor<ClientChannelInfo> syncUnRegisterChannelInfoArgumentCaptor = ArgumentCaptor.forClass(ClientChannelInfo.class);
+ doNothing().when(consumerManager).unregisterConsumer(anyString(), syncUnRegisterChannelInfoArgumentCaptor.capture(), anyBoolean());
+
+ // change local serve addr, to simulate other proxy receive messages
+ ConfigurationManager.getProxyConfig().setLocalServeAddr(RandomStringUtils.randomAlphabetic(10));
+ heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getAllValues().get(1))), null);
+ assertSame(channelInfoList.get(0).getChannel(), syncUnRegisterChannelInfoArgumentCaptor.getValue().getChannel());
+ }
+
+ @Test
+ public void testSyncRemotingChannel() throws Exception {
+ String consumerGroup = "consumerGroup";
+ Set<SubscriptionData> subscriptionDataSet = new HashSet<>();
+ subscriptionDataSet.add(FilterAPI.buildSubscriptionData("topic", "tagSub"));
+ RemotingProxyOutClient remotingProxyOutClient = mock(RemotingProxyOutClient.class);
+ RemotingChannel remotingChannel = new RemotingChannel(remotingProxyOutClient, proxyRelayService, createMockChannel(), clientId, subscriptionDataSet);
+ ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
+ remotingChannel,
+ clientId,
+ LanguageCode.JAVA,
+ 4
+ );
+
+ ArgumentCaptor<Message> messageArgumentCaptor = ArgumentCaptor.forClass(Message.class);
+ SendResult sendResult = new SendResult();
+ sendResult.setSendStatus(SendStatus.SEND_OK);
+ doReturn(CompletableFuture.completedFuture(sendResult)).when(this.mqClientAPIExt)
+ .sendMessageAsync(anyString(), anyString(), messageArgumentCaptor.capture(), any(), anyLong());
+
+ HeartbeatSyncer heartbeatSyncer = new HeartbeatSyncer(topicRouteService, adminService, consumerManager, mqClientAPIFactory);
+ heartbeatSyncer.onConsumerRegister(
+ consumerGroup,
+ clientChannelInfo,
+ ConsumeType.CONSUME_PASSIVELY,
+ MessageModel.CLUSTERING,
+ ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET,
+ subscriptionDataSet
+ );
+
+ await().atMost(Duration.ofSeconds(3)).until(() -> !messageArgumentCaptor.getAllValues().isEmpty());
+ heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getValue())), null);
+ verify(consumerManager, never()).registerConsumer(anyString(), any(), any(), any(), any(), any(), anyBoolean());
+
+ String localServeAddr = ConfigurationManager.getProxyConfig().getLocalServeAddr();
+ // change local serve addr, to simulate other proxy receive messages
+ ConfigurationManager.getProxyConfig().setLocalServeAddr(RandomStringUtils.randomAlphabetic(10));
+ ArgumentCaptor<ClientChannelInfo> syncChannelInfoArgumentCaptor = ArgumentCaptor.forClass(ClientChannelInfo.class);
+ doReturn(true).when(consumerManager).registerConsumer(anyString(), syncChannelInfoArgumentCaptor.capture(), any(), any(), any(), any(), anyBoolean());
+
+ heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getValue())), null);
+ heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getValue())), null);
+ assertEquals(2, syncChannelInfoArgumentCaptor.getAllValues().size());
+ List<ClientChannelInfo> channelInfoList = syncChannelInfoArgumentCaptor.getAllValues();
+ assertSame(channelInfoList.get(0).getChannel(), channelInfoList.get(1).getChannel());
+ assertEquals(subscriptionDataSet, RemotingChannel.parseChannelExtendAttribute(channelInfoList.get(0).getChannel()));
+ assertEquals(subscriptionDataSet, RemotingChannel.parseChannelExtendAttribute(channelInfoList.get(1).getChannel()));
+
+ // start test sync client unregister
+ // reset localServeAddr
+ ConfigurationManager.getProxyConfig().setLocalServeAddr(localServeAddr);
+ heartbeatSyncer.onConsumerUnRegister(consumerGroup, clientChannelInfo);
+ await().atMost(Duration.ofSeconds(3)).until(() -> messageArgumentCaptor.getAllValues().size() == 2);
+
+ ArgumentCaptor<ClientChannelInfo> syncUnRegisterChannelInfoArgumentCaptor = ArgumentCaptor.forClass(ClientChannelInfo.class);
+ doNothing().when(consumerManager).unregisterConsumer(anyString(), syncUnRegisterChannelInfoArgumentCaptor.capture(), anyBoolean());
+
+ // change local serve addr, to simulate other proxy receive messages
+ ConfigurationManager.getProxyConfig().setLocalServeAddr(RandomStringUtils.randomAlphabetic(10));
+ heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getAllValues().get(1))), null);
+ assertSame(channelInfoList.get(0).getChannel(), syncUnRegisterChannelInfoArgumentCaptor.getValue().getChannel());
+ }
+
+ private MessageExt convertFromMessage(Message message) {
+ MessageExt messageExt = new MessageExt();
+ messageExt.setTopic(message.getTopic());
+ messageExt.setBody(message.getBody());
+ return messageExt;
+ }
+
+ private Channel createMockChannel() {
+ return new MockChannel(RandomStringUtils.randomAlphabetic(10));
+ }
+
+ private class MockChannel extends SimpleChannel {
+
+ public MockChannel(String channelId) {
+ super(null, new MockChannelId(channelId), HeartbeatSyncerTest.this.remoteAddress, HeartbeatSyncerTest.this.localAddress);
+ }
+ }
+
+ private static class MockChannelId implements ChannelId {
+
+ private final String channelId;
+
+ public MockChannelId(String channelId) {
+ this.channelId = channelId;
+ }
+
+ @Override
+ public String asShortText() {
+ return channelId;
+ }
+
+ @Override
+ public String asLongText() {
+ return channelId;
+ }
+
+ @Override
+ public int compareTo(@NotNull ChannelId o) {
+ return this.channelId.compareTo(o.asLongText());
+ }
+ }
+}
\ No newline at end of file