You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/09/19 07:23:49 UTC

[5/8] incubator-rocketmq git commit: initialize RocketMQ5

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
new file mode 100644
index 0000000..d875f95
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.ServerSocketChannel;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http2.Http2SecurityUtil;
+import io.netty.handler.ssl.OpenSsl;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
+import io.netty.handler.ssl.SupportedCipherSuiteFilter;
+import io.netty.handler.ssl.util.SelfSignedCertificate;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import io.netty.util.concurrent.EventExecutorGroup;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.remoting.api.AsyncHandler;
+import org.apache.rocketmq.remoting.api.RemotingServer;
+import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.config.RemotingConfig;
+import org.apache.rocketmq.remoting.external.ThreadUtils;
+import org.apache.rocketmq.remoting.impl.channel.NettyChannelImpl;
+import org.apache.rocketmq.remoting.impl.netty.handler.ChannelStatistics;
+import org.apache.rocketmq.remoting.impl.netty.handler.ProtocolSelector;
+import org.apache.rocketmq.remoting.internal.JvmUtils;
+
+public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
+    private final RemotingConfig serverConfig;
+
+    private final ServerBootstrap serverBootstrap;
+    private final EventLoopGroup bossGroup;
+    private final EventLoopGroup ioGroup;
+    private EventExecutorGroup workerGroup;
+    private Class<? extends ServerSocketChannel> socketChannelClass;
+
+    private int port;
+    private SslContext sslContext;
+
+    NettyRemotingServer(final RemotingConfig serverConfig) {
+        super(serverConfig);
+
+        this.serverBootstrap = new ServerBootstrap();
+        this.serverConfig = serverConfig;
+
+        if (JvmUtils.isLinux() && this.serverConfig.isServerNativeEpollEnable()) {
+            this.ioGroup = new EpollEventLoopGroup(serverConfig.getServerIoThreads(), ThreadUtils.newGenericThreadFactory("NettyEpollIoThreads",
+                serverConfig.getServerIoThreads()));
+
+            this.bossGroup = new EpollEventLoopGroup(serverConfig.getServerAcceptorThreads(), ThreadUtils.newGenericThreadFactory("NettyBossThreads",
+                serverConfig.getServerAcceptorThreads()));
+
+            this.socketChannelClass = EpollServerSocketChannel.class;
+        } else {
+            this.bossGroup = new NioEventLoopGroup(serverConfig.getServerAcceptorThreads(), ThreadUtils.newGenericThreadFactory("NettyBossThreads",
+                serverConfig.getServerAcceptorThreads()));
+
+            this.ioGroup = new NioEventLoopGroup(serverConfig.getServerIoThreads(), ThreadUtils.newGenericThreadFactory("NettyNioIoThreads",
+                serverConfig.getServerIoThreads()));
+
+            this.socketChannelClass = NioServerSocketChannel.class;
+        }
+
+        this.workerGroup = new DefaultEventExecutorGroup(serverConfig.getServerWorkerThreads(),
+            ThreadUtils.newGenericThreadFactory("NettyWorkerThreads", serverConfig.getServerWorkerThreads()));
+
+        buildHttp2SslContext();
+    }
+
+    private void buildHttp2SslContext() {
+        try {
+            SslProvider provider = OpenSsl.isAvailable() ? SslProvider.OPENSSL : SslProvider.JDK;
+            SelfSignedCertificate ssc;
+            //NOTE: the cipher filter may not include all ciphers required by the HTTP/2 specification.
+            //Please refer to the HTTP/2 specification for cipher requirements.
+            ssc = new SelfSignedCertificate();
+            sslContext = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey())
+                .sslProvider(provider)
+                .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE).build();
+        } catch (Exception e) {
+            LOG.error("Can not build SSL context !", e);
+        }
+    }
+
+    private void applyOptions(ServerBootstrap bootstrap) {
+        //option() is for the NioServerSocketChannel that accepts incoming connections.
+        //childOption() is for the Channels accepted by the parent ServerChannel, which is NioServerSocketChannel in this case
+        if (null != serverConfig) {
+            if (serverConfig.getTcpSoBacklogSize() > 0) {
+                bootstrap.option(ChannelOption.SO_BACKLOG, serverConfig.getTcpSoBacklogSize());
+            }
+
+            if (serverConfig.getTcpSoLinger() > 0) {
+                bootstrap.option(ChannelOption.SO_LINGER, serverConfig.getTcpSoLinger());
+            }
+
+            if (serverConfig.getTcpSoSndBufSize() > 0) {
+                bootstrap.childOption(ChannelOption.SO_SNDBUF, serverConfig.getTcpSoSndBufSize());
+            }
+            if (serverConfig.getTcpSoRcvBufSize() > 0) {
+                bootstrap.childOption(ChannelOption.SO_RCVBUF, serverConfig.getTcpSoRcvBufSize());
+            }
+
+            bootstrap.option(ChannelOption.SO_REUSEADDR, serverConfig.isTcpSoReuseAddress()).
+                childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isTcpSoKeepAlive()).
+                childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpSoNoDelay()).
+                option(ChannelOption.CONNECT_TIMEOUT_MILLIS, serverConfig.getTcpSoTimeout());
+        }
+
+        if (serverConfig.isServerPooledBytebufAllocatorEnable()) {
+            bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+        }
+    }
+
+    @Override
+    public void start() {
+        super.start();
+
+        final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+        this.serverBootstrap.group(this.bossGroup, this.ioGroup).
+            channel(socketChannelClass).childHandler(new ChannelInitializer<SocketChannel>() {
+            @Override
+            public void initChannel(SocketChannel ch) throws Exception {
+                channels.add(ch);
+
+                ChannelPipeline cp = ch.pipeline();
+
+                cp.addLast(ChannelStatistics.NAME, new ChannelStatistics(channels));
+
+                cp.addFirst(ProtocolSelector.NAME, new ProtocolSelector(sslContext));
+                cp.addLast(workerGroup, new IdleStateHandler(serverConfig.getConnectionChannelReaderIdleSeconds(),
+                        serverConfig.getConnectionChannelWriterIdleSeconds(),
+                        serverConfig.getConnectionChannelIdleSeconds()),
+                    new ServerConnectionHandler(),
+                    new EventDispatcher());
+            }
+        });
+
+        applyOptions(serverBootstrap);
+
+        ChannelFuture channelFuture = this.serverBootstrap.bind(this.serverConfig.getServerListenPort()).syncUninterruptibly();
+        this.port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
+
+        startUpHouseKeepingService();
+    }
+
+    @Override
+    public void stop() {
+        try {
+
+            ThreadUtils.shutdownGracefully(houseKeepingService, 3000, TimeUnit.MILLISECONDS);
+
+            ThreadUtils.shutdownGracefully(channelEventExecutor);
+
+            this.bossGroup.shutdownGracefully().syncUninterruptibly();
+
+            this.ioGroup.shutdownGracefully().syncUninterruptibly();
+
+            this.workerGroup.shutdownGracefully().syncUninterruptibly();
+        } catch (Exception e) {
+            LOG.error("RemotingServer stopped error !", e);
+        }
+
+        super.stop();
+    }
+
+    @Override
+    public int localListenPort() {
+        return this.port;
+    }
+
+    @Override
+    public RemotingCommand invoke(final RemotingChannel remotingChannel, final RemotingCommand request,
+        final long timeoutMillis) {
+        return invokeWithInterceptor(((NettyChannelImpl) remotingChannel).getChannel(), request, timeoutMillis);
+    }
+
+    @Override
+    public void invokeAsync(final RemotingChannel remotingChannel, final RemotingCommand request,
+        final AsyncHandler asyncHandler,
+        final long timeoutMillis) {
+        invokeAsyncWithInterceptor(((NettyChannelImpl) remotingChannel).getChannel(), request, asyncHandler, timeoutMillis);
+    }
+
+    @Override
+    public void invokeOneWay(final RemotingChannel remotingChannel, final RemotingCommand request,
+        final long timeoutMillis) {
+        invokeOnewayWithInterceptor(((NettyChannelImpl) remotingChannel).getChannel(), request, timeoutMillis);
+    }
+
+    private class ServerConnectionHandler extends ChannelDuplexHandler {
+        @Override
+        public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+            LOG.warn("Channel {} channelWritabilityChanged event triggered - bytesBeforeUnwritable:{},bytesBeforeWritable:{}", ctx.channel(),
+                ctx.channel().bytesBeforeUnwritable(), ctx.channel().bytesBeforeWritable());
+        }
+
+        @Override
+        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+            super.channelRegistered(ctx);
+        }
+
+        @Override
+        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+            super.channelUnregistered(ctx);
+        }
+
+        @Override
+        public void channelActive(ChannelHandlerContext ctx) throws Exception {
+            super.channelActive(ctx);
+            putNettyEvent(new NettyChannelEvent(NettyChannelEventType.ACTIVE, ctx.channel()));
+        }
+
+        @Override
+        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+            super.channelInactive(ctx);
+            putNettyEvent(new NettyChannelEvent(NettyChannelEventType.INACTIVE, ctx.channel()));
+        }
+
+        @Override
+        public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
+            if (evt instanceof IdleStateEvent) {
+                final IdleStateEvent event = (IdleStateEvent) evt;
+                if (event.state().equals(IdleState.ALL_IDLE)) {
+                    ctx.channel().close().addListener(new ChannelFutureListener() {
+                        @Override
+                        public void operationComplete(ChannelFuture future) throws Exception {
+                            LOG.warn("Close channel {} because of event {},result is {}", ctx.channel(), event, future.isSuccess());
+                        }
+                    });
+
+                    putNettyEvent(new NettyChannelEvent(NettyChannelEventType.IDLE, ctx.channel()));
+                }
+            }
+            ctx.fireUserEventTriggered(evt);
+        }
+
+        @Override
+        public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
+            putNettyEvent(new NettyChannelEvent(NettyChannelEventType.EXCEPTION, ctx.channel(), cause));
+
+            ctx.channel().close().addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture future) throws Exception {
+                    LOG.warn("Close channel {} because of error {},result is {}", ctx.channel(), cause, future.isSuccess());
+                }
+            });
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/RemotingBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/RemotingBootstrapFactory.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/RemotingBootstrapFactory.java
new file mode 100644
index 0000000..4dd502c
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/RemotingBootstrapFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty;
+
+import java.util.Properties;
+import org.apache.rocketmq.remoting.api.RemotingClient;
+import org.apache.rocketmq.remoting.config.RemotingConfig;
+import org.apache.rocketmq.remoting.internal.BeanUtils;
+import org.apache.rocketmq.remoting.internal.PropertyUtils;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Remoting Bootstrap entrance.
+ */
+public final class RemotingBootstrapFactory {
+    public static RemotingClient createRemotingClient(@NotNull final String fileName) {
+        Properties prop = PropertyUtils.loadProps(fileName);
+        RemotingConfig config = BeanUtils.populate(prop, RemotingConfig.class);
+        return new NettyRemotingClient(config);
+    }
+
+    public static RemotingClient createRemotingClient(@NotNull final RemotingConfig config) {
+        return new NettyRemotingClient(config);
+    }
+
+    public static RemotingClient createRemotingClient(@NotNull final Properties properties) {
+        RemotingConfig config = BeanUtils.populate(properties, RemotingConfig.class);
+        return new NettyRemotingClient(config);
+    }
+
+    public static NettyRemotingServer createRemotingServer(@NotNull final String fileName) {
+        Properties prop = PropertyUtils.loadProps(fileName);
+        RemotingConfig config = BeanUtils.populate(prop, RemotingConfig.class);
+        return new NettyRemotingServer(config);
+    }
+
+    public static NettyRemotingServer createRemotingServer(@NotNull final Properties properties) {
+        RemotingConfig config = BeanUtils.populate(properties, RemotingConfig.class);
+        return new NettyRemotingServer(config);
+    }
+
+    public static NettyRemotingServer createRemotingServer(@NotNull final RemotingConfig config) {
+        return new NettyRemotingServer(config);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ChannelStatistics.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ChannelStatistics.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ChannelStatistics.java
new file mode 100755
index 0000000..ff0f9c9
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ChannelStatistics.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty.handler;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.group.ChannelGroup;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.remoting.common.metrics.ChannelMetrics;
+
+public class ChannelStatistics extends ChannelDuplexHandler implements ChannelMetrics {
+    public static final String NAME = ChannelStatistics.class.getSimpleName();
+    private final AtomicInteger channelCount = new AtomicInteger(0);
+    private final ChannelGroup allChannels;
+
+    public ChannelStatistics(ChannelGroup allChannels) {
+        this.allChannels = allChannels;
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        // connect
+        channelCount.incrementAndGet();
+        allChannels.add(ctx.channel());
+        super.channelActive(ctx);
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        // disconnect
+        channelCount.decrementAndGet();
+        allChannels.remove(ctx.channel());
+        super.channelInactive(ctx);
+    }
+
+    @Override
+    public Integer getChannelCount() {
+        return channelCount.get();
+    }
+
+    @Override
+    public ChannelGroup getChannels() {
+        return allChannels;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java
new file mode 100644
index 0000000..87a0912
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.rocketmq.remoting.api.buffer.ByteBufferWrapper;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.api.exception.RemoteCodecException;
+import org.apache.rocketmq.remoting.impl.buffer.NettyByteBufferWrapper;
+import org.apache.rocketmq.remoting.impl.command.CodecHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Decoder extends ByteToMessageDecoder {
+    private static final Logger LOG = LoggerFactory.getLogger(Decoder.class);
+
+    public Decoder() {
+    }
+
+    @Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+        if (!in.isReadable()) {
+            return;
+        }
+
+        NettyByteBufferWrapper wrapper = new NettyByteBufferWrapper(in, ctx.channel());
+
+        Object msg = this.decode(ctx, wrapper);
+        if (msg != null) {
+            out.add(msg);
+        }
+    }
+
+    private Object decode(final ChannelHandlerContext ctx, ByteBufferWrapper wrapper) throws Exception {
+        int originReaderIndex = wrapper.readerIndex();
+
+        byte type = wrapper.readByte();
+        try {
+            RemotingCommand cmd = decode(wrapper, originReaderIndex);
+            if (cmd != null) {
+                cmd.protocolType(type);
+            }
+            return cmd;
+        } catch (final RemoteCodecException e) {
+            LOG.warn("Decode error {}, close the channel {}", e.getMessage(), ctx.channel());
+            ctx.channel().close().addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture future) throws Exception {
+                    LOG.warn("Close channel {} because of error {},result is {}", ctx.channel(), e, future.isSuccess());
+                }
+            });
+        }
+        return null;
+    }
+
+    public RemotingCommand decode(final ByteBufferWrapper wrapper, final int originReaderIndex) {
+        // Full message isn't available yet, return nothing for now
+        if (wrapper.readableBytes() < CodecHelper.MIN_PROTOCOL_LEN - 1) {
+            wrapper.setReaderIndex(originReaderIndex);
+            return null;
+        }
+
+        int totalLength = wrapper.readInt();
+
+        if (totalLength <= 0) {
+            throw new IllegalArgumentException("Illegal total length " + totalLength);
+        }
+
+        if (totalLength > CodecHelper.PACKET_MAX_LEN) {
+            throw new IllegalArgumentException(String.format("Total length %d is more than limit %d", totalLength, CodecHelper.PACKET_MAX_LEN));
+        }
+
+        if (wrapper.readableBytes() < totalLength) {
+            wrapper.setReaderIndex(originReaderIndex);
+            return null;
+        }
+
+        ByteBuffer totalBuffer = ByteBuffer.allocate(totalLength);
+
+        wrapper.readBytes(totalBuffer);
+
+        totalBuffer.flip();
+
+        return CodecHelper.decode(totalBuffer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java
new file mode 100644
index 0000000..10aa504
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import org.apache.rocketmq.remoting.api.buffer.ByteBufferWrapper;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.api.serializable.Serializer;
+import org.apache.rocketmq.remoting.api.serializable.SerializerFactory;
+import org.apache.rocketmq.remoting.impl.buffer.NettyByteBufferWrapper;
+import org.apache.rocketmq.remoting.impl.command.CodecHelper;
+import org.apache.rocketmq.remoting.impl.protocol.serializer.SerializerFactoryImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Encoder extends MessageToByteEncoder<RemotingCommand> {
+    private static final Logger LOG = LoggerFactory.getLogger(Encoder.class);
+
+    private final SerializerFactory serializerFactory = new SerializerFactoryImpl();
+
+    public Encoder() {
+    }
+
+    @Override
+    public void encode(final ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out) throws Exception {
+        try {
+            ByteBufferWrapper wrapper = new NettyByteBufferWrapper(out);
+
+            encode(serializerFactory, remotingCommand, wrapper);
+        } catch (final Exception e) {
+            LOG.error("Error occurred when encoding response for channel " + ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress(), e);
+            if (remotingCommand != null) {
+                LOG.error(remotingCommand.toString());
+            }
+            ctx.channel().close().addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture future) throws Exception {
+                    LOG.warn("Close channel {} because of error {},result is {}", ctx.channel(), e, future.isSuccess());
+                }
+            });
+        }
+    }
+
+    public void encode(final SerializerFactory serializerFactory, final RemotingCommand remotingCommand,
+        final ByteBufferWrapper out) {
+        ByteBuffer encodeParameter = null;
+        if (remotingCommand.parameterBytes() != null) {
+            encodeParameter = ByteBuffer.wrap(remotingCommand.parameterBytes());
+        } else if (remotingCommand.parameter() != null) {
+            final Serializer serialization = serializerFactory.get(remotingCommand.serializerType());
+            encodeParameter = serialization.encode(remotingCommand.parameter());
+        }
+
+        int parameterLength = encodeParameter != null ? encodeParameter.limit() : 0;
+        int extBodyLength = remotingCommand.extraPayload() != null ? remotingCommand.extraPayload().length : 0;
+
+        ByteBuffer header = CodecHelper.encodeHeader(remotingCommand, parameterLength, extBodyLength);
+        out.writeBytes(header);
+
+        if (encodeParameter != null) {
+            out.writeBytes(encodeParameter);
+        }
+
+        if (remotingCommand.extraPayload() != null) {
+            out.writeBytes(remotingCommand.extraPayload());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ExceptionHandler.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ExceptionHandler.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ExceptionHandler.java
new file mode 100644
index 0000000..52563f4
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ExceptionHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty.handler;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@ChannelHandler.Sharable
+public class ExceptionHandler extends ChannelDuplexHandler {
+    private final static Logger LOG = LoggerFactory.getLogger(ExceptionHandler.class);
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        // Uncaught exceptions from inbound handlers will propagate up to this handler
+        LOG.error(String.format("channel exception %s occurred ! ", ctx.channel()), cause);
+        ctx.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Http2Handler.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Http2Handler.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Http2Handler.java
new file mode 100644
index 0000000..7cdb976
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Http2Handler.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http2.DefaultHttp2Connection;
+import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
+import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
+import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
+import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
+import io.netty.handler.codec.http2.DefaultHttp2Headers;
+import io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder;
+import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2ConnectionDecoder;
+import io.netty.handler.codec.http2.Http2ConnectionEncoder;
+import io.netty.handler.codec.http2.Http2ConnectionHandler;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2FrameAdapter;
+import io.netty.handler.codec.http2.Http2FrameReader;
+import io.netty.handler.codec.http2.Http2FrameWriter;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.codec.http2.Http2HeadersDecoder;
+import io.netty.handler.codec.http2.Http2Settings;
+import io.netty.handler.codec.http2.StreamBufferingEncoder;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
+
+public class Http2Handler extends Http2ConnectionHandler {
+
+    private boolean isServer;
+    private int lastStreamId;
+
+    private Http2Handler(final Http2ConnectionDecoder decoder, final Http2ConnectionEncoder encoder,
+        final Http2Settings initialSettings, final boolean isServer) {
+        super(decoder, encoder, initialSettings);
+        decoder.frameListener(new FrameListener());
+        this.isServer = isServer;
+    }
+
+    public static Http2Handler newHandler(final boolean isServer) {
+
+        Http2HeadersDecoder headersDecoder = new DefaultHttp2HeadersDecoder(true);
+        Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder);
+        Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter();
+
+        Http2Connection connection = new DefaultHttp2Connection(isServer);
+
+        Http2ConnectionEncoder encoder = new StreamBufferingEncoder(
+            new DefaultHttp2ConnectionEncoder(connection, frameWriter));
+
+        connection.local().flowController(new DefaultHttp2LocalFlowController(connection,
+            DEFAULT_WINDOW_UPDATE_RATIO, true));
+
+        Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
+            frameReader);
+
+        Http2Settings settings = new Http2Settings();
+
+        if (!isServer)
+            settings.pushEnabled(true);
+
+        settings.initialWindowSize(1048576 * 10); //10MiB
+        settings.maxConcurrentStreams(Integer.MAX_VALUE);
+
+        return newHandler(decoder, encoder, settings, isServer);
+    }
+
+    private static Http2Handler newHandler(final Http2ConnectionDecoder decoder, final Http2ConnectionEncoder encoder,
+        final Http2Settings settings, boolean isServer) {
+        return new Http2Handler(decoder, encoder, settings, isServer);
+    }
+
+    @Override
+    public void write(final ChannelHandlerContext ctx, final Object msg,
+        final ChannelPromise promise) throws Exception {
+        if (isServer) {
+            assert msg instanceof ByteBuf;
+            sendAPushPromise(ctx, lastStreamId, lastStreamId + 1, (ByteBuf) msg);
+        } else {
+
+            final Http2Headers headers = new DefaultHttp2Headers();
+
+            try {
+                long threadId = Thread.currentThread().getId();
+                long streamId = (threadId % 2 == 0) ? threadId + 1 : threadId + 2;
+                encoder().writeHeaders(ctx, (int) streamId, headers, 0, false, promise);
+                encoder().writeData(ctx, (int) streamId, (ByteBuf) msg, 0, false, ctx.newPromise());
+                ctx.flush();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+
+        }
+    }
+
+    private void sendAPushPromise(ChannelHandlerContext ctx, int streamId, int pushPromiseStreamId,
+        ByteBuf payload) throws Http2Exception {
+
+        encoder().writePushPromise(ctx, streamId, pushPromiseStreamId,
+            new DefaultHttp2Headers().status(OK.codeAsText()), 0, ctx.newPromise());
+
+        //Http2Stream stream = connection.local().reservePushStream(pushPromiseStreamId, connection.connectionStream());
+        Http2Headers headers = new DefaultHttp2Headers();
+        headers.status(OK.codeAsText());
+        encoder().writeHeaders(ctx, pushPromiseStreamId, headers, 0, false, ctx.newPromise());
+        encoder().writeData(ctx, pushPromiseStreamId, payload, 0, false, ctx.newPromise());
+    }
+
+    private class FrameListener extends Http2FrameAdapter {
+        @Override
+        public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
+            boolean endOfStream) throws Http2Exception {
+            //Http2Handler.this.onDataRead(ctx, streamId, data, endOfStream);
+            data.retain();
+            Http2Handler.this.lastStreamId = streamId;
+            ctx.fireChannelRead(data);
+            return data.readableBytes() + padding;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ProtocolSelector.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ProtocolSelector.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ProtocolSelector.java
new file mode 100644
index 0000000..e00a213
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ProtocolSelector.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.ssl.SslContext;
+import org.apache.rocketmq.remoting.api.channel.ChannelHandlerContextWrapper;
+import org.apache.rocketmq.remoting.api.protocol.Protocol;
+import org.apache.rocketmq.remoting.api.protocol.ProtocolFactory;
+import org.apache.rocketmq.remoting.impl.channel.ChannelHandlerContextWrapperImpl;
+import org.apache.rocketmq.remoting.impl.protocol.ProtocolFactoryImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ProtocolSelector extends SimpleChannelInboundHandler<ByteBuf> {
+    public static final String NAME = ProtocolSelector.class.getSimpleName();
+    private static final Logger LOG = LoggerFactory.getLogger(ProtocolSelector.class);
+    private ProtocolFactory protocolFactory;
+
+    public ProtocolSelector(final SslContext sslContext) {
+        this.protocolFactory = new ProtocolFactoryImpl(sslContext);
+    }
+
+    @Override
+    protected void channelRead0(final ChannelHandlerContext ctx, final ByteBuf msg) throws Exception {
+        if (msg.readableBytes() < 1) {
+            return;
+        }
+        msg.markReaderIndex();
+        Protocol protocol = protocolFactory.get(msg.readByte());
+        if (protocol == null) {
+            ctx.channel().close().addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture future) throws Exception {
+                    LOG.warn("Close channel {},result is {}", ctx.channel(), future.isSuccess());
+                }
+            });
+            return;
+        }
+        ChannelHandlerContextWrapper chcw = new ChannelHandlerContextWrapperImpl(ctx);
+        protocol.assembleHandler(chcw);
+        msg.resetReaderIndex();
+        ctx.pipeline().remove(this);
+        ctx.fireChannelRead(msg.retain());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/Httpv2Protocol.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/Httpv2Protocol.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/Httpv2Protocol.java
new file mode 100644
index 0000000..0740cbb
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/Httpv2Protocol.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.protocol;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.ssl.SslContext;
+import org.apache.rocketmq.remoting.api.channel.ChannelHandlerContextWrapper;
+import org.apache.rocketmq.remoting.api.protocol.Protocol;
+import org.apache.rocketmq.remoting.impl.netty.handler.Http2Handler;
+import org.apache.rocketmq.remoting.impl.netty.handler.ProtocolSelector;
+
+public class Httpv2Protocol extends RemotingCoreProtocol {
+    private SslContext sslContext;
+
+    public Httpv2Protocol(final SslContext sslContext) {
+        this.sslContext = sslContext;
+    }
+
+    @Override
+    public String name() {
+        return Protocol.HTTP2;
+    }
+
+    @Override
+    public byte type() {
+        return Protocol.HTTP_2_MAGIC;
+    }
+
+    @Override
+    public void assembleHandler(final ChannelHandlerContextWrapper ctx) {
+        super.assembleHandler(ctx);
+        ChannelHandlerContext chx = (ChannelHandlerContext) ctx.getContext();
+
+        chx.pipeline().addAfter(ProtocolSelector.NAME, "sslHandler", sslContext.newHandler(chx.alloc()));
+        chx.pipeline().addAfter("sslHandler", "http2Handler", Http2Handler.newHandler(true));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/ProtocolFactoryImpl.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/ProtocolFactoryImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/ProtocolFactoryImpl.java
new file mode 100644
index 0000000..15322be
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/ProtocolFactoryImpl.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.protocol;
+
+import io.netty.handler.ssl.SslContext;
+import org.apache.rocketmq.remoting.api.protocol.Protocol;
+import org.apache.rocketmq.remoting.api.protocol.ProtocolFactory;
+
+public class ProtocolFactoryImpl implements ProtocolFactory {
+    private static final int MAX_COUNT = 0x0FF;
+    private final Protocol[] tables = new Protocol[MAX_COUNT];
+
+    private SslContext sslContext;
+
+    public ProtocolFactoryImpl(final SslContext sslContext) {
+        this.sslContext = sslContext;
+        this.register(new RemotingCoreProtocol());
+        this.register(new Httpv2Protocol(sslContext));
+        this.register(new WebSocketProtocol());
+    }
+
+    public ProtocolFactoryImpl() {
+        this.register(new RemotingCoreProtocol());
+        this.register(new Httpv2Protocol(sslContext));
+        this.register(new WebSocketProtocol());
+    }
+
+    @Override
+    public void register(Protocol protocol) {
+        if (tables[protocol.type() & MAX_COUNT] != null) {
+            throw new RuntimeException("protocol header's sign is overlapped");
+        }
+        tables[protocol.type() & MAX_COUNT] = protocol;
+    }
+
+    @Override
+    public void resetAll(final Protocol protocol) {
+        for (int i = 0; i < MAX_COUNT; i++) {
+            tables[i] = protocol;
+        }
+    }
+
+    @Override
+    public byte type(final String protocolName) {
+
+        for (int i = 0; i < this.tables.length; i++) {
+            if (this.tables[i] != null) {
+                if (this.tables[i].name().equalsIgnoreCase(protocolName)) {
+                    return this.tables[i].type();
+                }
+            }
+        }
+
+        throw new IllegalArgumentException(String.format("the protocol: %s not exist", protocolName));
+    }
+
+    @Override
+    public Protocol get(byte type) {
+        return tables[type & MAX_COUNT];
+    }
+
+    @Override
+    public void clearAll() {
+        for (int i = 0; i < this.tables.length; i++) {
+            this.tables[i] = null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/RemotingCoreProtocol.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/RemotingCoreProtocol.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/RemotingCoreProtocol.java
new file mode 100644
index 0000000..317b24f
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/RemotingCoreProtocol.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.protocol;
+
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.rocketmq.remoting.api.channel.ChannelHandlerContextWrapper;
+import org.apache.rocketmq.remoting.api.protocol.Protocol;
+import org.apache.rocketmq.remoting.impl.netty.handler.Decoder;
+import org.apache.rocketmq.remoting.impl.netty.handler.Encoder;
+import org.apache.rocketmq.remoting.impl.netty.handler.ProtocolSelector;
+
+public class RemotingCoreProtocol implements Protocol {
+    @Override
+    public String name() {
+        return MVP;
+    }
+
+    @Override
+    public byte type() {
+        return MVP_MAGIC;
+    }
+
+    @Override
+    public void assembleHandler(final ChannelHandlerContextWrapper ctx) {
+
+        ChannelHandlerContext chx = (ChannelHandlerContext) ctx.getContext();
+
+        chx.pipeline().addAfter(ProtocolSelector.NAME, "decoder", new Decoder());
+        chx.pipeline().addAfter("decoder", "encoder", new Encoder());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/WebSocketProtocol.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/WebSocketProtocol.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/WebSocketProtocol.java
new file mode 100644
index 0000000..18a3a11
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/WebSocketProtocol.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.protocol;
+
+import org.apache.rocketmq.remoting.api.channel.ChannelHandlerContextWrapper;
+import org.apache.rocketmq.remoting.api.protocol.Protocol;
+
+public class WebSocketProtocol implements Protocol {
+    @Override
+    public String name() {
+        return WEBSOCKET;
+    }
+
+    @Override
+    public byte type() {
+        return WEBSOCKET_MAGIC;
+    }
+
+    @Override
+    public void assembleHandler(final ChannelHandlerContextWrapper ctx) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/CompressorFactoryImpl.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/CompressorFactoryImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/CompressorFactoryImpl.java
new file mode 100644
index 0000000..10e97ba
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/CompressorFactoryImpl.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.protocol.compression;
+
+import org.apache.rocketmq.remoting.api.compressable.Compressor;
+import org.apache.rocketmq.remoting.api.compressable.CompressorFactory;
+
+public class CompressorFactoryImpl implements CompressorFactory {
+    private static final int MAX_COUNT = 0x0FF;
+    private final Compressor[] tables = new Compressor[MAX_COUNT];
+
+    public CompressorFactoryImpl() {
+        this.register(new GZipCompressor());
+    }
+
+    @Override
+    public void register(Compressor compressor) {
+        if (tables[compressor.type() & MAX_COUNT] != null) {
+            throw new RuntimeException("compressor header's sign is overlapped");
+        }
+        tables[compressor.type() & MAX_COUNT] = compressor;
+    }
+
+    @Override
+    public byte type(String compressionName) {
+        for (Compressor table : this.tables) {
+            if (table != null) {
+                if (table.name().equalsIgnoreCase(compressionName)) {
+                    return table.type();
+                }
+            }
+        }
+
+        throw new IllegalArgumentException(String.format("the compressor: %s not exist", compressionName));
+
+    }
+
+    @Override
+    public Compressor get(byte type) {
+        return tables[type & MAX_COUNT];
+    }
+
+    @Override
+    public void clearAll() {
+        for (int i = 0; i < this.tables.length; i++) {
+            this.tables[i] = null;
+        }
+    }
+
+    public Compressor[] getTables() {
+        return tables;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/GZipCompressor.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/GZipCompressor.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/GZipCompressor.java
new file mode 100644
index 0000000..fc33f4c
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/GZipCompressor.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.protocol.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import org.apache.rocketmq.remoting.api.compressable.Compressor;
+
+public class GZipCompressor implements Compressor {
+    public static final int BUFFER = 1024;
+    public static final String COMPRESSOR_NAME = GZipCompressor.class.getSimpleName();
+    public static final byte COMPRESSOR_TYPE = 'G';
+
+    @Override
+    public String name() {
+        return COMPRESSOR_NAME;
+    }
+
+    @Override
+    public byte type() {
+        return COMPRESSOR_TYPE;
+    }
+
+    @Override
+    public byte[] compress(byte[] content) throws Exception {
+        if (content == null)
+            return new byte[0];
+
+        ByteArrayInputStream bais = new ByteArrayInputStream(content);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        compress(bais, baos);
+        byte[] output = baos.toByteArray();
+        baos.flush();
+        baos.close();
+        bais.close();
+        return output;
+
+    }
+
+    private void compress(InputStream is, OutputStream os) throws Exception {
+        GZIPOutputStream gos = new GZIPOutputStream(os);
+
+        int count;
+        byte data[] = new byte[BUFFER];
+        while ((count = is.read(data, 0, BUFFER)) != -1) {
+            gos.write(data, 0, count);
+        }
+        gos.finish();
+        gos.flush();
+        gos.close();
+    }
+
+    @Override
+    public byte[] deCompress(byte[] content) throws Exception {
+        if (content == null)
+            return new byte[0];
+
+        ByteArrayInputStream bais = new ByteArrayInputStream(content);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        decompress(bais, baos);
+        content = baos.toByteArray();
+        baos.flush();
+        baos.close();
+        bais.close();
+        return content;
+    }
+
+    private void decompress(InputStream is, OutputStream os) throws Exception {
+        GZIPInputStream gis = new GZIPInputStream(is);
+
+        int count;
+        byte data[] = new byte[BUFFER];
+        while ((count = gis.read(data, 0, BUFFER)) != -1) {
+            os.write(data, 0, count);
+        }
+        gis.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/JsonSerializer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/JsonSerializer.java
new file mode 100644
index 0000000..c85d44b
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/JsonSerializer.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.protocol.serializer;
+
+import com.alibaba.fastjson.JSON;
+import java.lang.reflect.Type;
+import java.nio.ByteBuffer;
+import org.apache.rocketmq.remoting.api.serializable.Serializer;
+import org.apache.rocketmq.remoting.common.TypePresentation;
+import org.apache.rocketmq.remoting.impl.command.CodecHelper;
+
+public class JsonSerializer implements Serializer {
+    public static final String SERIALIZER_NAME = JsonSerializer.class.getSimpleName();
+    public static final byte SERIALIZER_TYPE = 'J';
+
+    public JsonSerializer() {
+    }
+
+    @Override
+    public String name() {
+        return SERIALIZER_NAME;
+    }
+
+    @Override
+    public byte type() {
+        return SERIALIZER_TYPE;
+    }
+
+    @Override
+    public <T> T decode(final byte[] content, final Class<T> c) {
+        if (content != null) {
+            try {
+                final String jsonString = new String(content, CodecHelper.REMOTING_CHARSET);
+                return JSON.parseObject(jsonString, c);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public <T> T decode(final byte[] content, final TypePresentation<T> typePresentation) {
+        return decode(content, typePresentation.getType());
+    }
+
+    @Override
+    public <T> T decode(byte[] content, Type type) {
+        if (content != null) {
+            try {
+                final String jsonString = new String(content, CodecHelper.REMOTING_CHARSET);
+                return JSON.parseObject(jsonString, type);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public ByteBuffer encode(final Object object) {
+        if (object != null) {
+            String jsonString = JSON.toJSONString(object);
+            byte[] bytes = jsonString.getBytes(CodecHelper.REMOTING_CHARSET);
+            try {
+                return ByteBuffer.wrap(bytes, 0, bytes.length);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/Kryo3Serializer.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/Kryo3Serializer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/Kryo3Serializer.java
new file mode 100644
index 0000000..06ea217
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/Kryo3Serializer.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.protocol.serializer;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.nio.ByteBuffer;
+import org.apache.rocketmq.remoting.api.serializable.Serializer;
+import org.apache.rocketmq.remoting.common.TypePresentation;
+
+public class Kryo3Serializer implements Serializer {
+    public static final String SERIALIZER_NAME = Kryo3Serializer.class.getSimpleName();
+    public static final byte SERIALIZER_TYPE = 'K';
+
+    public Kryo3Serializer() {
+    }
+
+    @Override
+    public String name() {
+        return SERIALIZER_NAME;
+    }
+
+    @Override
+    public byte type() {
+        return SERIALIZER_TYPE;
+    }
+
+    @Override
+    public <T> T decode(final byte[] content, final Class<T> c) {
+        if (content != null) {
+            Input input = null;
+            try {
+                input = new Input(content);
+                return (T) ThreadSafeKryo.getKryoInstance().readClassAndObject(input);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            } finally {
+                input.close();
+            }
+        }
+
+        return null;
+    }
+
+    @Override
+    public <T> T decode(final byte[] content, final TypePresentation<T> typePresentation) {
+        return decode(content, typePresentation.getType());
+    }
+
+    @Override
+    public <T> T decode(byte[] content, Type type) {
+        if (type instanceof ParameterizedType) {
+            return decode(content, (Class<? extends T>) ((ParameterizedType) type).getRawType());
+        } else if (type instanceof Class) {
+            return decode(content, (Class<? extends T>) type);
+        }
+        return null;
+    }
+
+    @Override
+    public ByteBuffer encode(final Object object) {
+        if (object != null) {
+            try (Output output = new Output(1024, 1024 * 1024 * 6)) {
+                ThreadSafeKryo.getKryoInstance().writeClassAndObject(output, object);
+                return ByteBuffer.wrap(output.getBuffer(), 0, output.position());
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/MsgPackSerializer.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/MsgPackSerializer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/MsgPackSerializer.java
new file mode 100644
index 0000000..1097f8f
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/MsgPackSerializer.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.protocol.serializer;
+
+import java.lang.reflect.Type;
+import java.nio.ByteBuffer;
+import org.apache.rocketmq.remoting.api.serializable.Serializer;
+import org.apache.rocketmq.remoting.common.TypePresentation;
+import org.msgpack.MessagePack;
+import org.msgpack.template.Template;
+
+public class MsgPackSerializer implements Serializer {
+    public static final String SERIALIZER_NAME = MsgPackSerializer.class.getSimpleName();
+    public static final byte SERIALIZER_TYPE = 'M';
+    private final MessagePack messagePack = new MessagePack();
+
+    public MsgPackSerializer() {
+    }
+
+    @Override
+    public String name() {
+        return SERIALIZER_NAME;
+    }
+
+    @Override
+    public byte type() {
+        return SERIALIZER_TYPE;
+    }
+
+    @Override
+    public <T> T decode(final byte[] content, final Class<T> c) {
+        try {
+            return messagePack.read(content, c);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public <T> T decode(final byte[] content, final TypePresentation<T> typePresentation) {
+        return decode(content, typePresentation.getType());
+    }
+
+    @Override
+    public <T> T decode(byte[] content, Type type) {
+        Template<T> template = (Template<T>) messagePack.lookup(type);
+        try {
+            return messagePack.read(content, template);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public ByteBuffer encode(final Object object) {
+        try {
+            byte[] data = messagePack.write(object);
+            return ByteBuffer.wrap(data);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/SerializerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/SerializerFactoryImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/SerializerFactoryImpl.java
new file mode 100644
index 0000000..632b61f
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/SerializerFactoryImpl.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.protocol.serializer;
+
+import org.apache.rocketmq.remoting.api.serializable.Serializer;
+import org.apache.rocketmq.remoting.api.serializable.SerializerFactory;
+
+public class SerializerFactoryImpl implements SerializerFactory {
+    private static final int MAX_COUNT = 0x0FF;
+    private final Serializer[] tables = new Serializer[MAX_COUNT];
+
+    public SerializerFactoryImpl() {
+        this.register(new JsonSerializer());
+        this.register(new Kryo3Serializer());
+        this.register(new MsgPackSerializer());
+    }
+
+    @Override
+    public void register(Serializer serialization) {
+        if (tables[serialization.type() & MAX_COUNT] != null) {
+            throw new RuntimeException("serialization header's sign is overlapped");
+        }
+        tables[serialization.type() & MAX_COUNT] = serialization;
+    }
+
+    @Override
+    public byte type(final String serializationName) {
+        for (Serializer table : this.tables) {
+            if (table != null) {
+                if (table.name().equalsIgnoreCase(serializationName)) {
+                    return table.type();
+                }
+            }
+        }
+
+        throw new IllegalArgumentException(String.format("the serialization: %s not exist", serializationName));
+    }
+
+    @Override
+    public Serializer get(byte type) {
+        return tables[type & MAX_COUNT];
+    }
+
+    @Override
+    public void clearAll() {
+        for (int i = 0; i < this.tables.length; i++) {
+            this.tables[i] = null;
+        }
+    }
+
+    public Serializer[] getTables() {
+        return tables;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/ThreadSafeKryo.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/ThreadSafeKryo.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/ThreadSafeKryo.java
new file mode 100644
index 0000000..cadfc27
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/ThreadSafeKryo.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.protocol.serializer;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoSerializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Currency;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+public class ThreadSafeKryo {
+    private static final ThreadLocal<Kryo> KRYOS = new ThreadLocal<Kryo>() {
+        protected Kryo initialValue() {
+            Kryo kryo = new Kryo();
+
+            kryo.register(byte[].class);
+            kryo.register(char[].class);
+            kryo.register(short[].class);
+            kryo.register(int[].class);
+            kryo.register(long[].class);
+            kryo.register(float[].class);
+            kryo.register(double[].class);
+            kryo.register(boolean[].class);
+            kryo.register(String[].class);
+            kryo.register(Object[].class);
+            kryo.register(KryoSerializable.class);
+            kryo.register(BigInteger.class);
+            kryo.register(BigDecimal.class);
+            kryo.register(Class.class);
+            kryo.register(Date.class);
+            // kryo.register(Enum.class);
+            kryo.register(EnumSet.class);
+            kryo.register(Currency.class);
+            kryo.register(StringBuffer.class);
+            kryo.register(StringBuilder.class);
+            kryo.register(Collections.EMPTY_LIST.getClass());
+            kryo.register(Collections.EMPTY_MAP.getClass());
+            kryo.register(Collections.EMPTY_SET.getClass());
+            kryo.register(Collections.singletonList(null).getClass());
+            kryo.register(Collections.singletonMap(null, null).getClass());
+            kryo.register(Collections.singleton(null).getClass());
+            kryo.register(TreeSet.class);
+            kryo.register(Collection.class);
+            kryo.register(TreeMap.class);
+            kryo.register(Map.class);
+            try {
+                kryo.register(Class.forName("sun.util.calendar.ZoneInfo"));
+            } catch (ClassNotFoundException e) {
+                // Noop
+            }
+            kryo.register(Calendar.class);
+            kryo.register(Locale.class);
+
+            kryo.register(BitSet.class);
+            kryo.register(HashMap.class);
+            kryo.register(Timestamp.class);
+            kryo.register(ArrayList.class);
+
+            // kryo.setRegistrationRequired(true);
+            kryo.setReferences(false);
+            kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
+
+            return kryo;
+        }
+    };
+
+    public static Kryo getKryoInstance() {
+        return KRYOS.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/BeanUtils.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/BeanUtils.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/BeanUtils.java
new file mode 100644
index 0000000..0177990
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/BeanUtils.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.internal;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class BeanUtils {
+    private final static Logger LOG = LoggerFactory.getLogger(BeanUtils.class);
+
+    /**
+     * <p>Populate the JavaBeans properties of the specified bean, based on
+     * the specified name/value pairs.  This method uses Java reflection APIs
+     * to identify corresponding "property setter" method names, and deals
+     * with setter arguments of type <Code>String</Code>, <Code>boolean</Code>,
+     * <Code>int</Code>, <Code>long</Code>, <Code>float</Code>, and
+     * <Code>double</Code>.</p>
+     *
+     * <p>The particular setter method to be called for each property is
+     * determined using the usual JavaBeans introspection mechanisms.  Thus,
+     * you may identify custom setter methods using a BeanInfo class that is
+     * associated with the class of the bean itself.  If no such BeanInfo
+     * class is available, the standard method name conversion ("set" plus
+     * the capitalized name of the property in question) is used.</p>
+     *
+     * <p><strong>NOTE</strong>:  It is contrary to the JavaBeans Specification
+     * to have more than one setter method (with different argument
+     * signatures) for the same property.</p>
+     *
+     * @param clazz JavaBean class whose properties are being populated
+     * @param properties Map keyed by property name, with the corresponding (String or String[]) value(s) to be set
+     * @param <T> Class type
+     * @return Class instance
+     */
+    public static <T> T populate(final Properties properties, final Class<T> clazz) {
+        T obj = null;
+        try {
+            obj = clazz.newInstance();
+            return populate(properties, obj);
+        } catch (Throwable e) {
+            LOG.warn("Error occurs !", e);
+        }
+        return obj;
+    }
+
+    private static <T> void setField(final Field field, final Properties properties, final T obj) throws Exception {
+        Type fieldType = field.getType();
+        String fieldName = field.getName();
+
+        String value = null;
+        String configName = convertToConfigName(fieldName);
+        String envName = convertToEnvName(fieldName);
+
+        if (properties.containsKey(envName)) {
+            value = properties.getProperty(envName);
+        }
+
+        if (properties.containsKey(configName)) {
+            value = properties.getProperty(configName);
+        }
+
+        if (value == null) {
+            return;
+        }
+
+        if (fieldType == Boolean.TYPE) {
+            field.set(obj, Boolean.valueOf(value));
+        } else if (fieldType == Integer.TYPE) {
+            field.set(obj, Integer.valueOf(value));
+        } else if (fieldType == Double.TYPE) {
+            field.set(obj, Double.valueOf(value));
+        } else if (fieldType == Float.TYPE) {
+            field.set(obj, Float.valueOf(value));
+        } else if (fieldType == Long.TYPE) {
+            field.set(obj, Long.valueOf(value));
+        } else
+            field.set(obj, value);
+    }
+
+    private static String convertToConfigName(String variableName) {
+        StringBuilder sb = new StringBuilder();
+        for (char c : variableName.toCharArray()) {
+            if (Character.isUpperCase(c)) {
+                sb.append('.');
+            }
+            sb.append(Character.toLowerCase(c));
+        }
+        return sb.toString();
+    }
+
+    private static String convertToEnvName(String variableName) {
+        StringBuilder sb = new StringBuilder();
+        for (char c : variableName.toCharArray()) {
+            if (Character.isUpperCase(c)) {
+                sb.append('_');
+            }
+            sb.append(Character.toUpperCase(c));
+        }
+        return sb.toString();
+    }
+
+    public static <T> T populate(final Properties properties, final T obj) {
+        Class<?> clazz = obj.getClass();
+        List<Field> allFields = new ArrayList<>();
+        allFields = getAllFields(allFields, clazz);
+        Properties fullProp = extractProperties(properties);
+
+        try {
+            for (Field field : allFields) {
+                if (!Modifier.isStatic(field.getModifiers())) {
+                    field.setAccessible(true);
+                    setField(field, fullProp, obj);
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn("Error occurs !", e);
+        }
+        return obj;
+    }
+
+    public static String configObjectToString(final Object object) {
+        List<Field> allFields = new ArrayList<>();
+        getAllFields(allFields, object.getClass());
+        StringBuilder sb = new StringBuilder();
+        for (Field field : allFields) {
+            if (!Modifier.isStatic(field.getModifiers())) {
+                String name = field.getName();
+                if (!name.startsWith("this")) {
+                    Object value = null;
+                    try {
+                        field.setAccessible(true);
+                        value = field.get(object);
+                        if (null == value) {
+                            value = "";
+                        }
+                    } catch (IllegalAccessException ignored) {
+                    }
+                    sb.append(name).append("=").append(value).append("%n");
+                }
+            }
+        }
+        return sb.toString();
+    }
+
+    private static List<Field> getAllFields(List<Field> fields, Class<?> type) {
+        fields.addAll(Arrays.asList(type.getDeclaredFields()));
+        if (type.getSuperclass() != null) {
+            getAllFields(fields, type.getSuperclass());
+        }
+        return fields;
+    }
+
+    private static Properties extractProperties(final Properties properties) {
+        Properties newPro = new Properties();
+
+        Map<String, String> envMap = System.getenv();
+        for (final Map.Entry<String, String> entry : envMap.entrySet()) {
+            newPro.setProperty(entry.getKey().toUpperCase(), entry.getValue());
+            newPro.setProperty(entry.getKey().toLowerCase(), entry.getValue()); //EnvProp supports A_B_C and a.b.c
+        }
+
+        Properties systemProp = System.getProperties(); //SystemProp supports a.b.c
+        for (final Map.Entry<Object, Object> entry : systemProp.entrySet()) {
+            newPro.setProperty(String.valueOf(entry.getKey()).toLowerCase(), String.valueOf(entry.getValue()));
+        }
+
+        Properties inner = null;
+        try {
+            Field field = Properties.class.getDeclaredField("defaults");
+            field.setAccessible(true);
+            inner = (Properties) field.get(properties);
+        } catch (Exception ignore) {
+        }
+
+        if (inner != null) {
+            for (final Map.Entry<Object, Object> entry : inner.entrySet()) {
+                newPro.setProperty(String.valueOf(entry.getKey()).toLowerCase(), String.valueOf(entry.getValue()));
+            }
+        }
+
+        for (final Map.Entry<Object, Object> entry : properties.entrySet()) {
+            newPro.setProperty(String.valueOf(entry.getKey()).toLowerCase(), String.valueOf(entry.getValue()));
+        }
+
+        return newPro;
+    }
+}