You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/09 11:53:44 UTC
[rocketmq] 03/26: [ISSUE #5406] Add unit test
This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit b887313bfbcb2d90c6db9757299fd76deb9f103d
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Tue Nov 1 15:14:23 2022 +0800
[ISSUE #5406] Add unit test
---
.../activity/AbstractRemotingActivity.java | 1 +
.../remoting/activity/GetTopicRouteActivity.java | 1 +
.../remoting/activity/PullMessageActivity.java | 15 +-
.../activity/AbstractRemotingActivityTest.java | 130 +++++++++++++++-
.../remoting/activity/PullMessageActivityTest.java | 165 +++++++++++++++++++++
.../remoting/activity/SendMessageActivityTest.java | 102 +++++++++++++
6 files changed, 394 insertions(+), 20 deletions(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
index 7f0d891ec..54ef7bfa7 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
@@ -67,6 +67,7 @@ public abstract class AbstractRemotingActivity implements NettyRequestProcessor
}
String brokerName = request.getExtFields().get(BROKER_NAME_FIELD);
if (request.isOnewayRPC()) {
+ messagingProcessor.requestOneway(context, brokerName, request, timeoutMillis);
return null;
}
messagingProcessor.request(context, brokerName, request, timeoutMillis)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java
index d3b7de98d..26d28bafe 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java
@@ -49,6 +49,7 @@ public class GetTopicRouteActivity extends AbstractRemotingActivity {
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
List<Address> addressList = new ArrayList<>();
+ // AddressScheme is just a placeholder and will not affect topic route result in this case.
addressList.add(new Address(Address.AddressScheme.IPv4, HostAndPort.fromString(proxyConfig.getRemotingAccessPoint())));
ProxyTopicRouteData proxyTopicRouteData = messagingProcessor.getTopicRouteDataForProxy(context, addressList, requestHeader.getTopic());
TopicRouteData topicRouteData = proxyTopicRouteData.buildTopicRouteData();
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java
index 819bf139d..873b52460 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.proxy.remoting.activity;
import io.netty.channel.ChannelHandlerContext;
import java.time.Duration;
-import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
@@ -39,10 +38,6 @@ public class PullMessageActivity extends AbstractRemotingActivity {
@Override
protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request,
ProxyContext context) throws Exception {
- if (request.getExtFields().get(BROKER_NAME_FIELD) == null) {
- return RemotingCommand.buildErrorResponse(ResponseCode.VERSION_NOT_SUPPORTED,
- "Request doesn't have field bname");
- }
PullMessageRequestHeader requestHeader = (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
if (!PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag())) {
ConsumerGroupInfo consumerInfo = messagingProcessor.getConsumerGroupInfo(requestHeader.getConsumerGroup());
@@ -57,16 +52,10 @@ public class PullMessageActivity extends AbstractRemotingActivity {
}
requestHeader.setSubscription(subscriptionData.getSubString());
requestHeader.setExpressionType(subscriptionData.getExpressionType());
+ request.writeCustomHeader(requestHeader);
request.makeCustomHeaderToNet();
}
- String brokerName = requestHeader.getBname();
long timeoutMillis = requestHeader.getSuspendTimeoutMillis() + Duration.ofSeconds(10).toMillis();
- CompletableFuture<RemotingCommand> future = messagingProcessor.request(context, brokerName, request, timeoutMillis);
- future.thenAccept(r -> writeResponse(ctx, context, request, r))
- .exceptionally(t -> {
- writeErrResponse(ctx, context, request, t);
- return null;
- });
- return null;
+ return request(ctx, request, context, timeoutMillis);
}
}
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java
index b581d8a91..74eb3cbd8 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java
@@ -21,17 +21,24 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.acl.common.AclException;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.common.ProxyException;
+import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
import org.apache.rocketmq.proxy.service.channel.SimpleChannel;
import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
@@ -39,7 +46,8 @@ import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -50,7 +58,7 @@ public class AbstractRemotingActivityTest extends InitConfigAndLoggerTest {
@Mock
MessagingProcessor messagingProcessorMock;
@Spy
- ChannelHandlerContext ctx = new SimpleChannelHandlerContext(new SimpleChannel(null, "1", "2")) {
+ ChannelHandlerContext ctx = new SimpleChannelHandlerContext(new SimpleChannel(null, "0.0.0.0:0", "1.1.1.1:1")) {
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
return null;
@@ -69,16 +77,124 @@ public class AbstractRemotingActivityTest extends InitConfigAndLoggerTest {
}
@Test
- public void request() throws Exception {
+ public void testCreateContext() throws Exception {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+ ProxyContext context = remotingActivity.createContext(ctx, request);
+ assertThat(context.getLanguage()).isEqualTo(LanguageCode.JAVA.name());
+ assertThat(context.getAction()).isEqualTo("Remoting" + RequestCode.PULL_MESSAGE);
+ }
+
+ @Test
+ public void testRequest() throws Exception {
String brokerName = "broker";
- String remark = "success";
- when(messagingProcessorMock.request(any(), anyString(), any(), anyLong())).thenReturn(CompletableFuture.completedFuture(
- RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, remark)
+ RemotingCommand response = RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "remark");
+ when(messagingProcessorMock.request(any(), eq(brokerName), any(), anyLong())).thenReturn(CompletableFuture.completedFuture(
+ response
));
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
request.addExtField(AbstractRemotingActivity.BROKER_NAME_FIELD, brokerName);
RemotingCommand remotingCommand = remotingActivity.request(ctx, request, null, 10000);
assertThat(remotingCommand).isNull();
- verify(ctx, times(1)).writeAndFlush(any());
+ verify(ctx, times(1)).writeAndFlush(response);
+ }
+
+ @Test
+ public void testRequestOneway() throws Exception {
+ String brokerName = "broker";
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+ request.markOnewayRPC();
+ request.addExtField(AbstractRemotingActivity.BROKER_NAME_FIELD, brokerName);
+ RemotingCommand remotingCommand = remotingActivity.request(ctx, request, null, 10000);
+ assertThat(remotingCommand).isNull();
+ verify(messagingProcessorMock, times(1)).requestOneway(any(), eq(brokerName), any(), anyLong());
+ }
+
+ @Test
+ public void testRequestInvalid() throws Exception {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+ request.addExtField("test", "test");
+ RemotingCommand remotingCommand = remotingActivity.request(ctx, request, null, 10000);
+ assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.VERSION_NOT_SUPPORTED);
+ verify(ctx, never()).writeAndFlush(any());
+ }
+
+ @Test
+ public void testRequestProxyException() throws Exception {
+ ArgumentCaptor<RemotingCommand> captor = ArgumentCaptor.forClass(RemotingCommand.class);
+ String brokerName = "broker";
+ String remark = "exception";
+ CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+ future.completeExceptionally(new ProxyException(ProxyExceptionCode.FORBIDDEN, remark));
+ when(messagingProcessorMock.request(any(), eq(brokerName), any(), anyLong())).thenReturn(future);
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+ request.addExtField(AbstractRemotingActivity.BROKER_NAME_FIELD, brokerName);
+ RemotingCommand remotingCommand = remotingActivity.request(ctx, request, null, 10000);
+ assertThat(remotingCommand).isNull();
+ verify(ctx, times(1)).writeAndFlush(captor.capture());
+ assertThat(captor.getValue().getCode()).isEqualTo(ResponseCode.NO_PERMISSION);
+ }
+
+ @Test
+ public void testRequestClientException() throws Exception {
+ ArgumentCaptor<RemotingCommand> captor = ArgumentCaptor.forClass(RemotingCommand.class);
+ String brokerName = "broker";
+ String remark = "exception";
+ CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+ future.completeExceptionally(new MQClientException(remark, null));
+ when(messagingProcessorMock.request(any(), eq(brokerName), any(), anyLong())).thenReturn(future);
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+ request.addExtField(AbstractRemotingActivity.BROKER_NAME_FIELD, brokerName);
+ RemotingCommand remotingCommand = remotingActivity.request(ctx, request, null, 10000);
+ assertThat(remotingCommand).isNull();
+ verify(ctx, times(1)).writeAndFlush(captor.capture());
+ assertThat(captor.getValue().getCode()).isEqualTo(-1);
+ }
+
+ @Test
+ public void testRequestBrokerException() throws Exception {
+ ArgumentCaptor<RemotingCommand> captor = ArgumentCaptor.forClass(RemotingCommand.class);
+ String brokerName = "broker";
+ String remark = "exception";
+ CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+ future.completeExceptionally(new MQBrokerException(ResponseCode.FLUSH_DISK_TIMEOUT, remark));
+ when(messagingProcessorMock.request(any(), eq(brokerName), any(), anyLong())).thenReturn(future);
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+ request.addExtField(AbstractRemotingActivity.BROKER_NAME_FIELD, brokerName);
+ RemotingCommand remotingCommand = remotingActivity.request(ctx, request, null, 10000);
+ assertThat(remotingCommand).isNull();
+ verify(ctx, times(1)).writeAndFlush(captor.capture());
+ assertThat(captor.getValue().getCode()).isEqualTo(ResponseCode.FLUSH_DISK_TIMEOUT);
+ }
+
+ @Test
+ public void testRequestAclException() throws Exception {
+ ArgumentCaptor<RemotingCommand> captor = ArgumentCaptor.forClass(RemotingCommand.class);
+ String brokerName = "broker";
+ String remark = "exception";
+ CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+ future.completeExceptionally(new AclException(remark, ResponseCode.MESSAGE_ILLEGAL));
+ when(messagingProcessorMock.request(any(), eq(brokerName), any(), anyLong())).thenReturn(future);
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+ request.addExtField(AbstractRemotingActivity.BROKER_NAME_FIELD, brokerName);
+ RemotingCommand remotingCommand = remotingActivity.request(ctx, request, null, 10000);
+ assertThat(remotingCommand).isNull();
+ verify(ctx, times(1)).writeAndFlush(captor.capture());
+ assertThat(captor.getValue().getCode()).isEqualTo(ResponseCode.NO_PERMISSION);
+ }
+
+ @Test
+ public void testRequestDefaultException() throws Exception {
+ ArgumentCaptor<RemotingCommand> captor = ArgumentCaptor.forClass(RemotingCommand.class);
+ String brokerName = "broker";
+ String remark = "exception";
+ CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+ future.completeExceptionally(new Exception(remark));
+ when(messagingProcessorMock.request(any(), eq(brokerName), any(), anyLong())).thenReturn(future);
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+ request.addExtField(AbstractRemotingActivity.BROKER_NAME_FIELD, brokerName);
+ RemotingCommand remotingCommand = remotingActivity.request(ctx, request, null, 10000);
+ assertThat(remotingCommand).isNull();
+ verify(ctx, times(1)).writeAndFlush(captor.capture());
+ assertThat(captor.getValue().getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
}
}
\ No newline at end of file
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java
new file mode 100644
index 000000000..ffbe2ffac
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.activity;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.sysflag.PullSysFlag;
+import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest;
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.service.channel.SimpleChannel;
+import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PullMessageActivityTest extends InitConfigAndLoggerTest {
+ PullMessageActivity pullMessageActivity;
+
+ @Mock
+ MessagingProcessor messagingProcessorMock;
+ @Mock
+ ConsumerGroupInfo consumerGroupInfoMock;
+
+ String topic = "topic";
+ String group = "group";
+ String brokerName = "brokerName";
+ String subString = "sub";
+ String type = "type";
+ @Spy
+ ChannelHandlerContext ctx = new SimpleChannelHandlerContext(new SimpleChannel(null, "1", "2")) {
+ @Override
+ public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
+ return null;
+ }
+ };
+
+ @Before
+ public void setup() throws Exception {
+ pullMessageActivity = new PullMessageActivity(null, messagingProcessorMock);
+ }
+
+ @Test
+ public void testPullMessageWithoutSub() throws Exception {
+ when(messagingProcessorMock.getConsumerGroupInfo(eq(group)))
+ .thenReturn(consumerGroupInfoMock);
+ SubscriptionData subscriptionData = new SubscriptionData();
+ subscriptionData.setSubString(subString);
+ subscriptionData.setExpressionType(type);
+ when(consumerGroupInfoMock.findSubscriptionData(eq(topic)))
+ .thenReturn(subscriptionData);
+
+ PullMessageRequestHeader header = new PullMessageRequestHeader();
+ header.setTopic(topic);
+ header.setConsumerGroup(group);
+ header.setQueueId(0);
+ header.setQueueOffset(0L);
+ header.setMaxMsgNums(16);
+ header.setSysFlag(PullSysFlag.buildSysFlag(true, false, false, false));
+ header.setCommitOffset(0L);
+ header.setSuspendTimeoutMillis(1000L);
+ header.setSubVersion(0L);
+ header.setBname(brokerName);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, header);
+ request.makeCustomHeaderToNet();
+ RemotingCommand expectResponse = RemotingCommand.createResponseCommand(ResponseCode.NO_MESSAGE, "success");
+ PullMessageRequestHeader newHeader = new PullMessageRequestHeader();
+ newHeader.setTopic(topic);
+ newHeader.setConsumerGroup(group);
+ newHeader.setQueueId(0);
+ newHeader.setQueueOffset(0L);
+ newHeader.setMaxMsgNums(16);
+ newHeader.setSysFlag(PullSysFlag.buildSysFlag(true, false, false, false));
+ newHeader.setCommitOffset(0L);
+ newHeader.setSuspendTimeoutMillis(1000L);
+ newHeader.setSubVersion(0L);
+ newHeader.setBname(brokerName);
+ newHeader.setSubscription(subString);
+ newHeader.setExpressionType(type);
+ RemotingCommand matchRequest = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, newHeader);
+ matchRequest.setOpaque(request.getOpaque());
+ matchRequest.makeCustomHeaderToNet();
+
+ ArgumentCaptor<RemotingCommand> captor = ArgumentCaptor.forClass(RemotingCommand.class);
+ when(messagingProcessorMock.request(any(), eq(brokerName), captor.capture(), anyLong()))
+ .thenReturn(CompletableFuture.completedFuture(expectResponse));
+ RemotingCommand response = pullMessageActivity.processRequest0(ctx, request, null);
+ assertThat(captor.getValue().getExtFields()).isEqualTo(matchRequest.getExtFields());
+ assertThat(response).isNull();
+ verify(ctx, times(1)).writeAndFlush(eq(expectResponse));
+ }
+
+ @Test
+ public void testPullMessageWithSub() throws Exception {
+ when(messagingProcessorMock.getConsumerGroupInfo(eq(group)))
+ .thenReturn(consumerGroupInfoMock);
+ SubscriptionData subscriptionData = new SubscriptionData();
+ subscriptionData.setSubString(subString);
+ subscriptionData.setExpressionType(type);
+ when(consumerGroupInfoMock.findSubscriptionData(eq(topic)))
+ .thenReturn(subscriptionData);
+
+ PullMessageRequestHeader header = new PullMessageRequestHeader();
+ header.setTopic(topic);
+ header.setConsumerGroup(group);
+ header.setQueueId(0);
+ header.setQueueOffset(0L);
+ header.setMaxMsgNums(16);
+ header.setSysFlag(PullSysFlag.buildSysFlag(true, true, false, false));
+ header.setCommitOffset(0L);
+ header.setSuspendTimeoutMillis(1000L);
+ header.setSubVersion(0L);
+ header.setBname(brokerName);
+ header.setSubscription(subString);
+ header.setExpressionType(type);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, header);
+ request.makeCustomHeaderToNet();
+ RemotingCommand expectResponse = RemotingCommand.createResponseCommand(ResponseCode.NO_MESSAGE, "success");
+
+ ArgumentCaptor<RemotingCommand> captor = ArgumentCaptor.forClass(RemotingCommand.class);
+ when(messagingProcessorMock.request(any(), eq(brokerName), captor.capture(), anyLong()))
+ .thenReturn(CompletableFuture.completedFuture(expectResponse));
+ RemotingCommand response = pullMessageActivity.processRequest0(ctx, request, null);
+ assertThat(captor.getValue().getExtFields()).isEqualTo(request.getExtFields());
+ assertThat(response).isNull();
+ verify(ctx, times(1)).writeAndFlush(eq(expectResponse));
+ }
+}
\ No newline at end of file
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java
new file mode 100644
index 000000000..e03bc26e0
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.activity;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest;
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.service.channel.SimpleChannel;
+import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext;
+import org.apache.rocketmq.proxy.service.metadata.MetadataService;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SendMessageActivityTest extends InitConfigAndLoggerTest {
+ SendMessageActivity sendMessageActivity;
+
+ @Mock
+ MessagingProcessor messagingProcessorMock;
+ @Mock
+ MetadataService metadataServiceMock;
+
+ String topic = "topic";
+ String producerGroup = "group";
+ String brokerName = "brokerName";
+ @Spy
+ ChannelHandlerContext ctx = new SimpleChannelHandlerContext(new SimpleChannel(null, "1", "2")) {
+ @Override
+ public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
+ return null;
+ }
+ };
+
+ @Before
+ public void setup() {
+ sendMessageActivity = new SendMessageActivity(null, messagingProcessorMock);
+ when(messagingProcessorMock.getMetadataService()).thenReturn(metadataServiceMock);
+ }
+
+ @Test
+ public void testSendMessage() throws Exception {
+ when(metadataServiceMock.getTopicMessageType(eq(topic))).thenReturn(TopicMessageType.NORMAL);
+ Message message = new Message(topic, "123".getBytes());
+ message.putUserProperty("a", "b");
+ SendMessageRequestHeader sendMessageRequestHeader = new SendMessageRequestHeader();
+ sendMessageRequestHeader.setTopic(topic);
+ sendMessageRequestHeader.setProducerGroup(producerGroup);
+ sendMessageRequestHeader.setDefaultTopic("");
+ sendMessageRequestHeader.setDefaultTopicQueueNums(0);
+ sendMessageRequestHeader.setQueueId(0);
+ sendMessageRequestHeader.setSysFlag(0);
+ sendMessageRequestHeader.setBname(brokerName);
+ sendMessageRequestHeader.setProperties(MessageDecoder.messageProperties2String(message.getProperties()));
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, sendMessageRequestHeader);
+ remotingCommand.setBody(message.getBody());
+ remotingCommand.makeCustomHeaderToNet();
+
+ RemotingCommand expectResponse = RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "success");
+ when(messagingProcessorMock.request(any(), eq(brokerName), eq(remotingCommand), anyLong()))
+ .thenReturn(CompletableFuture.completedFuture(expectResponse));
+ RemotingCommand response = sendMessageActivity.processRequest0(ctx, remotingCommand, null);
+ assertThat(response).isNull();
+ verify(ctx, times(1)).writeAndFlush(eq(expectResponse));
+ }
+}
\ No newline at end of file