You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/09 11:53:49 UTC
[rocketmq] 08/26: [ISSUE #5486] polish MultiProtocolRemotingServer
This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 4c0e06749bb664f27a9726b9a37827453f96fba8
Author: kaiyi.lk <ka...@alibaba-inc.com>
AuthorDate: Wed Nov 9 16:44:03 2022 +0800
[ISSUE #5486] polish MultiProtocolRemotingServer
---
.../rocketmq/proxy/common/ReflectionCache.java | 45 ------------
.../remoting/MultiProtocolRemotingServer.java | 81 ++++------------------
.../http2proxy/Http2ProtocolProxyHandler.java | 2 +-
.../protocol/remoting/RemotingProtocolHandler.java | 17 +++--
.../remoting/netty/NettyRemotingServer.java | 68 +++++++++++++-----
5 files changed, 78 insertions(+), 135 deletions(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReflectionCache.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReflectionCache.java
deleted file mode 100644
index 31fa46c90..000000000
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReflectionCache.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.proxy.common;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import java.lang.reflect.Field;
-import java.util.concurrent.TimeUnit;
-
-public class ReflectionCache {
- private final Cache<Class<?>, Field> fieldCache;
- private static final int DEFAULT_MAX_SIZE = 15;
-
- public ReflectionCache() {
- this(DEFAULT_MAX_SIZE);
- }
-
- public ReflectionCache(int maxSize) {
- this.fieldCache = CacheBuilder.newBuilder().maximumSize(maxSize).expireAfterAccess(5, TimeUnit.MINUTES).build();
- }
-
- public Field getDeclaredField(final Class<?> clazz, final String fieldName) throws Exception {
- return this.fieldCache.get(clazz, () -> {
- Field field = clazz.getDeclaredField(fieldName);
- field.setAccessible(true);
- return field;
- });
- }
-}
-
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
index 73aeeaf42..02e3a545e 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
@@ -17,18 +17,11 @@
package org.apache.rocketmq.proxy.remoting;
-import com.google.common.base.Preconditions;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
-import io.netty.util.concurrent.DefaultEventExecutorGroup;
import java.io.IOException;
-import java.lang.reflect.Field;
import java.security.cert.CertificateException;
-import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.proxy.common.ProxyException;
import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
@@ -37,7 +30,6 @@ import org.apache.rocketmq.proxy.remoting.protocol.http2proxy.Http2ProtocolProxy
import org.apache.rocketmq.proxy.remoting.protocol.remoting.RemotingProtocolHandler;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.common.TlsMode;
-import org.apache.rocketmq.remoting.netty.NettyEncoder;
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
@@ -45,26 +37,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * for remoting server, if config listen port is 8080 in nettyServerConfig
- * <p>
- * will
- * <li>listen port at 9080 with protocol remoting</li>
- * <li>listen port at 8080 with protocol remoting and http2</li>
+ * support remoting and http2 protocol at one port
*/
public class MultiProtocolRemotingServer extends NettyRemotingServer {
private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
- private static final int PORT_DELTA = 1000;
private final NettyServerConfig nettyServerConfig;
- private final int port;
public MultiProtocolRemotingServer(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener) {
super(nettyServerConfig, channelEventListener);
- this.port = nettyServerConfig.getListenPort();
- // to support multiple protocol
- // will bind the real port in configChildHandler
- // so let parent bind to a useless port
- nettyServerConfig.setListenPort(nettyServerConfig.getListenPort() + PORT_DELTA);
this.nettyServerConfig = nettyServerConfig;
}
@@ -84,50 +65,18 @@ public class MultiProtocolRemotingServer extends NettyRemotingServer {
}
@Override
- public void start() {
- super.start();
- this.configChildHandler();
- }
-
- protected void configChildHandler() {
- try {
- ServerBootstrap serverBootstrap = getField("serverBootstrap", ServerBootstrap.class);
- Preconditions.checkNotNull(serverBootstrap);
- DefaultEventExecutorGroup defaultEventExecutorGroup = getField("defaultEventExecutorGroup", DefaultEventExecutorGroup.class);
- Preconditions.checkNotNull(defaultEventExecutorGroup);
- NettyEncoder encoder = getField("encoder", NettyEncoder.class);
- Preconditions.checkNotNull(encoder);
- ChannelDuplexHandler connectionManageHandler = getField("connectionManageHandler", ChannelDuplexHandler.class);
- Preconditions.checkNotNull(connectionManageHandler);
- SimpleChannelInboundHandler serverHandler = getField("serverHandler", SimpleChannelInboundHandler.class);
- Preconditions.checkNotNull(serverHandler);
- SimpleChannelInboundHandler handshakeHandler = getField("handshakeHandler", SimpleChannelInboundHandler.class);
- Preconditions.checkNotNull(handshakeHandler);
- ConcurrentMap remotingServerTable = getField("remotingServerTable", ConcurrentMap.class);
- Preconditions.checkNotNull(remotingServerTable);
-
- serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) {
- ch.pipeline()
- .addLast(defaultEventExecutorGroup, "handshakeHandler", handshakeHandler)
- .addLast(defaultEventExecutorGroup,
- new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
- new ProtocolNegotiationHandler(new RemotingProtocolHandler(encoder, connectionManageHandler, serverHandler))
- .addProtocolHandler(new Http2ProtocolProxyHandler())
- );
- }
- });
- remotingServerTable.put(port, this);
- serverBootstrap.bind(port).sync();
- } catch (Throwable t) {
- throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "config netty child handler failed", t);
- }
- }
-
- protected <T> T getField(String name, Class<T> getClazz) throws Throwable {
- Field field = NettyRemotingServer.class.getDeclaredField(name);
- field.setAccessible(true);
- return getClazz.cast(field.get(this));
+ protected ChannelPipeline configChannel(SocketChannel ch) {
+ return ch.pipeline()
+ .addLast(this.getDefaultEventExecutorGroup(), HANDSHAKE_HANDLER_NAME, this.getHandshakeHandler())
+ .addLast(this.getDefaultEventExecutorGroup(),
+ new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
+ new ProtocolNegotiationHandler(
+ new RemotingProtocolHandler(
+ this.getEncoder(),
+ this.getDistributionHandler(),
+ this.getConnectionManageHandler(),
+ this.getServerHandler()))
+ .addProtocolHandler(new Http2ProtocolProxyHandler())
+ );
}
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
index c5050cda7..86f1ee921 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
@@ -43,7 +43,7 @@ public class Http2ProtocolProxyHandler implements ProtocolHandler {
private static final String LOCAL_HOST = "127.0.0.1";
/**
* The int value of "PRI ". Now use 4 bytes to judge protocol, may be has potential risks if there is a new protocol
- * which start with "PRI " too in the future
+ * which start with "PRI " in the future
* <p>
* The full HTTP/2 connection preface is "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
* <p>
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/remoting/RemotingProtocolHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/remoting/RemotingProtocolHandler.java
index 3e4cc7c04..2d1a04d0e 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/remoting/RemotingProtocolHandler.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/remoting/RemotingProtocolHandler.java
@@ -18,22 +18,26 @@
package org.apache.rocketmq.proxy.remoting.protocol.remoting;
import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.rocketmq.proxy.remoting.protocol.ProtocolHandler;
import org.apache.rocketmq.remoting.netty.NettyDecoder;
import org.apache.rocketmq.remoting.netty.NettyEncoder;
+import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
+import org.apache.rocketmq.remoting.netty.RemotingCodeDistributionHandler;
public class RemotingProtocolHandler implements ProtocolHandler {
private final NettyEncoder encoder;
- private final ChannelDuplexHandler connectionManageHandler;
- private final SimpleChannelInboundHandler serverHandler;
+ private final RemotingCodeDistributionHandler remotingCodeDistributionHandler;
+ private final NettyRemotingServer.NettyConnectManageHandler connectionManageHandler;
+ private final NettyRemotingServer.NettyServerHandler serverHandler;
- public RemotingProtocolHandler(NettyEncoder encoder, ChannelDuplexHandler connectionManageHandler,
- SimpleChannelInboundHandler serverHandler) {
+ public RemotingProtocolHandler(NettyEncoder encoder,
+ RemotingCodeDistributionHandler remotingCodeDistributionHandler,
+ NettyRemotingServer.NettyConnectManageHandler connectionManageHandler,
+ NettyRemotingServer.NettyServerHandler serverHandler) {
this.encoder = encoder;
+ this.remotingCodeDistributionHandler = remotingCodeDistributionHandler;
this.connectionManageHandler = connectionManageHandler;
this.serverHandler = serverHandler;
}
@@ -48,6 +52,7 @@ public class RemotingProtocolHandler implements ProtocolHandler {
ctx.pipeline().addLast(
this.encoder,
new NettyDecoder(),
+ this.remotingCodeDistributionHandler,
this.connectionManageHandler,
this.serverHandler
);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 1b364b6ee..646c0734e 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -26,6 +26,7 @@ import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.WriteBufferWaterMark;
@@ -93,9 +94,9 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
*/
private final ConcurrentMap<Integer/*Port*/, NettyRemotingAbstract> remotingServerTable = new ConcurrentHashMap<>();
- private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
- private static final String TLS_HANDLER_NAME = "sslHandler";
- private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
+ public static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
+ public static final String TLS_HANDLER_NAME = "sslHandler";
+ public static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
// sharable handlers
private HandshakeHandler handshakeHandler;
@@ -242,17 +243,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
- ch.pipeline()
- .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
- .addLast(defaultEventExecutorGroup,
- encoder,
- new NettyDecoder(),
- distributionHandler,
- new IdleStateHandler(0, 0,
- nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
- connectionManageHandler,
- serverHandler
- );
+ configChannel(ch);
}
});
@@ -297,6 +288,25 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
}, 1, 1, TimeUnit.SECONDS);
}
+ /**
+ * config channel in ChannelInitializer
+ * @param ch the SocketChannel needed to init
+ * @return the initialized ChannelPipeline, sub class can use it to extent in the future
+ */
+ protected ChannelPipeline configChannel(SocketChannel ch) {
+ return ch.pipeline()
+ .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
+ .addLast(defaultEventExecutorGroup,
+ encoder,
+ new NettyDecoder(),
+ distributionHandler,
+ new IdleStateHandler(0, 0,
+ nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
+ connectionManageHandler,
+ serverHandler
+ );
+ }
+
private void addCustomConfig(ServerBootstrap childHandler) {
if (nettyServerConfig.getServerSocketSndBufSize() > 0) {
log.info("server set SO_SNDBUF to {}", nettyServerConfig.getServerSocketSndBufSize());
@@ -438,8 +448,32 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
}
}
+ public DefaultEventExecutorGroup getDefaultEventExecutorGroup() {
+ return defaultEventExecutorGroup;
+ }
+
+ public HandshakeHandler getHandshakeHandler() {
+ return handshakeHandler;
+ }
+
+ public NettyEncoder getEncoder() {
+ return encoder;
+ }
+
+ public NettyConnectManageHandler getConnectionManageHandler() {
+ return connectionManageHandler;
+ }
+
+ public NettyServerHandler getServerHandler() {
+ return serverHandler;
+ }
+
+ public RemotingCodeDistributionHandler getDistributionHandler() {
+ return distributionHandler;
+ }
+
@ChannelHandler.Sharable
- class HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf> {
+ public class HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final TlsMode tlsMode;
@@ -496,7 +530,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
}
@ChannelHandler.Sharable
- class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
+ public class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) {
@@ -529,7 +563,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
}
@ChannelHandler.Sharable
- class NettyConnectManageHandler extends ChannelDuplexHandler {
+ public class NettyConnectManageHandler extends ChannelDuplexHandler {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());