You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/09 11:53:42 UTC

[rocketmq] 01/26: [ISSUE #5406] Add processor for remoting messaging module

This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 934f5b657639aede31a70aa902b25207ba16d179
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Thu Oct 27 16:00:13 2022 +0800

    [ISSUE #5406] Add processor for remoting messaging module
---
 .../common/attribute/TopicMessageType.java         |  18 +++
 .../apache/rocketmq/proxy/config/ProxyConfig.java  |  30 ++++
 .../proxy/processor/AbstractProcessor.java         |  19 ---
 .../proxy/processor/DefaultMessagingProcessor.java |  14 ++
 .../proxy/processor/MessagingProcessor.java        |   6 +
 .../proxy/processor/ProducerProcessor.java         |   2 +-
 .../proxy/processor/RequestBrokerProcessor.java    |  39 +++++
 .../activity/AbstractRemotingActivity.java         | 166 +++++++++++++++++++++
 .../remoting/activity/AckMessageActivity.java      |  38 +++++
 .../activity/ChangeInvisibleTimeActivity.java      |  38 +++++
 .../remoting/activity/ConsumerManagerActivity.java | 112 ++++++++++++++
 .../remoting/activity/GetTopicRouteActivity.java   |  71 +++++++++
 .../remoting/activity/PopMessageActivity.java      |  41 +++++
 .../remoting/activity/PullMessageActivity.java     |  72 +++++++++
 .../remoting/activity/SendMessageActivity.java     |  96 ++++++++++++
 .../proxy/remoting/pipeline/RequestPipeline.java   |  34 ++---
 .../proxy/service/channel/SimpleChannel.java       |   5 +
 .../service/message/ClusterMessageService.java     |  37 ++++-
 .../proxy/service/message/LocalMessageService.java |  12 ++
 .../proxy/service/message/MessageService.java      |   6 +
 .../activity/AbstractRemotingActivityTest.java     |  84 +++++++++++
 21 files changed, 892 insertions(+), 48 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
index 8c484da31..5e6629e3b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
+++ b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
@@ -18,7 +18,9 @@
 package org.apache.rocketmq.common.attribute;
 
 import com.google.common.collect.Sets;
+import java.util.Map;
 import java.util.Set;
+import org.apache.rocketmq.common.message.MessageConst;
 
 public enum TopicMessageType {
     UNSPECIFIED("UNSPECIFIED"),
@@ -40,6 +42,22 @@ public enum TopicMessageType {
         return value;
     }
 
+    public static TopicMessageType parseFromMessageProperty(Map<String, String> messageProperty) {
+        String isTrans = messageProperty.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
+        String isTransValue = "true";
+        if (isTransValue.equals(isTrans)) {
+            return TopicMessageType.TRANSACTION;
+        } else if (messageProperty.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null
+            || messageProperty.get(MessageConst.PROPERTY_TIMER_DELIVER_MS) != null
+            || messageProperty.get(MessageConst.PROPERTY_TIMER_DELAY_SEC) != null) {
+            return TopicMessageType.DELAY;
+        } else if (messageProperty.get(MessageConst.PROPERTY_SHARDING_KEY) != null) {
+            return TopicMessageType.FIFO;
+        } else {
+            return TopicMessageType.NORMAL;
+        }
+    }
+
     public String getMetricsValue() {
         return value.toLowerCase();
     }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index 6bb488984..cbedc3c50 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -178,6 +178,12 @@ public class ProxyConfig implements ConfigFile {
     // Example address: 127.0.0.1:1234
     private String metricCollectorAddress = "";
 
+    private String regionId = "";
+
+    private boolean traceOn = false;
+
+    private String remotingAccessPoint = "";
+
     private BrokerConfig.MetricsExporterType metricsExporterType = BrokerConfig.MetricsExporterType.DISABLE;
 
     private String metricsGrpcExporterTarget = "";
@@ -961,6 +967,30 @@ public class ProxyConfig implements ConfigFile {
         this.grpcClientIdleTimeMills = grpcClientIdleTimeMills;
     }
 
+    public String getRegionId() {
+        return regionId;
+    }
+
+    public void setRegionId(String regionId) {
+        this.regionId = regionId;
+    }
+
+    public boolean isTraceOn() {
+        return traceOn;
+    }
+
+    public void setTraceOn(boolean traceOn) {
+        this.traceOn = traceOn;
+    }
+
+    public String getRemotingAccessPoint() {
+        return remotingAccessPoint;
+    }
+
+    public void setRemotingAccessPoint(String remotingAccessPoint) {
+        this.remotingAccessPoint = remotingAccessPoint;
+    }
+
     public BrokerConfig.MetricsExporterType getMetricsExporterType() {
         return metricsExporterType;
     }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/AbstractProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/AbstractProcessor.java
index c223eb478..679cc4b3d 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/AbstractProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/AbstractProcessor.java
@@ -16,10 +16,7 @@
  */
 package org.apache.rocketmq.proxy.processor;
 
-import org.apache.rocketmq.common.attribute.TopicMessageType;
 import org.apache.rocketmq.common.consumer.ReceiptHandle;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.proxy.common.AbstractStartAndShutdown;
 import org.apache.rocketmq.proxy.common.ProxyException;
 import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
@@ -41,20 +38,4 @@ public abstract class AbstractProcessor extends AbstractStartAndShutdown {
             throw new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "receipt handle is expired");
         }
     }
-
-    protected TopicMessageType parseFromMessageExt(Message message) {
-        String isTrans = message.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
-        String isTransValue = "true";
-        if (isTransValue.equals(isTrans)) {
-            return TopicMessageType.TRANSACTION;
-        } else if (message.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null
-            || message.getProperty(MessageConst.PROPERTY_TIMER_DELIVER_MS) != null
-            || message.getProperty(MessageConst.PROPERTY_TIMER_DELAY_SEC) != null) {
-            return TopicMessageType.DELAY;
-        } else if (message.getProperty(MessageConst.PROPERTY_SHARDING_KEY) != null) {
-            return TopicMessageType.FIFO;
-        } else {
-            return TopicMessageType.NORMAL;
-        }
-    }
 }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
index 95fba895a..1b7baba0a 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
@@ -60,6 +60,7 @@ public class DefaultMessagingProcessor extends AbstractStartAndShutdown implemen
     protected ConsumerProcessor consumerProcessor;
     protected TransactionProcessor transactionProcessor;
     protected ClientProcessor clientProcessor;
+    protected RequestBrokerProcessor requestBrokerProcessor;
 
     protected ThreadPoolExecutor producerProcessorExecutor;
     protected ThreadPoolExecutor consumerProcessorExecutor;
@@ -88,6 +89,7 @@ public class DefaultMessagingProcessor extends AbstractStartAndShutdown implemen
         this.consumerProcessor = new ConsumerProcessor(this, serviceManager, this.consumerProcessorExecutor);
         this.transactionProcessor = new TransactionProcessor(this, serviceManager);
         this.clientProcessor = new ClientProcessor(this, serviceManager);
+        this.requestBrokerProcessor = new RequestBrokerProcessor(this, serviceManager);
 
         this.init();
     }
@@ -218,6 +220,18 @@ public class DefaultMessagingProcessor extends AbstractStartAndShutdown implemen
         return this.consumerProcessor.getMinOffset(ctx, messageQueue, timeoutMillis);
     }
 
+    @Override
+    public CompletableFuture<RemotingCommand> request(ProxyContext ctx, String brokerName, RemotingCommand request,
+        long timeoutMillis) {
+        return this.requestBrokerProcessor.request(ctx, brokerName, request, timeoutMillis);
+    }
+
+    @Override
+    public CompletableFuture<Void> requestOneway(ProxyContext ctx, String brokerName, RemotingCommand request,
+        long timeoutMillis) {
+        return this.requestBrokerProcessor.requestOneway(ctx, brokerName, request, timeoutMillis);
+    }
+
     @Override
     public void registerProducer(ProxyContext ctx, String producerGroup, ClientChannelInfo clientChannelInfo) {
         this.clientProcessor.registerProducer(ctx, producerGroup, clientChannelInfo);
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
index 89be595ec..3e8b8084e 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
@@ -230,6 +230,12 @@ public interface MessagingProcessor extends StartAndShutdown {
         long timeoutMillis
     );
 
+    CompletableFuture<RemotingCommand> request(ProxyContext ctx, String brokerName, RemotingCommand request,
+        long timeoutMillis);
+
+    CompletableFuture<Void> requestOneway(ProxyContext ctx, String brokerName, RemotingCommand request,
+        long timeoutMillis);
+
     void registerProducer(
         ProxyContext ctx,
         String producerGroup,
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
index 95bd0e5fe..2fce78d31 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
@@ -73,7 +73,7 @@ public class ProducerProcessor extends AbstractProcessor {
                     // Do not check retry or dlq topic
                     if (!NamespaceUtil.isRetryTopic(topic) && !NamespaceUtil.isDLQTopic(topic)) {
                         TopicMessageType topicMessageType = serviceManager.getMetadataService().getTopicMessageType(topic);
-                        TopicMessageType messageType = parseFromMessageExt(message);
+                        TopicMessageType messageType = TopicMessageType.parseFromMessageProperty(message.getProperties());
                         topicMessageTypeValidator.validate(topicMessageType, messageType);
                     }
                 }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/RequestBrokerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/RequestBrokerProcessor.java
new file mode 100644
index 000000000..9f3187cde
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/RequestBrokerProcessor.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.processor;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.service.ServiceManager;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class RequestBrokerProcessor extends AbstractProcessor {
+
+    public RequestBrokerProcessor(MessagingProcessor messagingProcessor,
+        ServiceManager serviceManager) {
+        super(messagingProcessor, serviceManager);
+    }
+
+    CompletableFuture<RemotingCommand> request(ProxyContext ctx, String brokerName, RemotingCommand request, long timeoutMillis) {
+        return serviceManager.getMessageService().request(ctx, brokerName, request, timeoutMillis);
+    }
+
+    CompletableFuture<Void> requestOneway(ProxyContext ctx, String brokerName, RemotingCommand request, long timeoutMillis) {
+        return serviceManager.getMessageService().requestOneway(ctx, brokerName, request, timeoutMillis);
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
new file mode 100644
index 000000000..7f0d891ec
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.activity;
+
+import io.netty.channel.ChannelHandlerContext;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.acl.common.AclException;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.common.ProxyException;
+import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
+import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.config.ProxyConfig;
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractRemotingActivity implements NettyRequestProcessor {
+    protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+    protected final MessagingProcessor messagingProcessor;
+    protected static final String BROKER_NAME_FIELD = "bname";
+    private static final Map<ProxyExceptionCode, Integer> PROXY_EXCEPTION_RESPONSE_CODE_MAP = new HashMap<ProxyExceptionCode, Integer>() {
+        {
+            put(ProxyExceptionCode.FORBIDDEN, ResponseCode.NO_PERMISSION);
+            put(ProxyExceptionCode.MESSAGE_PROPERTY_CONFLICT_WITH_TYPE, ResponseCode.MESSAGE_ILLEGAL);
+            put(ProxyExceptionCode.INTERNAL_SERVER_ERROR, ResponseCode.SYSTEM_ERROR);
+            put(ProxyExceptionCode.TRANSACTION_DATA_NOT_FOUND, ResponseCode.SUCCESS);
+        }
+    };
+    protected final RequestPipeline requestPipeline;
+
+    public AbstractRemotingActivity(RequestPipeline requestPipeline, MessagingProcessor messagingProcessor) {
+        this.requestPipeline = requestPipeline;
+        this.messagingProcessor = messagingProcessor;
+    }
+
+    protected RemotingCommand request(ChannelHandlerContext ctx, RemotingCommand request,
+        ProxyContext context, long timeoutMillis) throws Exception {
+        if (request.getExtFields().get(BROKER_NAME_FIELD) == null) {
+            return RemotingCommand.buildErrorResponse(ResponseCode.VERSION_NOT_SUPPORTED,
+                "Request doesn't have field bname");
+        }
+        String brokerName = request.getExtFields().get(BROKER_NAME_FIELD);
+        if (request.isOnewayRPC()) {
+            return null;
+        }
+        messagingProcessor.request(context, brokerName, request, timeoutMillis)
+            .thenAccept(r -> writeResponse(ctx, context, request, r))
+            .exceptionally(t -> {
+                writeErrResponse(ctx, context, request, t);
+                return null;
+            });
+        return null;
+    }
+
+    @Override
+    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+        ProxyContext context = createContext(ctx, request);
+        try {
+            this.requestPipeline.execute(ctx, request, context);
+            RemotingCommand response = this.processRequest0(ctx, request, context);
+            if (response != null) {
+                writeResponse(ctx, context, request, response);
+            }
+            return null;
+        } catch (Throwable t) {
+            writeErrResponse(ctx, context, request, t);
+            return null;
+        }
+    }
+
+    @Override
+    public boolean rejectRequest() {
+        return false;
+    }
+
+    protected abstract RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request,
+        ProxyContext context) throws Exception;
+
+    protected ProxyContext createContext(ChannelHandlerContext ctx, RemotingCommand request) {
+        ProxyContext context = ProxyContext.create();
+        context.setAction("Remoting" + request.getCode())
+            .setLanguage(request.getLanguage().name())
+            .setChannel(ctx.channel())
+            .setLocalAddress(RemotingUtil.socketAddress2String(ctx.channel().localAddress()))
+            .setRemoteAddress(RemotingUtil.socketAddress2String(ctx.channel().remoteAddress()));
+
+        return context;
+    }
+
+    protected void writeErrResponse(ChannelHandlerContext ctx, final ProxyContext context,
+        final RemotingCommand request, Throwable t) {
+        t = ExceptionUtils.getRealException(t);
+        if (t instanceof ProxyException) {
+            ProxyException e = (ProxyException) t;
+            writeResponse(ctx, context, request,
+                RemotingCommand.createResponseCommand(
+                    PROXY_EXCEPTION_RESPONSE_CODE_MAP.getOrDefault(e.getCode(), ResponseCode.SYSTEM_ERROR),
+                    e.getMessage()),
+                t);
+        } else if (t instanceof MQClientException) {
+            MQClientException e = (MQClientException) t;
+            writeResponse(ctx, context, request, RemotingCommand.createResponseCommand(e.getResponseCode(), e.getErrorMessage()), t);
+        } else if (t instanceof MQBrokerException) {
+            MQBrokerException e = (MQBrokerException) t;
+            writeResponse(ctx, context, request, RemotingCommand.createResponseCommand(e.getResponseCode(), e.getErrorMessage()), t);
+        } else if (t instanceof AclException) {
+            writeResponse(ctx, context, request, RemotingCommand.createResponseCommand(ResponseCode.NO_PERMISSION, t.getMessage()), t);
+        } else {
+            writeResponse(ctx, context, request,
+                RemotingCommand.createResponseCommand(ResponseCode.SYSTEM_ERROR, t.getMessage()), t);
+        }
+    }
+
+    protected void writeResponse(ChannelHandlerContext ctx, final ProxyContext context,
+        final RemotingCommand request, RemotingCommand response) {
+        writeResponse(ctx, context, request, response, null);
+    }
+
+    protected void writeResponse(ChannelHandlerContext ctx, final ProxyContext context,
+        final RemotingCommand request, RemotingCommand response, Throwable t) {
+        if (request.isOnewayRPC()) {
+            return;
+        }
+        if (!ctx.channel().isWritable()) {
+            return;
+        }
+
+        ProxyConfig config = ConfigurationManager.getProxyConfig();
+
+        response.setOpaque(request.getOpaque());
+        response.markResponseType();
+        response.addExtField(MessageConst.PROPERTY_MSG_REGION, config.getRegionId());
+        response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(config.isTraceOn()));
+        if (t != null) {
+            response.setRemark(t.getMessage());
+        }
+
+        ctx.writeAndFlush(response);
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AckMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AckMessageActivity.java
new file mode 100644
index 000000000..723b5918b
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AckMessageActivity.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.activity;
+
+import io.netty.channel.ChannelHandlerContext;
+import java.time.Duration;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class AckMessageActivity extends AbstractRemotingActivity {
+    public AckMessageActivity(RequestPipeline requestPipeline,
+        MessagingProcessor messagingProcessor) {
+        super(requestPipeline, messagingProcessor);
+    }
+
+    @Override
+    protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request,
+        ProxyContext context) throws Exception {
+        return request(ctx, request, context, Duration.ofSeconds(3).toMillis());
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ChangeInvisibleTimeActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ChangeInvisibleTimeActivity.java
new file mode 100644
index 000000000..9f6de99e0
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ChangeInvisibleTimeActivity.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.activity;
+
+import io.netty.channel.ChannelHandlerContext;
+import java.time.Duration;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class ChangeInvisibleTimeActivity extends AbstractRemotingActivity {
+    public ChangeInvisibleTimeActivity(RequestPipeline requestPipeline,
+        MessagingProcessor messagingProcessor) {
+        super(requestPipeline, messagingProcessor);
+    }
+
+    @Override
+    protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request,
+        ProxyContext context) throws Exception {
+        return request(ctx, request, context, Duration.ofSeconds(3).toMillis());
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java
new file mode 100644
index 000000000..fb248a894
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.activity;
+
+import io.netty.channel.ChannelHandlerContext;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Set;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
+import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class ConsumerManagerActivity extends AbstractRemotingActivity {
+    public ConsumerManagerActivity(RequestPipeline requestPipeline, MessagingProcessor messagingProcessor) {
+        super(requestPipeline, messagingProcessor);
+    }
+
+    @Override
+    protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request,
+        ProxyContext context) throws Exception {
+        switch (request.getCode()) {
+            case RequestCode.GET_CONSUMER_LIST_BY_GROUP: {
+                return getConsumerListByGroup(ctx, request, context);
+            }
+            case RequestCode.LOCK_BATCH_MQ: {
+                return lockBatchMQ(ctx, request, context);
+            }
+            case RequestCode.UNLOCK_BATCH_MQ: {
+                return unlockBatchMQ(ctx, request, context);
+            }
+            case RequestCode.UPDATE_CONSUMER_OFFSET:
+            case RequestCode.QUERY_CONSUMER_OFFSET:
+            case RequestCode.SEARCH_OFFSET_BY_TIMESTAMP:
+            case RequestCode.GET_MIN_OFFSET:
+            case RequestCode.GET_MAX_OFFSET:
+            case RequestCode.GET_EARLIEST_MSG_STORETIME: {
+                return request(ctx, request, context, Duration.ofSeconds(3).toMillis());
+            }
+            default:
+                break;
+        }
+        return null;
+    }
+
+    protected RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request,
+        ProxyContext context) throws Exception {
+        // TODO after connection-related module
+        return null;
+    }
+
+    protected RemotingCommand lockBatchMQ(ChannelHandlerContext ctx, RemotingCommand request,
+        ProxyContext context) throws Exception {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        LockBatchRequestBody requestBody = LockBatchRequestBody.decode(request.getBody(), LockBatchRequestBody.class);
+        Set<MessageQueue> mqSet = requestBody.getMqSet();
+        if (mqSet.isEmpty()) {
+            response.setBody(requestBody.encode());
+            response.setRemark("MessageQueue set is empty");
+            return response;
+        }
+
+        String brokerName = new ArrayList<>(mqSet).get(0).getBrokerName();
+        messagingProcessor.request(context, brokerName, request, Duration.ofSeconds(3).toMillis())
+            .thenAccept(r -> writeResponse(ctx, context, request, r))
+            .exceptionally(t -> {
+                writeErrResponse(ctx, context, request, t);
+                return null;
+            });
+        return null;
+    }
+
+    protected RemotingCommand unlockBatchMQ(ChannelHandlerContext ctx, RemotingCommand request,
+        ProxyContext context) throws Exception {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        UnlockBatchRequestBody requestBody = UnlockBatchRequestBody.decode(request.getBody(), UnlockBatchRequestBody.class);
+        Set<MessageQueue> mqSet = requestBody.getMqSet();
+        if (mqSet.isEmpty()) {
+            response.setBody(requestBody.encode());
+            response.setRemark("MessageQueue set is empty");
+            return response;
+        }
+
+        String brokerName = new ArrayList<>(mqSet).get(0).getBrokerName();
+        messagingProcessor.request(context, brokerName, request, Duration.ofSeconds(3).toMillis())
+            .thenAccept(r -> writeResponse(ctx, context, request, r))
+            .exceptionally(t -> {
+                writeErrResponse(ctx, context, request, t);
+                return null;
+            });
+        return null;
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java
new file mode 100644
index 000000000..d3b7de98d
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.activity;
+
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.google.common.net.HostAndPort;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.proxy.common.Address;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.config.ProxyConfig;
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
+import org.apache.rocketmq.proxy.service.route.ProxyTopicRouteData;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class GetTopicRouteActivity extends AbstractRemotingActivity {
+    public GetTopicRouteActivity(RequestPipeline requestPipeline,
+        MessagingProcessor messagingProcessor) {
+        super(requestPipeline, messagingProcessor);
+    }
+
+    @Override
+    protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request,
+        ProxyContext context) throws Exception {
+        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        final GetRouteInfoRequestHeader requestHeader =
+            (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
+        List<Address> addressList = new ArrayList<>();
+        addressList.add(new Address(Address.AddressScheme.IPv4, HostAndPort.fromString(proxyConfig.getRemotingAccessPoint())));
+        ProxyTopicRouteData proxyTopicRouteData = messagingProcessor.getTopicRouteDataForProxy(context, addressList, requestHeader.getTopic());
+        TopicRouteData topicRouteData = proxyTopicRouteData.buildTopicRouteData();
+
+        byte[] content;
+        Boolean standardJsonOnly = requestHeader.getAcceptStandardJsonOnly();
+        if (request.getVersion() >= MQVersion.Version.V4_9_4.ordinal() || null != standardJsonOnly && standardJsonOnly) {
+            content = topicRouteData.encode(SerializerFeature.BrowserCompatible,
+                SerializerFeature.QuoteFieldNames, SerializerFeature.SkipTransientField,
+                SerializerFeature.MapSortField);
+        } else {
+            content = topicRouteData.encode();
+        }
+
+        response.setBody(content);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PopMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PopMessageActivity.java
new file mode 100644
index 000000000..d52b84b12
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PopMessageActivity.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.activity;
+
+import io.netty.channel.ChannelHandlerContext;
+import java.time.Duration;
+import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class PopMessageActivity extends AbstractRemotingActivity {
+    public PopMessageActivity(RequestPipeline requestPipeline,
+        MessagingProcessor messagingProcessor) {
+        super(requestPipeline, messagingProcessor);
+    }
+
+    @Override
+    protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request,
+        ProxyContext context) throws Exception {
+        PopMessageRequestHeader popMessageRequestHeader = (PopMessageRequestHeader) request.decodeCommandCustomHeader(PopMessageRequestHeader.class);
+        long timeoutMillis = popMessageRequestHeader.getPollTime();
+        return request(ctx, request, context, timeoutMillis + Duration.ofSeconds(10).toMillis());
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java
new file mode 100644
index 000000000..819bf139d
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.activity;
+
+import io.netty.channel.ChannelHandlerContext;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.sysflag.PullSysFlag;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class PullMessageActivity extends AbstractRemotingActivity {
+    public PullMessageActivity(RequestPipeline requestPipeline,
+        MessagingProcessor messagingProcessor) {
+        super(requestPipeline, messagingProcessor);
+    }
+
+    @Override
+    protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request,
+        ProxyContext context) throws Exception {
+        if (request.getExtFields().get(BROKER_NAME_FIELD) == null) {
+            return RemotingCommand.buildErrorResponse(ResponseCode.VERSION_NOT_SUPPORTED,
+                "Request doesn't have field bname");
+        }
+        PullMessageRequestHeader requestHeader = (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
+        if (!PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag())) {
+            ConsumerGroupInfo consumerInfo = messagingProcessor.getConsumerGroupInfo(requestHeader.getConsumerGroup());
+            if (consumerInfo == null) {
+                return RemotingCommand.buildErrorResponse(ResponseCode.SUBSCRIPTION_NOT_LATEST,
+                    "the consumer's subscription not latest");
+            }
+            SubscriptionData subscriptionData = consumerInfo.findSubscriptionData(requestHeader.getTopic());
+            if (subscriptionData == null) {
+                return RemotingCommand.buildErrorResponse(ResponseCode.SUBSCRIPTION_NOT_EXIST,
+                    "the consumer's subscription not exist");
+            }
+            requestHeader.setSubscription(subscriptionData.getSubString());
+            requestHeader.setExpressionType(subscriptionData.getExpressionType());
+            request.makeCustomHeaderToNet();
+        }
+        String brokerName = requestHeader.getBname();
+        long timeoutMillis = requestHeader.getSuspendTimeoutMillis() + Duration.ofSeconds(10).toMillis();
+        CompletableFuture<RemotingCommand> future = messagingProcessor.request(context, brokerName, request, timeoutMillis);
+        future.thenAccept(r -> writeResponse(ctx, context, request, r))
+            .exceptionally(t -> {
+                writeErrResponse(ctx, context, request, t);
+                return null;
+            });
+        return null;
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java
new file mode 100644
index 000000000..904460431
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.activity;
+
+import io.netty.channel.ChannelHandlerContext;
+import java.time.Duration;
+import java.util.Map;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.processor.validator.DefaultTopicMessageTypeValidator;
+import org.apache.rocketmq.proxy.processor.validator.TopicMessageTypeValidator;
+import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class SendMessageActivity extends AbstractRemotingActivity {
+    TopicMessageTypeValidator topicMessageTypeValidator;
+
+    public SendMessageActivity(RequestPipeline requestPipeline,
+        MessagingProcessor messagingProcessor) {
+        super(requestPipeline, messagingProcessor);
+        this.topicMessageTypeValidator = new DefaultTopicMessageTypeValidator();
+    }
+
+    @Override
+    protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request,
+        ProxyContext context) throws Exception {
+        switch (request.getCode()) {
+            case RequestCode.SEND_MESSAGE:
+            case RequestCode.SEND_MESSAGE_V2:
+            case RequestCode.SEND_BATCH_MESSAGE: {
+                return sendMessage(ctx, request, context);
+            }
+            case RequestCode.CONSUMER_SEND_MSG_BACK: {
+                return consumerSendMessage(ctx, request, context);
+            }
+            default:
+                break;
+        }
+        return null;
+    }
+
+    protected RemotingCommand sendMessage(ChannelHandlerContext ctx, RemotingCommand request,
+        ProxyContext context) throws Exception {
+        SendMessageRequestHeader requestHeader = SendMessageRequestHeader.parseRequestHeader(request);
+        String topic = requestHeader.getTopic();
+        Map<String, String> property = MessageDecoder.string2messageProperties(requestHeader.getProperties());
+        TopicMessageType messageType = TopicMessageType.parseFromMessageProperty(property);
+        if (ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck()) {
+            if (topicMessageTypeValidator != null) {
+                // Do not check retry or dlq topic
+                if (!NamespaceUtil.isRetryTopic(topic) && !NamespaceUtil.isDLQTopic(topic)) {
+                    TopicMessageType topicMessageType = messagingProcessor.getMetadataService().getTopicMessageType(topic);
+                    topicMessageTypeValidator.validate(topicMessageType, messageType);
+                }
+            }
+        }
+        if (!NamespaceUtil.isRetryTopic(topic) && !NamespaceUtil.isDLQTopic(topic)) {
+            if (TopicMessageType.TRANSACTION.equals(messageType)) {
+                return sendTransactionMessage(ctx, request, context);
+            }
+        }
+        return request(ctx, request, context, Duration.ofSeconds(3).toMillis());
+    }
+
+    protected RemotingCommand consumerSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
+        ProxyContext context) throws Exception {
+        return request(ctx, request, context, Duration.ofSeconds(3).toMillis());
+    }
+
+    protected RemotingCommand sendTransactionMessage(ChannelHandlerContext ctx, RemotingCommand request,
+        ProxyContext context) throws Exception {
+        // TODO: wait for connection implement.
+        return null;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/pipeline/RequestPipeline.java
similarity index 54%
copy from common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
copy to proxy/src/main/java/org/apache/rocketmq/proxy/remoting/pipeline/RequestPipeline.java
index 8c484da31..4c46a6e7d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/pipeline/RequestPipeline.java
@@ -15,32 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.common.attribute;
+package org.apache.rocketmq.proxy.remoting.pipeline;
 
-import com.google.common.collect.Sets;
-import java.util.Set;
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
-public enum TopicMessageType {
-    UNSPECIFIED("UNSPECIFIED"),
-    NORMAL("NORMAL"),
-    FIFO("FIFO"),
-    DELAY("DELAY"),
-    TRANSACTION("TRANSACTION");
+public interface RequestPipeline {
 
-    private final String value;
-    TopicMessageType(String value) {
-        this.value = value;
-    }
-
-    public static Set<String> topicMessageTypeSet() {
-        return Sets.newHashSet(UNSPECIFIED.value, NORMAL.value, FIFO.value, DELAY.value, TRANSACTION.value);
-    }
-
-    public String getValue() {
-        return value;
-    }
+    void execute(ChannelHandlerContext ctx, RemotingCommand request, ProxyContext context) throws Exception;
 
-    public String getMetricsValue() {
-        return value.toLowerCase();
+    default RequestPipeline pipe(RequestPipeline source) {
+        return (ctx, request, context) -> {
+            source.execute(ctx, request, context);
+            execute(ctx, request, context);
+        };
     }
 }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/SimpleChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/SimpleChannel.java
index ff7ef01a0..04ad5e269 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/SimpleChannel.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/SimpleChannel.java
@@ -175,6 +175,11 @@ public class SimpleChannel extends AbstractChannel {
         return promise;
     }
 
+    @Override
+    public boolean isWritable() {
+        return true;
+    }
+
     public void updateLastAccessTime() {
         this.lastAccessTime = System.currentTimeMillis();
     }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
index f27f002d0..c2a5a6435 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
@@ -63,13 +63,13 @@ public class ClusterMessageService implements MessageService {
         CompletableFuture<List<SendResult>> future;
         if (msgList.size() == 1) {
             future = this.mqClientAPIFactory.getClient().sendMessageAsync(
-                messageQueue.getBrokerAddr(),
-                messageQueue.getBrokerName(), msgList.get(0), requestHeader, timeoutMillis)
+                    messageQueue.getBrokerAddr(),
+                    messageQueue.getBrokerName(), msgList.get(0), requestHeader, timeoutMillis)
                 .thenApply(Lists::newArrayList);
         } else {
             future = this.mqClientAPIFactory.getClient().sendMessageAsync(
-                messageQueue.getBrokerAddr(),
-                messageQueue.getBrokerName(), msgList, requestHeader, timeoutMillis)
+                    messageQueue.getBrokerAddr(),
+                    messageQueue.getBrokerName(), msgList, requestHeader, timeoutMillis)
                 .thenApply(Lists::newArrayList);
         }
         return future;
@@ -86,7 +86,8 @@ public class ClusterMessageService implements MessageService {
     }
 
     @Override
-    public CompletableFuture<Void> endTransactionOneway(ProxyContext ctx, String brokerName, EndTransactionRequestHeader requestHeader,
+    public CompletableFuture<Void> endTransactionOneway(ProxyContext ctx, String brokerName,
+        EndTransactionRequestHeader requestHeader,
         long timeoutMillis) {
         CompletableFuture<Void> future = new CompletableFuture<>();
         try {
@@ -205,6 +206,32 @@ public class ClusterMessageService implements MessageService {
         );
     }
 
+    @Override
+    public CompletableFuture<RemotingCommand> request(ProxyContext ctx, String brokerName, RemotingCommand request,
+        long timeoutMillis) {
+        try {
+            String brokerAddress = topicRouteService.getBrokerAddr(brokerName);
+            return mqClientAPIFactory.getClient().invoke(brokerAddress, request, timeoutMillis);
+        } catch (Exception e) {
+            CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+            future.completeExceptionally(e);
+            return future;
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> requestOneway(ProxyContext ctx, String brokerName, RemotingCommand request,
+        long timeoutMillis) {
+        try {
+            String brokerAddress = topicRouteService.getBrokerAddr(brokerName);
+            return mqClientAPIFactory.getClient().invokeOneway(brokerAddress, request, timeoutMillis);
+        } catch (Exception e) {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            future.completeExceptionally(e);
+            return future;
+        }
+    }
+
     protected String resolveBrokerAddrInReceiptHandle(ReceiptHandle handle) {
         try {
             return this.topicRouteService.getBrokerAddr(handle.getBrokerName());
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
index 491926d01..115c140ff 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
@@ -402,4 +402,16 @@ public class LocalMessageService implements MessageService {
         GetMinOffsetRequestHeader requestHeader, long timeoutMillis) {
         throw new NotImplementedException("getMinOffset is not implemented in LocalMessageService");
     }
+
+    @Override
+    public CompletableFuture<RemotingCommand> request(ProxyContext ctx, String brokerName, RemotingCommand request,
+        long timeoutMillis) {
+        throw new NotImplementedException("request is not implemented in LocalMessageService");
+    }
+
+    @Override
+    public CompletableFuture<Void> requestOneway(ProxyContext ctx, String brokerName, RemotingCommand request,
+        long timeoutMillis) {
+        throw new NotImplementedException("requestOneway is not implemented in LocalMessageService");
+    }
 }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java
index 18673b505..15da17154 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java
@@ -139,4 +139,10 @@ public interface MessageService {
         GetMinOffsetRequestHeader requestHeader,
         long timeoutMillis
     );
+
+    CompletableFuture<RemotingCommand> request(ProxyContext ctx, String brokerName, RemotingCommand request,
+        long timeoutMillis);
+
+    CompletableFuture<Void> requestOneway(ProxyContext ctx, String brokerName, RemotingCommand request,
+        long timeoutMillis);
 }
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java
new file mode 100644
index 000000000..b581d8a91
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.activity;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest;
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.service.channel.SimpleChannel;
+import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class AbstractRemotingActivityTest extends InitConfigAndLoggerTest {
+    AbstractRemotingActivity remotingActivity;
+    @Mock
+    MessagingProcessor messagingProcessorMock;
+    @Spy
+    ChannelHandlerContext ctx = new SimpleChannelHandlerContext(new SimpleChannel(null, "1", "2")) {
+        @Override
+        public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
+            return null;
+        }
+    };
+
+    @Before
+    public void setup() {
+        remotingActivity = new AbstractRemotingActivity(null, messagingProcessorMock) {
+            @Override
+            protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request,
+                ProxyContext context) throws Exception {
+                return null;
+            }
+        };
+    }
+
+    @Test
+    public void request() throws Exception {
+        String brokerName = "broker";
+        String remark = "success";
+        when(messagingProcessorMock.request(any(), anyString(), any(), anyLong())).thenReturn(CompletableFuture.completedFuture(
+            RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, remark)
+        ));
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+        request.addExtField(AbstractRemotingActivity.BROKER_NAME_FIELD, brokerName);
+        RemotingCommand remotingCommand = remotingActivity.request(ctx, request, null, 10000);
+        assertThat(remotingCommand).isNull();
+        verify(ctx, times(1)).writeAndFlush(any());
+    }
+}
\ No newline at end of file