You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/09 11:53:47 UTC
[rocketmq] 06/26: [ISSUE #5486] Add remoting server
This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 48673af7641418d3ae8cd02aa5c341e552036900
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Wed Nov 9 14:53:42 2022 +0800
[ISSUE #5486] Add remoting server
---
.../org/apache/rocketmq/proxy/ProxyStartup.java | 4 +
.../rocketmq/proxy/common/ReflectionCache.java | 45 +++
.../apache/rocketmq/proxy/config/ProxyConfig.java | 196 +++++++++++++
.../remoting/MultiProtocolRemotingServer.java | 133 +++++++++
.../proxy/remoting/MultiProtocolTlsHelper.java | 113 ++++++++
.../proxy/remoting/RemotingProtocolServer.java | 304 ++++++++++++++++++++-
.../proxy/remoting/protocol/ProtocolHandler.java | 28 ++
.../protocol/ProtocolNegotiationHandler.java | 61 +++++
.../http2proxy/Http2ProtocolProxyHandler.java | 119 ++++++++
.../http2proxy/Http2ProxyBackendHandler.java | 67 +++++
.../http2proxy/Http2ProxyFrontendHandler.java | 78 ++++++
.../protocol/remoting/RemotingProtocolHandler.java | 55 ++++
12 files changed, 1199 insertions(+), 4 deletions(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
index 78399cf35..42a833430 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
@@ -47,6 +47,7 @@ import org.apache.rocketmq.proxy.grpc.v2.GrpcMessagingApplication;
import org.apache.rocketmq.proxy.metrics.ProxyMetricsManager;
import org.apache.rocketmq.proxy.processor.DefaultMessagingProcessor;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.remoting.RemotingProtocolServer;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.srvutil.ServerUtil;
@@ -83,6 +84,9 @@ public class ProxyStartup {
.build();
PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(grpcServer);
+ RemotingProtocolServer remotingServer = new RemotingProtocolServer(messagingProcessor);
+ PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(remotingServer);
+
// start servers one by one.
PROXY_START_AND_SHUTDOWN.start();
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReflectionCache.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReflectionCache.java
new file mode 100644
index 000000000..31fa46c90
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReflectionCache.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.common;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
+
+public class ReflectionCache {
+ private final Cache<Class<?>, Field> fieldCache;
+ private static final int DEFAULT_MAX_SIZE = 15;
+
+ public ReflectionCache() {
+ this(DEFAULT_MAX_SIZE);
+ }
+
+ public ReflectionCache(int maxSize) {
+ this.fieldCache = CacheBuilder.newBuilder().maximumSize(maxSize).expireAfterAccess(5, TimeUnit.MINUTES).build();
+ }
+
+ public Field getDeclaredField(final Class<?> clazz, final String fieldName) throws Exception {
+ return this.fieldCache.get(clazz, () -> {
+ Field field = clazz.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ return field;
+ });
+ }
+}
+
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index 0efca05b4..b613c191e 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -209,6 +209,33 @@ public class ProxyConfig implements ConfigFile {
private long channelExpiredTimeout = 1000 * 120;
+ // remoting
+
+ private boolean enableRemotingLocalProxyGrpc = true;
+ private int localProxyConnectTimeoutMs = 3000;
+ private int remotingListenPort = 8080;
+
+ private int remotingHeartbeatThreadPoolNums = 2 * PROCESSOR_NUMBER;
+ private int remotingTopicRouteThreadPoolNums = 2 * PROCESSOR_NUMBER;
+ private int remotingSendMessageThreadPoolNums = 4 * PROCESSOR_NUMBER;
+ private int remotingPullMessageThreadPoolNums = 4 * PROCESSOR_NUMBER;
+ private int remotingUpdateOffsetThreadPoolNums = 4 * PROCESSOR_NUMBER;
+ private int remotingDefaultThreadPoolNums = 4 * PROCESSOR_NUMBER;
+
+ private int remotingHeartbeatThreadPoolQueueCapacity = 50000;
+ private int remotingTopicRouteThreadPoolQueueCapacity = 50000;
+ private int remotingSendThreadPoolQueueCapacity = 10000;
+ private int remotingPullThreadPoolQueueCapacity = 50000;
+ private int remotingUpdateOffsetThreadPoolQueueCapacity = 10000;
+ private int remotingDefaultThreadPoolQueueCapacity = 50000;
+
+ private long remotingWaitTimeMillsInSendQueue = 3 * 1000;
+ private long remotingWaitTimeMillsInPullQueue = 5 * 1000;
+ private long remotingWaitTimeMillsInHeartbeatQueue = 31 * 1000;
+ private long remotingWaitTimeMillsInUpdateOffsetQueue = 3 * 1000;
+ private long remotingWaitTimeMillsInTopicRouteQueue = 3 * 1000;
+ private long remotingWaitTimeMillsInDefaultQueue = 3 * 1000;
+
@Override
public void initData() {
parseDelayLevel();
@@ -1124,7 +1151,176 @@ public class ProxyConfig implements ConfigFile {
return channelExpiredTimeout;
}
+ public boolean isEnableRemotingLocalProxyGrpc() {
+ return enableRemotingLocalProxyGrpc;
+ }
+
public void setChannelExpiredTimeout(long channelExpiredTimeout) {
this.channelExpiredTimeout = channelExpiredTimeout;
}
}
+
+ public void setEnableRemotingLocalProxyGrpc(boolean enableRemotingLocalProxyGrpc) {
+ this.enableRemotingLocalProxyGrpc = enableRemotingLocalProxyGrpc;
+ }
+
+ public int getLocalProxyConnectTimeoutMs() {
+ return localProxyConnectTimeoutMs;
+ }
+
+ public void setLocalProxyConnectTimeoutMs(int localProxyConnectTimeoutMs) {
+ this.localProxyConnectTimeoutMs = localProxyConnectTimeoutMs;
+ }
+
+ public int getRemotingListenPort() {
+ return remotingListenPort;
+ }
+
+ public void setRemotingListenPort(int remotingListenPort) {
+ this.remotingListenPort = remotingListenPort;
+ }
+
+ public int getRemotingHeartbeatThreadPoolNums() {
+ return remotingHeartbeatThreadPoolNums;
+ }
+
+ public void setRemotingHeartbeatThreadPoolNums(int remotingHeartbeatThreadPoolNums) {
+ this.remotingHeartbeatThreadPoolNums = remotingHeartbeatThreadPoolNums;
+ }
+
+ public int getRemotingTopicRouteThreadPoolNums() {
+ return remotingTopicRouteThreadPoolNums;
+ }
+
+ public void setRemotingTopicRouteThreadPoolNums(int remotingTopicRouteThreadPoolNums) {
+ this.remotingTopicRouteThreadPoolNums = remotingTopicRouteThreadPoolNums;
+ }
+
+ public int getRemotingSendMessageThreadPoolNums() {
+ return remotingSendMessageThreadPoolNums;
+ }
+
+ public void setRemotingSendMessageThreadPoolNums(int remotingSendMessageThreadPoolNums) {
+ this.remotingSendMessageThreadPoolNums = remotingSendMessageThreadPoolNums;
+ }
+
+ public int getRemotingPullMessageThreadPoolNums() {
+ return remotingPullMessageThreadPoolNums;
+ }
+
+ public void setRemotingPullMessageThreadPoolNums(int remotingPullMessageThreadPoolNums) {
+ this.remotingPullMessageThreadPoolNums = remotingPullMessageThreadPoolNums;
+ }
+
+ public int getRemotingUpdateOffsetThreadPoolNums() {
+ return remotingUpdateOffsetThreadPoolNums;
+ }
+
+ public void setRemotingUpdateOffsetThreadPoolNums(int remotingUpdateOffsetThreadPoolNums) {
+ this.remotingUpdateOffsetThreadPoolNums = remotingUpdateOffsetThreadPoolNums;
+ }
+
+ public int getRemotingDefaultThreadPoolNums() {
+ return remotingDefaultThreadPoolNums;
+ }
+
+ public void setRemotingDefaultThreadPoolNums(int remotingDefaultThreadPoolNums) {
+ this.remotingDefaultThreadPoolNums = remotingDefaultThreadPoolNums;
+ }
+
+ public int getRemotingHeartbeatThreadPoolQueueCapacity() {
+ return remotingHeartbeatThreadPoolQueueCapacity;
+ }
+
+ public void setRemotingHeartbeatThreadPoolQueueCapacity(int remotingHeartbeatThreadPoolQueueCapacity) {
+ this.remotingHeartbeatThreadPoolQueueCapacity = remotingHeartbeatThreadPoolQueueCapacity;
+ }
+
+ public int getRemotingTopicRouteThreadPoolQueueCapacity() {
+ return remotingTopicRouteThreadPoolQueueCapacity;
+ }
+
+ public void setRemotingTopicRouteThreadPoolQueueCapacity(int remotingTopicRouteThreadPoolQueueCapacity) {
+ this.remotingTopicRouteThreadPoolQueueCapacity = remotingTopicRouteThreadPoolQueueCapacity;
+ }
+
+ public int getRemotingSendThreadPoolQueueCapacity() {
+ return remotingSendThreadPoolQueueCapacity;
+ }
+
+ public void setRemotingSendThreadPoolQueueCapacity(int remotingSendThreadPoolQueueCapacity) {
+ this.remotingSendThreadPoolQueueCapacity = remotingSendThreadPoolQueueCapacity;
+ }
+
+ public int getRemotingPullThreadPoolQueueCapacity() {
+ return remotingPullThreadPoolQueueCapacity;
+ }
+
+ public void setRemotingPullThreadPoolQueueCapacity(int remotingPullThreadPoolQueueCapacity) {
+ this.remotingPullThreadPoolQueueCapacity = remotingPullThreadPoolQueueCapacity;
+ }
+
+ public int getRemotingUpdateOffsetThreadPoolQueueCapacity() {
+ return remotingUpdateOffsetThreadPoolQueueCapacity;
+ }
+
+ public void setRemotingUpdateOffsetThreadPoolQueueCapacity(int remotingUpdateOffsetThreadPoolQueueCapacity) {
+ this.remotingUpdateOffsetThreadPoolQueueCapacity = remotingUpdateOffsetThreadPoolQueueCapacity;
+ }
+
+ public int getRemotingDefaultThreadPoolQueueCapacity() {
+ return remotingDefaultThreadPoolQueueCapacity;
+ }
+
+ public void setRemotingDefaultThreadPoolQueueCapacity(int remotingDefaultThreadPoolQueueCapacity) {
+ this.remotingDefaultThreadPoolQueueCapacity = remotingDefaultThreadPoolQueueCapacity;
+ }
+
+ public long getRemotingWaitTimeMillsInSendQueue() {
+ return remotingWaitTimeMillsInSendQueue;
+ }
+
+ public void setRemotingWaitTimeMillsInSendQueue(long remotingWaitTimeMillsInSendQueue) {
+ this.remotingWaitTimeMillsInSendQueue = remotingWaitTimeMillsInSendQueue;
+ }
+
+ public long getRemotingWaitTimeMillsInPullQueue() {
+ return remotingWaitTimeMillsInPullQueue;
+ }
+
+ public void setRemotingWaitTimeMillsInPullQueue(long remotingWaitTimeMillsInPullQueue) {
+ this.remotingWaitTimeMillsInPullQueue = remotingWaitTimeMillsInPullQueue;
+ }
+
+ public long getRemotingWaitTimeMillsInHeartbeatQueue() {
+ return remotingWaitTimeMillsInHeartbeatQueue;
+ }
+
+ public void setRemotingWaitTimeMillsInHeartbeatQueue(long remotingWaitTimeMillsInHeartbeatQueue) {
+ this.remotingWaitTimeMillsInHeartbeatQueue = remotingWaitTimeMillsInHeartbeatQueue;
+ }
+
+ public long getRemotingWaitTimeMillsInUpdateOffsetQueue() {
+ return remotingWaitTimeMillsInUpdateOffsetQueue;
+ }
+
+ public void setRemotingWaitTimeMillsInUpdateOffsetQueue(long remotingWaitTimeMillsInUpdateOffsetQueue) {
+ this.remotingWaitTimeMillsInUpdateOffsetQueue = remotingWaitTimeMillsInUpdateOffsetQueue;
+ }
+
+ public long getRemotingWaitTimeMillsInTopicRouteQueue() {
+ return remotingWaitTimeMillsInTopicRouteQueue;
+ }
+
+ public void setRemotingWaitTimeMillsInTopicRouteQueue(long remotingWaitTimeMillsInTopicRouteQueue) {
+ this.remotingWaitTimeMillsInTopicRouteQueue = remotingWaitTimeMillsInTopicRouteQueue;
+ }
+
+ public long getRemotingWaitTimeMillsInDefaultQueue() {
+ return remotingWaitTimeMillsInDefaultQueue;
+ }
+
+ public void setRemotingWaitTimeMillsInDefaultQueue(long remotingWaitTimeMillsInDefaultQueue) {
+ this.remotingWaitTimeMillsInDefaultQueue = remotingWaitTimeMillsInDefaultQueue;
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
new file mode 100644
index 000000000..73aeeaf42
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting;
+
+import com.google.common.base.Preconditions;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.security.cert.CertificateException;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.proxy.common.ProxyException;
+import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
+import org.apache.rocketmq.proxy.remoting.protocol.ProtocolNegotiationHandler;
+import org.apache.rocketmq.proxy.remoting.protocol.http2proxy.Http2ProtocolProxyHandler;
+import org.apache.rocketmq.proxy.remoting.protocol.remoting.RemotingProtocolHandler;
+import org.apache.rocketmq.remoting.ChannelEventListener;
+import org.apache.rocketmq.remoting.common.TlsMode;
+import org.apache.rocketmq.remoting.netty.NettyEncoder;
+import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * for remoting server, if config listen port is 8080 in nettyServerConfig
+ * <p>
+ * will
+ * <li>listen port at 9080 with protocol remoting</li>
+ * <li>listen port at 8080 with protocol remoting and http2</li>
+ */
+public class MultiProtocolRemotingServer extends NettyRemotingServer {
+
+ private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+ private static final int PORT_DELTA = 1000;
+ private final NettyServerConfig nettyServerConfig;
+ private final int port;
+
+ public MultiProtocolRemotingServer(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener) {
+ super(nettyServerConfig, channelEventListener);
+ this.port = nettyServerConfig.getListenPort();
+ // to support multiple protocol
+ // will bind the real port in configChildHandler
+ // so let parent bind to a useless port
+ nettyServerConfig.setListenPort(nettyServerConfig.getListenPort() + PORT_DELTA);
+ this.nettyServerConfig = nettyServerConfig;
+ }
+
+ @Override
+ public void loadSslContext() {
+ TlsMode tlsMode = TlsSystemConfig.tlsMode;
+ log.info("Server is running in TLS {} mode", tlsMode.getName());
+
+ if (tlsMode != TlsMode.DISABLED) {
+ try {
+ sslContext = MultiProtocolTlsHelper.buildSslContext();
+ log.info("SSLContext created for server");
+ } catch (CertificateException | IOException e) {
+ throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "Failed to create SSLContext for server", e);
+ }
+ }
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ this.configChildHandler();
+ }
+
+ protected void configChildHandler() {
+ try {
+ ServerBootstrap serverBootstrap = getField("serverBootstrap", ServerBootstrap.class);
+ Preconditions.checkNotNull(serverBootstrap);
+ DefaultEventExecutorGroup defaultEventExecutorGroup = getField("defaultEventExecutorGroup", DefaultEventExecutorGroup.class);
+ Preconditions.checkNotNull(defaultEventExecutorGroup);
+ NettyEncoder encoder = getField("encoder", NettyEncoder.class);
+ Preconditions.checkNotNull(encoder);
+ ChannelDuplexHandler connectionManageHandler = getField("connectionManageHandler", ChannelDuplexHandler.class);
+ Preconditions.checkNotNull(connectionManageHandler);
+ SimpleChannelInboundHandler serverHandler = getField("serverHandler", SimpleChannelInboundHandler.class);
+ Preconditions.checkNotNull(serverHandler);
+ SimpleChannelInboundHandler handshakeHandler = getField("handshakeHandler", SimpleChannelInboundHandler.class);
+ Preconditions.checkNotNull(handshakeHandler);
+ ConcurrentMap remotingServerTable = getField("remotingServerTable", ConcurrentMap.class);
+ Preconditions.checkNotNull(remotingServerTable);
+
+ serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ protected void initChannel(SocketChannel ch) {
+ ch.pipeline()
+ .addLast(defaultEventExecutorGroup, "handshakeHandler", handshakeHandler)
+ .addLast(defaultEventExecutorGroup,
+ new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
+ new ProtocolNegotiationHandler(new RemotingProtocolHandler(encoder, connectionManageHandler, serverHandler))
+ .addProtocolHandler(new Http2ProtocolProxyHandler())
+ );
+ }
+ });
+ remotingServerTable.put(port, this);
+ serverBootstrap.bind(port).sync();
+ } catch (Throwable t) {
+ throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "config netty child handler failed", t);
+ }
+ }
+
+ protected <T> T getField(String name, Class<T> getClazz) throws Throwable {
+ Field field = NettyRemotingServer.class.getDeclaredField(name);
+ field.setAccessible(true);
+ return getClazz.cast(field.get(this));
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolTlsHelper.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolTlsHelper.java
new file mode 100644
index 000000000..54af7bc9e
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolTlsHelper.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting;
+
+import io.netty.handler.ssl.ApplicationProtocolConfig;
+import io.netty.handler.ssl.ApplicationProtocolNames;
+import io.netty.handler.ssl.ClientAuth;
+import io.netty.handler.ssl.OpenSsl;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import io.netty.handler.ssl.util.SelfSignedCertificate;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.cert.CertificateException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.remoting.netty.TlsHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsServerAuthClient;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsServerCertPath;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsServerKeyPassword;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsServerKeyPath;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsServerNeedClientAuth;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsServerTrustCertPath;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsTestModeEnable;
+
+public class MultiProtocolTlsHelper extends TlsHelper {
+ private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+ private static final DecryptionStrategy DECRYPTION_STRATEGY = (privateKeyEncryptPath, forClient) -> new FileInputStream(privateKeyEncryptPath);
+
+ public static SslContext buildSslContext() throws IOException, CertificateException {
+ TlsHelper.buildSslContext(false);
+ SslProvider provider;
+ if (OpenSsl.isAvailable()) {
+ provider = SslProvider.OPENSSL;
+ log.info("Using OpenSSL provider");
+ } else {
+ provider = SslProvider.JDK;
+ log.info("Using JDK SSL provider");
+ }
+
+ SslContextBuilder sslContextBuilder = null;
+ if (tlsTestModeEnable) {
+ SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate();
+ sslContextBuilder = SslContextBuilder
+ .forServer(selfSignedCertificate.certificate(), selfSignedCertificate.privateKey())
+ .sslProvider(SslProvider.OPENSSL)
+ .clientAuth(ClientAuth.OPTIONAL);
+ } else {
+ sslContextBuilder = SslContextBuilder.forServer(
+ !StringUtils.isBlank(tlsServerCertPath) ? Files.newInputStream(Paths.get(tlsServerCertPath)) : null,
+ !StringUtils.isBlank(tlsServerKeyPath) ? DECRYPTION_STRATEGY.decryptPrivateKey(tlsServerKeyPath, false) : null,
+ !StringUtils.isBlank(tlsServerKeyPassword) ? tlsServerKeyPassword : null)
+ .sslProvider(provider);
+
+ if (!tlsServerAuthClient) {
+ sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
+ } else {
+ if (!StringUtils.isBlank(tlsServerTrustCertPath)) {
+ sslContextBuilder.trustManager(new File(tlsServerTrustCertPath));
+ }
+ }
+
+ sslContextBuilder.clientAuth(parseClientAuthMode(tlsServerNeedClientAuth));
+ }
+
+ sslContextBuilder.applicationProtocolConfig(new ApplicationProtocolConfig(
+ ApplicationProtocolConfig.Protocol.ALPN,
+ // NO_ADVERTISE is currently the only mode supported by both OpenSsl and JDK providers.
+ ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
+ // ACCEPT is currently the only mode supported by both OpenSsl and JDK providers.
+ ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
+ ApplicationProtocolNames.HTTP_2));
+
+ return sslContextBuilder.build();
+ }
+
+ private static ClientAuth parseClientAuthMode(String authMode) {
+ if (null == authMode || authMode.trim().isEmpty()) {
+ return ClientAuth.NONE;
+ }
+
+ for (ClientAuth clientAuth : ClientAuth.values()) {
+ if (clientAuth.name().equals(authMode.toUpperCase())) {
+ return clientAuth;
+ }
+ }
+
+ return ClientAuth.NONE;
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
index 58b257641..fdf1870a5 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
@@ -18,20 +18,161 @@
package org.apache.rocketmq.proxy.remoting;
import io.netty.channel.Channel;
+import java.lang.reflect.Field;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.broker.latency.FutureTaskExt;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
+import org.apache.rocketmq.common.thread.ThreadPoolStatusMonitor;
+import org.apache.rocketmq.proxy.common.ReflectionCache;
import org.apache.rocketmq.proxy.common.StartAndShutdown;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.remoting.activity.AckMessageActivity;
+import org.apache.rocketmq.proxy.remoting.activity.ChangeInvisibleTimeActivity;
+import org.apache.rocketmq.proxy.remoting.activity.ClientManagerActivity;
+import org.apache.rocketmq.proxy.remoting.activity.ConsumerManagerActivity;
+import org.apache.rocketmq.proxy.remoting.activity.GetTopicRouteActivity;
+import org.apache.rocketmq.proxy.remoting.activity.PopMessageActivity;
+import org.apache.rocketmq.proxy.remoting.activity.PullMessageActivity;
+import org.apache.rocketmq.proxy.remoting.activity.SendMessageActivity;
+import org.apache.rocketmq.proxy.remoting.channel.RemotingChannelManager;
+import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
+import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.RemotingServer;
+import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.netty.RequestTask;
+import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOutClient {
+ private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
- private final MessagingProcessor messagingProcessor;
- private RemotingServer defaultRemotingServer;
+ protected final MessagingProcessor messagingProcessor;
+ protected final RemotingChannelManager remotingChannelManager;
+ protected final ChannelEventListener clientHousekeepingService;
+ protected final RemotingServer defaultRemotingServer;
+ protected final GetTopicRouteActivity getTopicRouteActivity;
+ protected final ClientManagerActivity clientManagerActivity;
+ protected final ConsumerManagerActivity consumerManagerActivity;
+ protected final SendMessageActivity sendMessageActivity;
+ protected final PullMessageActivity pullMessageActivity;
+ protected final PopMessageActivity popMessageActivity;
+ protected final AckMessageActivity ackMessageActivity;
+ protected final ChangeInvisibleTimeActivity changeInvisibleTimeActivity;
+ protected final ThreadPoolExecutor sendMessageExecutor;
+ protected final ThreadPoolExecutor pullMessageExecutor;
+ protected final ThreadPoolExecutor heartbeatExecutor;
+ protected final ThreadPoolExecutor updateOffsetExecutor;
+ protected final ThreadPoolExecutor topicRouteExecutor;
+ protected final ThreadPoolExecutor defaultExecutor;
+
+ private final ReflectionCache reflectionCache = new ReflectionCache();
public RemotingProtocolServer(MessagingProcessor messagingProcessor) {
this.messagingProcessor = messagingProcessor;
+ this.remotingChannelManager = new RemotingChannelManager(this, messagingProcessor.getProxyRelayService());
+
+ RequestPipeline pipeline = createRequestPipeline();
+ this.getTopicRouteActivity = new GetTopicRouteActivity(pipeline, messagingProcessor);
+ this.clientManagerActivity = new ClientManagerActivity(pipeline, messagingProcessor, remotingChannelManager);
+ this.consumerManagerActivity = new ConsumerManagerActivity(pipeline, messagingProcessor);
+ this.sendMessageActivity = new SendMessageActivity(pipeline, messagingProcessor);
+ this.pullMessageActivity = new PullMessageActivity(pipeline, messagingProcessor);
+ this.popMessageActivity = new PopMessageActivity(pipeline, messagingProcessor);
+ this.ackMessageActivity = new AckMessageActivity(pipeline, messagingProcessor);
+ this.changeInvisibleTimeActivity = new ChangeInvisibleTimeActivity(pipeline, messagingProcessor);
+
+ ProxyConfig config = ConfigurationManager.getProxyConfig();
+ NettyServerConfig defaultServerConfig = new NettyServerConfig();
+ defaultServerConfig.setListenPort(config.getRemotingListenPort());
+ TlsSystemConfig.tlsTestModeEnable = false;
+ System.setProperty(TlsSystemConfig.TLS_TEST_MODE_ENABLE, "false");
+ TlsSystemConfig.tlsServerCertPath = config.getGrpcTlsCertPath();
+ System.setProperty(TlsSystemConfig.TLS_SERVER_CERTPATH, config.getGrpcTlsCertPath());
+ TlsSystemConfig.tlsServerKeyPath = config.getGrpcTlsKeyPath();
+ System.setProperty(TlsSystemConfig.TLS_SERVER_KEYPATH, config.getGrpcTlsKeyPath());
+
+ this.clientHousekeepingService = new ClientHousekeepingService(this.clientManagerActivity);
+
+ if (config.isEnableRemotingLocalProxyGrpc()) {
+ this.defaultRemotingServer = new MultiProtocolRemotingServer(defaultServerConfig, this.clientHousekeepingService);
+ } else {
+ this.defaultRemotingServer = new NettyRemotingServer(defaultServerConfig, this.clientHousekeepingService);
+ }
+ this.registerRemotingServer(this.defaultRemotingServer);
+
+ this.sendMessageExecutor = ThreadPoolMonitor.createAndMonitor(
+ config.getRemotingSendMessageThreadPoolNums(),
+ config.getRemotingSendMessageThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ "RemotingSendMessageThread",
+ config.getRemotingSendThreadPoolQueueCapacity(),
+ new ThreadPoolHeadSlowTimeMillsMonitor(config.getRemotingWaitTimeMillsInSendQueue())
+ );
+
+ this.pullMessageExecutor = ThreadPoolMonitor.createAndMonitor(
+ config.getRemotingPullMessageThreadPoolNums(),
+ config.getRemotingPullMessageThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ "RemotingPullMessageThread",
+ config.getRemotingPullThreadPoolQueueCapacity(),
+ new ThreadPoolHeadSlowTimeMillsMonitor(config.getRemotingWaitTimeMillsInPullQueue())
+ );
+
+ this.updateOffsetExecutor = ThreadPoolMonitor.createAndMonitor(
+ config.getRemotingUpdateOffsetThreadPoolNums(),
+ config.getRemotingUpdateOffsetThreadPoolNums(),
+ 1,
+ TimeUnit.MINUTES,
+ "RemotingUpdateOffsetThread",
+ config.getRemotingUpdateOffsetThreadPoolQueueCapacity(),
+ new ThreadPoolHeadSlowTimeMillsMonitor(config.getRemotingWaitTimeMillsInUpdateOffsetQueue())
+ );
+
+ this.heartbeatExecutor = ThreadPoolMonitor.createAndMonitor(
+ config.getRemotingHeartbeatThreadPoolNums(),
+ config.getRemotingHeartbeatThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ "RemotingHeartbeatThread",
+ config.getRemotingHeartbeatThreadPoolQueueCapacity(),
+ new ThreadPoolHeadSlowTimeMillsMonitor(config.getRemotingWaitTimeMillsInHeartbeatQueue())
+ );
+
+ this.topicRouteExecutor = ThreadPoolMonitor.createAndMonitor(
+ config.getRemotingTopicRouteThreadPoolNums(),
+ config.getRemotingTopicRouteThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ "RemotingTopicRouteThread",
+ config.getRemotingTopicRouteThreadPoolQueueCapacity(),
+ new ThreadPoolHeadSlowTimeMillsMonitor(config.getRemotingWaitTimeMillsInTopicRouteQueue())
+ );
+
+ this.defaultExecutor = ThreadPoolMonitor.createAndMonitor(
+ config.getRemotingDefaultThreadPoolNums(),
+ config.getRemotingDefaultThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ "RemotingDefaultThread",
+ config.getRemotingDefaultThreadPoolQueueCapacity(),
+ new ThreadPoolHeadSlowTimeMillsMonitor(config.getRemotingWaitTimeMillsInDefaultQueue())
+ );
}
protected void init() {
@@ -39,17 +180,50 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu
}
protected void registerRemotingServer(RemotingServer remotingServer) {
+ remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendMessageActivity, this.sendMessageExecutor);
+ remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageActivity, this.sendMessageExecutor);
+ remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendMessageActivity, this.sendMessageExecutor);
+ remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageActivity, sendMessageExecutor);
+
+ remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManagerActivity, this.heartbeatExecutor);
+ remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManagerActivity, this.defaultExecutor);
+ remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManagerActivity, this.defaultExecutor);
+
+ remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, pullMessageActivity, this.pullMessageExecutor);
+ remotingServer.registerProcessor(RequestCode.LITE_PULL_MESSAGE, pullMessageActivity, this.pullMessageExecutor);
+ remotingServer.registerProcessor(RequestCode.POP_MESSAGE, pullMessageActivity, this.pullMessageExecutor);
+ remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManagerActivity, this.updateOffsetExecutor);
+ remotingServer.registerProcessor(RequestCode.ACK_MESSAGE, consumerManagerActivity, this.updateOffsetExecutor);
+ remotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, consumerManagerActivity, this.updateOffsetExecutor);
+
+ remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManagerActivity, this.defaultExecutor);
+ remotingServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManagerActivity, this.defaultExecutor);
+ remotingServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManagerActivity, this.defaultExecutor);
+ remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManagerActivity, this.defaultExecutor);
+ remotingServer.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManagerActivity, this.defaultExecutor);
+ remotingServer.registerProcessor(RequestCode.LOCK_BATCH_MQ, consumerManagerActivity, this.defaultExecutor);
+ remotingServer.registerProcessor(RequestCode.UNLOCK_BATCH_MQ, consumerManagerActivity, this.defaultExecutor);
+
+ remotingServer.registerProcessor(RequestCode.GET_ROUTEINFO_BY_TOPIC, getTopicRouteActivity, this.topicRouteExecutor);
}
@Override
public void shutdown() throws Exception {
-
+ this.defaultRemotingServer.shutdown();
+ this.remotingChannelManager.shutdown();
+ this.sendMessageExecutor.shutdown();
+ this.pullMessageExecutor.shutdown();
+ this.heartbeatExecutor.shutdown();
+ this.updateOffsetExecutor.shutdown();
+ this.topicRouteExecutor.shutdown();
+ this.defaultExecutor.shutdown();
}
@Override
public void start() throws Exception {
-
+ this.remotingChannelManager.start();
+ this.defaultRemotingServer.start();
}
@Override
@@ -69,4 +243,126 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu
}
return future;
}
+
+ protected RequestPipeline createRequestPipeline() {
+ RequestPipeline pipeline = (ctx, request, context) -> {
+ };
+
+ // add pipeline
+ // the last pipe add will execute at the first
+ return pipeline;
+ }
+
+ protected class ThreadPoolHeadSlowTimeMillsMonitor implements ThreadPoolStatusMonitor {
+
+ private final long maxWaitTimeMillsInQueue;
+
+ public ThreadPoolHeadSlowTimeMillsMonitor(long maxWaitTimeMillsInQueue) {
+ this.maxWaitTimeMillsInQueue = maxWaitTimeMillsInQueue;
+ }
+
+ @Override
+ public String describe() {
+ return "headSlow";
+ }
+
+ @Override
+ public double value(ThreadPoolExecutor executor) {
+ return headSlowTimeMills(executor.getQueue());
+ }
+
+ @Override
+ public boolean needPrintJstack(ThreadPoolExecutor executor, double value) {
+ return value > maxWaitTimeMillsInQueue;
+ }
+ }
+
+ protected long headSlowTimeMills(BlockingQueue<Runnable> q) {
+ try {
+ long slowTimeMills = 0;
+ final Runnable peek = q.peek();
+ if (peek != null) {
+ RequestTask rt = castRunnable(peek);
+ slowTimeMills = rt == null ? 0 : System.currentTimeMillis() - rt.getCreateTimestamp();
+ }
+
+ if (slowTimeMills < 0) {
+ slowTimeMills = 0;
+ }
+
+ return slowTimeMills;
+ } catch (Exception e) {
+ log.error("error when headSlowTimeMills.", e);
+ }
+ return -1;
+ }
+
+ protected void cleanExpireRequest() {
+ ProxyConfig config = ConfigurationManager.getProxyConfig();
+
+ cleanExpiredRequestInQueue(this.sendMessageExecutor, config.getRemotingWaitTimeMillsInSendQueue());
+ cleanExpiredRequestInQueue(this.pullMessageExecutor, config.getRemotingWaitTimeMillsInPullQueue());
+ cleanExpiredRequestInQueue(this.heartbeatExecutor, config.getRemotingWaitTimeMillsInHeartbeatQueue());
+ cleanExpiredRequestInQueue(this.updateOffsetExecutor, config.getRemotingWaitTimeMillsInUpdateOffsetQueue());
+ cleanExpiredRequestInQueue(this.topicRouteExecutor, config.getRemotingWaitTimeMillsInTopicRouteQueue());
+ cleanExpiredRequestInQueue(this.defaultExecutor, config.getRemotingWaitTimeMillsInDefaultQueue());
+ }
+
+ protected void cleanExpiredRequestInQueue(ThreadPoolExecutor threadPoolExecutor, long maxWaitTimeMillsInQueue) {
+ while (true) {
+ try {
+ BlockingQueue<Runnable> blockingQueue = threadPoolExecutor.getQueue();
+ if (!blockingQueue.isEmpty()) {
+ final Runnable runnable = blockingQueue.peek();
+ if (null == runnable) {
+ break;
+ }
+ final RequestTask rt = castRunnable(runnable);
+ if (rt == null || rt.isStopRun()) {
+ break;
+ }
+
+ final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
+ if (behind >= maxWaitTimeMillsInQueue) {
+ if (blockingQueue.remove(runnable)) {
+ rt.setStopRun(true);
+ rt.returnResponse(ResponseCode.SYSTEM_BUSY,
+ String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
+ }
+ } else {
+ break;
+ }
+ } else {
+ break;
+ }
+ } catch (Throwable ignored) {
+ }
+ }
+ }
+
+ private RequestTask castRunnable(final Runnable runnable) {
+ try {
+ if (runnable instanceof FutureTask) {
+ Field callableField = reflectionCache.getDeclaredField(FutureTask.class, "callable");
+ Callable callable = (Callable) callableField.get(runnable);
+ if (callable == null) {
+ return null;
+ }
+ Field taskField = reflectionCache.getDeclaredField(callable.getClass(), "task");
+ if (taskField == null) {
+ log.warn("get task from FutureTask failed. class:{}", runnable.getClass().getName());
+ return null;
+ }
+ return (RequestTask) taskField.get(callable);
+ } else if (runnable instanceof FutureTaskExt) {
+ FutureTaskExt futureTaskExt = (FutureTaskExt) runnable;
+ return (RequestTask) futureTaskExt.getRunnable();
+ }
+ return null;
+ } catch (Throwable e) {
+ log.error("castRunnable exception. class:{}", runnable.getClass().getName(), e);
+ }
+
+ return null;
+ }
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/ProtocolHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/ProtocolHandler.java
new file mode 100644
index 000000000..4b1b03067
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/ProtocolHandler.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.protocol;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+
+public interface ProtocolHandler {
+
+ boolean match(ByteBuf msg);
+
+ void config(final ChannelHandlerContext ctx, final ByteBuf msg);
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/ProtocolNegotiationHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/ProtocolNegotiationHandler.java
new file mode 100644
index 000000000..da2dded5f
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/ProtocolNegotiationHandler.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.proxy.remoting.protocol;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ProtocolNegotiationHandler extends ByteToMessageDecoder {
+
+ private final List<ProtocolHandler> protocolHandlerList = new ArrayList<ProtocolHandler>();
+ private final ProtocolHandler fallbackProtocolHandler;
+
+ public ProtocolNegotiationHandler(ProtocolHandler fallbackProtocolHandler) {
+ this.fallbackProtocolHandler = fallbackProtocolHandler;
+ }
+
+ public ProtocolNegotiationHandler addProtocolHandler(ProtocolHandler protocolHandler) {
+ protocolHandlerList.add(protocolHandler);
+ return this;
+ }
+
+ @Override
+ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+ // use 4 bytes to judge protocol
+ if (in.readableBytes() < 4) {
+ return;
+ }
+
+ ProtocolHandler protocolHandler = null;
+ for (ProtocolHandler curProtocolHandler : protocolHandlerList) {
+ if (curProtocolHandler.match(in)) {
+ protocolHandler = curProtocolHandler;
+ break;
+ }
+ }
+
+ if (protocolHandler == null) {
+ protocolHandler = fallbackProtocolHandler;
+ }
+
+ protocolHandler.config(ctx, in);
+ ctx.pipeline().remove(this);
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
new file mode 100644
index 000000000..c5050cda7
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.protocol.http2proxy;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.handler.ssl.ApplicationProtocolConfig;
+import io.netty.handler.ssl.ApplicationProtocolNames;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import javax.net.ssl.SSLException;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.config.ProxyConfig;
+import org.apache.rocketmq.proxy.remoting.protocol.ProtocolHandler;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+public class Http2ProtocolProxyHandler implements ProtocolHandler {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+ private static final String LOCAL_HOST = "127.0.0.1";
+ /**
+ * The int value of "PRI ". Now use 4 bytes to judge protocol, may be has potential risks if there is a new protocol
+ * which start with "PRI " too in the future
+ * <p>
+ * The full HTTP/2 connection preface is "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
+ * <p>
+ * ref: https://datatracker.ietf.org/doc/html/rfc7540#section-3.5
+ */
+ private static final int PRI_INT = 0x50524920;
+
+ private final SslContext sslContext;
+
+ public Http2ProtocolProxyHandler() {
+ try {
+ sslContext = SslContextBuilder
+ .forClient()
+ .sslProvider(SslProvider.OPENSSL)
+ .trustManager(InsecureTrustManagerFactory.INSTANCE)
+ .applicationProtocolConfig(new ApplicationProtocolConfig(
+ ApplicationProtocolConfig.Protocol.ALPN,
+ ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
+ ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
+ ApplicationProtocolNames.HTTP_2))
+ .build();
+ } catch (SSLException e) {
+ log.error("Failed to create SSLContext for Http2ProtocolProxyHandler", e);
+ throw new RuntimeException("Failed to create SSLContext for Http2ProtocolProxyHandler", e);
+ }
+ }
+
+ @Override
+ public boolean match(ByteBuf in) {
+ if (!ConfigurationManager.getProxyConfig().isEnableRemotingLocalProxyGrpc()) {
+ return false;
+ }
+
+ // If starts with 'PRI '
+ return in.getInt(in.readerIndex()) == PRI_INT;
+ }
+
+ @Override
+ public void config(final ChannelHandlerContext ctx, final ByteBuf msg) {
+ // proxy channel to http2 server
+ final Channel inboundChannel = ctx.channel();
+
+ ProxyConfig config = ConfigurationManager.getProxyConfig();
+ // Start the connection attempt.
+ Bootstrap b = new Bootstrap();
+ b.group(inboundChannel.eventLoop())
+ .channel(ctx.channel().getClass())
+ .handler(new ChannelInitializer<Channel>() {
+ @Override
+ protected void initChannel(Channel ch) throws Exception {
+ if (sslContext != null) {
+ ch.pipeline()
+ .addLast(sslContext.newHandler(ch.alloc(), LOCAL_HOST, config.getGrpcServerPort()));
+ }
+ ch.pipeline().addLast(new Http2ProxyBackendHandler(inboundChannel));
+ }
+ })
+ .option(ChannelOption.AUTO_READ, false)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getLocalProxyConnectTimeoutMs());
+ ChannelFuture f;
+ try {
+ f = b.connect(LOCAL_HOST, config.getGrpcServerPort()).sync();
+ } catch (Exception e) {
+ log.error("connect http2 server failed. port:{}", config.getGrpcServerPort(), e);
+ inboundChannel.close();
+ return;
+ }
+
+ final Channel outboundChannel = f.channel();
+
+ ctx.pipeline().addLast(new Http2ProxyFrontendHandler(outboundChannel));
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java
new file mode 100644
index 000000000..53bddfc31
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.protocol.http2proxy;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+public class Http2ProxyBackendHandler extends ChannelInboundHandlerAdapter {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+
+ private final Channel inboundChannel;
+
+ public Http2ProxyBackendHandler(Channel inboundChannel) {
+ this.inboundChannel = inboundChannel;
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) {
+ ctx.read();
+ }
+
+ @Override
+ public void channelRead(final ChannelHandlerContext ctx, Object msg) {
+ inboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) {
+ if (future.isSuccess()) {
+ ctx.channel().read();
+ } else {
+ future.channel().close();
+ }
+ }
+ });
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) {
+ Http2ProxyFrontendHandler.closeOnFlush(inboundChannel);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ log.error("Http2ProxyBackendHandler#exceptionCaught", cause);
+ Http2ProxyFrontendHandler.closeOnFlush(ctx.channel());
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java
new file mode 100644
index 000000000..8bffdc6d0
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.protocol.http2proxy;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+public class Http2ProxyFrontendHandler extends ChannelInboundHandlerAdapter {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+ // As we use inboundChannel.eventLoop() when building the Bootstrap this does not need to be volatile as
+ // the outboundChannel will use the same EventLoop (and therefore Thread) as the inboundChannel.
+ private final Channel outboundChannel;
+
+ public Http2ProxyFrontendHandler(final Channel outboundChannel) {
+ this.outboundChannel = outboundChannel;
+ }
+
+ @Override
+ public void channelRead(final ChannelHandlerContext ctx, Object msg) {
+ if (outboundChannel.isActive()) {
+ outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) {
+ if (future.isSuccess()) {
+ // was able to flush out data, start to read the next chunk
+ ctx.channel().read();
+ } else {
+ future.channel().close();
+ }
+ }
+ });
+ }
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) {
+ if (outboundChannel != null) {
+ closeOnFlush(outboundChannel);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ log.error("Http2ProxyFrontendHandler#exceptionCaught", cause);
+ closeOnFlush(ctx.channel());
+ }
+
+ /**
+ * Closes the specified channel after all queued write requests are flushed.
+ */
+ static void closeOnFlush(Channel ch) {
+ if (ch.isActive()) {
+ ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+ }
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/remoting/RemotingProtocolHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/remoting/RemotingProtocolHandler.java
new file mode 100644
index 000000000..3e4cc7c04
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/remoting/RemotingProtocolHandler.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.protocol.remoting;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.rocketmq.proxy.remoting.protocol.ProtocolHandler;
+import org.apache.rocketmq.remoting.netty.NettyDecoder;
+import org.apache.rocketmq.remoting.netty.NettyEncoder;
+
+public class RemotingProtocolHandler implements ProtocolHandler {
+
+ private final NettyEncoder encoder;
+ private final ChannelDuplexHandler connectionManageHandler;
+ private final SimpleChannelInboundHandler serverHandler;
+
+ public RemotingProtocolHandler(NettyEncoder encoder, ChannelDuplexHandler connectionManageHandler,
+ SimpleChannelInboundHandler serverHandler) {
+ this.encoder = encoder;
+ this.connectionManageHandler = connectionManageHandler;
+ this.serverHandler = serverHandler;
+ }
+
+ @Override
+ public boolean match(ByteBuf in) {
+ return true;
+ }
+
+ @Override
+ public void config(ChannelHandlerContext ctx, ByteBuf msg) {
+ ctx.pipeline().addLast(
+ this.encoder,
+ new NettyDecoder(),
+ this.connectionManageHandler,
+ this.serverHandler
+ );
+ }
+}