You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2019/06/28 11:19:37 UTC
[rocketmq-remoting] 02/39: Remove multi-protocol support feature,
only support RemotingCommand protocol
This is an automated email from the ASF dual-hosted git repository.
yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-remoting.git
commit 449dfa8649b802b753c4033f437ca4a89aef3569
Author: yukon <yu...@apache.org>
AuthorDate: Wed May 15 16:19:07 2019 +0800
Remove multi-protocol support feature, only support RemotingCommand protocol
---
.../rocketmq/remoting/api/RemotingMarshaller.java | 3 -
.../rocketmq/remoting/api/protocol/Protocol.java | 39 ------
.../remoting/api/protocol/ProtocolFactory.java | 30 -----
.../common/RemotingCommandFactoryMeta.java | 49 --------
.../rocketmq/remoting/config/RemotingConfig.java | 14 +--
.../impl/command/RemotingCommandFactoryImpl.java | 18 +--
.../remoting/impl/netty/NettyRemotingAbstract.java | 15 +--
.../remoting/impl/netty/NettyRemotingClient.java | 36 +-----
.../remoting/impl/netty/NettyRemotingServer.java | 35 +-----
.../remoting/impl/netty/handler/Http2Handler.java | 139 ---------------------
.../impl/netty/handler/ProtocolSelector.java | 65 ----------
.../remoting/impl/protocol/Httpv2Protocol.java | 52 --------
.../impl/protocol/ProtocolFactoryImpl.java | 83 ------------
.../impl/protocol/RemotingCoreProtocol.java | 46 -------
.../remoting/impl/protocol/WebSocketProtocol.java | 38 ------
15 files changed, 21 insertions(+), 641 deletions(-)
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingMarshaller.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingMarshaller.java
index 0386a03..62c9dda 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingMarshaller.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingMarshaller.java
@@ -17,11 +17,8 @@
package org.apache.rocketmq.remoting.api;
-import org.apache.rocketmq.remoting.api.protocol.ProtocolFactory;
import org.apache.rocketmq.remoting.api.serializable.SerializerFactory;
public interface RemotingMarshaller {
- ProtocolFactory protocolFactory();
-
SerializerFactory serializerFactory();
}
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/Protocol.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/Protocol.java
deleted file mode 100644
index 5caf167..0000000
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/Protocol.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.api.protocol;
-
-import org.apache.rocketmq.remoting.api.channel.ChannelHandlerContextWrapper;
-
-public interface Protocol {
- /**
- * Minimum Viable Protocol
- */
- String MVP = "mvp";
- String HTTP2 = "http2";
- String WEBSOCKET = "websocket";
-
- byte MVP_MAGIC = 0x14;
- byte WEBSOCKET_MAGIC = 0x15;
- byte HTTP_2_MAGIC = 0x16;
-
- String name();
-
- byte type();
-
- void assembleHandler(ChannelHandlerContextWrapper ctx);
-}
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/ProtocolFactory.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/ProtocolFactory.java
deleted file mode 100644
index cf016f9..0000000
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/ProtocolFactory.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.api.protocol;
-
-public interface ProtocolFactory {
- void register(Protocol protocol);
-
- void resetAll(Protocol protocol);
-
- byte type(String protocolName);
-
- Protocol get(byte type);
-
- void clearAll();
-}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/RemotingCommandFactoryMeta.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/RemotingCommandFactoryMeta.java
deleted file mode 100644
index d5c0aaa..0000000
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/RemotingCommandFactoryMeta.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.common;
-
-import org.apache.rocketmq.remoting.api.protocol.ProtocolFactory;
-import org.apache.rocketmq.remoting.api.serializable.SerializerFactory;
-import org.apache.rocketmq.remoting.impl.protocol.Httpv2Protocol;
-import org.apache.rocketmq.remoting.impl.protocol.ProtocolFactoryImpl;
-import org.apache.rocketmq.remoting.impl.protocol.serializer.MsgPackSerializer;
-import org.apache.rocketmq.remoting.impl.protocol.serializer.SerializerFactoryImpl;
-
-public class RemotingCommandFactoryMeta {
- private final ProtocolFactory protocolFactory = new ProtocolFactoryImpl();
- private final SerializerFactory serializerFactory = new SerializerFactoryImpl();
- private byte protocolType = Httpv2Protocol.MVP_MAGIC;
- private byte serializeType = MsgPackSerializer.SERIALIZER_TYPE;
-
- public RemotingCommandFactoryMeta() {
- }
-
- public RemotingCommandFactoryMeta(String protocolName, String serializeName) {
- this.protocolType = protocolFactory.type(protocolName);
- this.serializeType = serializerFactory.type(serializeName);
- }
-
- public byte getSerializeType() {
- return serializeType;
- }
-
- public byte getProtocolType() {
- return protocolType;
- }
-
-}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
index b330041..f7a4b67 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
@@ -19,9 +19,8 @@ package org.apache.rocketmq.remoting.config;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
-import org.apache.rocketmq.remoting.api.protocol.Protocol;
import org.apache.rocketmq.remoting.impl.protocol.compression.GZipCompressor;
-import org.apache.rocketmq.remoting.impl.protocol.serializer.MsgPackSerializer;
+import org.apache.rocketmq.remoting.impl.protocol.serializer.JsonSerializer;
public class RemotingConfig extends TcpSocketConfig {
private int connectionMaxRetries = 3;
@@ -38,8 +37,7 @@ public class RemotingConfig extends TcpSocketConfig {
private int threadTaskLowWaterMark = 30000;
private int threadTaskHighWaterMark = 50000;
private int connectionRetryBackoffMillis = 3000;
- private String protocolName = Protocol.MVP;
- private String serializerName = MsgPackSerializer.SERIALIZER_NAME;
+ private String serializerName = JsonSerializer.SERIALIZER_NAME;
private String compressorName = GZipCompressor.COMPRESSOR_NAME;
private int serviceThreadBlockQueueSize = 50000;
private boolean clientNativeEpollEnable = false;
@@ -149,14 +147,6 @@ public class RemotingConfig extends TcpSocketConfig {
this.connectionRetryBackoffMillis = connectionRetryBackoffMillis;
}
- public String getProtocolName() {
- return protocolName;
- }
-
- public void setProtocolName(final String protocolName) {
- this.protocolName = protocolName;
- }
-
public String getSerializerName() {
return serializerName;
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java
index f5d2126..6e1efaa 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java
@@ -20,24 +20,28 @@ package org.apache.rocketmq.remoting.impl.command;
import org.apache.rocketmq.remoting.api.command.RemotingCommand;
import org.apache.rocketmq.remoting.api.command.RemotingCommandFactory;
import org.apache.rocketmq.remoting.api.command.TrafficType;
-import org.apache.rocketmq.remoting.common.RemotingCommandFactoryMeta;
+import org.apache.rocketmq.remoting.api.serializable.SerializerFactory;
+import org.apache.rocketmq.remoting.impl.protocol.serializer.JsonSerializer;
+import org.apache.rocketmq.remoting.impl.protocol.serializer.SerializerFactoryImpl;
public class RemotingCommandFactoryImpl implements RemotingCommandFactory {
- private RemotingCommandFactoryMeta remotingCommandFactoryMeta;
+ private final SerializerFactory serializerFactory = new SerializerFactoryImpl();
+ private byte serializeType = JsonSerializer.SERIALIZER_TYPE;
+
+ private byte PROTOCOL_MAGIC = 0x14;
public RemotingCommandFactoryImpl() {
- this(new RemotingCommandFactoryMeta());
}
- public RemotingCommandFactoryImpl(final RemotingCommandFactoryMeta remotingCommandFactoryMeta) {
- this.remotingCommandFactoryMeta = remotingCommandFactoryMeta;
+ public RemotingCommandFactoryImpl(final String serializeName) {
+ this.serializeType = serializerFactory.type(serializeName);
}
@Override
public RemotingCommand createRequest() {
RemotingCommand request = new RemotingCommandImpl();
- request.protocolType(this.remotingCommandFactoryMeta.getProtocolType());
- request.serializerType(this.remotingCommandFactoryMeta.getSerializeType());
+ request.protocolType(this.PROTOCOL_MAGIC);
+ request.serializerType(this.serializeType);
return request;
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
index 82b17f4..a4c33e1 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
@@ -48,19 +48,16 @@ import org.apache.rocketmq.remoting.api.interceptor.Interceptor;
import org.apache.rocketmq.remoting.api.interceptor.InterceptorGroup;
import org.apache.rocketmq.remoting.api.interceptor.RequestContext;
import org.apache.rocketmq.remoting.api.interceptor.ResponseContext;
-import org.apache.rocketmq.remoting.api.protocol.ProtocolFactory;
import org.apache.rocketmq.remoting.api.serializable.Serializer;
import org.apache.rocketmq.remoting.api.serializable.SerializerFactory;
import org.apache.rocketmq.remoting.common.ChannelEventListenerGroup;
import org.apache.rocketmq.remoting.common.Pair;
-import org.apache.rocketmq.remoting.common.RemotingCommandFactoryMeta;
import org.apache.rocketmq.remoting.common.ResponseResult;
import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
import org.apache.rocketmq.remoting.config.RemotingConfig;
import org.apache.rocketmq.remoting.external.ThreadUtils;
import org.apache.rocketmq.remoting.impl.channel.NettyChannelImpl;
import org.apache.rocketmq.remoting.impl.command.RemotingCommandFactoryImpl;
-import org.apache.rocketmq.remoting.impl.protocol.ProtocolFactoryImpl;
import org.apache.rocketmq.remoting.impl.protocol.serializer.SerializerFactoryImpl;
import org.apache.rocketmq.remoting.internal.UIDGenerator;
import org.jetbrains.annotations.NotNull;
@@ -69,7 +66,6 @@ import org.slf4j.LoggerFactory;
public abstract class NettyRemotingAbstract implements RemotingService {
protected static final Logger LOG = LoggerFactory.getLogger(NettyRemotingAbstract.class);
- protected final ProtocolFactory protocolFactory = new ProtocolFactoryImpl();
protected final SerializerFactory serializerFactory = new SerializerFactoryImpl();
protected final ChannelEventExecutor channelEventExecutor = new ChannelEventExecutor("ChannelEventExecutor");
private final Semaphore semaphoreOneway;
@@ -86,16 +82,12 @@ public abstract class NettyRemotingAbstract implements RemotingService {
private ChannelEventListenerGroup channelEventListenerGroup = new ChannelEventListenerGroup();
NettyRemotingAbstract(RemotingConfig clientConfig) {
- this(clientConfig, new RemotingCommandFactoryMeta());
- }
-
- NettyRemotingAbstract(RemotingConfig clientConfig, RemotingCommandFactoryMeta remotingCommandFactoryMeta) {
this.semaphoreOneway = new Semaphore(clientConfig.getClientOnewayInvokeSemaphore(), true);
this.semaphoreAsync = new Semaphore(clientConfig.getClientAsyncInvokeSemaphore(), true);
this.publicExecutor = ThreadUtils.newFixedThreadPool(
clientConfig.getClientAsyncCallbackExecutorThreads(),
10000, "Remoting-PublicExecutor", true);
- this.remotingCommandFactory = new RemotingCommandFactoryImpl(remotingCommandFactoryMeta);
+ this.remotingCommandFactory = new RemotingCommandFactoryImpl(clientConfig.getSerializerName());
}
public SerializerFactory getSerializerFactory() {
@@ -516,11 +508,6 @@ public abstract class NettyRemotingAbstract implements RemotingService {
}
@Override
- public ProtocolFactory protocolFactory() {
- return this.protocolFactory;
- }
-
- @Override
public SerializerFactory serializerFactory() {
return this.serializerFactory;
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
index 7481574..faead7f 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
@@ -33,13 +33,6 @@ import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.codec.http2.Http2SecurityUtil;
-import io.netty.handler.ssl.OpenSsl;
-import io.netty.handler.ssl.SslContext;
-import io.netty.handler.ssl.SslContextBuilder;
-import io.netty.handler.ssl.SslProvider;
-import io.netty.handler.ssl.SupportedCipherSuiteFilter;
-import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
@@ -52,21 +45,17 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import javax.net.ssl.SSLException;
import org.apache.rocketmq.remoting.api.AsyncHandler;
import org.apache.rocketmq.remoting.api.RemotingClient;
import org.apache.rocketmq.remoting.api.command.RemotingCommand;
import org.apache.rocketmq.remoting.api.command.TrafficType;
import org.apache.rocketmq.remoting.api.exception.RemoteConnectFailureException;
import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException;
-import org.apache.rocketmq.remoting.api.protocol.Protocol;
-import org.apache.rocketmq.remoting.common.RemotingCommandFactoryMeta;
import org.apache.rocketmq.remoting.config.RemotingConfig;
import org.apache.rocketmq.remoting.external.ThreadUtils;
import org.apache.rocketmq.remoting.impl.netty.handler.Decoder;
import org.apache.rocketmq.remoting.impl.netty.handler.Encoder;
import org.apache.rocketmq.remoting.impl.netty.handler.ExceptionHandler;
-import org.apache.rocketmq.remoting.impl.netty.handler.Http2Handler;
import org.apache.rocketmq.remoting.internal.JvmUtils;
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
@@ -80,10 +69,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
private final ConcurrentHashMap<String, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>();
private final Lock lockChannelTables = new ReentrantLock();
private EventExecutorGroup workerGroup;
- private SslContext sslContext;
NettyRemotingClient(final RemotingConfig clientConfig) {
- super(clientConfig, new RemotingCommandFactoryMeta(clientConfig.getProtocolName(), clientConfig.getSerializerName()));
+ super(clientConfig);
this.clientConfig = clientConfig;
if (JvmUtils.isLinux() && this.clientConfig.isClientNativeEpollEnable()) {
@@ -98,10 +86,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
this.workerGroup = new DefaultEventExecutorGroup(clientConfig.getClientWorkerThreads(),
ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", clientConfig.getClientWorkerThreads()));
-
- if (Protocol.HTTP2.equals(clientConfig.getProtocolName())) {
- buildSslContext();
- }
}
private void applyOptions(Bootstrap bootstrap) {
@@ -134,9 +118,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
- if (Protocol.HTTP2.equals(clientConfig.getProtocolName())) {
- ch.pipeline().addFirst(sslContext.newHandler(ch.alloc()), Http2Handler.newHandler(false));
- }
ch.pipeline().addLast(workerGroup, new Decoder(), new Encoder(), new IdleStateHandler(clientConfig.getConnectionChannelReaderIdleSeconds(),
clientConfig.getConnectionChannelWriterIdleSeconds(), clientConfig.getConnectionChannelIdleSeconds()),
new ClientConnectionHandler(), new EventDispatcher(), new ExceptionHandler());
@@ -148,21 +129,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
startUpHouseKeepingService();
}
- private void buildSslContext() {
- SslProvider provider = OpenSsl.isAvailable() ? SslProvider.OPENSSL : SslProvider.JDK;
- try {
- sslContext = SslContextBuilder.forClient()
- .sslProvider(provider)
- /* NOTE: the cipher filter may not include all ciphers required by the HTTP/2 specification.
- * Please refer to the HTTP/2 specification for cipher requirements. */
- .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
- .trustManager(InsecureTrustManagerFactory.INSTANCE)
- .build();
- } catch (SSLException e) {
- e.printStackTrace();
- }
- }
-
@Override
public void stop() {
// try {
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
index d875f95..0d6a2cc 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
@@ -35,13 +35,6 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.codec.http2.Http2SecurityUtil;
-import io.netty.handler.ssl.OpenSsl;
-import io.netty.handler.ssl.SslContext;
-import io.netty.handler.ssl.SslContextBuilder;
-import io.netty.handler.ssl.SslProvider;
-import io.netty.handler.ssl.SupportedCipherSuiteFilter;
-import io.netty.handler.ssl.util.SelfSignedCertificate;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
@@ -58,7 +51,8 @@ import org.apache.rocketmq.remoting.config.RemotingConfig;
import org.apache.rocketmq.remoting.external.ThreadUtils;
import org.apache.rocketmq.remoting.impl.channel.NettyChannelImpl;
import org.apache.rocketmq.remoting.impl.netty.handler.ChannelStatistics;
-import org.apache.rocketmq.remoting.impl.netty.handler.ProtocolSelector;
+import org.apache.rocketmq.remoting.impl.netty.handler.Decoder;
+import org.apache.rocketmq.remoting.impl.netty.handler.Encoder;
import org.apache.rocketmq.remoting.internal.JvmUtils;
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
@@ -71,7 +65,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
private Class<? extends ServerSocketChannel> socketChannelClass;
private int port;
- private SslContext sslContext;
NettyRemotingServer(final RemotingConfig serverConfig) {
super(serverConfig);
@@ -99,23 +92,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
this.workerGroup = new DefaultEventExecutorGroup(serverConfig.getServerWorkerThreads(),
ThreadUtils.newGenericThreadFactory("NettyWorkerThreads", serverConfig.getServerWorkerThreads()));
-
- buildHttp2SslContext();
- }
-
- private void buildHttp2SslContext() {
- try {
- SslProvider provider = OpenSsl.isAvailable() ? SslProvider.OPENSSL : SslProvider.JDK;
- SelfSignedCertificate ssc;
- //NOTE: the cipher filter may not include all ciphers required by the HTTP/2 specification.
- //Please refer to the HTTP/2 specification for cipher requirements.
- ssc = new SelfSignedCertificate();
- sslContext = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey())
- .sslProvider(provider)
- .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE).build();
- } catch (Exception e) {
- LOG.error("Can not build SSL context !", e);
- }
}
private void applyOptions(ServerBootstrap bootstrap) {
@@ -162,9 +138,10 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
ChannelPipeline cp = ch.pipeline();
cp.addLast(ChannelStatistics.NAME, new ChannelStatistics(channels));
-
- cp.addFirst(ProtocolSelector.NAME, new ProtocolSelector(sslContext));
- cp.addLast(workerGroup, new IdleStateHandler(serverConfig.getConnectionChannelReaderIdleSeconds(),
+ cp.addLast(workerGroup,
+ new Encoder(),
+ new Decoder(),
+ new IdleStateHandler(serverConfig.getConnectionChannelReaderIdleSeconds(),
serverConfig.getConnectionChannelWriterIdleSeconds(),
serverConfig.getConnectionChannelIdleSeconds()),
new ServerConnectionHandler(),
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Http2Handler.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Http2Handler.java
deleted file mode 100644
index 7cdb976..0000000
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Http2Handler.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.impl.netty.handler;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
-import io.netty.handler.codec.http2.DefaultHttp2Connection;
-import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
-import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
-import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
-import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
-import io.netty.handler.codec.http2.DefaultHttp2Headers;
-import io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder;
-import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
-import io.netty.handler.codec.http2.Http2Connection;
-import io.netty.handler.codec.http2.Http2ConnectionDecoder;
-import io.netty.handler.codec.http2.Http2ConnectionEncoder;
-import io.netty.handler.codec.http2.Http2ConnectionHandler;
-import io.netty.handler.codec.http2.Http2Exception;
-import io.netty.handler.codec.http2.Http2FrameAdapter;
-import io.netty.handler.codec.http2.Http2FrameReader;
-import io.netty.handler.codec.http2.Http2FrameWriter;
-import io.netty.handler.codec.http2.Http2Headers;
-import io.netty.handler.codec.http2.Http2HeadersDecoder;
-import io.netty.handler.codec.http2.Http2Settings;
-import io.netty.handler.codec.http2.StreamBufferingEncoder;
-
-import static io.netty.handler.codec.http.HttpResponseStatus.OK;
-import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
-
-public class Http2Handler extends Http2ConnectionHandler {
-
- private boolean isServer;
- private int lastStreamId;
-
- private Http2Handler(final Http2ConnectionDecoder decoder, final Http2ConnectionEncoder encoder,
- final Http2Settings initialSettings, final boolean isServer) {
- super(decoder, encoder, initialSettings);
- decoder.frameListener(new FrameListener());
- this.isServer = isServer;
- }
-
- public static Http2Handler newHandler(final boolean isServer) {
-
- Http2HeadersDecoder headersDecoder = new DefaultHttp2HeadersDecoder(true);
- Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder);
- Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter();
-
- Http2Connection connection = new DefaultHttp2Connection(isServer);
-
- Http2ConnectionEncoder encoder = new StreamBufferingEncoder(
- new DefaultHttp2ConnectionEncoder(connection, frameWriter));
-
- connection.local().flowController(new DefaultHttp2LocalFlowController(connection,
- DEFAULT_WINDOW_UPDATE_RATIO, true));
-
- Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
- frameReader);
-
- Http2Settings settings = new Http2Settings();
-
- if (!isServer)
- settings.pushEnabled(true);
-
- settings.initialWindowSize(1048576 * 10); //10MiB
- settings.maxConcurrentStreams(Integer.MAX_VALUE);
-
- return newHandler(decoder, encoder, settings, isServer);
- }
-
- private static Http2Handler newHandler(final Http2ConnectionDecoder decoder, final Http2ConnectionEncoder encoder,
- final Http2Settings settings, boolean isServer) {
- return new Http2Handler(decoder, encoder, settings, isServer);
- }
-
- @Override
- public void write(final ChannelHandlerContext ctx, final Object msg,
- final ChannelPromise promise) throws Exception {
- if (isServer) {
- assert msg instanceof ByteBuf;
- sendAPushPromise(ctx, lastStreamId, lastStreamId + 1, (ByteBuf) msg);
- } else {
-
- final Http2Headers headers = new DefaultHttp2Headers();
-
- try {
- long threadId = Thread.currentThread().getId();
- long streamId = (threadId % 2 == 0) ? threadId + 1 : threadId + 2;
- encoder().writeHeaders(ctx, (int) streamId, headers, 0, false, promise);
- encoder().writeData(ctx, (int) streamId, (ByteBuf) msg, 0, false, ctx.newPromise());
- ctx.flush();
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
- }
-
- private void sendAPushPromise(ChannelHandlerContext ctx, int streamId, int pushPromiseStreamId,
- ByteBuf payload) throws Http2Exception {
-
- encoder().writePushPromise(ctx, streamId, pushPromiseStreamId,
- new DefaultHttp2Headers().status(OK.codeAsText()), 0, ctx.newPromise());
-
- //Http2Stream stream = connection.local().reservePushStream(pushPromiseStreamId, connection.connectionStream());
- Http2Headers headers = new DefaultHttp2Headers();
- headers.status(OK.codeAsText());
- encoder().writeHeaders(ctx, pushPromiseStreamId, headers, 0, false, ctx.newPromise());
- encoder().writeData(ctx, pushPromiseStreamId, payload, 0, false, ctx.newPromise());
- }
-
- private class FrameListener extends Http2FrameAdapter {
- @Override
- public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
- boolean endOfStream) throws Http2Exception {
- //Http2Handler.this.onDataRead(ctx, streamId, data, endOfStream);
- data.retain();
- Http2Handler.this.lastStreamId = streamId;
- ctx.fireChannelRead(data);
- return data.readableBytes() + padding;
- }
- }
-}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ProtocolSelector.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ProtocolSelector.java
deleted file mode 100644
index e00a213..0000000
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ProtocolSelector.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.impl.netty.handler;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.handler.ssl.SslContext;
-import org.apache.rocketmq.remoting.api.channel.ChannelHandlerContextWrapper;
-import org.apache.rocketmq.remoting.api.protocol.Protocol;
-import org.apache.rocketmq.remoting.api.protocol.ProtocolFactory;
-import org.apache.rocketmq.remoting.impl.channel.ChannelHandlerContextWrapperImpl;
-import org.apache.rocketmq.remoting.impl.protocol.ProtocolFactoryImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ProtocolSelector extends SimpleChannelInboundHandler<ByteBuf> {
- public static final String NAME = ProtocolSelector.class.getSimpleName();
- private static final Logger LOG = LoggerFactory.getLogger(ProtocolSelector.class);
- private ProtocolFactory protocolFactory;
-
- public ProtocolSelector(final SslContext sslContext) {
- this.protocolFactory = new ProtocolFactoryImpl(sslContext);
- }
-
- @Override
- protected void channelRead0(final ChannelHandlerContext ctx, final ByteBuf msg) throws Exception {
- if (msg.readableBytes() < 1) {
- return;
- }
- msg.markReaderIndex();
- Protocol protocol = protocolFactory.get(msg.readByte());
- if (protocol == null) {
- ctx.channel().close().addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- LOG.warn("Close channel {},result is {}", ctx.channel(), future.isSuccess());
- }
- });
- return;
- }
- ChannelHandlerContextWrapper chcw = new ChannelHandlerContextWrapperImpl(ctx);
- protocol.assembleHandler(chcw);
- msg.resetReaderIndex();
- ctx.pipeline().remove(this);
- ctx.fireChannelRead(msg.retain());
- }
-}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/Httpv2Protocol.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/Httpv2Protocol.java
deleted file mode 100644
index 1dc371d..0000000
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/Httpv2Protocol.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.impl.protocol;
-
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.ssl.SslContext;
-import org.apache.rocketmq.remoting.api.channel.ChannelHandlerContextWrapper;
-import org.apache.rocketmq.remoting.impl.netty.handler.Http2Handler;
-import org.apache.rocketmq.remoting.impl.netty.handler.ProtocolSelector;
-
-public class Httpv2Protocol extends RemotingCoreProtocol {
- private SslContext sslContext;
-
- public Httpv2Protocol(final SslContext sslContext) {
- this.sslContext = sslContext;
- }
-
- @Override
- public String name() {
- return HTTP2;
- }
-
- @Override
- public byte type() {
- return HTTP_2_MAGIC;
- }
-
- @Override
- public void assembleHandler(final ChannelHandlerContextWrapper ctx) {
- super.assembleHandler(ctx);
- ChannelHandlerContext chx = (ChannelHandlerContext) ctx.getContext();
-
- chx.pipeline().addAfter(ProtocolSelector.NAME, "sslHandler", sslContext.newHandler(chx.alloc()));
- chx.pipeline().addAfter("sslHandler", "http2Handler", Http2Handler.newHandler(true));
- }
-
-}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/ProtocolFactoryImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/ProtocolFactoryImpl.java
deleted file mode 100644
index 15322be..0000000
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/ProtocolFactoryImpl.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.impl.protocol;
-
-import io.netty.handler.ssl.SslContext;
-import org.apache.rocketmq.remoting.api.protocol.Protocol;
-import org.apache.rocketmq.remoting.api.protocol.ProtocolFactory;
-
-public class ProtocolFactoryImpl implements ProtocolFactory {
- private static final int MAX_COUNT = 0x0FF;
- private final Protocol[] tables = new Protocol[MAX_COUNT];
-
- private SslContext sslContext;
-
- public ProtocolFactoryImpl(final SslContext sslContext) {
- this.sslContext = sslContext;
- this.register(new RemotingCoreProtocol());
- this.register(new Httpv2Protocol(sslContext));
- this.register(new WebSocketProtocol());
- }
-
- public ProtocolFactoryImpl() {
- this.register(new RemotingCoreProtocol());
- this.register(new Httpv2Protocol(sslContext));
- this.register(new WebSocketProtocol());
- }
-
- @Override
- public void register(Protocol protocol) {
- if (tables[protocol.type() & MAX_COUNT] != null) {
- throw new RuntimeException("protocol header's sign is overlapped");
- }
- tables[protocol.type() & MAX_COUNT] = protocol;
- }
-
- @Override
- public void resetAll(final Protocol protocol) {
- for (int i = 0; i < MAX_COUNT; i++) {
- tables[i] = protocol;
- }
- }
-
- @Override
- public byte type(final String protocolName) {
-
- for (int i = 0; i < this.tables.length; i++) {
- if (this.tables[i] != null) {
- if (this.tables[i].name().equalsIgnoreCase(protocolName)) {
- return this.tables[i].type();
- }
- }
- }
-
- throw new IllegalArgumentException(String.format("the protocol: %s not exist", protocolName));
- }
-
- @Override
- public Protocol get(byte type) {
- return tables[type & MAX_COUNT];
- }
-
- @Override
- public void clearAll() {
- for (int i = 0; i < this.tables.length; i++) {
- this.tables[i] = null;
- }
- }
-}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/RemotingCoreProtocol.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/RemotingCoreProtocol.java
deleted file mode 100644
index 317b24f..0000000
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/RemotingCoreProtocol.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.impl.protocol;
-
-import io.netty.channel.ChannelHandlerContext;
-import org.apache.rocketmq.remoting.api.channel.ChannelHandlerContextWrapper;
-import org.apache.rocketmq.remoting.api.protocol.Protocol;
-import org.apache.rocketmq.remoting.impl.netty.handler.Decoder;
-import org.apache.rocketmq.remoting.impl.netty.handler.Encoder;
-import org.apache.rocketmq.remoting.impl.netty.handler.ProtocolSelector;
-
-public class RemotingCoreProtocol implements Protocol {
- @Override
- public String name() {
- return MVP;
- }
-
- @Override
- public byte type() {
- return MVP_MAGIC;
- }
-
- @Override
- public void assembleHandler(final ChannelHandlerContextWrapper ctx) {
-
- ChannelHandlerContext chx = (ChannelHandlerContext) ctx.getContext();
-
- chx.pipeline().addAfter(ProtocolSelector.NAME, "decoder", new Decoder());
- chx.pipeline().addAfter("decoder", "encoder", new Encoder());
- }
-}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/WebSocketProtocol.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/WebSocketProtocol.java
deleted file mode 100644
index 18a3a11..0000000
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/WebSocketProtocol.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.impl.protocol;
-
-import org.apache.rocketmq.remoting.api.channel.ChannelHandlerContextWrapper;
-import org.apache.rocketmq.remoting.api.protocol.Protocol;
-
-public class WebSocketProtocol implements Protocol {
- @Override
- public String name() {
- return WEBSOCKET;
- }
-
- @Override
- public byte type() {
- return WEBSOCKET_MAGIC;
- }
-
- @Override
- public void assembleHandler(final ChannelHandlerContextWrapper ctx) {
-
- }
-}