You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2019/06/28 11:19:37 UTC

[rocketmq-remoting] 02/39: Remove multi-protocol support feature, only support RemotingCommand protocol

This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-remoting.git

commit 449dfa8649b802b753c4033f437ca4a89aef3569
Author: yukon <yu...@apache.org>
AuthorDate: Wed May 15 16:19:07 2019 +0800

    Remove multi-protocol support feature, only support RemotingCommand protocol
---
 .../rocketmq/remoting/api/RemotingMarshaller.java  |   3 -
 .../rocketmq/remoting/api/protocol/Protocol.java   |  39 ------
 .../remoting/api/protocol/ProtocolFactory.java     |  30 -----
 .../common/RemotingCommandFactoryMeta.java         |  49 --------
 .../rocketmq/remoting/config/RemotingConfig.java   |  14 +--
 .../impl/command/RemotingCommandFactoryImpl.java   |  18 +--
 .../remoting/impl/netty/NettyRemotingAbstract.java |  15 +--
 .../remoting/impl/netty/NettyRemotingClient.java   |  36 +-----
 .../remoting/impl/netty/NettyRemotingServer.java   |  35 +-----
 .../remoting/impl/netty/handler/Http2Handler.java  | 139 ---------------------
 .../impl/netty/handler/ProtocolSelector.java       |  65 ----------
 .../remoting/impl/protocol/Httpv2Protocol.java     |  52 --------
 .../impl/protocol/ProtocolFactoryImpl.java         |  83 ------------
 .../impl/protocol/RemotingCoreProtocol.java        |  46 -------
 .../remoting/impl/protocol/WebSocketProtocol.java  |  38 ------
 15 files changed, 21 insertions(+), 641 deletions(-)

diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingMarshaller.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingMarshaller.java
index 0386a03..62c9dda 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingMarshaller.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingMarshaller.java
@@ -17,11 +17,8 @@
 
 package org.apache.rocketmq.remoting.api;
 
-import org.apache.rocketmq.remoting.api.protocol.ProtocolFactory;
 import org.apache.rocketmq.remoting.api.serializable.SerializerFactory;
 
 public interface RemotingMarshaller {
-    ProtocolFactory protocolFactory();
-
     SerializerFactory serializerFactory();
 }
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/Protocol.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/Protocol.java
deleted file mode 100644
index 5caf167..0000000
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/Protocol.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.api.protocol;
-
-import org.apache.rocketmq.remoting.api.channel.ChannelHandlerContextWrapper;
-
-public interface Protocol {
-    /**
-     * Minimum Viable Protocol
-     */
-    String MVP = "mvp";
-    String HTTP2 = "http2";
-    String WEBSOCKET = "websocket";
-
-    byte MVP_MAGIC = 0x14;
-    byte WEBSOCKET_MAGIC = 0x15;
-    byte HTTP_2_MAGIC = 0x16;
-
-    String name();
-
-    byte type();
-
-    void assembleHandler(ChannelHandlerContextWrapper ctx);
-}
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/ProtocolFactory.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/ProtocolFactory.java
deleted file mode 100644
index cf016f9..0000000
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/ProtocolFactory.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.api.protocol;
-
-public interface ProtocolFactory {
-    void register(Protocol protocol);
-
-    void resetAll(Protocol protocol);
-
-    byte type(String protocolName);
-
-    Protocol get(byte type);
-
-    void clearAll();
-}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/RemotingCommandFactoryMeta.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/RemotingCommandFactoryMeta.java
deleted file mode 100644
index d5c0aaa..0000000
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/RemotingCommandFactoryMeta.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.common;
-
-import org.apache.rocketmq.remoting.api.protocol.ProtocolFactory;
-import org.apache.rocketmq.remoting.api.serializable.SerializerFactory;
-import org.apache.rocketmq.remoting.impl.protocol.Httpv2Protocol;
-import org.apache.rocketmq.remoting.impl.protocol.ProtocolFactoryImpl;
-import org.apache.rocketmq.remoting.impl.protocol.serializer.MsgPackSerializer;
-import org.apache.rocketmq.remoting.impl.protocol.serializer.SerializerFactoryImpl;
-
-public class RemotingCommandFactoryMeta {
-    private final ProtocolFactory protocolFactory = new ProtocolFactoryImpl();
-    private final SerializerFactory serializerFactory = new SerializerFactoryImpl();
-    private byte protocolType = Httpv2Protocol.MVP_MAGIC;
-    private byte serializeType = MsgPackSerializer.SERIALIZER_TYPE;
-
-    public RemotingCommandFactoryMeta() {
-    }
-
-    public RemotingCommandFactoryMeta(String protocolName, String serializeName) {
-        this.protocolType = protocolFactory.type(protocolName);
-        this.serializeType = serializerFactory.type(serializeName);
-    }
-
-    public byte getSerializeType() {
-        return serializeType;
-    }
-
-    public byte getProtocolType() {
-        return protocolType;
-    }
-
-}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
index b330041..f7a4b67 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
@@ -19,9 +19,8 @@ package org.apache.rocketmq.remoting.config;
 
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
-import org.apache.rocketmq.remoting.api.protocol.Protocol;
 import org.apache.rocketmq.remoting.impl.protocol.compression.GZipCompressor;
-import org.apache.rocketmq.remoting.impl.protocol.serializer.MsgPackSerializer;
+import org.apache.rocketmq.remoting.impl.protocol.serializer.JsonSerializer;
 
 public class RemotingConfig extends TcpSocketConfig {
     private int connectionMaxRetries = 3;
@@ -38,8 +37,7 @@ public class RemotingConfig extends TcpSocketConfig {
     private int threadTaskLowWaterMark = 30000;
     private int threadTaskHighWaterMark = 50000;
     private int connectionRetryBackoffMillis = 3000;
-    private String protocolName = Protocol.MVP;
-    private String serializerName = MsgPackSerializer.SERIALIZER_NAME;
+    private String serializerName = JsonSerializer.SERIALIZER_NAME;
     private String compressorName = GZipCompressor.COMPRESSOR_NAME;
     private int serviceThreadBlockQueueSize = 50000;
     private boolean clientNativeEpollEnable = false;
@@ -149,14 +147,6 @@ public class RemotingConfig extends TcpSocketConfig {
         this.connectionRetryBackoffMillis = connectionRetryBackoffMillis;
     }
 
-    public String getProtocolName() {
-        return protocolName;
-    }
-
-    public void setProtocolName(final String protocolName) {
-        this.protocolName = protocolName;
-    }
-
     public String getSerializerName() {
         return serializerName;
     }
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java
index f5d2126..6e1efaa 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java
@@ -20,24 +20,28 @@ package org.apache.rocketmq.remoting.impl.command;
 import org.apache.rocketmq.remoting.api.command.RemotingCommand;
 import org.apache.rocketmq.remoting.api.command.RemotingCommandFactory;
 import org.apache.rocketmq.remoting.api.command.TrafficType;
-import org.apache.rocketmq.remoting.common.RemotingCommandFactoryMeta;
+import org.apache.rocketmq.remoting.api.serializable.SerializerFactory;
+import org.apache.rocketmq.remoting.impl.protocol.serializer.JsonSerializer;
+import org.apache.rocketmq.remoting.impl.protocol.serializer.SerializerFactoryImpl;
 
 public class RemotingCommandFactoryImpl implements RemotingCommandFactory {
-    private RemotingCommandFactoryMeta remotingCommandFactoryMeta;
+    private final SerializerFactory serializerFactory = new SerializerFactoryImpl();
+    private byte serializeType = JsonSerializer.SERIALIZER_TYPE;
+
+    private byte PROTOCOL_MAGIC = 0x14;
 
     public RemotingCommandFactoryImpl() {
-        this(new RemotingCommandFactoryMeta());
     }
 
-    public RemotingCommandFactoryImpl(final RemotingCommandFactoryMeta remotingCommandFactoryMeta) {
-        this.remotingCommandFactoryMeta = remotingCommandFactoryMeta;
+    public RemotingCommandFactoryImpl(final String serializeName) {
+        this.serializeType = serializerFactory.type(serializeName);
     }
 
     @Override
     public RemotingCommand createRequest() {
         RemotingCommand request = new RemotingCommandImpl();
-        request.protocolType(this.remotingCommandFactoryMeta.getProtocolType());
-        request.serializerType(this.remotingCommandFactoryMeta.getSerializeType());
+        request.protocolType(this.PROTOCOL_MAGIC);
+        request.serializerType(this.serializeType);
         return request;
     }
 
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
index 82b17f4..a4c33e1 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
@@ -48,19 +48,16 @@ import org.apache.rocketmq.remoting.api.interceptor.Interceptor;
 import org.apache.rocketmq.remoting.api.interceptor.InterceptorGroup;
 import org.apache.rocketmq.remoting.api.interceptor.RequestContext;
 import org.apache.rocketmq.remoting.api.interceptor.ResponseContext;
-import org.apache.rocketmq.remoting.api.protocol.ProtocolFactory;
 import org.apache.rocketmq.remoting.api.serializable.Serializer;
 import org.apache.rocketmq.remoting.api.serializable.SerializerFactory;
 import org.apache.rocketmq.remoting.common.ChannelEventListenerGroup;
 import org.apache.rocketmq.remoting.common.Pair;
-import org.apache.rocketmq.remoting.common.RemotingCommandFactoryMeta;
 import org.apache.rocketmq.remoting.common.ResponseResult;
 import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
 import org.apache.rocketmq.remoting.config.RemotingConfig;
 import org.apache.rocketmq.remoting.external.ThreadUtils;
 import org.apache.rocketmq.remoting.impl.channel.NettyChannelImpl;
 import org.apache.rocketmq.remoting.impl.command.RemotingCommandFactoryImpl;
-import org.apache.rocketmq.remoting.impl.protocol.ProtocolFactoryImpl;
 import org.apache.rocketmq.remoting.impl.protocol.serializer.SerializerFactoryImpl;
 import org.apache.rocketmq.remoting.internal.UIDGenerator;
 import org.jetbrains.annotations.NotNull;
@@ -69,7 +66,6 @@ import org.slf4j.LoggerFactory;
 
 public abstract class NettyRemotingAbstract implements RemotingService {
     protected static final Logger LOG = LoggerFactory.getLogger(NettyRemotingAbstract.class);
-    protected final ProtocolFactory protocolFactory = new ProtocolFactoryImpl();
     protected final SerializerFactory serializerFactory = new SerializerFactoryImpl();
     protected final ChannelEventExecutor channelEventExecutor = new ChannelEventExecutor("ChannelEventExecutor");
     private final Semaphore semaphoreOneway;
@@ -86,16 +82,12 @@ public abstract class NettyRemotingAbstract implements RemotingService {
     private ChannelEventListenerGroup channelEventListenerGroup = new ChannelEventListenerGroup();
 
     NettyRemotingAbstract(RemotingConfig clientConfig) {
-        this(clientConfig, new RemotingCommandFactoryMeta());
-    }
-
-    NettyRemotingAbstract(RemotingConfig clientConfig, RemotingCommandFactoryMeta remotingCommandFactoryMeta) {
         this.semaphoreOneway = new Semaphore(clientConfig.getClientOnewayInvokeSemaphore(), true);
         this.semaphoreAsync = new Semaphore(clientConfig.getClientAsyncInvokeSemaphore(), true);
         this.publicExecutor = ThreadUtils.newFixedThreadPool(
             clientConfig.getClientAsyncCallbackExecutorThreads(),
             10000, "Remoting-PublicExecutor", true);
-        this.remotingCommandFactory = new RemotingCommandFactoryImpl(remotingCommandFactoryMeta);
+        this.remotingCommandFactory = new RemotingCommandFactoryImpl(clientConfig.getSerializerName());
     }
 
     public SerializerFactory getSerializerFactory() {
@@ -516,11 +508,6 @@ public abstract class NettyRemotingAbstract implements RemotingService {
     }
 
     @Override
-    public ProtocolFactory protocolFactory() {
-        return this.protocolFactory;
-    }
-
-    @Override
     public SerializerFactory serializerFactory() {
         return this.serializerFactory;
     }
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
index 7481574..faead7f 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
@@ -33,13 +33,6 @@ import io.netty.channel.epoll.EpollSocketChannel;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.codec.http2.Http2SecurityUtil;
-import io.netty.handler.ssl.OpenSsl;
-import io.netty.handler.ssl.SslContext;
-import io.netty.handler.ssl.SslContextBuilder;
-import io.netty.handler.ssl.SslProvider;
-import io.netty.handler.ssl.SupportedCipherSuiteFilter;
-import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
 import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.handler.timeout.IdleStateHandler;
@@ -52,21 +45,17 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import javax.net.ssl.SSLException;
 import org.apache.rocketmq.remoting.api.AsyncHandler;
 import org.apache.rocketmq.remoting.api.RemotingClient;
 import org.apache.rocketmq.remoting.api.command.RemotingCommand;
 import org.apache.rocketmq.remoting.api.command.TrafficType;
 import org.apache.rocketmq.remoting.api.exception.RemoteConnectFailureException;
 import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException;
-import org.apache.rocketmq.remoting.api.protocol.Protocol;
-import org.apache.rocketmq.remoting.common.RemotingCommandFactoryMeta;
 import org.apache.rocketmq.remoting.config.RemotingConfig;
 import org.apache.rocketmq.remoting.external.ThreadUtils;
 import org.apache.rocketmq.remoting.impl.netty.handler.Decoder;
 import org.apache.rocketmq.remoting.impl.netty.handler.Encoder;
 import org.apache.rocketmq.remoting.impl.netty.handler.ExceptionHandler;
-import org.apache.rocketmq.remoting.impl.netty.handler.Http2Handler;
 import org.apache.rocketmq.remoting.internal.JvmUtils;
 
 public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
@@ -80,10 +69,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
     private final ConcurrentHashMap<String, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>();
     private final Lock lockChannelTables = new ReentrantLock();
     private EventExecutorGroup workerGroup;
-    private SslContext sslContext;
 
     NettyRemotingClient(final RemotingConfig clientConfig) {
-        super(clientConfig, new RemotingCommandFactoryMeta(clientConfig.getProtocolName(), clientConfig.getSerializerName()));
+        super(clientConfig);
         this.clientConfig = clientConfig;
 
         if (JvmUtils.isLinux() && this.clientConfig.isClientNativeEpollEnable()) {
@@ -98,10 +86,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
 
         this.workerGroup = new DefaultEventExecutorGroup(clientConfig.getClientWorkerThreads(),
             ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", clientConfig.getClientWorkerThreads()));
-
-        if (Protocol.HTTP2.equals(clientConfig.getProtocolName())) {
-            buildSslContext();
-        }
     }
 
     private void applyOptions(Bootstrap bootstrap) {
@@ -134,9 +118,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
-                    if (Protocol.HTTP2.equals(clientConfig.getProtocolName())) {
-                        ch.pipeline().addFirst(sslContext.newHandler(ch.alloc()), Http2Handler.newHandler(false));
-                    }
                     ch.pipeline().addLast(workerGroup, new Decoder(), new Encoder(), new IdleStateHandler(clientConfig.getConnectionChannelReaderIdleSeconds(),
                             clientConfig.getConnectionChannelWriterIdleSeconds(), clientConfig.getConnectionChannelIdleSeconds()),
                         new ClientConnectionHandler(), new EventDispatcher(), new ExceptionHandler());
@@ -148,21 +129,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
         startUpHouseKeepingService();
     }
 
-    private void buildSslContext() {
-        SslProvider provider = OpenSsl.isAvailable() ? SslProvider.OPENSSL : SslProvider.JDK;
-        try {
-            sslContext = SslContextBuilder.forClient()
-                .sslProvider(provider)
-                    /* NOTE: the cipher filter may not include all ciphers required by the HTTP/2 specification.
-                     * Please refer to the HTTP/2 specification for cipher requirements. */
-                .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
-                .trustManager(InsecureTrustManagerFactory.INSTANCE)
-                .build();
-        } catch (SSLException e) {
-            e.printStackTrace();
-        }
-    }
-
     @Override
     public void stop() {
         // try {
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
index d875f95..0d6a2cc 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
@@ -35,13 +35,6 @@ import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.ServerSocketChannel;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.codec.http2.Http2SecurityUtil;
-import io.netty.handler.ssl.OpenSsl;
-import io.netty.handler.ssl.SslContext;
-import io.netty.handler.ssl.SslContextBuilder;
-import io.netty.handler.ssl.SslProvider;
-import io.netty.handler.ssl.SupportedCipherSuiteFilter;
-import io.netty.handler.ssl.util.SelfSignedCertificate;
 import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.handler.timeout.IdleStateHandler;
@@ -58,7 +51,8 @@ import org.apache.rocketmq.remoting.config.RemotingConfig;
 import org.apache.rocketmq.remoting.external.ThreadUtils;
 import org.apache.rocketmq.remoting.impl.channel.NettyChannelImpl;
 import org.apache.rocketmq.remoting.impl.netty.handler.ChannelStatistics;
-import org.apache.rocketmq.remoting.impl.netty.handler.ProtocolSelector;
+import org.apache.rocketmq.remoting.impl.netty.handler.Decoder;
+import org.apache.rocketmq.remoting.impl.netty.handler.Encoder;
 import org.apache.rocketmq.remoting.internal.JvmUtils;
 
 public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
@@ -71,7 +65,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
     private Class<? extends ServerSocketChannel> socketChannelClass;
 
     private int port;
-    private SslContext sslContext;
 
     NettyRemotingServer(final RemotingConfig serverConfig) {
         super(serverConfig);
@@ -99,23 +92,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
 
         this.workerGroup = new DefaultEventExecutorGroup(serverConfig.getServerWorkerThreads(),
             ThreadUtils.newGenericThreadFactory("NettyWorkerThreads", serverConfig.getServerWorkerThreads()));
-
-        buildHttp2SslContext();
-    }
-
-    private void buildHttp2SslContext() {
-        try {
-            SslProvider provider = OpenSsl.isAvailable() ? SslProvider.OPENSSL : SslProvider.JDK;
-            SelfSignedCertificate ssc;
-            //NOTE: the cipher filter may not include all ciphers required by the HTTP/2 specification.
-            //Please refer to the HTTP/2 specification for cipher requirements.
-            ssc = new SelfSignedCertificate();
-            sslContext = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey())
-                .sslProvider(provider)
-                .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE).build();
-        } catch (Exception e) {
-            LOG.error("Can not build SSL context !", e);
-        }
     }
 
     private void applyOptions(ServerBootstrap bootstrap) {
@@ -162,9 +138,10 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
                 ChannelPipeline cp = ch.pipeline();
 
                 cp.addLast(ChannelStatistics.NAME, new ChannelStatistics(channels));
-
-                cp.addFirst(ProtocolSelector.NAME, new ProtocolSelector(sslContext));
-                cp.addLast(workerGroup, new IdleStateHandler(serverConfig.getConnectionChannelReaderIdleSeconds(),
+                cp.addLast(workerGroup,
+                    new Encoder(),
+                    new Decoder(),
+                    new IdleStateHandler(serverConfig.getConnectionChannelReaderIdleSeconds(),
                         serverConfig.getConnectionChannelWriterIdleSeconds(),
                         serverConfig.getConnectionChannelIdleSeconds()),
                     new ServerConnectionHandler(),
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Http2Handler.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Http2Handler.java
deleted file mode 100644
index 7cdb976..0000000
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Http2Handler.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.impl.netty.handler;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
-import io.netty.handler.codec.http2.DefaultHttp2Connection;
-import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
-import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
-import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
-import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
-import io.netty.handler.codec.http2.DefaultHttp2Headers;
-import io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder;
-import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
-import io.netty.handler.codec.http2.Http2Connection;
-import io.netty.handler.codec.http2.Http2ConnectionDecoder;
-import io.netty.handler.codec.http2.Http2ConnectionEncoder;
-import io.netty.handler.codec.http2.Http2ConnectionHandler;
-import io.netty.handler.codec.http2.Http2Exception;
-import io.netty.handler.codec.http2.Http2FrameAdapter;
-import io.netty.handler.codec.http2.Http2FrameReader;
-import io.netty.handler.codec.http2.Http2FrameWriter;
-import io.netty.handler.codec.http2.Http2Headers;
-import io.netty.handler.codec.http2.Http2HeadersDecoder;
-import io.netty.handler.codec.http2.Http2Settings;
-import io.netty.handler.codec.http2.StreamBufferingEncoder;
-
-import static io.netty.handler.codec.http.HttpResponseStatus.OK;
-import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
-
-public class Http2Handler extends Http2ConnectionHandler {
-
-    private boolean isServer;
-    private int lastStreamId;
-
-    private Http2Handler(final Http2ConnectionDecoder decoder, final Http2ConnectionEncoder encoder,
-        final Http2Settings initialSettings, final boolean isServer) {
-        super(decoder, encoder, initialSettings);
-        decoder.frameListener(new FrameListener());
-        this.isServer = isServer;
-    }
-
-    public static Http2Handler newHandler(final boolean isServer) {
-
-        Http2HeadersDecoder headersDecoder = new DefaultHttp2HeadersDecoder(true);
-        Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder);
-        Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter();
-
-        Http2Connection connection = new DefaultHttp2Connection(isServer);
-
-        Http2ConnectionEncoder encoder = new StreamBufferingEncoder(
-            new DefaultHttp2ConnectionEncoder(connection, frameWriter));
-
-        connection.local().flowController(new DefaultHttp2LocalFlowController(connection,
-            DEFAULT_WINDOW_UPDATE_RATIO, true));
-
-        Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
-            frameReader);
-
-        Http2Settings settings = new Http2Settings();
-
-        if (!isServer)
-            settings.pushEnabled(true);
-
-        settings.initialWindowSize(1048576 * 10); //10MiB
-        settings.maxConcurrentStreams(Integer.MAX_VALUE);
-
-        return newHandler(decoder, encoder, settings, isServer);
-    }
-
-    private static Http2Handler newHandler(final Http2ConnectionDecoder decoder, final Http2ConnectionEncoder encoder,
-        final Http2Settings settings, boolean isServer) {
-        return new Http2Handler(decoder, encoder, settings, isServer);
-    }
-
-    @Override
-    public void write(final ChannelHandlerContext ctx, final Object msg,
-        final ChannelPromise promise) throws Exception {
-        if (isServer) {
-            assert msg instanceof ByteBuf;
-            sendAPushPromise(ctx, lastStreamId, lastStreamId + 1, (ByteBuf) msg);
-        } else {
-
-            final Http2Headers headers = new DefaultHttp2Headers();
-
-            try {
-                long threadId = Thread.currentThread().getId();
-                long streamId = (threadId % 2 == 0) ? threadId + 1 : threadId + 2;
-                encoder().writeHeaders(ctx, (int) streamId, headers, 0, false, promise);
-                encoder().writeData(ctx, (int) streamId, (ByteBuf) msg, 0, false, ctx.newPromise());
-                ctx.flush();
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-
-        }
-    }
-
-    private void sendAPushPromise(ChannelHandlerContext ctx, int streamId, int pushPromiseStreamId,
-        ByteBuf payload) throws Http2Exception {
-
-        encoder().writePushPromise(ctx, streamId, pushPromiseStreamId,
-            new DefaultHttp2Headers().status(OK.codeAsText()), 0, ctx.newPromise());
-
-        //Http2Stream stream = connection.local().reservePushStream(pushPromiseStreamId, connection.connectionStream());
-        Http2Headers headers = new DefaultHttp2Headers();
-        headers.status(OK.codeAsText());
-        encoder().writeHeaders(ctx, pushPromiseStreamId, headers, 0, false, ctx.newPromise());
-        encoder().writeData(ctx, pushPromiseStreamId, payload, 0, false, ctx.newPromise());
-    }
-
-    private class FrameListener extends Http2FrameAdapter {
-        @Override
-        public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
-            boolean endOfStream) throws Http2Exception {
-            //Http2Handler.this.onDataRead(ctx, streamId, data, endOfStream);
-            data.retain();
-            Http2Handler.this.lastStreamId = streamId;
-            ctx.fireChannelRead(data);
-            return data.readableBytes() + padding;
-        }
-    }
-}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ProtocolSelector.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ProtocolSelector.java
deleted file mode 100644
index e00a213..0000000
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ProtocolSelector.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.impl.netty.handler;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.handler.ssl.SslContext;
-import org.apache.rocketmq.remoting.api.channel.ChannelHandlerContextWrapper;
-import org.apache.rocketmq.remoting.api.protocol.Protocol;
-import org.apache.rocketmq.remoting.api.protocol.ProtocolFactory;
-import org.apache.rocketmq.remoting.impl.channel.ChannelHandlerContextWrapperImpl;
-import org.apache.rocketmq.remoting.impl.protocol.ProtocolFactoryImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ProtocolSelector extends SimpleChannelInboundHandler<ByteBuf> {
-    public static final String NAME = ProtocolSelector.class.getSimpleName();
-    private static final Logger LOG = LoggerFactory.getLogger(ProtocolSelector.class);
-    private ProtocolFactory protocolFactory;
-
-    public ProtocolSelector(final SslContext sslContext) {
-        this.protocolFactory = new ProtocolFactoryImpl(sslContext);
-    }
-
-    @Override
-    protected void channelRead0(final ChannelHandlerContext ctx, final ByteBuf msg) throws Exception {
-        if (msg.readableBytes() < 1) {
-            return;
-        }
-        msg.markReaderIndex();
-        Protocol protocol = protocolFactory.get(msg.readByte());
-        if (protocol == null) {
-            ctx.channel().close().addListener(new ChannelFutureListener() {
-                @Override
-                public void operationComplete(ChannelFuture future) throws Exception {
-                    LOG.warn("Close channel {},result is {}", ctx.channel(), future.isSuccess());
-                }
-            });
-            return;
-        }
-        ChannelHandlerContextWrapper chcw = new ChannelHandlerContextWrapperImpl(ctx);
-        protocol.assembleHandler(chcw);
-        msg.resetReaderIndex();
-        ctx.pipeline().remove(this);
-        ctx.fireChannelRead(msg.retain());
-    }
-}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/Httpv2Protocol.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/Httpv2Protocol.java
deleted file mode 100644
index 1dc371d..0000000
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/Httpv2Protocol.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.impl.protocol;
-
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.ssl.SslContext;
-import org.apache.rocketmq.remoting.api.channel.ChannelHandlerContextWrapper;
-import org.apache.rocketmq.remoting.impl.netty.handler.Http2Handler;
-import org.apache.rocketmq.remoting.impl.netty.handler.ProtocolSelector;
-
-public class Httpv2Protocol extends RemotingCoreProtocol {
-    private SslContext sslContext;
-
-    public Httpv2Protocol(final SslContext sslContext) {
-        this.sslContext = sslContext;
-    }
-
-    @Override
-    public String name() {
-        return HTTP2;
-    }
-
-    @Override
-    public byte type() {
-        return HTTP_2_MAGIC;
-    }
-
-    @Override
-    public void assembleHandler(final ChannelHandlerContextWrapper ctx) {
-        super.assembleHandler(ctx);
-        ChannelHandlerContext chx = (ChannelHandlerContext) ctx.getContext();
-
-        chx.pipeline().addAfter(ProtocolSelector.NAME, "sslHandler", sslContext.newHandler(chx.alloc()));
-        chx.pipeline().addAfter("sslHandler", "http2Handler", Http2Handler.newHandler(true));
-    }
-
-}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/ProtocolFactoryImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/ProtocolFactoryImpl.java
deleted file mode 100644
index 15322be..0000000
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/ProtocolFactoryImpl.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.impl.protocol;
-
-import io.netty.handler.ssl.SslContext;
-import org.apache.rocketmq.remoting.api.protocol.Protocol;
-import org.apache.rocketmq.remoting.api.protocol.ProtocolFactory;
-
-public class ProtocolFactoryImpl implements ProtocolFactory {
-    private static final int MAX_COUNT = 0x0FF;
-    private final Protocol[] tables = new Protocol[MAX_COUNT];
-
-    private SslContext sslContext;
-
-    public ProtocolFactoryImpl(final SslContext sslContext) {
-        this.sslContext = sslContext;
-        this.register(new RemotingCoreProtocol());
-        this.register(new Httpv2Protocol(sslContext));
-        this.register(new WebSocketProtocol());
-    }
-
-    public ProtocolFactoryImpl() {
-        this.register(new RemotingCoreProtocol());
-        this.register(new Httpv2Protocol(sslContext));
-        this.register(new WebSocketProtocol());
-    }
-
-    @Override
-    public void register(Protocol protocol) {
-        if (tables[protocol.type() & MAX_COUNT] != null) {
-            throw new RuntimeException("protocol header's sign is overlapped");
-        }
-        tables[protocol.type() & MAX_COUNT] = protocol;
-    }
-
-    @Override
-    public void resetAll(final Protocol protocol) {
-        for (int i = 0; i < MAX_COUNT; i++) {
-            tables[i] = protocol;
-        }
-    }
-
-    @Override
-    public byte type(final String protocolName) {
-
-        for (int i = 0; i < this.tables.length; i++) {
-            if (this.tables[i] != null) {
-                if (this.tables[i].name().equalsIgnoreCase(protocolName)) {
-                    return this.tables[i].type();
-                }
-            }
-        }
-
-        throw new IllegalArgumentException(String.format("the protocol: %s not exist", protocolName));
-    }
-
-    @Override
-    public Protocol get(byte type) {
-        return tables[type & MAX_COUNT];
-    }
-
-    @Override
-    public void clearAll() {
-        for (int i = 0; i < this.tables.length; i++) {
-            this.tables[i] = null;
-        }
-    }
-}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/RemotingCoreProtocol.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/RemotingCoreProtocol.java
deleted file mode 100644
index 317b24f..0000000
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/RemotingCoreProtocol.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.impl.protocol;
-
-import io.netty.channel.ChannelHandlerContext;
-import org.apache.rocketmq.remoting.api.channel.ChannelHandlerContextWrapper;
-import org.apache.rocketmq.remoting.api.protocol.Protocol;
-import org.apache.rocketmq.remoting.impl.netty.handler.Decoder;
-import org.apache.rocketmq.remoting.impl.netty.handler.Encoder;
-import org.apache.rocketmq.remoting.impl.netty.handler.ProtocolSelector;
-
-public class RemotingCoreProtocol implements Protocol {
-    @Override
-    public String name() {
-        return MVP;
-    }
-
-    @Override
-    public byte type() {
-        return MVP_MAGIC;
-    }
-
-    @Override
-    public void assembleHandler(final ChannelHandlerContextWrapper ctx) {
-
-        ChannelHandlerContext chx = (ChannelHandlerContext) ctx.getContext();
-
-        chx.pipeline().addAfter(ProtocolSelector.NAME, "decoder", new Decoder());
-        chx.pipeline().addAfter("decoder", "encoder", new Encoder());
-    }
-}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/WebSocketProtocol.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/WebSocketProtocol.java
deleted file mode 100644
index 18a3a11..0000000
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/WebSocketProtocol.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.impl.protocol;
-
-import org.apache.rocketmq.remoting.api.channel.ChannelHandlerContextWrapper;
-import org.apache.rocketmq.remoting.api.protocol.Protocol;
-
-public class WebSocketProtocol implements Protocol {
-    @Override
-    public String name() {
-        return WEBSOCKET;
-    }
-
-    @Override
-    public byte type() {
-        return WEBSOCKET_MAGIC;
-    }
-
-    @Override
-    public void assembleHandler(final ChannelHandlerContextWrapper ctx) {
-
-    }
-}