You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/09 11:53:49 UTC

[rocketmq] 08/26: [ISSUE #5486] polish MultiProtocolRemotingServer

This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 4c0e06749bb664f27a9726b9a37827453f96fba8
Author: kaiyi.lk <ka...@alibaba-inc.com>
AuthorDate: Wed Nov 9 16:44:03 2022 +0800

    [ISSUE #5486] polish MultiProtocolRemotingServer
---
 .../rocketmq/proxy/common/ReflectionCache.java     | 45 ------------
 .../remoting/MultiProtocolRemotingServer.java      | 81 ++++------------------
 .../http2proxy/Http2ProtocolProxyHandler.java      |  2 +-
 .../protocol/remoting/RemotingProtocolHandler.java | 17 +++--
 .../remoting/netty/NettyRemotingServer.java        | 68 +++++++++++++-----
 5 files changed, 78 insertions(+), 135 deletions(-)

diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReflectionCache.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReflectionCache.java
deleted file mode 100644
index 31fa46c90..000000000
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReflectionCache.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.proxy.common;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import java.lang.reflect.Field;
-import java.util.concurrent.TimeUnit;
-
-public class ReflectionCache {
-    private final Cache<Class<?>, Field> fieldCache;
-    private static final int DEFAULT_MAX_SIZE = 15;
-
-    public ReflectionCache() {
-        this(DEFAULT_MAX_SIZE);
-    }
-
-    public ReflectionCache(int maxSize) {
-        this.fieldCache = CacheBuilder.newBuilder().maximumSize(maxSize).expireAfterAccess(5, TimeUnit.MINUTES).build();
-    }
-
-    public Field getDeclaredField(final Class<?> clazz, final String fieldName) throws Exception {
-        return this.fieldCache.get(clazz, () -> {
-            Field field = clazz.getDeclaredField(fieldName);
-            field.setAccessible(true);
-            return field;
-        });
-    }
-}
-
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
index 73aeeaf42..02e3a545e 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
@@ -17,18 +17,11 @@
 
 package org.apache.rocketmq.proxy.remoting;
 
-import com.google.common.base.Preconditions;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.ChannelPipeline;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.timeout.IdleStateHandler;
-import io.netty.util.concurrent.DefaultEventExecutorGroup;
 import java.io.IOException;
-import java.lang.reflect.Field;
 import java.security.cert.CertificateException;
-import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.proxy.common.ProxyException;
 import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
@@ -37,7 +30,6 @@ import org.apache.rocketmq.proxy.remoting.protocol.http2proxy.Http2ProtocolProxy
 import org.apache.rocketmq.proxy.remoting.protocol.remoting.RemotingProtocolHandler;
 import org.apache.rocketmq.remoting.ChannelEventListener;
 import org.apache.rocketmq.remoting.common.TlsMode;
-import org.apache.rocketmq.remoting.netty.NettyEncoder;
 import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
@@ -45,26 +37,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * for remoting server, if config listen port is 8080 in nettyServerConfig
- * <p>
- * will
- * <li>listen port at 9080 with protocol remoting</li>
- * <li>listen port at 8080 with protocol remoting and http2</li>
+ * support remoting and http2 protocol at one port
  */
 public class MultiProtocolRemotingServer extends NettyRemotingServer {
 
     private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
-    private static final int PORT_DELTA = 1000;
     private final NettyServerConfig nettyServerConfig;
-    private final int port;
 
     public MultiProtocolRemotingServer(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener) {
         super(nettyServerConfig, channelEventListener);
-        this.port = nettyServerConfig.getListenPort();
-        // to support multiple protocol
-        // will bind the real port in configChildHandler
-        // so let parent bind to a useless port
-        nettyServerConfig.setListenPort(nettyServerConfig.getListenPort() + PORT_DELTA);
         this.nettyServerConfig = nettyServerConfig;
     }
 
@@ -84,50 +65,18 @@ public class MultiProtocolRemotingServer extends NettyRemotingServer {
     }
 
     @Override
-    public void start() {
-        super.start();
-        this.configChildHandler();
-    }
-
-    protected void configChildHandler() {
-        try {
-            ServerBootstrap serverBootstrap = getField("serverBootstrap", ServerBootstrap.class);
-            Preconditions.checkNotNull(serverBootstrap);
-            DefaultEventExecutorGroup defaultEventExecutorGroup = getField("defaultEventExecutorGroup", DefaultEventExecutorGroup.class);
-            Preconditions.checkNotNull(defaultEventExecutorGroup);
-            NettyEncoder encoder = getField("encoder", NettyEncoder.class);
-            Preconditions.checkNotNull(encoder);
-            ChannelDuplexHandler connectionManageHandler = getField("connectionManageHandler", ChannelDuplexHandler.class);
-            Preconditions.checkNotNull(connectionManageHandler);
-            SimpleChannelInboundHandler serverHandler = getField("serverHandler", SimpleChannelInboundHandler.class);
-            Preconditions.checkNotNull(serverHandler);
-            SimpleChannelInboundHandler handshakeHandler = getField("handshakeHandler", SimpleChannelInboundHandler.class);
-            Preconditions.checkNotNull(handshakeHandler);
-            ConcurrentMap remotingServerTable = getField("remotingServerTable", ConcurrentMap.class);
-            Preconditions.checkNotNull(remotingServerTable);
-
-            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
-                @Override
-                protected void initChannel(SocketChannel ch) {
-                    ch.pipeline()
-                        .addLast(defaultEventExecutorGroup, "handshakeHandler", handshakeHandler)
-                        .addLast(defaultEventExecutorGroup,
-                            new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
-                            new ProtocolNegotiationHandler(new RemotingProtocolHandler(encoder, connectionManageHandler, serverHandler))
-                                .addProtocolHandler(new Http2ProtocolProxyHandler())
-                        );
-                }
-            });
-            remotingServerTable.put(port, this);
-            serverBootstrap.bind(port).sync();
-        } catch (Throwable t) {
-            throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "config netty child handler failed", t);
-        }
-    }
-
-    protected <T> T getField(String name, Class<T> getClazz) throws Throwable {
-        Field field = NettyRemotingServer.class.getDeclaredField(name);
-        field.setAccessible(true);
-        return getClazz.cast(field.get(this));
+    protected ChannelPipeline configChannel(SocketChannel ch) {
+        return ch.pipeline()
+            .addLast(this.getDefaultEventExecutorGroup(), HANDSHAKE_HANDLER_NAME, this.getHandshakeHandler())
+            .addLast(this.getDefaultEventExecutorGroup(),
+                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
+                new ProtocolNegotiationHandler(
+                    new RemotingProtocolHandler(
+                        this.getEncoder(),
+                        this.getDistributionHandler(),
+                        this.getConnectionManageHandler(),
+                        this.getServerHandler()))
+                    .addProtocolHandler(new Http2ProtocolProxyHandler())
+            );
     }
 }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
index c5050cda7..86f1ee921 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
@@ -43,7 +43,7 @@ public class Http2ProtocolProxyHandler implements ProtocolHandler {
     private static final String LOCAL_HOST = "127.0.0.1";
     /**
      * The int value of "PRI ". Now use 4 bytes to judge protocol, may be has potential risks if there is a new protocol
-     * which start with "PRI " too in the future
+     * which start with "PRI " in the future
      * <p>
      * The full HTTP/2 connection preface is "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
      * <p>
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/remoting/RemotingProtocolHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/remoting/RemotingProtocolHandler.java
index 3e4cc7c04..2d1a04d0e 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/remoting/RemotingProtocolHandler.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/remoting/RemotingProtocolHandler.java
@@ -18,22 +18,26 @@
 package org.apache.rocketmq.proxy.remoting.protocol.remoting;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
 import org.apache.rocketmq.proxy.remoting.protocol.ProtocolHandler;
 import org.apache.rocketmq.remoting.netty.NettyDecoder;
 import org.apache.rocketmq.remoting.netty.NettyEncoder;
+import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
+import org.apache.rocketmq.remoting.netty.RemotingCodeDistributionHandler;
 
 public class RemotingProtocolHandler implements ProtocolHandler {
 
     private final NettyEncoder encoder;
-    private final ChannelDuplexHandler connectionManageHandler;
-    private final SimpleChannelInboundHandler serverHandler;
+    private final RemotingCodeDistributionHandler remotingCodeDistributionHandler;
+    private final NettyRemotingServer.NettyConnectManageHandler connectionManageHandler;
+    private final NettyRemotingServer.NettyServerHandler serverHandler;
 
-    public RemotingProtocolHandler(NettyEncoder encoder, ChannelDuplexHandler connectionManageHandler,
-        SimpleChannelInboundHandler serverHandler) {
+    public RemotingProtocolHandler(NettyEncoder encoder,
+        RemotingCodeDistributionHandler remotingCodeDistributionHandler,
+        NettyRemotingServer.NettyConnectManageHandler connectionManageHandler,
+        NettyRemotingServer.NettyServerHandler serverHandler) {
         this.encoder = encoder;
+        this.remotingCodeDistributionHandler = remotingCodeDistributionHandler;
         this.connectionManageHandler = connectionManageHandler;
         this.serverHandler = serverHandler;
     }
@@ -48,6 +52,7 @@ public class RemotingProtocolHandler implements ProtocolHandler {
         ctx.pipeline().addLast(
             this.encoder,
             new NettyDecoder(),
+            this.remotingCodeDistributionHandler,
             this.connectionManageHandler,
             this.serverHandler
         );
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 1b364b6ee..646c0734e 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -26,6 +26,7 @@ import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.WriteBufferWaterMark;
@@ -93,9 +94,9 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
      */
     private final ConcurrentMap<Integer/*Port*/, NettyRemotingAbstract> remotingServerTable = new ConcurrentHashMap<>();
 
-    private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
-    private static final String TLS_HANDLER_NAME = "sslHandler";
-    private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
+    public static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
+    public static final String TLS_HANDLER_NAME = "sslHandler";
+    public static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
 
     // sharable handlers
     private HandshakeHandler handshakeHandler;
@@ -242,17 +243,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) {
-                    ch.pipeline()
-                        .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
-                        .addLast(defaultEventExecutorGroup,
-                            encoder,
-                            new NettyDecoder(),
-                            distributionHandler,
-                            new IdleStateHandler(0, 0,
-                                nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
-                            connectionManageHandler,
-                            serverHandler
-                        );
+                    configChannel(ch);
                 }
             });
 
@@ -297,6 +288,25 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
         }, 1, 1, TimeUnit.SECONDS);
     }
 
+    /**
+     * config channel in ChannelInitializer
+     * @param ch the SocketChannel needed to init
+     * @return the initialized ChannelPipeline, sub class can use it to extent in the future
+     */
+    protected ChannelPipeline configChannel(SocketChannel ch) {
+        return ch.pipeline()
+            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
+            .addLast(defaultEventExecutorGroup,
+                encoder,
+                new NettyDecoder(),
+                distributionHandler,
+                new IdleStateHandler(0, 0,
+                    nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
+                connectionManageHandler,
+                serverHandler
+            );
+    }
+
     private void addCustomConfig(ServerBootstrap childHandler) {
         if (nettyServerConfig.getServerSocketSndBufSize() > 0) {
             log.info("server set SO_SNDBUF to {}", nettyServerConfig.getServerSocketSndBufSize());
@@ -438,8 +448,32 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
         }
     }
 
+    public DefaultEventExecutorGroup getDefaultEventExecutorGroup() {
+        return defaultEventExecutorGroup;
+    }
+
+    public HandshakeHandler getHandshakeHandler() {
+        return handshakeHandler;
+    }
+
+    public NettyEncoder getEncoder() {
+        return encoder;
+    }
+
+    public NettyConnectManageHandler getConnectionManageHandler() {
+        return connectionManageHandler;
+    }
+
+    public NettyServerHandler getServerHandler() {
+        return serverHandler;
+    }
+
+    public RemotingCodeDistributionHandler getDistributionHandler() {
+        return distributionHandler;
+    }
+
     @ChannelHandler.Sharable
-    class HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf> {
+    public class HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf> {
 
         private final TlsMode tlsMode;
 
@@ -496,7 +530,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
     }
 
     @ChannelHandler.Sharable
-    class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
+    public class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
 
         @Override
         protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) {
@@ -529,7 +563,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
     }
 
     @ChannelHandler.Sharable
-    class NettyConnectManageHandler extends ChannelDuplexHandler {
+    public class NettyConnectManageHandler extends ChannelDuplexHandler {
         @Override
         public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
             final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());