You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/09 11:53:47 UTC

[rocketmq] 06/26: [ISSUE #5486] Add remoting server

This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 48673af7641418d3ae8cd02aa5c341e552036900
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Wed Nov 9 14:53:42 2022 +0800

    [ISSUE #5486] Add remoting server
---
 .../org/apache/rocketmq/proxy/ProxyStartup.java    |   4 +
 .../rocketmq/proxy/common/ReflectionCache.java     |  45 +++
 .../apache/rocketmq/proxy/config/ProxyConfig.java  | 196 +++++++++++++
 .../remoting/MultiProtocolRemotingServer.java      | 133 +++++++++
 .../proxy/remoting/MultiProtocolTlsHelper.java     | 113 ++++++++
 .../proxy/remoting/RemotingProtocolServer.java     | 304 ++++++++++++++++++++-
 .../proxy/remoting/protocol/ProtocolHandler.java   |  28 ++
 .../protocol/ProtocolNegotiationHandler.java       |  61 +++++
 .../http2proxy/Http2ProtocolProxyHandler.java      | 119 ++++++++
 .../http2proxy/Http2ProxyBackendHandler.java       |  67 +++++
 .../http2proxy/Http2ProxyFrontendHandler.java      |  78 ++++++
 .../protocol/remoting/RemotingProtocolHandler.java |  55 ++++
 12 files changed, 1199 insertions(+), 4 deletions(-)

diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
index 78399cf35..42a833430 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
@@ -47,6 +47,7 @@ import org.apache.rocketmq.proxy.grpc.v2.GrpcMessagingApplication;
 import org.apache.rocketmq.proxy.metrics.ProxyMetricsManager;
 import org.apache.rocketmq.proxy.processor.DefaultMessagingProcessor;
 import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.remoting.RemotingProtocolServer;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.srvutil.ServerUtil;
 
@@ -83,6 +84,9 @@ public class ProxyStartup {
                 .build();
             PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(grpcServer);
 
+            RemotingProtocolServer remotingServer = new RemotingProtocolServer(messagingProcessor);
+            PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(remotingServer);
+
             // start servers one by one.
             PROXY_START_AND_SHUTDOWN.start();
 
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReflectionCache.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReflectionCache.java
new file mode 100644
index 000000000..31fa46c90
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReflectionCache.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.common;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
+
+public class ReflectionCache {
+    private final Cache<Class<?>, Field> fieldCache;
+    private static final int DEFAULT_MAX_SIZE = 15;
+
+    public ReflectionCache() {
+        this(DEFAULT_MAX_SIZE);
+    }
+
+    public ReflectionCache(int maxSize) {
+        this.fieldCache = CacheBuilder.newBuilder().maximumSize(maxSize).expireAfterAccess(5, TimeUnit.MINUTES).build();
+    }
+
+    public Field getDeclaredField(final Class<?> clazz, final String fieldName) throws Exception {
+        return this.fieldCache.get(clazz, () -> {
+            Field field = clazz.getDeclaredField(fieldName);
+            field.setAccessible(true);
+            return field;
+        });
+    }
+}
+
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index 0efca05b4..b613c191e 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -209,6 +209,33 @@ public class ProxyConfig implements ConfigFile {
 
     private long channelExpiredTimeout = 1000 * 120;
 
+    // remoting
+
+    private boolean enableRemotingLocalProxyGrpc = true;
+    private int localProxyConnectTimeoutMs = 3000;
+    private int remotingListenPort = 8080;
+
+    private int remotingHeartbeatThreadPoolNums = 2 * PROCESSOR_NUMBER;
+    private int remotingTopicRouteThreadPoolNums = 2 * PROCESSOR_NUMBER;
+    private int remotingSendMessageThreadPoolNums = 4 * PROCESSOR_NUMBER;
+    private int remotingPullMessageThreadPoolNums = 4 * PROCESSOR_NUMBER;
+    private int remotingUpdateOffsetThreadPoolNums = 4 * PROCESSOR_NUMBER;
+    private int remotingDefaultThreadPoolNums = 4 * PROCESSOR_NUMBER;
+
+    private int remotingHeartbeatThreadPoolQueueCapacity = 50000;
+    private int remotingTopicRouteThreadPoolQueueCapacity = 50000;
+    private int remotingSendThreadPoolQueueCapacity = 10000;
+    private int remotingPullThreadPoolQueueCapacity = 50000;
+    private int remotingUpdateOffsetThreadPoolQueueCapacity = 10000;
+    private int remotingDefaultThreadPoolQueueCapacity = 50000;
+
+    private long remotingWaitTimeMillsInSendQueue = 3 * 1000;
+    private long remotingWaitTimeMillsInPullQueue = 5 * 1000;
+    private long remotingWaitTimeMillsInHeartbeatQueue = 31 * 1000;
+    private long remotingWaitTimeMillsInUpdateOffsetQueue = 3 * 1000;
+    private long remotingWaitTimeMillsInTopicRouteQueue = 3 * 1000;
+    private long remotingWaitTimeMillsInDefaultQueue = 3 * 1000;
+
     @Override
     public void initData() {
         parseDelayLevel();
@@ -1124,7 +1151,176 @@ public class ProxyConfig implements ConfigFile {
         return channelExpiredTimeout;
     }
 
+    public boolean isEnableRemotingLocalProxyGrpc() {
+        return enableRemotingLocalProxyGrpc;
+    }
+
     public void setChannelExpiredTimeout(long channelExpiredTimeout) {
         this.channelExpiredTimeout = channelExpiredTimeout;
     }
 }
+
+    public void setEnableRemotingLocalProxyGrpc(boolean enableRemotingLocalProxyGrpc) {
+        this.enableRemotingLocalProxyGrpc = enableRemotingLocalProxyGrpc;
+    }
+
+    public int getLocalProxyConnectTimeoutMs() {
+        return localProxyConnectTimeoutMs;
+    }
+
+    public void setLocalProxyConnectTimeoutMs(int localProxyConnectTimeoutMs) {
+        this.localProxyConnectTimeoutMs = localProxyConnectTimeoutMs;
+    }
+
+    public int getRemotingListenPort() {
+        return remotingListenPort;
+    }
+
+    public void setRemotingListenPort(int remotingListenPort) {
+        this.remotingListenPort = remotingListenPort;
+    }
+
+    public int getRemotingHeartbeatThreadPoolNums() {
+        return remotingHeartbeatThreadPoolNums;
+    }
+
+    public void setRemotingHeartbeatThreadPoolNums(int remotingHeartbeatThreadPoolNums) {
+        this.remotingHeartbeatThreadPoolNums = remotingHeartbeatThreadPoolNums;
+    }
+
+    public int getRemotingTopicRouteThreadPoolNums() {
+        return remotingTopicRouteThreadPoolNums;
+    }
+
+    public void setRemotingTopicRouteThreadPoolNums(int remotingTopicRouteThreadPoolNums) {
+        this.remotingTopicRouteThreadPoolNums = remotingTopicRouteThreadPoolNums;
+    }
+
+    public int getRemotingSendMessageThreadPoolNums() {
+        return remotingSendMessageThreadPoolNums;
+    }
+
+    public void setRemotingSendMessageThreadPoolNums(int remotingSendMessageThreadPoolNums) {
+        this.remotingSendMessageThreadPoolNums = remotingSendMessageThreadPoolNums;
+    }
+
+    public int getRemotingPullMessageThreadPoolNums() {
+        return remotingPullMessageThreadPoolNums;
+    }
+
+    public void setRemotingPullMessageThreadPoolNums(int remotingPullMessageThreadPoolNums) {
+        this.remotingPullMessageThreadPoolNums = remotingPullMessageThreadPoolNums;
+    }
+
+    public int getRemotingUpdateOffsetThreadPoolNums() {
+        return remotingUpdateOffsetThreadPoolNums;
+    }
+
+    public void setRemotingUpdateOffsetThreadPoolNums(int remotingUpdateOffsetThreadPoolNums) {
+        this.remotingUpdateOffsetThreadPoolNums = remotingUpdateOffsetThreadPoolNums;
+    }
+
+    public int getRemotingDefaultThreadPoolNums() {
+        return remotingDefaultThreadPoolNums;
+    }
+
+    public void setRemotingDefaultThreadPoolNums(int remotingDefaultThreadPoolNums) {
+        this.remotingDefaultThreadPoolNums = remotingDefaultThreadPoolNums;
+    }
+
+    public int getRemotingHeartbeatThreadPoolQueueCapacity() {
+        return remotingHeartbeatThreadPoolQueueCapacity;
+    }
+
+    public void setRemotingHeartbeatThreadPoolQueueCapacity(int remotingHeartbeatThreadPoolQueueCapacity) {
+        this.remotingHeartbeatThreadPoolQueueCapacity = remotingHeartbeatThreadPoolQueueCapacity;
+    }
+
+    public int getRemotingTopicRouteThreadPoolQueueCapacity() {
+        return remotingTopicRouteThreadPoolQueueCapacity;
+    }
+
+    public void setRemotingTopicRouteThreadPoolQueueCapacity(int remotingTopicRouteThreadPoolQueueCapacity) {
+        this.remotingTopicRouteThreadPoolQueueCapacity = remotingTopicRouteThreadPoolQueueCapacity;
+    }
+
+    public int getRemotingSendThreadPoolQueueCapacity() {
+        return remotingSendThreadPoolQueueCapacity;
+    }
+
+    public void setRemotingSendThreadPoolQueueCapacity(int remotingSendThreadPoolQueueCapacity) {
+        this.remotingSendThreadPoolQueueCapacity = remotingSendThreadPoolQueueCapacity;
+    }
+
+    public int getRemotingPullThreadPoolQueueCapacity() {
+        return remotingPullThreadPoolQueueCapacity;
+    }
+
+    public void setRemotingPullThreadPoolQueueCapacity(int remotingPullThreadPoolQueueCapacity) {
+        this.remotingPullThreadPoolQueueCapacity = remotingPullThreadPoolQueueCapacity;
+    }
+
+    public int getRemotingUpdateOffsetThreadPoolQueueCapacity() {
+        return remotingUpdateOffsetThreadPoolQueueCapacity;
+    }
+
+    public void setRemotingUpdateOffsetThreadPoolQueueCapacity(int remotingUpdateOffsetThreadPoolQueueCapacity) {
+        this.remotingUpdateOffsetThreadPoolQueueCapacity = remotingUpdateOffsetThreadPoolQueueCapacity;
+    }
+
+    public int getRemotingDefaultThreadPoolQueueCapacity() {
+        return remotingDefaultThreadPoolQueueCapacity;
+    }
+
+    public void setRemotingDefaultThreadPoolQueueCapacity(int remotingDefaultThreadPoolQueueCapacity) {
+        this.remotingDefaultThreadPoolQueueCapacity = remotingDefaultThreadPoolQueueCapacity;
+    }
+
+    public long getRemotingWaitTimeMillsInSendQueue() {
+        return remotingWaitTimeMillsInSendQueue;
+    }
+
+    public void setRemotingWaitTimeMillsInSendQueue(long remotingWaitTimeMillsInSendQueue) {
+        this.remotingWaitTimeMillsInSendQueue = remotingWaitTimeMillsInSendQueue;
+    }
+
+    public long getRemotingWaitTimeMillsInPullQueue() {
+        return remotingWaitTimeMillsInPullQueue;
+    }
+
+    public void setRemotingWaitTimeMillsInPullQueue(long remotingWaitTimeMillsInPullQueue) {
+        this.remotingWaitTimeMillsInPullQueue = remotingWaitTimeMillsInPullQueue;
+    }
+
+    public long getRemotingWaitTimeMillsInHeartbeatQueue() {
+        return remotingWaitTimeMillsInHeartbeatQueue;
+    }
+
+    public void setRemotingWaitTimeMillsInHeartbeatQueue(long remotingWaitTimeMillsInHeartbeatQueue) {
+        this.remotingWaitTimeMillsInHeartbeatQueue = remotingWaitTimeMillsInHeartbeatQueue;
+    }
+
+    public long getRemotingWaitTimeMillsInUpdateOffsetQueue() {
+        return remotingWaitTimeMillsInUpdateOffsetQueue;
+    }
+
+    public void setRemotingWaitTimeMillsInUpdateOffsetQueue(long remotingWaitTimeMillsInUpdateOffsetQueue) {
+        this.remotingWaitTimeMillsInUpdateOffsetQueue = remotingWaitTimeMillsInUpdateOffsetQueue;
+    }
+
+    public long getRemotingWaitTimeMillsInTopicRouteQueue() {
+        return remotingWaitTimeMillsInTopicRouteQueue;
+    }
+
+    public void setRemotingWaitTimeMillsInTopicRouteQueue(long remotingWaitTimeMillsInTopicRouteQueue) {
+        this.remotingWaitTimeMillsInTopicRouteQueue = remotingWaitTimeMillsInTopicRouteQueue;
+    }
+
+    public long getRemotingWaitTimeMillsInDefaultQueue() {
+        return remotingWaitTimeMillsInDefaultQueue;
+    }
+
+    public void setRemotingWaitTimeMillsInDefaultQueue(long remotingWaitTimeMillsInDefaultQueue) {
+        this.remotingWaitTimeMillsInDefaultQueue = remotingWaitTimeMillsInDefaultQueue;
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
new file mode 100644
index 000000000..73aeeaf42
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting;
+
+import com.google.common.base.Preconditions;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.security.cert.CertificateException;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.proxy.common.ProxyException;
+import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
+import org.apache.rocketmq.proxy.remoting.protocol.ProtocolNegotiationHandler;
+import org.apache.rocketmq.proxy.remoting.protocol.http2proxy.Http2ProtocolProxyHandler;
+import org.apache.rocketmq.proxy.remoting.protocol.remoting.RemotingProtocolHandler;
+import org.apache.rocketmq.remoting.ChannelEventListener;
+import org.apache.rocketmq.remoting.common.TlsMode;
+import org.apache.rocketmq.remoting.netty.NettyEncoder;
+import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * for remoting server, if config listen port is 8080 in nettyServerConfig
+ * <p>
+ * will
+ * <li>listen port at 9080 with protocol remoting</li>
+ * <li>listen port at 8080 with protocol remoting and http2</li>
+ */
+public class MultiProtocolRemotingServer extends NettyRemotingServer {
+
+    private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+    private static final int PORT_DELTA = 1000;
+    private final NettyServerConfig nettyServerConfig;
+    private final int port;
+
+    public MultiProtocolRemotingServer(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener) {
+        super(nettyServerConfig, channelEventListener);
+        this.port = nettyServerConfig.getListenPort();
+        // to support multiple protocol
+        // will bind the real port in configChildHandler
+        // so let parent bind to a useless port
+        nettyServerConfig.setListenPort(nettyServerConfig.getListenPort() + PORT_DELTA);
+        this.nettyServerConfig = nettyServerConfig;
+    }
+
+    @Override
+    public void loadSslContext() {
+        TlsMode tlsMode = TlsSystemConfig.tlsMode;
+        log.info("Server is running in TLS {} mode", tlsMode.getName());
+
+        if (tlsMode != TlsMode.DISABLED) {
+            try {
+                sslContext = MultiProtocolTlsHelper.buildSslContext();
+                log.info("SSLContext created for server");
+            } catch (CertificateException | IOException e) {
+                throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "Failed to create SSLContext for server", e);
+            }
+        }
+    }
+
+    @Override
+    public void start() {
+        super.start();
+        this.configChildHandler();
+    }
+
+    protected void configChildHandler() {
+        try {
+            ServerBootstrap serverBootstrap = getField("serverBootstrap", ServerBootstrap.class);
+            Preconditions.checkNotNull(serverBootstrap);
+            DefaultEventExecutorGroup defaultEventExecutorGroup = getField("defaultEventExecutorGroup", DefaultEventExecutorGroup.class);
+            Preconditions.checkNotNull(defaultEventExecutorGroup);
+            NettyEncoder encoder = getField("encoder", NettyEncoder.class);
+            Preconditions.checkNotNull(encoder);
+            ChannelDuplexHandler connectionManageHandler = getField("connectionManageHandler", ChannelDuplexHandler.class);
+            Preconditions.checkNotNull(connectionManageHandler);
+            SimpleChannelInboundHandler serverHandler = getField("serverHandler", SimpleChannelInboundHandler.class);
+            Preconditions.checkNotNull(serverHandler);
+            SimpleChannelInboundHandler handshakeHandler = getField("handshakeHandler", SimpleChannelInboundHandler.class);
+            Preconditions.checkNotNull(handshakeHandler);
+            ConcurrentMap remotingServerTable = getField("remotingServerTable", ConcurrentMap.class);
+            Preconditions.checkNotNull(remotingServerTable);
+
+            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
+                @Override
+                protected void initChannel(SocketChannel ch) {
+                    ch.pipeline()
+                        .addLast(defaultEventExecutorGroup, "handshakeHandler", handshakeHandler)
+                        .addLast(defaultEventExecutorGroup,
+                            new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
+                            new ProtocolNegotiationHandler(new RemotingProtocolHandler(encoder, connectionManageHandler, serverHandler))
+                                .addProtocolHandler(new Http2ProtocolProxyHandler())
+                        );
+                }
+            });
+            remotingServerTable.put(port, this);
+            serverBootstrap.bind(port).sync();
+        } catch (Throwable t) {
+            throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "config netty child handler failed", t);
+        }
+    }
+
+    protected <T> T getField(String name, Class<T> getClazz) throws Throwable {
+        Field field = NettyRemotingServer.class.getDeclaredField(name);
+        field.setAccessible(true);
+        return getClazz.cast(field.get(this));
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolTlsHelper.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolTlsHelper.java
new file mode 100644
index 000000000..54af7bc9e
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolTlsHelper.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting;
+
+import io.netty.handler.ssl.ApplicationProtocolConfig;
+import io.netty.handler.ssl.ApplicationProtocolNames;
+import io.netty.handler.ssl.ClientAuth;
+import io.netty.handler.ssl.OpenSsl;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import io.netty.handler.ssl.util.SelfSignedCertificate;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.cert.CertificateException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.remoting.netty.TlsHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsServerAuthClient;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsServerCertPath;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsServerKeyPassword;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsServerKeyPath;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsServerNeedClientAuth;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsServerTrustCertPath;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsTestModeEnable;
+
+public class MultiProtocolTlsHelper extends TlsHelper {
+    private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+    private static final DecryptionStrategy DECRYPTION_STRATEGY = (privateKeyEncryptPath, forClient) -> new FileInputStream(privateKeyEncryptPath);
+
+    public static SslContext buildSslContext() throws IOException, CertificateException {
+        TlsHelper.buildSslContext(false);
+        SslProvider provider;
+        if (OpenSsl.isAvailable()) {
+            provider = SslProvider.OPENSSL;
+            log.info("Using OpenSSL provider");
+        } else {
+            provider = SslProvider.JDK;
+            log.info("Using JDK SSL provider");
+        }
+
+        SslContextBuilder sslContextBuilder = null;
+        if (tlsTestModeEnable) {
+            SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate();
+            sslContextBuilder = SslContextBuilder
+                .forServer(selfSignedCertificate.certificate(), selfSignedCertificate.privateKey())
+                .sslProvider(SslProvider.OPENSSL)
+                .clientAuth(ClientAuth.OPTIONAL);
+        } else {
+            sslContextBuilder = SslContextBuilder.forServer(
+                !StringUtils.isBlank(tlsServerCertPath) ? Files.newInputStream(Paths.get(tlsServerCertPath)) : null,
+                !StringUtils.isBlank(tlsServerKeyPath) ? DECRYPTION_STRATEGY.decryptPrivateKey(tlsServerKeyPath, false) : null,
+                !StringUtils.isBlank(tlsServerKeyPassword) ? tlsServerKeyPassword : null)
+                .sslProvider(provider);
+
+            if (!tlsServerAuthClient) {
+                sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
+            } else {
+                if (!StringUtils.isBlank(tlsServerTrustCertPath)) {
+                    sslContextBuilder.trustManager(new File(tlsServerTrustCertPath));
+                }
+            }
+
+            sslContextBuilder.clientAuth(parseClientAuthMode(tlsServerNeedClientAuth));
+        }
+
+        sslContextBuilder.applicationProtocolConfig(new ApplicationProtocolConfig(
+            ApplicationProtocolConfig.Protocol.ALPN,
+            // NO_ADVERTISE is currently the only mode supported by both OpenSsl and JDK providers.
+            ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
+            // ACCEPT is currently the only mode supported by both OpenSsl and JDK providers.
+            ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
+            ApplicationProtocolNames.HTTP_2));
+
+        return sslContextBuilder.build();
+    }
+
+    private static ClientAuth parseClientAuthMode(String authMode) {
+        if (null == authMode || authMode.trim().isEmpty()) {
+            return ClientAuth.NONE;
+        }
+
+        for (ClientAuth clientAuth : ClientAuth.values()) {
+            if (clientAuth.name().equals(authMode.toUpperCase())) {
+                return clientAuth;
+            }
+        }
+
+        return ClientAuth.NONE;
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
index 58b257641..fdf1870a5 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
@@ -18,20 +18,161 @@
 package org.apache.rocketmq.proxy.remoting;
 
 import io.netty.channel.Channel;
+import java.lang.reflect.Field;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.broker.latency.FutureTaskExt;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
+import org.apache.rocketmq.common.thread.ThreadPoolStatusMonitor;
+import org.apache.rocketmq.proxy.common.ReflectionCache;
 import org.apache.rocketmq.proxy.common.StartAndShutdown;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.config.ProxyConfig;
 import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.remoting.activity.AckMessageActivity;
+import org.apache.rocketmq.proxy.remoting.activity.ChangeInvisibleTimeActivity;
+import org.apache.rocketmq.proxy.remoting.activity.ClientManagerActivity;
+import org.apache.rocketmq.proxy.remoting.activity.ConsumerManagerActivity;
+import org.apache.rocketmq.proxy.remoting.activity.GetTopicRouteActivity;
+import org.apache.rocketmq.proxy.remoting.activity.PopMessageActivity;
+import org.apache.rocketmq.proxy.remoting.activity.PullMessageActivity;
+import org.apache.rocketmq.proxy.remoting.activity.SendMessageActivity;
+import org.apache.rocketmq.proxy.remoting.channel.RemotingChannelManager;
+import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
+import org.apache.rocketmq.remoting.ChannelEventListener;
 import org.apache.rocketmq.remoting.RemotingServer;
+import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.netty.RequestTask;
+import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOutClient {
+    private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
 
-    private final MessagingProcessor messagingProcessor;
-    private RemotingServer defaultRemotingServer;
+    protected final MessagingProcessor messagingProcessor;
+    protected final RemotingChannelManager remotingChannelManager;
+    protected final ChannelEventListener clientHousekeepingService;
+    protected final RemotingServer defaultRemotingServer;
+    protected final GetTopicRouteActivity getTopicRouteActivity;
+    protected final ClientManagerActivity clientManagerActivity;
+    protected final ConsumerManagerActivity consumerManagerActivity;
+    protected final SendMessageActivity sendMessageActivity;
+    protected final PullMessageActivity pullMessageActivity;
+    protected final PopMessageActivity popMessageActivity;
+    protected final AckMessageActivity ackMessageActivity;
+    protected final ChangeInvisibleTimeActivity changeInvisibleTimeActivity;
+    protected final ThreadPoolExecutor sendMessageExecutor;
+    protected final ThreadPoolExecutor pullMessageExecutor;
+    protected final ThreadPoolExecutor heartbeatExecutor;
+    protected final ThreadPoolExecutor updateOffsetExecutor;
+    protected final ThreadPoolExecutor topicRouteExecutor;
+    protected final ThreadPoolExecutor defaultExecutor;
+
+    private final ReflectionCache reflectionCache = new ReflectionCache();
 
     public RemotingProtocolServer(MessagingProcessor messagingProcessor) {
         this.messagingProcessor = messagingProcessor;
+        this.remotingChannelManager = new RemotingChannelManager(this, messagingProcessor.getProxyRelayService());
+
+        RequestPipeline pipeline = createRequestPipeline();
+        this.getTopicRouteActivity = new GetTopicRouteActivity(pipeline, messagingProcessor);
+        this.clientManagerActivity = new ClientManagerActivity(pipeline, messagingProcessor, remotingChannelManager);
+        this.consumerManagerActivity = new ConsumerManagerActivity(pipeline, messagingProcessor);
+        this.sendMessageActivity = new SendMessageActivity(pipeline, messagingProcessor);
+        this.pullMessageActivity = new PullMessageActivity(pipeline, messagingProcessor);
+        this.popMessageActivity = new PopMessageActivity(pipeline, messagingProcessor);
+        this.ackMessageActivity = new AckMessageActivity(pipeline, messagingProcessor);
+        this.changeInvisibleTimeActivity = new ChangeInvisibleTimeActivity(pipeline, messagingProcessor);
+
+        ProxyConfig config = ConfigurationManager.getProxyConfig();
+        NettyServerConfig defaultServerConfig = new NettyServerConfig();
+        defaultServerConfig.setListenPort(config.getRemotingListenPort());
+        TlsSystemConfig.tlsTestModeEnable = false;
+        System.setProperty(TlsSystemConfig.TLS_TEST_MODE_ENABLE, "false");
+        TlsSystemConfig.tlsServerCertPath = config.getGrpcTlsCertPath();
+        System.setProperty(TlsSystemConfig.TLS_SERVER_CERTPATH, config.getGrpcTlsCertPath());
+        TlsSystemConfig.tlsServerKeyPath = config.getGrpcTlsKeyPath();
+        System.setProperty(TlsSystemConfig.TLS_SERVER_KEYPATH, config.getGrpcTlsKeyPath());
+
+        this.clientHousekeepingService = new ClientHousekeepingService(this.clientManagerActivity);
+
+        if (config.isEnableRemotingLocalProxyGrpc()) {
+            this.defaultRemotingServer = new MultiProtocolRemotingServer(defaultServerConfig, this.clientHousekeepingService);
+        } else {
+            this.defaultRemotingServer = new NettyRemotingServer(defaultServerConfig, this.clientHousekeepingService);
+        }
+        this.registerRemotingServer(this.defaultRemotingServer);
+
+        this.sendMessageExecutor = ThreadPoolMonitor.createAndMonitor(
+            config.getRemotingSendMessageThreadPoolNums(),
+            config.getRemotingSendMessageThreadPoolNums(),
+            1000 * 60,
+            TimeUnit.MILLISECONDS,
+            "RemotingSendMessageThread",
+            config.getRemotingSendThreadPoolQueueCapacity(),
+            new ThreadPoolHeadSlowTimeMillsMonitor(config.getRemotingWaitTimeMillsInSendQueue())
+        );
+
+        this.pullMessageExecutor = ThreadPoolMonitor.createAndMonitor(
+            config.getRemotingPullMessageThreadPoolNums(),
+            config.getRemotingPullMessageThreadPoolNums(),
+            1000 * 60,
+            TimeUnit.MILLISECONDS,
+            "RemotingPullMessageThread",
+            config.getRemotingPullThreadPoolQueueCapacity(),
+            new ThreadPoolHeadSlowTimeMillsMonitor(config.getRemotingWaitTimeMillsInPullQueue())
+        );
+
+        this.updateOffsetExecutor = ThreadPoolMonitor.createAndMonitor(
+            config.getRemotingUpdateOffsetThreadPoolNums(),
+            config.getRemotingUpdateOffsetThreadPoolNums(),
+            1,
+            TimeUnit.MINUTES,
+            "RemotingUpdateOffsetThread",
+            config.getRemotingUpdateOffsetThreadPoolQueueCapacity(),
+            new ThreadPoolHeadSlowTimeMillsMonitor(config.getRemotingWaitTimeMillsInUpdateOffsetQueue())
+        );
+
+        this.heartbeatExecutor = ThreadPoolMonitor.createAndMonitor(
+            config.getRemotingHeartbeatThreadPoolNums(),
+            config.getRemotingHeartbeatThreadPoolNums(),
+            1000 * 60,
+            TimeUnit.MILLISECONDS,
+            "RemotingHeartbeatThread",
+            config.getRemotingHeartbeatThreadPoolQueueCapacity(),
+            new ThreadPoolHeadSlowTimeMillsMonitor(config.getRemotingWaitTimeMillsInHeartbeatQueue())
+        );
+
+        this.topicRouteExecutor = ThreadPoolMonitor.createAndMonitor(
+            config.getRemotingTopicRouteThreadPoolNums(),
+            config.getRemotingTopicRouteThreadPoolNums(),
+            1000 * 60,
+            TimeUnit.MILLISECONDS,
+            "RemotingTopicRouteThread",
+            config.getRemotingTopicRouteThreadPoolQueueCapacity(),
+            new ThreadPoolHeadSlowTimeMillsMonitor(config.getRemotingWaitTimeMillsInTopicRouteQueue())
+        );
+
+        this.defaultExecutor = ThreadPoolMonitor.createAndMonitor(
+            config.getRemotingDefaultThreadPoolNums(),
+            config.getRemotingDefaultThreadPoolNums(),
+            1000 * 60,
+            TimeUnit.MILLISECONDS,
+            "RemotingDefaultThread",
+            config.getRemotingDefaultThreadPoolQueueCapacity(),
+            new ThreadPoolHeadSlowTimeMillsMonitor(config.getRemotingWaitTimeMillsInDefaultQueue())
+        );
     }
 
     protected void init() {
@@ -39,17 +180,50 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu
     }
 
     protected void registerRemotingServer(RemotingServer remotingServer) {
+        remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendMessageActivity, this.sendMessageExecutor);
+        remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageActivity, this.sendMessageExecutor);
+        remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendMessageActivity, this.sendMessageExecutor);
+        remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageActivity, sendMessageExecutor);
+
+        remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManagerActivity, this.heartbeatExecutor);
+        remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManagerActivity, this.defaultExecutor);
+        remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManagerActivity, this.defaultExecutor);
+
+        remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, pullMessageActivity, this.pullMessageExecutor);
+        remotingServer.registerProcessor(RequestCode.LITE_PULL_MESSAGE, pullMessageActivity, this.pullMessageExecutor);
+        remotingServer.registerProcessor(RequestCode.POP_MESSAGE, pullMessageActivity, this.pullMessageExecutor);
 
+        remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManagerActivity, this.updateOffsetExecutor);
+        remotingServer.registerProcessor(RequestCode.ACK_MESSAGE, consumerManagerActivity, this.updateOffsetExecutor);
+        remotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, consumerManagerActivity, this.updateOffsetExecutor);
+
+        remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManagerActivity, this.defaultExecutor);
+        remotingServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManagerActivity, this.defaultExecutor);
+        remotingServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManagerActivity, this.defaultExecutor);
+        remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManagerActivity, this.defaultExecutor);
+        remotingServer.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManagerActivity, this.defaultExecutor);
+        remotingServer.registerProcessor(RequestCode.LOCK_BATCH_MQ, consumerManagerActivity, this.defaultExecutor);
+        remotingServer.registerProcessor(RequestCode.UNLOCK_BATCH_MQ, consumerManagerActivity, this.defaultExecutor);
+
+        remotingServer.registerProcessor(RequestCode.GET_ROUTEINFO_BY_TOPIC, getTopicRouteActivity, this.topicRouteExecutor);
     }
 
     @Override
     public void shutdown() throws Exception {
-
+        this.defaultRemotingServer.shutdown();
+        this.remotingChannelManager.shutdown();
+        this.sendMessageExecutor.shutdown();
+        this.pullMessageExecutor.shutdown();
+        this.heartbeatExecutor.shutdown();
+        this.updateOffsetExecutor.shutdown();
+        this.topicRouteExecutor.shutdown();
+        this.defaultExecutor.shutdown();
     }
 
     @Override
     public void start() throws Exception {
-
+        this.remotingChannelManager.start();
+        this.defaultRemotingServer.start();
     }
 
     @Override
@@ -69,4 +243,126 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu
         }
         return future;
     }
+
+    protected RequestPipeline createRequestPipeline() {
+        RequestPipeline pipeline = (ctx, request, context) -> {
+        };
+
+        // add pipeline
+        // the last pipe add will execute at the first
+        return pipeline;
+    }
+
+    protected class ThreadPoolHeadSlowTimeMillsMonitor implements ThreadPoolStatusMonitor {
+
+        private final long maxWaitTimeMillsInQueue;
+
+        public ThreadPoolHeadSlowTimeMillsMonitor(long maxWaitTimeMillsInQueue) {
+            this.maxWaitTimeMillsInQueue = maxWaitTimeMillsInQueue;
+        }
+
+        @Override
+        public String describe() {
+            return "headSlow";
+        }
+
+        @Override
+        public double value(ThreadPoolExecutor executor) {
+            return headSlowTimeMills(executor.getQueue());
+        }
+
+        @Override
+        public boolean needPrintJstack(ThreadPoolExecutor executor, double value) {
+            return value > maxWaitTimeMillsInQueue;
+        }
+    }
+
+    protected long headSlowTimeMills(BlockingQueue<Runnable> q) {
+        try {
+            long slowTimeMills = 0;
+            final Runnable peek = q.peek();
+            if (peek != null) {
+                RequestTask rt = castRunnable(peek);
+                slowTimeMills = rt == null ? 0 : System.currentTimeMillis() - rt.getCreateTimestamp();
+            }
+
+            if (slowTimeMills < 0) {
+                slowTimeMills = 0;
+            }
+
+            return slowTimeMills;
+        } catch (Exception e) {
+            log.error("error when headSlowTimeMills.", e);
+        }
+        return -1;
+    }
+
+    protected void cleanExpireRequest() {
+        ProxyConfig config = ConfigurationManager.getProxyConfig();
+
+        cleanExpiredRequestInQueue(this.sendMessageExecutor, config.getRemotingWaitTimeMillsInSendQueue());
+        cleanExpiredRequestInQueue(this.pullMessageExecutor, config.getRemotingWaitTimeMillsInPullQueue());
+        cleanExpiredRequestInQueue(this.heartbeatExecutor, config.getRemotingWaitTimeMillsInHeartbeatQueue());
+        cleanExpiredRequestInQueue(this.updateOffsetExecutor, config.getRemotingWaitTimeMillsInUpdateOffsetQueue());
+        cleanExpiredRequestInQueue(this.topicRouteExecutor, config.getRemotingWaitTimeMillsInTopicRouteQueue());
+        cleanExpiredRequestInQueue(this.defaultExecutor, config.getRemotingWaitTimeMillsInDefaultQueue());
+    }
+
+    protected void cleanExpiredRequestInQueue(ThreadPoolExecutor threadPoolExecutor, long maxWaitTimeMillsInQueue) {
+        while (true) {
+            try {
+                BlockingQueue<Runnable> blockingQueue = threadPoolExecutor.getQueue();
+                if (!blockingQueue.isEmpty()) {
+                    final Runnable runnable = blockingQueue.peek();
+                    if (null == runnable) {
+                        break;
+                    }
+                    final RequestTask rt = castRunnable(runnable);
+                    if (rt == null || rt.isStopRun()) {
+                        break;
+                    }
+
+                    final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
+                    if (behind >= maxWaitTimeMillsInQueue) {
+                        if (blockingQueue.remove(runnable)) {
+                            rt.setStopRun(true);
+                            rt.returnResponse(ResponseCode.SYSTEM_BUSY,
+                                String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
+                        }
+                    } else {
+                        break;
+                    }
+                } else {
+                    break;
+                }
+            } catch (Throwable ignored) {
+            }
+        }
+    }
+
+    private RequestTask castRunnable(final Runnable runnable) {
+        try {
+            if (runnable instanceof FutureTask) {
+                Field callableField = reflectionCache.getDeclaredField(FutureTask.class, "callable");
+                Callable callable = (Callable) callableField.get(runnable);
+                if (callable == null) {
+                    return null;
+                }
+                Field taskField = reflectionCache.getDeclaredField(callable.getClass(), "task");
+                if (taskField == null) {
+                    log.warn("get task from FutureTask failed. class:{}", runnable.getClass().getName());
+                    return null;
+                }
+                return (RequestTask) taskField.get(callable);
+            } else if (runnable instanceof FutureTaskExt) {
+                FutureTaskExt futureTaskExt = (FutureTaskExt) runnable;
+                return (RequestTask) futureTaskExt.getRunnable();
+            }
+            return null;
+        } catch (Throwable e) {
+            log.error("castRunnable exception. class:{}", runnable.getClass().getName(), e);
+        }
+
+        return null;
+    }
 }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/ProtocolHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/ProtocolHandler.java
new file mode 100644
index 000000000..4b1b03067
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/ProtocolHandler.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.protocol;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+
+public interface ProtocolHandler {
+
+    boolean match(ByteBuf msg);
+
+    void config(final ChannelHandlerContext ctx, final ByteBuf msg);
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/ProtocolNegotiationHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/ProtocolNegotiationHandler.java
new file mode 100644
index 000000000..da2dded5f
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/ProtocolNegotiationHandler.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.proxy.remoting.protocol;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ProtocolNegotiationHandler extends ByteToMessageDecoder {
+
+    private final List<ProtocolHandler> protocolHandlerList = new ArrayList<ProtocolHandler>();
+    private final ProtocolHandler fallbackProtocolHandler;
+
+    public ProtocolNegotiationHandler(ProtocolHandler fallbackProtocolHandler) {
+        this.fallbackProtocolHandler = fallbackProtocolHandler;
+    }
+
+    public ProtocolNegotiationHandler addProtocolHandler(ProtocolHandler protocolHandler) {
+        protocolHandlerList.add(protocolHandler);
+        return this;
+    }
+
+    @Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+        // use 4 bytes to judge protocol
+        if (in.readableBytes() < 4) {
+            return;
+        }
+
+        ProtocolHandler protocolHandler = null;
+        for (ProtocolHandler curProtocolHandler : protocolHandlerList) {
+            if (curProtocolHandler.match(in)) {
+                protocolHandler = curProtocolHandler;
+                break;
+            }
+        }
+
+        if (protocolHandler == null) {
+            protocolHandler = fallbackProtocolHandler;
+        }
+
+        protocolHandler.config(ctx, in);
+        ctx.pipeline().remove(this);
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
new file mode 100644
index 000000000..c5050cda7
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.protocol.http2proxy;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.handler.ssl.ApplicationProtocolConfig;
+import io.netty.handler.ssl.ApplicationProtocolNames;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import javax.net.ssl.SSLException;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.config.ProxyConfig;
+import org.apache.rocketmq.proxy.remoting.protocol.ProtocolHandler;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+public class Http2ProtocolProxyHandler implements ProtocolHandler {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+    private static final String LOCAL_HOST = "127.0.0.1";
+    /**
+     * The int value of "PRI ". Now use 4 bytes to judge protocol, may be has potential risks if there is a new protocol
+     * which start with "PRI " too in the future
+     * <p>
+     * The full HTTP/2 connection preface is "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
+     * <p>
+     * ref: https://datatracker.ietf.org/doc/html/rfc7540#section-3.5
+     */
+    private static final int PRI_INT = 0x50524920;
+
+    private final SslContext sslContext;
+
+    public Http2ProtocolProxyHandler() {
+        try {
+            sslContext = SslContextBuilder
+                .forClient()
+                .sslProvider(SslProvider.OPENSSL)
+                .trustManager(InsecureTrustManagerFactory.INSTANCE)
+                .applicationProtocolConfig(new ApplicationProtocolConfig(
+                    ApplicationProtocolConfig.Protocol.ALPN,
+                    ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
+                    ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
+                    ApplicationProtocolNames.HTTP_2))
+                .build();
+        } catch (SSLException e) {
+            log.error("Failed to create SSLContext for Http2ProtocolProxyHandler", e);
+            throw new RuntimeException("Failed to create SSLContext for Http2ProtocolProxyHandler", e);
+        }
+    }
+
+    @Override
+    public boolean match(ByteBuf in) {
+        if (!ConfigurationManager.getProxyConfig().isEnableRemotingLocalProxyGrpc()) {
+            return false;
+        }
+
+        // If starts with 'PRI '
+        return in.getInt(in.readerIndex()) == PRI_INT;
+    }
+
+    @Override
+    public void config(final ChannelHandlerContext ctx, final ByteBuf msg) {
+        // proxy channel to http2 server
+        final Channel inboundChannel = ctx.channel();
+
+        ProxyConfig config = ConfigurationManager.getProxyConfig();
+        // Start the connection attempt.
+        Bootstrap b = new Bootstrap();
+        b.group(inboundChannel.eventLoop())
+            .channel(ctx.channel().getClass())
+            .handler(new ChannelInitializer<Channel>() {
+                @Override
+                protected void initChannel(Channel ch) throws Exception {
+                    if (sslContext != null) {
+                        ch.pipeline()
+                            .addLast(sslContext.newHandler(ch.alloc(), LOCAL_HOST, config.getGrpcServerPort()));
+                    }
+                    ch.pipeline().addLast(new Http2ProxyBackendHandler(inboundChannel));
+                }
+            })
+            .option(ChannelOption.AUTO_READ, false)
+            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getLocalProxyConnectTimeoutMs());
+        ChannelFuture f;
+        try {
+            f = b.connect(LOCAL_HOST, config.getGrpcServerPort()).sync();
+        } catch (Exception e) {
+            log.error("connect http2 server failed. port:{}", config.getGrpcServerPort(), e);
+            inboundChannel.close();
+            return;
+        }
+
+        final Channel outboundChannel = f.channel();
+
+        ctx.pipeline().addLast(new Http2ProxyFrontendHandler(outboundChannel));
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java
new file mode 100644
index 000000000..53bddfc31
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.protocol.http2proxy;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+public class Http2ProxyBackendHandler extends ChannelInboundHandlerAdapter {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+
+    private final Channel inboundChannel;
+
+    public Http2ProxyBackendHandler(Channel inboundChannel) {
+        this.inboundChannel = inboundChannel;
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) {
+        ctx.read();
+    }
+
+    @Override
+    public void channelRead(final ChannelHandlerContext ctx, Object msg) {
+        inboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture future) {
+                if (future.isSuccess()) {
+                    ctx.channel().read();
+                } else {
+                    future.channel().close();
+                }
+            }
+        });
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) {
+        Http2ProxyFrontendHandler.closeOnFlush(inboundChannel);
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        log.error("Http2ProxyBackendHandler#exceptionCaught", cause);
+        Http2ProxyFrontendHandler.closeOnFlush(ctx.channel());
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java
new file mode 100644
index 000000000..8bffdc6d0
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.protocol.http2proxy;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+public class Http2ProxyFrontendHandler extends ChannelInboundHandlerAdapter {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+    // As we use inboundChannel.eventLoop() when building the Bootstrap this does not need to be volatile as
+    // the outboundChannel will use the same EventLoop (and therefore Thread) as the inboundChannel.
+    private final Channel outboundChannel;
+
+    public Http2ProxyFrontendHandler(final Channel outboundChannel) {
+        this.outboundChannel = outboundChannel;
+    }
+
+    @Override
+    public void channelRead(final ChannelHandlerContext ctx, Object msg) {
+        if (outboundChannel.isActive()) {
+            outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture future) {
+                    if (future.isSuccess()) {
+                        // was able to flush out data, start to read the next chunk
+                        ctx.channel().read();
+                    } else {
+                        future.channel().close();
+                    }
+                }
+            });
+        }
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) {
+        if (outboundChannel != null) {
+            closeOnFlush(outboundChannel);
+        }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        log.error("Http2ProxyFrontendHandler#exceptionCaught", cause);
+        closeOnFlush(ctx.channel());
+    }
+
+    /**
+     * Closes the specified channel after all queued write requests are flushed.
+     */
+    static void closeOnFlush(Channel ch) {
+        if (ch.isActive()) {
+            ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+        }
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/remoting/RemotingProtocolHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/remoting/RemotingProtocolHandler.java
new file mode 100644
index 000000000..3e4cc7c04
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/remoting/RemotingProtocolHandler.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.protocol.remoting;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.rocketmq.proxy.remoting.protocol.ProtocolHandler;
+import org.apache.rocketmq.remoting.netty.NettyDecoder;
+import org.apache.rocketmq.remoting.netty.NettyEncoder;
+
+public class RemotingProtocolHandler implements ProtocolHandler {
+
+    private final NettyEncoder encoder;
+    private final ChannelDuplexHandler connectionManageHandler;
+    private final SimpleChannelInboundHandler serverHandler;
+
+    public RemotingProtocolHandler(NettyEncoder encoder, ChannelDuplexHandler connectionManageHandler,
+        SimpleChannelInboundHandler serverHandler) {
+        this.encoder = encoder;
+        this.connectionManageHandler = connectionManageHandler;
+        this.serverHandler = serverHandler;
+    }
+
+    @Override
+    public boolean match(ByteBuf in) {
+        return true;
+    }
+
+    @Override
+    public void config(ChannelHandlerContext ctx, ByteBuf msg) {
+        ctx.pipeline().addLast(
+            this.encoder,
+            new NettyDecoder(),
+            this.connectionManageHandler,
+            this.serverHandler
+        );
+    }
+}