You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/09 11:53:44 UTC

[rocketmq] 03/26: [ISSUE #5406] Add unit test

This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit b887313bfbcb2d90c6db9757299fd76deb9f103d
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Tue Nov 1 15:14:23 2022 +0800

    [ISSUE #5406] Add unit test
---
 .../activity/AbstractRemotingActivity.java         |   1 +
 .../remoting/activity/GetTopicRouteActivity.java   |   1 +
 .../remoting/activity/PullMessageActivity.java     |  15 +-
 .../activity/AbstractRemotingActivityTest.java     | 130 +++++++++++++++-
 .../remoting/activity/PullMessageActivityTest.java | 165 +++++++++++++++++++++
 .../remoting/activity/SendMessageActivityTest.java | 102 +++++++++++++
 6 files changed, 394 insertions(+), 20 deletions(-)

diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
index 7f0d891ec..54ef7bfa7 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
@@ -67,6 +67,7 @@ public abstract class AbstractRemotingActivity implements NettyRequestProcessor
         }
         String brokerName = request.getExtFields().get(BROKER_NAME_FIELD);
         if (request.isOnewayRPC()) {
+            messagingProcessor.requestOneway(context, brokerName, request, timeoutMillis);
             return null;
         }
         messagingProcessor.request(context, brokerName, request, timeoutMillis)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java
index d3b7de98d..26d28bafe 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java
@@ -49,6 +49,7 @@ public class GetTopicRouteActivity extends AbstractRemotingActivity {
         final GetRouteInfoRequestHeader requestHeader =
             (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
         List<Address> addressList = new ArrayList<>();
+        // AddressScheme is just a placeholder and will not affect topic route result in this case.
         addressList.add(new Address(Address.AddressScheme.IPv4, HostAndPort.fromString(proxyConfig.getRemotingAccessPoint())));
         ProxyTopicRouteData proxyTopicRouteData = messagingProcessor.getTopicRouteDataForProxy(context, addressList, requestHeader.getTopic());
         TopicRouteData topicRouteData = proxyTopicRouteData.buildTopicRouteData();
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java
index 819bf139d..873b52460 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.proxy.remoting.activity;
 
 import io.netty.channel.ChannelHandlerContext;
 import java.time.Duration;
-import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
@@ -39,10 +38,6 @@ public class PullMessageActivity extends AbstractRemotingActivity {
     @Override
     protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request,
         ProxyContext context) throws Exception {
-        if (request.getExtFields().get(BROKER_NAME_FIELD) == null) {
-            return RemotingCommand.buildErrorResponse(ResponseCode.VERSION_NOT_SUPPORTED,
-                "Request doesn't have field bname");
-        }
         PullMessageRequestHeader requestHeader = (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
         if (!PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag())) {
             ConsumerGroupInfo consumerInfo = messagingProcessor.getConsumerGroupInfo(requestHeader.getConsumerGroup());
@@ -57,16 +52,10 @@ public class PullMessageActivity extends AbstractRemotingActivity {
             }
             requestHeader.setSubscription(subscriptionData.getSubString());
             requestHeader.setExpressionType(subscriptionData.getExpressionType());
+            request.writeCustomHeader(requestHeader);
             request.makeCustomHeaderToNet();
         }
-        String brokerName = requestHeader.getBname();
         long timeoutMillis = requestHeader.getSuspendTimeoutMillis() + Duration.ofSeconds(10).toMillis();
-        CompletableFuture<RemotingCommand> future = messagingProcessor.request(context, brokerName, request, timeoutMillis);
-        future.thenAccept(r -> writeResponse(ctx, context, request, r))
-            .exceptionally(t -> {
-                writeErrResponse(ctx, context, request, t);
-                return null;
-            });
-        return null;
+        return request(ctx, request, context, timeoutMillis);
     }
 }
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java
index b581d8a91..74eb3cbd8 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java
@@ -21,17 +21,24 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
 import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.acl.common.AclException;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.common.ProxyException;
+import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
 import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest;
 import org.apache.rocketmq.proxy.processor.MessagingProcessor;
 import org.apache.rocketmq.proxy.service.channel.SimpleChannel;
 import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
@@ -39,7 +46,8 @@ import org.mockito.junit.MockitoJUnitRunner;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -50,7 +58,7 @@ public class AbstractRemotingActivityTest extends InitConfigAndLoggerTest {
     @Mock
     MessagingProcessor messagingProcessorMock;
     @Spy
-    ChannelHandlerContext ctx = new SimpleChannelHandlerContext(new SimpleChannel(null, "1", "2")) {
+    ChannelHandlerContext ctx = new SimpleChannelHandlerContext(new SimpleChannel(null, "0.0.0.0:0", "1.1.1.1:1")) {
         @Override
         public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
             return null;
@@ -69,16 +77,124 @@ public class AbstractRemotingActivityTest extends InitConfigAndLoggerTest {
     }
 
     @Test
-    public void request() throws Exception {
+    public void testCreateContext() throws Exception {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+        ProxyContext context = remotingActivity.createContext(ctx, request);
+        assertThat(context.getLanguage()).isEqualTo(LanguageCode.JAVA.name());
+        assertThat(context.getAction()).isEqualTo("Remoting" + RequestCode.PULL_MESSAGE);
+    }
+
+    @Test
+    public void testRequest() throws Exception {
         String brokerName = "broker";
-        String remark = "success";
-        when(messagingProcessorMock.request(any(), anyString(), any(), anyLong())).thenReturn(CompletableFuture.completedFuture(
-            RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, remark)
+        RemotingCommand response = RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "remark");
+        when(messagingProcessorMock.request(any(), eq(brokerName), any(), anyLong())).thenReturn(CompletableFuture.completedFuture(
+            response
         ));
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
         request.addExtField(AbstractRemotingActivity.BROKER_NAME_FIELD, brokerName);
         RemotingCommand remotingCommand = remotingActivity.request(ctx, request, null, 10000);
         assertThat(remotingCommand).isNull();
-        verify(ctx, times(1)).writeAndFlush(any());
+        verify(ctx, times(1)).writeAndFlush(response);
+    }
+
+    @Test
+    public void testRequestOneway() throws Exception {
+        String brokerName = "broker";
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+        request.markOnewayRPC();
+        request.addExtField(AbstractRemotingActivity.BROKER_NAME_FIELD, brokerName);
+        RemotingCommand remotingCommand = remotingActivity.request(ctx, request, null, 10000);
+        assertThat(remotingCommand).isNull();
+        verify(messagingProcessorMock, times(1)).requestOneway(any(), eq(brokerName), any(), anyLong());
+    }
+
+    @Test
+    public void testRequestInvalid() throws Exception {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+        request.addExtField("test", "test");
+        RemotingCommand remotingCommand = remotingActivity.request(ctx, request, null, 10000);
+        assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.VERSION_NOT_SUPPORTED);
+        verify(ctx, never()).writeAndFlush(any());
+    }
+
+    @Test
+    public void testRequestProxyException() throws Exception {
+        ArgumentCaptor<RemotingCommand> captor = ArgumentCaptor.forClass(RemotingCommand.class);
+        String brokerName = "broker";
+        String remark = "exception";
+        CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+        future.completeExceptionally(new ProxyException(ProxyExceptionCode.FORBIDDEN, remark));
+        when(messagingProcessorMock.request(any(), eq(brokerName), any(), anyLong())).thenReturn(future);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+        request.addExtField(AbstractRemotingActivity.BROKER_NAME_FIELD, brokerName);
+        RemotingCommand remotingCommand = remotingActivity.request(ctx, request, null, 10000);
+        assertThat(remotingCommand).isNull();
+        verify(ctx, times(1)).writeAndFlush(captor.capture());
+        assertThat(captor.getValue().getCode()).isEqualTo(ResponseCode.NO_PERMISSION);
+    }
+
+    @Test
+    public void testRequestClientException() throws Exception {
+        ArgumentCaptor<RemotingCommand> captor = ArgumentCaptor.forClass(RemotingCommand.class);
+        String brokerName = "broker";
+        String remark = "exception";
+        CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+        future.completeExceptionally(new MQClientException(remark, null));
+        when(messagingProcessorMock.request(any(), eq(brokerName), any(), anyLong())).thenReturn(future);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+        request.addExtField(AbstractRemotingActivity.BROKER_NAME_FIELD, brokerName);
+        RemotingCommand remotingCommand = remotingActivity.request(ctx, request, null, 10000);
+        assertThat(remotingCommand).isNull();
+        verify(ctx, times(1)).writeAndFlush(captor.capture());
+        assertThat(captor.getValue().getCode()).isEqualTo(-1);
+    }
+
+    @Test
+    public void testRequestBrokerException() throws Exception {
+        ArgumentCaptor<RemotingCommand> captor = ArgumentCaptor.forClass(RemotingCommand.class);
+        String brokerName = "broker";
+        String remark = "exception";
+        CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+        future.completeExceptionally(new MQBrokerException(ResponseCode.FLUSH_DISK_TIMEOUT, remark));
+        when(messagingProcessorMock.request(any(), eq(brokerName), any(), anyLong())).thenReturn(future);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+        request.addExtField(AbstractRemotingActivity.BROKER_NAME_FIELD, brokerName);
+        RemotingCommand remotingCommand = remotingActivity.request(ctx, request, null, 10000);
+        assertThat(remotingCommand).isNull();
+        verify(ctx, times(1)).writeAndFlush(captor.capture());
+        assertThat(captor.getValue().getCode()).isEqualTo(ResponseCode.FLUSH_DISK_TIMEOUT);
+    }
+
+    @Test
+    public void testRequestAclException() throws Exception {
+        ArgumentCaptor<RemotingCommand> captor = ArgumentCaptor.forClass(RemotingCommand.class);
+        String brokerName = "broker";
+        String remark = "exception";
+        CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+        future.completeExceptionally(new AclException(remark, ResponseCode.MESSAGE_ILLEGAL));
+        when(messagingProcessorMock.request(any(), eq(brokerName), any(), anyLong())).thenReturn(future);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+        request.addExtField(AbstractRemotingActivity.BROKER_NAME_FIELD, brokerName);
+        RemotingCommand remotingCommand = remotingActivity.request(ctx, request, null, 10000);
+        assertThat(remotingCommand).isNull();
+        verify(ctx, times(1)).writeAndFlush(captor.capture());
+        assertThat(captor.getValue().getCode()).isEqualTo(ResponseCode.NO_PERMISSION);
+    }
+
+    @Test
+    public void testRequestDefaultException() throws Exception {
+        ArgumentCaptor<RemotingCommand> captor = ArgumentCaptor.forClass(RemotingCommand.class);
+        String brokerName = "broker";
+        String remark = "exception";
+        CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+        future.completeExceptionally(new Exception(remark));
+        when(messagingProcessorMock.request(any(), eq(brokerName), any(), anyLong())).thenReturn(future);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+        request.addExtField(AbstractRemotingActivity.BROKER_NAME_FIELD, brokerName);
+        RemotingCommand remotingCommand = remotingActivity.request(ctx, request, null, 10000);
+        assertThat(remotingCommand).isNull();
+        verify(ctx, times(1)).writeAndFlush(captor.capture());
+        assertThat(captor.getValue().getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
     }
 }
\ No newline at end of file
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java
new file mode 100644
index 000000000..ffbe2ffac
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.activity;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.sysflag.PullSysFlag;
+import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest;
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.service.channel.SimpleChannel;
+import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PullMessageActivityTest extends InitConfigAndLoggerTest {
+    PullMessageActivity pullMessageActivity;
+
+    @Mock
+    MessagingProcessor messagingProcessorMock;
+    @Mock
+    ConsumerGroupInfo consumerGroupInfoMock;
+
+    String topic = "topic";
+    String group = "group";
+    String brokerName = "brokerName";
+    String subString = "sub";
+    String type = "type";
+    @Spy
+    ChannelHandlerContext ctx = new SimpleChannelHandlerContext(new SimpleChannel(null, "1", "2")) {
+        @Override
+        public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
+            return null;
+        }
+    };
+
+    @Before
+    public void setup() throws Exception {
+        pullMessageActivity = new PullMessageActivity(null, messagingProcessorMock);
+    }
+
+    @Test
+    public void testPullMessageWithoutSub() throws Exception {
+        when(messagingProcessorMock.getConsumerGroupInfo(eq(group)))
+            .thenReturn(consumerGroupInfoMock);
+        SubscriptionData subscriptionData = new SubscriptionData();
+        subscriptionData.setSubString(subString);
+        subscriptionData.setExpressionType(type);
+        when(consumerGroupInfoMock.findSubscriptionData(eq(topic)))
+            .thenReturn(subscriptionData);
+
+        PullMessageRequestHeader header = new PullMessageRequestHeader();
+        header.setTopic(topic);
+        header.setConsumerGroup(group);
+        header.setQueueId(0);
+        header.setQueueOffset(0L);
+        header.setMaxMsgNums(16);
+        header.setSysFlag(PullSysFlag.buildSysFlag(true, false, false, false));
+        header.setCommitOffset(0L);
+        header.setSuspendTimeoutMillis(1000L);
+        header.setSubVersion(0L);
+        header.setBname(brokerName);
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, header);
+        request.makeCustomHeaderToNet();
+        RemotingCommand expectResponse = RemotingCommand.createResponseCommand(ResponseCode.NO_MESSAGE, "success");
+        PullMessageRequestHeader newHeader = new PullMessageRequestHeader();
+        newHeader.setTopic(topic);
+        newHeader.setConsumerGroup(group);
+        newHeader.setQueueId(0);
+        newHeader.setQueueOffset(0L);
+        newHeader.setMaxMsgNums(16);
+        newHeader.setSysFlag(PullSysFlag.buildSysFlag(true, false, false, false));
+        newHeader.setCommitOffset(0L);
+        newHeader.setSuspendTimeoutMillis(1000L);
+        newHeader.setSubVersion(0L);
+        newHeader.setBname(brokerName);
+        newHeader.setSubscription(subString);
+        newHeader.setExpressionType(type);
+        RemotingCommand matchRequest = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, newHeader);
+        matchRequest.setOpaque(request.getOpaque());
+        matchRequest.makeCustomHeaderToNet();
+
+        ArgumentCaptor<RemotingCommand> captor = ArgumentCaptor.forClass(RemotingCommand.class);
+        when(messagingProcessorMock.request(any(), eq(brokerName), captor.capture(), anyLong()))
+            .thenReturn(CompletableFuture.completedFuture(expectResponse));
+        RemotingCommand response = pullMessageActivity.processRequest0(ctx, request, null);
+        assertThat(captor.getValue().getExtFields()).isEqualTo(matchRequest.getExtFields());
+        assertThat(response).isNull();
+        verify(ctx, times(1)).writeAndFlush(eq(expectResponse));
+    }
+
+    @Test
+    public void testPullMessageWithSub() throws Exception {
+        when(messagingProcessorMock.getConsumerGroupInfo(eq(group)))
+            .thenReturn(consumerGroupInfoMock);
+        SubscriptionData subscriptionData = new SubscriptionData();
+        subscriptionData.setSubString(subString);
+        subscriptionData.setExpressionType(type);
+        when(consumerGroupInfoMock.findSubscriptionData(eq(topic)))
+            .thenReturn(subscriptionData);
+
+        PullMessageRequestHeader header = new PullMessageRequestHeader();
+        header.setTopic(topic);
+        header.setConsumerGroup(group);
+        header.setQueueId(0);
+        header.setQueueOffset(0L);
+        header.setMaxMsgNums(16);
+        header.setSysFlag(PullSysFlag.buildSysFlag(true, true, false, false));
+        header.setCommitOffset(0L);
+        header.setSuspendTimeoutMillis(1000L);
+        header.setSubVersion(0L);
+        header.setBname(brokerName);
+        header.setSubscription(subString);
+        header.setExpressionType(type);
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, header);
+        request.makeCustomHeaderToNet();
+        RemotingCommand expectResponse = RemotingCommand.createResponseCommand(ResponseCode.NO_MESSAGE, "success");
+
+        ArgumentCaptor<RemotingCommand> captor = ArgumentCaptor.forClass(RemotingCommand.class);
+        when(messagingProcessorMock.request(any(), eq(brokerName), captor.capture(), anyLong()))
+            .thenReturn(CompletableFuture.completedFuture(expectResponse));
+        RemotingCommand response = pullMessageActivity.processRequest0(ctx, request, null);
+        assertThat(captor.getValue().getExtFields()).isEqualTo(request.getExtFields());
+        assertThat(response).isNull();
+        verify(ctx, times(1)).writeAndFlush(eq(expectResponse));
+    }
+}
\ No newline at end of file
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java
new file mode 100644
index 000000000..e03bc26e0
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.activity;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest;
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.service.channel.SimpleChannel;
+import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext;
+import org.apache.rocketmq.proxy.service.metadata.MetadataService;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SendMessageActivityTest extends InitConfigAndLoggerTest {
+    SendMessageActivity sendMessageActivity;
+
+    @Mock
+    MessagingProcessor messagingProcessorMock;
+    @Mock
+    MetadataService metadataServiceMock;
+
+    String topic = "topic";
+    String producerGroup = "group";
+    String brokerName = "brokerName";
+    @Spy
+    ChannelHandlerContext ctx = new SimpleChannelHandlerContext(new SimpleChannel(null, "1", "2")) {
+        @Override
+        public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
+            return null;
+        }
+    };
+
+    @Before
+    public void setup() {
+        sendMessageActivity = new SendMessageActivity(null, messagingProcessorMock);
+        when(messagingProcessorMock.getMetadataService()).thenReturn(metadataServiceMock);
+    }
+
+    @Test
+    public void testSendMessage() throws Exception {
+        when(metadataServiceMock.getTopicMessageType(eq(topic))).thenReturn(TopicMessageType.NORMAL);
+        Message message = new Message(topic, "123".getBytes());
+        message.putUserProperty("a", "b");
+        SendMessageRequestHeader sendMessageRequestHeader = new SendMessageRequestHeader();
+        sendMessageRequestHeader.setTopic(topic);
+        sendMessageRequestHeader.setProducerGroup(producerGroup);
+        sendMessageRequestHeader.setDefaultTopic("");
+        sendMessageRequestHeader.setDefaultTopicQueueNums(0);
+        sendMessageRequestHeader.setQueueId(0);
+        sendMessageRequestHeader.setSysFlag(0);
+        sendMessageRequestHeader.setBname(brokerName);
+        sendMessageRequestHeader.setProperties(MessageDecoder.messageProperties2String(message.getProperties()));
+        RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, sendMessageRequestHeader);
+        remotingCommand.setBody(message.getBody());
+        remotingCommand.makeCustomHeaderToNet();
+
+        RemotingCommand expectResponse = RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "success");
+        when(messagingProcessorMock.request(any(), eq(brokerName), eq(remotingCommand), anyLong()))
+            .thenReturn(CompletableFuture.completedFuture(expectResponse));
+        RemotingCommand response = sendMessageActivity.processRequest0(ctx, remotingCommand, null);
+        assertThat(response).isNull();
+        verify(ctx, times(1)).writeAndFlush(eq(expectResponse));
+    }
+}
\ No newline at end of file