You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/02 16:45:25 UTC

[rocketmq] 01/14: Refactor remoting module

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit fa2f611470e9d1ca168270bb0d18217c96001897
Author: duhengforever <du...@gmail.com>
AuthorDate: Wed Dec 12 11:30:56 2018 +0800

    Refactor remoting module
---
 .../apache/rocketmq/broker/BrokerController.java   |  10 +-
 .../rocketmq/broker/client/ClientChannelInfo.java  |   2 +-
 .../broker/filter/ConsumerFilterManager.java       |   2 +-
 .../broker/offset/ConsumerOffsetManager.java       |   2 +-
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java |  12 +-
 .../broker/pagecache/ManyMessageTransfer.java      |  30 +
 .../broker/pagecache/OneMessageTransfer.java       |  29 +
 .../broker/pagecache/QueryMessageTransfer.java     |  29 +
 .../broker/processor/AdminBrokerProcessor.java     |   4 +-
 .../subscription/SubscriptionGroupManager.java     |   2 +-
 .../apache/rocketmq/broker/BrokerOuterAPITest.java |   2 +-
 .../processor/ClientManageProcessorTest.java       |   6 +-
 .../processor/EndTransactionProcessorTest.java     |   2 +-
 .../broker/processor/PullMessageProcessorTest.java |   2 +-
 .../broker/processor/SendMessageProcessorTest.java |   6 +-
 .../org/apache/rocketmq/client/ClientConfig.java   |   2 +-
 .../consumer/store/OffsetSerializeWrapper.java     |   2 +-
 .../rocketmq/client/impl/MQClientAPIImpl.java      |   7 +-
 .../client/producer/DefaultMQProducer.java         |   1 -
 .../client/producer/DefaultMQProducerTest.java     |   2 +-
 .../org/apache/rocketmq/common/DataVersion.java    |   2 +-
 .../apache/rocketmq/common/admin/ConsumeStats.java |   2 +-
 .../rocketmq/common/admin/TopicStatsTable.java     |   2 +-
 .../rocketmq/common/protocol/MQProtosHelper.java   |  49 --
 .../common/protocol/body/BrokerStatsData.java      |   2 +-
 .../protocol/body/CheckClientRequestBody.java      |   2 +-
 .../rocketmq/common/protocol/body/ClusterInfo.java |   2 +-
 .../rocketmq/common/protocol/body/Connection.java  |   2 +-
 .../common/protocol/body/ConsumeByWho.java         |   2 +-
 .../body/ConsumeMessageDirectlyResult.java         |   2 +-
 .../common/protocol/body/ConsumeStatsList.java     |   2 +-
 .../common/protocol/body/ConsumerConnection.java   |   2 +-
 .../body/ConsumerOffsetSerializeWrapper.java       |   2 +-
 .../common/protocol/body/ConsumerRunningInfo.java  |   2 +-
 .../protocol/body/GetConsumerStatusBody.java       |   2 +-
 .../rocketmq/common/protocol/body/GroupList.java   |   2 +-
 .../rocketmq/common/protocol/body/KVTable.java     |   2 +-
 .../common/protocol/body/LockBatchRequestBody.java |   2 +-
 .../protocol/body/LockBatchResponseBody.java       |   2 +-
 .../common/protocol/body/ProducerConnection.java   |   2 +-
 .../body/QueryConsumeQueueResponseBody.java        |   2 +-
 .../protocol/body/QueryConsumeTimeSpanBody.java    |   2 +-
 .../protocol/body/QueryCorrectionOffsetBody.java   |   2 +-
 .../common/protocol/body/RegisterBrokerBody.java   |   2 +-
 .../common/protocol/body/ResetOffsetBody.java      |   2 +-
 .../common/protocol/body/ResetOffsetBodyForC.java  |   2 +-
 .../protocol/body/SubscriptionGroupWrapper.java    |   2 +-
 .../protocol/body/TopicConfigSerializeWrapper.java |   2 +-
 .../rocketmq/common/protocol/body/TopicList.java   |   2 +-
 .../protocol/body/UnlockBatchRequestBody.java      |   2 +-
 .../header/GetConsumerListByGroupResponseBody.java |   2 +-
 .../common/protocol/heartbeat/HeartbeatData.java   |   2 +-
 .../common/protocol/route/TopicRouteData.java      |   2 +-
 .../common/protocol/topic/OffsetMovedEvent.java    |   2 +-
 .../common/protocol/ConsumeStatusTest.java         |   2 +-
 .../apache/rocketmq/namesrv/NamesrvController.java |  10 +-
 .../namesrv/kvconfig/KVConfigSerializeWrapper.java |   2 +-
 .../namesrv/processor/DefaultRequestProcessor.java |   4 +-
 .../rocketmq/consumer/PullConsumerImpl.java        |   2 +-
 .../rocketmq/consumer/PushConsumerImpl.java        |   2 +-
 .../rocketmq/producer/AbstractOMSProducer.java     |   2 +-
 pom.xml                                            |   7 +-
 remoting/pom.xml                                   |   4 +
 .../apache/rocketmq/remoting/RemotingClient.java   |   3 +
 .../rocketmq/remoting/RemotingClientFactory.java   |  33 +
 .../apache/rocketmq/remoting/RemotingServer.java   |   5 +
 .../rocketmq/remoting/RemotingServerFactory.java   |  40 ++
 .../rocketmq/remoting/common/RemotingHelper.java   |  92 ---
 .../ChannelMetrics.java}                           |  13 +-
 .../remoting/netty/ChannelStatisticsHandler.java   |  60 ++
 .../rocketmq/remoting/netty/CodecHelper.java       | 283 ++++++++
 .../rocketmq/remoting/netty/FileRegionEncoder.java |   2 +-
 .../rocketmq/remoting/netty/NettyLogger.java       |  28 +-
 .../remoting/netty/NettyRemotingAbstract.java      |  67 +-
 .../remoting/netty/NettyRemotingClient.java        | 710 ---------------------
 .../rocketmq/remoting/netty/NettyServerConfig.java |  29 +-
 .../remoting/protocol/RemotingCommand.java         | 346 ++--------
 .../remoting/protocol/RemotingCommandType.java     |   2 +-
 .../{protocol => serialize}/LanguageCode.java      |   5 +-
 .../remoting/serialize/MsgPackSerializable.java    |  40 ++
 .../RemotingSerializable.java                      |  25 +-
 .../RocketMQSerializable.java                      |  26 +-
 .../{protocol => serialize}/SerializeType.java     |   8 +-
 .../Serializer.java}                               |  16 +-
 .../remoting/serialize/SerializerFactory.java      |  26 +
 .../transport/NettyRemotingClientAbstract.java     | 411 ++++++++++++
 .../transport/NettyRemotingServerAbstract.java     |  93 +++
 .../remoting/transport/http2/Http2ClientImpl.java  | 293 +++++++++
 .../remoting/transport/http2/Http2Handler.java     | 143 +++++
 .../remoting/transport/http2/Http2ServerImpl.java  | 242 +++++++
 .../rocketmq}/NettyDecoder.java                    |   6 +-
 .../rocketmq}/NettyEncoder.java                    |  10 +-
 .../transport/rocketmq/NettyRemotingClient.java    | 315 +++++++++
 .../rocketmq}/NettyRemotingServer.java             | 249 ++------
 .../apache/rocketmq/remoting/util/JvmUtils.java    |  94 +++
 .../rocketmq/remoting/util/RemotingUtil.java       |   6 +
 .../rocketmq/remoting/util/ServiceProvider.java    | 200 ++++++
 .../apache/rocketmq/remoting/util/ThreadUtils.java | 181 ++++++
 .../org.apache.rocketmq.remoting.RemotingClient    |   2 +
 .../org.apache.rocketmq.remoting.RemotingServer    |   2 +
 .../rocketmq/remoting/RemotingServerTest.java      |   6 +-
 .../java/org/apache/rocketmq/remoting/TlsTest.java |   4 +-
 .../remoting/netty/NettyRemotingAbstractTest.java  |   1 +
 .../remoting/netty/NettyRemotingClientTest.java    |   1 +
 .../remoting/protocol/RemotingCommandTest.java     |  95 +--
 .../protocol/RemotingSerializableTest.java         |   1 +
 .../protocol/RocketMQSerializableTest.java         |   3 +
 .../schedule/DelayOffsetSerializeWrapper.java      |   2 +-
 .../tools/command/topic/AllocateMQSubCommand.java  |   2 +-
 .../rocketmq/tools/monitor/MonitorServiceTest.java |   2 +-
 110 files changed, 2993 insertions(+), 1534 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index e7ef46d..3e9762a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -84,13 +84,14 @@ import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.RemotingServer;
+import org.apache.rocketmq.remoting.RemotingServerFactory;
 import org.apache.rocketmq.remoting.common.TlsMode;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.remoting.netty.RequestTask;
 import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
+import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingServer;
 import org.apache.rocketmq.srvutil.FileWatchService;
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.MessageArrivingListener;
@@ -245,10 +246,15 @@ public class BrokerController {
         result = result && this.messageStore.load();
 
         if (result) {
-            this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
+            this.remotingServer = RemotingServerFactory.getRemotingServer();
+            this.remotingServer.init(this.nettyServerConfig, this.clientHousekeepingService);
+//            this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
             NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
             fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
             this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
+//            this.fastRemotingServer = RemotingServerFactory.getRemotingServer();
+//            this.fastRemotingServer.init(this.nettyServerConfig, this.clientHousekeepingService);
+
             this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
                 this.brokerConfig.getSendMessageThreadPoolNums(),
                 this.brokerConfig.getSendMessageThreadPoolNums(),
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java
index edcba96..7c5e25b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java
@@ -17,7 +17,7 @@
 package org.apache.rocketmq.broker.client;
 
 import io.netty.channel.Channel;
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.serialize.LanguageCode;
 
 public class ClientChannelInfo {
     private final Channel channel;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java
index e9c5286..fbaf337 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java
@@ -29,7 +29,7 @@ import org.apache.rocketmq.filter.FilterFactory;
 import org.apache.rocketmq.common.filter.ExpressionType;
 import org.apache.rocketmq.filter.util.BloomFilter;
 import org.apache.rocketmq.filter.util.BloomFilterData;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 import java.util.Collection;
 import java.util.HashSet;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index ebc9dd8..7c415f3 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -31,7 +31,7 @@ import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class ConsumerOffsetManager extends ConfigManager {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 4dee01c..d157021 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -31,8 +31,6 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
 import org.apache.rocketmq.common.namesrv.TopAddressing;
 import org.apache.rocketmq.common.protocol.RequestCode;
@@ -47,15 +45,17 @@ import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRespon
 import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.RemotingClientFactory;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 public class BrokerOuterAPI {
@@ -71,7 +71,9 @@ public class BrokerOuterAPI {
     }
 
     public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) {
-        this.remotingClient = new NettyRemotingClient(nettyClientConfig);
+        this.remotingClient = RemotingClientFactory.getClient();
+        this.remotingClient.init(nettyClientConfig, null);
+//        this.remotingClient = new NettyRemotingClient(nettyClientConfig);
         this.remotingClient.registerRPCHook(rpcHook);
     }
 
@@ -147,7 +149,7 @@ public class BrokerOuterAPI {
                     @Override
                     public void run() {
                         try {
-                            RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
+                            RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
                             if (result != null) {
                                 registerBrokerResultList.add(result);
                             }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java
index 968bcfb..849d205 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java
@@ -84,4 +84,34 @@ public class ManyMessageTransfer extends AbstractReferenceCounted implements Fil
     protected void deallocate() {
         this.getMessageResult.release();
     }
+
+
+
+    @Override
+    public long transferred() {
+        return transferred;
+    }
+
+
+    @Override
+    public FileRegion retain() {
+        super.retain();
+        return this;
+    }
+
+    @Override
+    public FileRegion retain(int increment) {
+        super.retain(increment);
+        return this;
+    }
+
+    @Override
+    public FileRegion touch() {
+        return this;
+    }
+
+    @Override
+    public FileRegion touch(Object hint) {
+        return this;
+    }
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java
index b795d2d..18b57ad 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java
@@ -73,4 +73,33 @@ public class OneMessageTransfer extends AbstractReferenceCounted implements File
     protected void deallocate() {
         this.selectMappedBufferResult.release();
     }
+
+
+    @Override
+    public long transferred() {
+        return transferred;
+    }
+
+
+    @Override
+    public FileRegion retain() {
+        super.retain();
+        return this;
+    }
+
+    @Override
+    public FileRegion retain(int increment) {
+        super.retain(increment);
+        return this;
+    }
+
+    @Override
+    public FileRegion touch() {
+        return this;
+    }
+
+    @Override
+    public FileRegion touch(Object hint) {
+        return this;
+    }
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java
index e8f3099..b02fb07 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java
@@ -84,4 +84,33 @@ public class QueryMessageTransfer extends AbstractReferenceCounted implements Fi
     protected void deallocate() {
         this.queryMessageResult.release();
     }
+
+
+    @Override
+    public long transferred() {
+        return transferred;
+    }
+
+
+    @Override
+    public FileRegion retain() {
+        super.retain();
+        return this;
+    }
+
+    @Override
+    public FileRegion retain(int increment) {
+        super.retain(increment);
+        return this;
+    }
+
+    @Override
+    public FileRegion touch() {
+        return this;
+    }
+
+    @Override
+    public FileRegion touch(Object hint) {
+        return this;
+    }
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 73fe439..341907a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -107,9 +107,9 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.serialize.LanguageCode;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 import org.apache.rocketmq.store.ConsumeQueue;
 import org.apache.rocketmq.store.ConsumeQueueExt;
 import org.apache.rocketmq.store.DefaultMessageStore;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
index 41f7a8a..3df07f8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -29,7 +29,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class SubscriptionGroupManager extends ConfigManager {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
index 68d58ef..30fe3a2 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
@@ -32,9 +32,9 @@ import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
 import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.junit.Test;
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java
index 147c732..9ee9035 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java
@@ -31,7 +31,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.serialize.LanguageCode;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.junit.Before;
@@ -115,7 +115,7 @@ public class ClientManageProcessorTest {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader);
         request.setLanguage(LanguageCode.JAVA);
         request.setVersion(100);
-        request.makeCustomHeaderToNet();
+//        request.makeCustomHeaderToNet();
         return request;
     }
 
@@ -126,7 +126,7 @@ public class ClientManageProcessorTest {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader);
         request.setLanguage(LanguageCode.JAVA);
         request.setVersion(100);
-        request.makeCustomHeaderToNet();
+//        request.makeCustomHeaderToNet();
         return request;
     }
 }
\ No newline at end of file
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
index 019cc6a..7d8aa13 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
@@ -146,7 +146,7 @@ public class EndTransactionProcessorTest {
     private RemotingCommand createEndTransactionMsgCommand(int status, boolean isCheckMsg) {
         EndTransactionRequestHeader header = createEndTransactionRequestHeader(status, isCheckMsg);
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, header);
-        request.makeCustomHeaderToNet();
+//        request.makeCustomHeaderToNet();
         return request;
     }
 }
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
index c96f708..dc7b567 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
@@ -204,7 +204,7 @@ public class PullMessageProcessorTest {
         requestHeader.setSysFlag(0);
         requestHeader.setSubVersion(100L);
         RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader);
-        request.makeCustomHeaderToNet();
+//        request.makeCustomHeaderToNet();
         return request;
     }
 
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
index 792fd0f..2f56422 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
@@ -217,7 +217,7 @@ public class SendMessageProcessorTest {
         header.setSysFlag(sysFlag);
         RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, header);
         request.setBody(new byte[] {'a'});
-        request.makeCustomHeaderToNet();
+//        request.makeCustomHeaderToNet();
         return request;
     }
 
@@ -240,7 +240,7 @@ public class SendMessageProcessorTest {
 
         RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader);
         request.setBody(new byte[] {'a'});
-        request.makeCustomHeaderToNet();
+//        request.makeCustomHeaderToNet();
         return request;
     }
 
@@ -253,7 +253,7 @@ public class SendMessageProcessorTest {
         requestHeader.setOffset(123L);
 
         RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader);
-        request.makeCustomHeaderToNet();
+//        request.makeCustomHeaderToNet();
         return request;
     }
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index d798164..562810f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -20,7 +20,7 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.serialize.LanguageCode;
 
 /**
  * Client Common configuration
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java
index 7dfd97a..2c8d973 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java
@@ -20,7 +20,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 /**
  * Wrapper class for offset serialization
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 1837204..0fa1ae7 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -146,11 +146,11 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
 import org.apache.rocketmq.remoting.netty.ResponseFuture;
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.serialize.LanguageCode;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
+import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
 
 public class MQClientAPIImpl {
 
@@ -1210,6 +1210,7 @@ public class MQClientAPIImpl {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
 
         RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
+        log.info("getTopicRouteInfoFromNameServer response: " + response);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.TOPIC_NOT_EXIST: {
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 9732d0e..2e6078f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -35,7 +35,6 @@ import org.apache.rocketmq.common.message.MessageId;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
-import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
 
 /**
  * This class is the entry point for applications intending to send messages.
diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index c225afd..49223f0 100644
--- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -45,7 +45,7 @@ import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.remoting.exception.RemotingException;
-import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
diff --git a/common/src/main/java/org/apache/rocketmq/common/DataVersion.java b/common/src/main/java/org/apache/rocketmq/common/DataVersion.java
index e54000d..a2756e8 100644
--- a/common/src/main/java/org/apache/rocketmq/common/DataVersion.java
+++ b/common/src/main/java/org/apache/rocketmq/common/DataVersion.java
@@ -17,7 +17,7 @@
 package org.apache.rocketmq.common;
 
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class DataVersion extends RemotingSerializable {
     private long timestamp = System.currentTimeMillis();
diff --git a/common/src/main/java/org/apache/rocketmq/common/admin/ConsumeStats.java b/common/src/main/java/org/apache/rocketmq/common/admin/ConsumeStats.java
index 6b1c492..a4d14a6 100644
--- a/common/src/main/java/org/apache/rocketmq/common/admin/ConsumeStats.java
+++ b/common/src/main/java/org/apache/rocketmq/common/admin/ConsumeStats.java
@@ -20,7 +20,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map.Entry;
 import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class ConsumeStats extends RemotingSerializable {
     private HashMap<MessageQueue, OffsetWrapper> offsetTable = new HashMap<MessageQueue, OffsetWrapper>();
diff --git a/common/src/main/java/org/apache/rocketmq/common/admin/TopicStatsTable.java b/common/src/main/java/org/apache/rocketmq/common/admin/TopicStatsTable.java
index 729075c..e30fd78 100644
--- a/common/src/main/java/org/apache/rocketmq/common/admin/TopicStatsTable.java
+++ b/common/src/main/java/org/apache/rocketmq/common/admin/TopicStatsTable.java
@@ -18,7 +18,7 @@ package org.apache.rocketmq.common.admin;
 
 import java.util.HashMap;
 import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class TopicStatsTable extends RemotingSerializable {
     private HashMap<MessageQueue, TopicOffset> offsetTable = new HashMap<MessageQueue, TopicOffset>();
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java
deleted file mode 100644
index d8c1ced..0000000
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.common.protocol;
-
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-
-public class MQProtosHelper {
-    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
-
-    public static boolean registerBrokerToNameServer(final String nsaddr, final String brokerAddr,
-        final long timeoutMillis) {
-        RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
-        requestHeader.setBrokerAddr(brokerAddr);
-
-        RemotingCommand request =
-            RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
-
-        try {
-            RemotingCommand response = RemotingHelper.invokeSync(nsaddr, request, timeoutMillis);
-            if (response != null) {
-                return ResponseCode.SUCCESS == response.getCode();
-            }
-        } catch (Exception e) {
-            log.error("Failed to register broker", e);
-        }
-
-        return false;
-    }
-}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/BrokerStatsData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/BrokerStatsData.java
index c4ff63d..4a6e0ae 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/BrokerStatsData.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/BrokerStatsData.java
@@ -17,7 +17,7 @@
 
 package org.apache.rocketmq.common.protocol.body;
 
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class BrokerStatsData extends RemotingSerializable {
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/CheckClientRequestBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/CheckClientRequestBody.java
index a78ce55..b98ce95 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/CheckClientRequestBody.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/CheckClientRequestBody.java
@@ -18,7 +18,7 @@
 package org.apache.rocketmq.common.protocol.body;
 
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class CheckClientRequestBody extends RemotingSerializable {
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java
index 76c64a8..566bf93 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java
@@ -22,7 +22,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Set;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class ClusterInfo extends RemotingSerializable {
     private HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/Connection.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/Connection.java
index b42737f..dd0bf81 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/Connection.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/Connection.java
@@ -17,7 +17,7 @@
 
 package org.apache.rocketmq.common.protocol.body;
 
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.serialize.LanguageCode;
 
 public class Connection {
     private String clientId;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeByWho.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeByWho.java
index 7b20d76..f600991 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeByWho.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeByWho.java
@@ -17,7 +17,7 @@
 package org.apache.rocketmq.common.protocol.body;
 
 import java.util.HashSet;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class ConsumeByWho extends RemotingSerializable {
     private HashSet<String> consumedGroup = new HashSet<String>();
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java
index 674df60..02e37af 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java
@@ -17,7 +17,7 @@
 
 package org.apache.rocketmq.common.protocol.body;
 
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class ConsumeMessageDirectlyResult extends RemotingSerializable {
     private boolean order = false;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeStatsList.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeStatsList.java
index 7b35a80..28c5a7d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeStatsList.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeStatsList.java
@@ -20,7 +20,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import org.apache.rocketmq.common.admin.ConsumeStats;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class ConsumeStatsList extends RemotingSerializable {
     private List<Map<String/*subscriptionGroupName*/, List<ConsumeStats>>> consumeStatsList = new ArrayList<Map<String/*subscriptionGroupName*/, List<ConsumeStats>>>();
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java
index 3a0356c..d3ac81e 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java
@@ -24,7 +24,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class ConsumerConnection extends RemotingSerializable {
     private HashSet<Connection> connectionSet = new HashSet<Connection>();
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java
index 5b08d78..9fe7382 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java
@@ -19,7 +19,7 @@ package org.apache.rocketmq.common.protocol.body;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class ConsumerOffsetSerializeWrapper extends RemotingSerializable {
     private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java
index d7942eb..c0be419 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java
@@ -25,7 +25,7 @@ import java.util.TreeSet;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class ConsumerRunningInfo extends RemotingSerializable {
     public static final String PROP_NAMESERVER_ADDR = "PROP_NAMESERVER_ADDR";
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/GetConsumerStatusBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/GetConsumerStatusBody.java
index 6234a77..f654843 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/GetConsumerStatusBody.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/GetConsumerStatusBody.java
@@ -20,7 +20,7 @@ package org.apache.rocketmq.common.protocol.body;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 @Deprecated
 public class GetConsumerStatusBody extends RemotingSerializable {
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/GroupList.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/GroupList.java
index 862a739..97ffb74 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/GroupList.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/GroupList.java
@@ -17,7 +17,7 @@
 package org.apache.rocketmq.common.protocol.body;
 
 import java.util.HashSet;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class GroupList extends RemotingSerializable {
     private HashSet<String> groupList = new HashSet<String>();
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/KVTable.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/KVTable.java
index 28aafc4..e1290fd 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/KVTable.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/KVTable.java
@@ -17,7 +17,7 @@
 package org.apache.rocketmq.common.protocol.body;
 
 import java.util.HashMap;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class KVTable extends RemotingSerializable {
     private HashMap<String, String> table = new HashMap<String, String>();
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/LockBatchRequestBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/LockBatchRequestBody.java
index 480862b..0851687 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/LockBatchRequestBody.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/LockBatchRequestBody.java
@@ -20,7 +20,7 @@ package org.apache.rocketmq.common.protocol.body;
 import java.util.HashSet;
 import java.util.Set;
 import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class LockBatchRequestBody extends RemotingSerializable {
     private String consumerGroup;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/LockBatchResponseBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/LockBatchResponseBody.java
index 5018f20..56bd1a4 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/LockBatchResponseBody.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/LockBatchResponseBody.java
@@ -20,7 +20,7 @@ package org.apache.rocketmq.common.protocol.body;
 import java.util.HashSet;
 import java.util.Set;
 import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class LockBatchResponseBody extends RemotingSerializable {
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProducerConnection.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProducerConnection.java
index 14d7110..b5d3a2e 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProducerConnection.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProducerConnection.java
@@ -18,7 +18,7 @@
 package org.apache.rocketmq.common.protocol.body;
 
 import java.util.HashSet;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class ProducerConnection extends RemotingSerializable {
     private HashSet<Connection> connectionSet = new HashSet<Connection>();
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryConsumeQueueResponseBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryConsumeQueueResponseBody.java
index be93da9..7f53e05 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryConsumeQueueResponseBody.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryConsumeQueueResponseBody.java
@@ -18,7 +18,7 @@
 package org.apache.rocketmq.common.protocol.body;
 
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 import java.util.List;
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryConsumeTimeSpanBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryConsumeTimeSpanBody.java
index cae2109..82b4e85 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryConsumeTimeSpanBody.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryConsumeTimeSpanBody.java
@@ -19,7 +19,7 @@ package org.apache.rocketmq.common.protocol.body;
 
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class QueryConsumeTimeSpanBody extends RemotingSerializable {
     List<QueueTimeSpan> consumeTimeSpanSet = new ArrayList<QueueTimeSpan>();
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryCorrectionOffsetBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryCorrectionOffsetBody.java
index f7c866a..c7ccd12 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryCorrectionOffsetBody.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryCorrectionOffsetBody.java
@@ -18,7 +18,7 @@ package org.apache.rocketmq.common.protocol.body;
 
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class QueryCorrectionOffsetBody extends RemotingSerializable {
     private Map<Integer, Long> correctionOffsets = new HashMap<Integer, Long>();
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
index 4065c08..d3f8833 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
@@ -36,7 +36,7 @@ import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class RegisterBrokerBody extends RemotingSerializable {
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResetOffsetBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResetOffsetBody.java
index b28e74b..df72758 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResetOffsetBody.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResetOffsetBody.java
@@ -19,7 +19,7 @@ package org.apache.rocketmq.common.protocol.body;
 
 import java.util.Map;
 import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class ResetOffsetBody extends RemotingSerializable {
     private Map<MessageQueue, Long> offsetTable;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResetOffsetBodyForC.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResetOffsetBodyForC.java
index fa812ed..e15322f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResetOffsetBodyForC.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResetOffsetBodyForC.java
@@ -19,7 +19,7 @@ package org.apache.rocketmq.common.protocol.body;
 
 import java.util.List;
 import org.apache.rocketmq.common.message.MessageQueueForC;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class ResetOffsetBodyForC extends RemotingSerializable {
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java
index e05f759..d966980 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java
@@ -21,7 +21,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class SubscriptionGroupWrapper extends RemotingSerializable {
     private ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
index ce12302..d944b27 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
@@ -21,7 +21,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class TopicConfigSerializeWrapper extends RemotingSerializable {
     private ConcurrentMap<String, TopicConfig> topicConfigTable =
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicList.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicList.java
index baf8312..3a3e13e 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicList.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicList.java
@@ -18,7 +18,7 @@ package org.apache.rocketmq.common.protocol.body;
 
 import java.util.HashSet;
 import java.util.Set;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class TopicList extends RemotingSerializable {
     private Set<String> topicList = new HashSet<String>();
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/UnlockBatchRequestBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/UnlockBatchRequestBody.java
index baf4071..d8c73a2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/UnlockBatchRequestBody.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/UnlockBatchRequestBody.java
@@ -20,7 +20,7 @@ package org.apache.rocketmq.common.protocol.body;
 import java.util.HashSet;
 import java.util.Set;
 import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class UnlockBatchRequestBody extends RemotingSerializable {
     private String consumerGroup;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerListByGroupResponseBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerListByGroupResponseBody.java
index da39e77..2ab1578 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerListByGroupResponseBody.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerListByGroupResponseBody.java
@@ -18,7 +18,7 @@
 package org.apache.rocketmq.common.protocol.header;
 
 import java.util.List;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class GetConsumerListByGroupResponseBody extends RemotingSerializable {
     private List<String> consumerIdList;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/HeartbeatData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/HeartbeatData.java
index 47ae542..03151f5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/HeartbeatData.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/HeartbeatData.java
@@ -22,7 +22,7 @@ package org.apache.rocketmq.common.protocol.heartbeat;
 
 import java.util.HashSet;
 import java.util.Set;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class HeartbeatData extends RemotingSerializable {
     private String clientID;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
index e8f54b8..ed9d644 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
@@ -23,7 +23,7 @@ package org.apache.rocketmq.common.protocol.route;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class TopicRouteData extends RemotingSerializable {
     private String orderTopicConf;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/topic/OffsetMovedEvent.java b/common/src/main/java/org/apache/rocketmq/common/protocol/topic/OffsetMovedEvent.java
index 1c03c07..a769f95 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/topic/OffsetMovedEvent.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/topic/OffsetMovedEvent.java
@@ -18,7 +18,7 @@
 package org.apache.rocketmq.common.protocol.topic;
 
 import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class OffsetMovedEvent extends RemotingSerializable {
     private String consumerGroup;
diff --git a/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java b/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java
index 4a2e790..5f22d82 100644
--- a/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java
@@ -18,7 +18,7 @@
 package org.apache.rocketmq.common.protocol;
 
 import org.apache.rocketmq.common.protocol.body.ConsumeStatus;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 import org.junit.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
index a6654f2..6faccf7 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
@@ -32,13 +32,13 @@ import org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor;
 import org.apache.rocketmq.namesrv.routeinfo.BrokerHousekeepingService;
 import org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager;
 import org.apache.rocketmq.remoting.RemotingServer;
+import org.apache.rocketmq.remoting.RemotingServerFactory;
 import org.apache.rocketmq.remoting.common.TlsMode;
-import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
+import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingServer;
 import org.apache.rocketmq.srvutil.FileWatchService;
 
-
 public class NamesrvController {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
 
@@ -77,7 +77,9 @@ public class NamesrvController {
 
         this.kvConfigManager.load();
 
-        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
+        this.remotingServer = RemotingServerFactory.getRemotingServer();
+        this.remotingServer.init(this.nettyServerConfig, this.brokerHousekeepingService);
+//        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
 
         this.remotingExecutor =
             Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
@@ -111,6 +113,7 @@ public class NamesrvController {
                     },
                     new FileWatchService.Listener() {
                         boolean certChanged, keyChanged = false;
+
                         @Override
                         public void onChanged(String path) {
                             if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
@@ -129,6 +132,7 @@ public class NamesrvController {
                                 reloadServerSslContext();
                             }
                         }
+
                         private void reloadServerSslContext() {
                             ((NettyRemotingServer) remotingServer).loadSslContext();
                         }
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigSerializeWrapper.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigSerializeWrapper.java
index 3b53e19..59b2161 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigSerializeWrapper.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigSerializeWrapper.java
@@ -17,7 +17,7 @@
 package org.apache.rocketmq.namesrv.kvconfig;
 
 import java.util.HashMap;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class KVConfigSerializeWrapper extends RemotingSerializable {
     private HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable;
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index 467078c..dc32445 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -69,7 +69,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
     @Override
     public RemotingCommand processRequest(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
-
+        log.info("receive remoting command: " + request);
         if (ctx != null) {
             log.debug("receive request, {} {} {}",
                 request.getCode(),
@@ -280,7 +280,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
         final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
         final RegisterBrokerRequestHeader requestHeader =
             (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
-
+        log.info("requestHeader:  " + requestHeader );
         if (!checksum(ctx, request, requestHeader)) {
             response.setCode(ResponseCode.SYSTEM_ERROR);
             response.setRemark("crc32 not match");
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
index d673510..1ce127f 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
@@ -37,7 +37,7 @@ import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.serialize.LanguageCode;
 
 public class PullConsumerImpl implements PullConsumer {
     private final DefaultMQPullConsumer rocketmqPullConsumer;
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
index d5d394a..46f9a45 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
@@ -39,7 +39,7 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.serialize.LanguageCode;
 
 public class PushConsumerImpl implements PushConsumer {
     private final DefaultMQPushConsumer rocketmqPushConsumer;
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
index 3db8590..95daef3 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
@@ -37,7 +37,7 @@ import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.serialize.LanguageCode;
 
 import static io.openmessaging.rocketmq.utils.OMSUtil.buildInstanceName;
 
diff --git a/pom.xml b/pom.xml
index 0a8fef8..0a8a41e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -544,7 +544,7 @@
             <dependency>
                 <groupId>io.netty</groupId>
                 <artifactId>netty-all</artifactId>
-                <version>4.0.42.Final</version>
+                <version>4.1.32.Final</version>
             </dependency>
             <dependency>
                 <groupId>com.alibaba</groupId>
@@ -552,6 +552,11 @@
                 <version>1.2.51</version>
             </dependency>
             <dependency>
+                <groupId>org.msgpack</groupId>
+                <artifactId>msgpack</artifactId>
+                <version>0.6.12</version>
+            </dependency>
+            <dependency>
                 <groupId>org.javassist</groupId>
                 <artifactId>javassist</artifactId>
                 <version>3.20.0-GA</version>
diff --git a/remoting/pom.xml b/remoting/pom.xml
index 55d92f3..26e5bfc 100644
--- a/remoting/pom.xml
+++ b/remoting/pom.xml
@@ -38,6 +38,10 @@
             <artifactId>fastjson</artifactId>
         </dependency>
         <dependency>
+        <groupId>org.msgpack</groupId>
+        <artifactId>msgpack</artifactId>
+        </dependency>
+        <dependency>
             <groupId>io.netty</groupId>
             <artifactId>netty-all</artifactId>
         </dependency>
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
index c0754db..ab4e914 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
@@ -22,6 +22,7 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
@@ -51,4 +52,6 @@ public interface RemotingClient extends RemotingService {
     ExecutorService getCallbackExecutor();
 
     boolean isChannelWritable(final String addr);
+
+    void init(NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener);
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java
new file mode 100644
index 0000000..5e87ec9
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java
@@ -0,0 +1,33 @@
+package org.apache.rocketmq.remoting;
+
+import java.util.Map;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.util.RemotingUtil;
+import org.apache.rocketmq.remoting.util.ServiceProvider;
+
+public class RemotingClientFactory {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+
+    private RemotingClientFactory() {
+    }
+
+    private static Map<String, RemotingClient> clients;
+
+    private static final String CLIENT_LOCATION = "META-INF/service/org.apache.rocketmq.remoting.RemotingClient";
+
+    static {
+        log.info("begin load client");
+        clients = ServiceProvider.load(CLIENT_LOCATION, RemotingClient.class);
+        log.info("end load client, size:{}", clients.size());
+    }
+
+    public static RemotingClient getClient(String protocolType) {
+        return clients.get(protocolType);
+    }
+
+    public static RemotingClient getClient() {
+        return clients.get(RemotingUtil.DEFAULT_PROTOCOL);
+    }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
index a12c089..6a5fb91 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
@@ -23,6 +23,7 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 public interface RemotingServer extends RemotingService {
@@ -36,6 +37,8 @@ public interface RemotingServer extends RemotingService {
 
     Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
 
+    void push(final String addr, final String sessionId, RemotingCommand remotingCommand);
+
     RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
         final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
         RemotingTimeoutException;
@@ -48,4 +51,6 @@ public interface RemotingServer extends RemotingService {
         throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
         RemotingSendRequestException;
 
+    void init(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener);
+
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java
new file mode 100644
index 0000000..e7a7700
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java
@@ -0,0 +1,40 @@
+package org.apache.rocketmq.remoting;
+
+import java.util.Map;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.util.RemotingUtil;
+import org.apache.rocketmq.remoting.util.ServiceProvider;
+
+public class RemotingServerFactory {
+
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+
+    private RemotingServerFactory() {
+    }
+
+    private static Map<String, RemotingServer> servers;
+
+//    private static Map<String/*protocolType*/, String/*path*/ >
+
+    private static final String SERVER_LOCATION = "META-INF/service/org.apache.rocketmq.remoting.RemotingServer";
+
+    static {
+        log.info("begin load server");
+        servers = ServiceProvider.load(SERVER_LOCATION, RemotingClient.class);
+        log.info("end load server, size:{}", servers.size());
+    }
+
+    public static RemotingServer getRemotingServer() {
+        return getRemotingServer(RemotingUtil.DEFAULT_PROTOCOL);
+    }
+
+    public static RemotingServer getRemotingServer(String protocolType) {
+        return servers.get(protocolType);
+    }
+
+//    public static RemotingServer createNewInstance(String protocolType){
+//        return ServiceProvider.load()
+//    }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
index 585b60b..0414996 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
@@ -58,98 +58,6 @@ public class RemotingHelper {
         return isa;
     }
 
-    public static RemotingCommand invokeSync(final String addr, final RemotingCommand request,
-        final long timeoutMillis) throws InterruptedException, RemotingConnectException,
-        RemotingSendRequestException, RemotingTimeoutException {
-        long beginTime = System.currentTimeMillis();
-        SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
-        SocketChannel socketChannel = RemotingUtil.connect(socketAddress);
-        if (socketChannel != null) {
-            boolean sendRequestOK = false;
-
-            try {
-
-                socketChannel.configureBlocking(true);
-
-                //bugfix  http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802
-                socketChannel.socket().setSoTimeout((int) timeoutMillis);
-
-                ByteBuffer byteBufferRequest = request.encode();
-                while (byteBufferRequest.hasRemaining()) {
-                    int length = socketChannel.write(byteBufferRequest);
-                    if (length > 0) {
-                        if (byteBufferRequest.hasRemaining()) {
-                            if ((System.currentTimeMillis() - beginTime) > timeoutMillis) {
-
-                                throw new RemotingSendRequestException(addr);
-                            }
-                        }
-                    } else {
-                        throw new RemotingSendRequestException(addr);
-                    }
-
-                    Thread.sleep(1);
-                }
-
-                sendRequestOK = true;
-
-                ByteBuffer byteBufferSize = ByteBuffer.allocate(4);
-                while (byteBufferSize.hasRemaining()) {
-                    int length = socketChannel.read(byteBufferSize);
-                    if (length > 0) {
-                        if (byteBufferSize.hasRemaining()) {
-                            if ((System.currentTimeMillis() - beginTime) > timeoutMillis) {
-
-                                throw new RemotingTimeoutException(addr, timeoutMillis);
-                            }
-                        }
-                    } else {
-                        throw new RemotingTimeoutException(addr, timeoutMillis);
-                    }
-
-                    Thread.sleep(1);
-                }
-
-                int size = byteBufferSize.getInt(0);
-                ByteBuffer byteBufferBody = ByteBuffer.allocate(size);
-                while (byteBufferBody.hasRemaining()) {
-                    int length = socketChannel.read(byteBufferBody);
-                    if (length > 0) {
-                        if (byteBufferBody.hasRemaining()) {
-                            if ((System.currentTimeMillis() - beginTime) > timeoutMillis) {
-
-                                throw new RemotingTimeoutException(addr, timeoutMillis);
-                            }
-                        }
-                    } else {
-                        throw new RemotingTimeoutException(addr, timeoutMillis);
-                    }
-
-                    Thread.sleep(1);
-                }
-
-                byteBufferBody.flip();
-                return RemotingCommand.decode(byteBufferBody);
-            } catch (IOException e) {
-                log.error("invokeSync failure", e);
-
-                if (sendRequestOK) {
-                    throw new RemotingTimeoutException(addr, timeoutMillis);
-                } else {
-                    throw new RemotingSendRequestException(addr);
-                }
-            } finally {
-                try {
-                    socketChannel.close();
-                } catch (IOException e) {
-                    e.printStackTrace();
-                }
-            }
-        } else {
-            throw new RemotingConnectException(addr);
-        }
-    }
-
     public static String parseChannelRemoteAddr(final Channel channel) {
         if (null == channel) {
             return "";
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommandType.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ChannelMetrics.java
old mode 100644
new mode 100755
similarity index 80%
copy from remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommandType.java
copy to remoting/src/main/java/org/apache/rocketmq/remoting/netty/ChannelMetrics.java
index 01c853b..c56b93d
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommandType.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ChannelMetrics.java
@@ -14,9 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.remoting.protocol;
 
-public enum RemotingCommandType {
-    REQUEST_COMMAND,
-    RESPONSE_COMMAND;
+package org.apache.rocketmq.remoting.netty;
+
+import io.netty.channel.group.ChannelGroup;
+
+public interface ChannelMetrics {
+
+    Integer getChannelCount();
+
+    ChannelGroup getChannels();
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ChannelStatisticsHandler.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ChannelStatisticsHandler.java
new file mode 100755
index 0000000..6aa5fd9
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ChannelStatisticsHandler.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.netty;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.group.ChannelGroup;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ChannelStatisticsHandler extends ChannelDuplexHandler implements ChannelMetrics {
+    public static final String NAME = ChannelStatisticsHandler.class.getSimpleName();
+    private final AtomicInteger channelCount = new AtomicInteger(0);
+    private final ChannelGroup allChannels;
+
+    public ChannelStatisticsHandler(ChannelGroup allChannels) {
+        this.allChannels = allChannels;
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        // connect
+        channelCount.incrementAndGet();
+        allChannels.add(ctx.channel());
+        super.channelActive(ctx);
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        // disconnect
+        channelCount.decrementAndGet();
+        allChannels.remove(ctx.channel());
+        super.channelInactive(ctx);
+    }
+
+    @Override
+    public Integer getChannelCount() {
+        return channelCount.get();
+    }
+
+    @Override
+    public ChannelGroup getChannels() {
+        return allChannels;
+    }
+
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/CodecHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/CodecHelper.java
new file mode 100644
index 0000000..d0e3632
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/CodecHelper.java
@@ -0,0 +1,283 @@
+package org.apache.rocketmq.remoting.netty;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RocketMQSerializable;
+import org.apache.rocketmq.remoting.serialize.SerializeType;
+import org.apache.rocketmq.remoting.serialize.Serializer;
+import org.apache.rocketmq.remoting.serialize.SerializerFactory;
+
+public class CodecHelper {
+    public static final String REMOTING_VERSION_KEY = "rocketmq.remoting.version";
+    protected static final Map<Class<? extends CommandCustomHeader>, Field[]> CLASS_HASH_MAP =
+        new HashMap<Class<? extends CommandCustomHeader>, Field[]>();
+    protected static final Map<Class, String> CANONICAL_NAME_CACHE = new HashMap<Class, String>();
+    private static final Map<Field, Boolean> NULLABLE_FIELD_CACHE = new HashMap<Field, Boolean>();
+    private static final String STRING_CANONICAL_NAME = String.class.getCanonicalName();
+    private static final String DOUBLE_CANONICAL_NAME_1 = Double.class.getCanonicalName();
+    private static final String DOUBLE_CANONICAL_NAME_2 = double.class.getCanonicalName();
+    private static final String INTEGER_CANONICAL_NAME_1 = Integer.class.getCanonicalName();
+    private static final String INTEGER_CANONICAL_NAME_2 = int.class.getCanonicalName();
+    private static final String LONG_CANONICAL_NAME_1 = Long.class.getCanonicalName();
+    private static final String LONG_CANONICAL_NAME_2 = long.class.getCanonicalName();
+    private static final String BOOLEAN_CANONICAL_NAME_1 = Boolean.class.getCanonicalName();
+    private static final String BOOLEAN_CANONICAL_NAME_2 = boolean.class.getCanonicalName();
+
+    public static RemotingCommand decode(final byte[] array) {
+        ByteBuffer byteBuffer = ByteBuffer.wrap(array);
+        return decode(byteBuffer);
+    }
+
+    public static RemotingCommand decode(final ByteBuffer byteBuffer) {
+        int length = byteBuffer.limit();
+        int oriHeaderLen = byteBuffer.getInt();
+        int headerLength = getHeaderLength(oriHeaderLen);
+
+        byte[] headerData = new byte[headerLength];
+        byteBuffer.get(headerData);
+        RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
+        System.out.println("cmd: " + cmd);
+        int bodyLength = length - 4 - headerLength;
+        byte[] bodyData = null;
+        if (bodyLength > 0) {
+            bodyData = new byte[bodyLength];
+            byteBuffer.get(bodyData);
+        }
+        cmd.setBody(bodyData);
+        return cmd;
+    }
+
+    public static int getHeaderLength(int length) {
+        return length & 0xFFFFFF;
+    }
+
+    private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
+        Serializer serializer = SerializerFactory.get(type);
+        if (serializer != null) {
+            RemotingCommand remotingCommand = serializer.deserializer(headerData);
+            return remotingCommand;
+        }
+        return null;
+    }
+
+    public static SerializeType getProtocolType(int source) {
+        return SerializeType.valueOf((byte) ((source >> 24) & 0xFF));
+    }
+
+    public static byte[] markProtocolType(int source, SerializeType type) {
+        byte[] result = new byte[4];
+
+        result[0] = type.getCode();
+        result[1] = (byte) ((source >> 16) & 0xFF);
+        result[2] = (byte) ((source >> 8) & 0xFF);
+        result[3] = (byte) (source & 0xFF);
+        return result;
+    }
+
+    public static CommandCustomHeader decodeCommandCustomHeader(RemotingCommand remotingCommand,
+        Class<? extends CommandCustomHeader> classHeader) throws RemotingCommandException {
+        CommandCustomHeader objectHeader;
+        try {
+            objectHeader = classHeader.newInstance();
+        } catch (InstantiationException e) {
+            return null;
+        } catch (IllegalAccessException e) {
+            return null;
+        }
+
+        if (remotingCommand.getExtFields() != null) {
+            Field[] fields = getClazzFields(classHeader);
+            for (Field field : fields) {
+                if (!Modifier.isStatic(field.getModifiers())) {
+                    String fieldName = field.getName();
+                    if (!fieldName.startsWith("this")) {
+                        try {
+                            String value = remotingCommand.getExtFields().get(fieldName);
+                            if (null == value) {
+                                if (!isFieldNullable(field)) {
+                                    throw new RemotingCommandException("the custom field <" + fieldName + "> is null");
+                                }
+                                continue;
+                            }
+
+                            field.setAccessible(true);
+                            String type = getCanonicalName(field.getType());
+                            Object valueParsed;
+
+                            if (type.equals(STRING_CANONICAL_NAME)) {
+                                valueParsed = value;
+                            } else if (type.equals(INTEGER_CANONICAL_NAME_1) || type.equals(INTEGER_CANONICAL_NAME_2)) {
+                                valueParsed = Integer.parseInt(value);
+                            } else if (type.equals(LONG_CANONICAL_NAME_1) || type.equals(LONG_CANONICAL_NAME_2)) {
+                                valueParsed = Long.parseLong(value);
+                            } else if (type.equals(BOOLEAN_CANONICAL_NAME_1) || type.equals(BOOLEAN_CANONICAL_NAME_2)) {
+                                valueParsed = Boolean.parseBoolean(value);
+                            } else if (type.equals(DOUBLE_CANONICAL_NAME_1) || type.equals(DOUBLE_CANONICAL_NAME_2)) {
+                                valueParsed = Double.parseDouble(value);
+                            } else {
+                                throw new RemotingCommandException("the custom field <" + fieldName + "> type is not supported");
+                            }
+
+                            field.set(objectHeader, valueParsed);
+
+                        } catch (Throwable e) {
+                        }
+                    }
+                }
+            }
+
+            objectHeader.checkFields();
+        }
+
+        return objectHeader;
+    }
+
+    private static Field[] getClazzFields(Class<? extends CommandCustomHeader> classHeader) {
+        Field[] field = CLASS_HASH_MAP.get(classHeader);
+
+        if (field == null) {
+            field = classHeader.getDeclaredFields();
+            synchronized (CLASS_HASH_MAP) {
+                CLASS_HASH_MAP.put(classHeader, field);
+            }
+        }
+        return field;
+    }
+
+    private static boolean isFieldNullable(Field field) {
+        if (!NULLABLE_FIELD_CACHE.containsKey(field)) {
+            Annotation annotation = field.getAnnotation(CFNotNull.class);
+            synchronized (NULLABLE_FIELD_CACHE) {
+                NULLABLE_FIELD_CACHE.put(field, annotation == null);
+            }
+        }
+        return NULLABLE_FIELD_CACHE.get(field);
+    }
+
+    private static String getCanonicalName(Class clazz) {
+        String name = CANONICAL_NAME_CACHE.get(clazz);
+
+        if (name == null) {
+            name = clazz.getCanonicalName();
+            synchronized (CANONICAL_NAME_CACHE) {
+                CANONICAL_NAME_CACHE.put(clazz, name);
+            }
+        }
+        return name;
+    }
+
+    public static ByteBuffer encode(RemotingCommand remotingCommand) {
+        // 1> header length size
+        int length = 4;
+
+        // 2> header data length
+        byte[] headerData = headerEncode(remotingCommand);
+        length += headerData.length;
+
+        // 3> body data length
+        if (remotingCommand.getBody() != null) {
+            length += remotingCommand.getBody().length;
+        }
+
+        ByteBuffer result = ByteBuffer.allocate(4 + length);
+
+        // length
+        result.putInt(length);
+
+        // header length
+        result.put(markProtocolType(headerData.length, remotingCommand.getSerializeTypeCurrentRPC()));
+
+        // header data
+        result.put(headerData);
+
+        // body data;
+        if (remotingCommand.getBody() != null) {
+            result.put(remotingCommand.getBody());
+        }
+
+        result.flip();
+
+        return result;
+    }
+
+    private static byte[] headerEncode(RemotingCommand remotingCommand) {
+        makeCustomHeaderToNet(remotingCommand);
+        Serializer serializer = SerializerFactory.get(remotingCommand.getSerializeTypeCurrentRPC());
+        if (serializer == null) {
+            serializer = SerializerFactory.get(SerializeType.JSON);
+        }
+        return serializer.serializer(remotingCommand);
+    }
+
+    public static void makeCustomHeaderToNet(RemotingCommand remotingCommand) {
+        if (remotingCommand.getCustomHeader() != null) {
+            Field[] fields = getClazzFields(remotingCommand.getCustomHeader().getClass());
+            HashMap extFields = remotingCommand.getExtFields();
+
+            if (null == extFields) {
+                remotingCommand.setExtFields(new HashMap<String, String>());
+            }
+            for (Field field : fields) {
+                if (!Modifier.isStatic(field.getModifiers())) {
+                    String name = field.getName();
+                    if (!name.startsWith("this")) {
+                        Object value = null;
+                        try {
+                            field.setAccessible(true);
+                            value = field.get(remotingCommand.getCustomHeader());
+                        } catch (Exception e) {
+                        }
+
+                        if (value != null) {
+                            remotingCommand.getExtFields().put(name, value.toString());
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    public static ByteBuffer encodeHeader(RemotingCommand remotingCommand) {
+        int bodyLength = remotingCommand.getBody() != null ? remotingCommand.getBody().length : 0;
+        return encodeHeader(bodyLength, remotingCommand);
+    }
+
+    public static ByteBuffer encodeHeader(final int bodyLength, RemotingCommand remotingCommand) {
+        // 1> header length size
+        int length = 4;
+
+        // 2> header data length
+        byte[] headerData;
+        headerData = headerEncode(remotingCommand);
+
+        length += headerData.length;
+
+        // 3> body data length
+        length += bodyLength;
+
+        ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
+
+        // length
+        result.putInt(length);
+
+        // header length
+        result.put(markProtocolType(headerData.length, remotingCommand.getSerializeTypeCurrentRPC()));
+
+        // header data
+        result.put(headerData);
+
+        result.flip();
+
+        return result;
+    }
+
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java
index 2bd15ae..0c5e52d 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java
@@ -68,7 +68,7 @@ public class FileRegionEncoder extends MessageToByteEncoder<FileRegion> {
         long toTransfer = msg.count();
 
         while (true) {
-            long transferred = msg.transfered();
+            long transferred = msg.transferred();
             if (toTransfer - transferred <= 0) {
                 break;
             }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyLogger.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyLogger.java
index 4b4e86e..ccd1b14 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyLogger.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyLogger.java
@@ -17,13 +17,11 @@
 
 package org.apache.rocketmq.remoting.netty;
 
-
 import io.netty.util.internal.logging.InternalLogLevel;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 
-import java.util.concurrent.atomic.AtomicBoolean;
-
 public class NettyLogger {
 
     private static AtomicBoolean nettyLoggerSeted = new AtomicBoolean(false);
@@ -52,6 +50,30 @@ public class NettyLogger {
 
         private InternalLogger logger = null;
 
+        @Override public void trace(Throwable throwable) {
+
+        }
+
+        @Override public void debug(Throwable throwable) {
+
+        }
+
+        @Override public void info(Throwable throwable) {
+
+        }
+
+        @Override public void warn(Throwable throwable) {
+
+        }
+
+        @Override public void error(Throwable throwable) {
+
+        }
+
+        @Override public void log(InternalLogLevel level, Throwable throwable) {
+
+        }
+
         public NettyBridgeLogger(String name) {
             logger = InternalLoggerFactory.getLogger(name);
         }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 8dccebc..17053ff 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -17,11 +17,15 @@
 package org.apache.rocketmq.remoting.netty;
 
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
 import java.net.SocketAddress;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -33,13 +37,16 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.remoting.ChannelEventListener;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.RemotingServer;
 import org.apache.rocketmq.remoting.common.Pair;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
 import org.apache.rocketmq.remoting.common.ServiceThread;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
@@ -49,6 +56,7 @@ import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
+import org.apache.rocketmq.remoting.util.ThreadUtils;
 
 public abstract class NettyRemotingAbstract {
 
@@ -60,12 +68,12 @@ public abstract class NettyRemotingAbstract {
     /**
      * Semaphore to limit maximum number of on-going one-way requests, which protects system memory footprint.
      */
-    protected final Semaphore semaphoreOneway;
+    protected Semaphore semaphoreOneway;
 
     /**
      * Semaphore to limit maximum number of on-going asynchronous requests, which protects system memory footprint.
      */
-    protected final Semaphore semaphoreAsync;
+    protected Semaphore semaphoreAsync;
 
     /**
      * This map caches all on-going requests.
@@ -83,10 +91,11 @@ public abstract class NettyRemotingAbstract {
     /**
      * Executor to feed netty events to user defined {@link ChannelEventListener}.
      */
-    protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();
+    protected NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();
 
     /**
-     * The default request processor to use in case there is no exact match in {@link #processorTable} per request code.
+     * The default request processor to use in case there is no exact match in {@link #processorTable} per request
+     * code.
      */
     protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;
 
@@ -99,6 +108,13 @@ public abstract class NettyRemotingAbstract {
         NettyLogger.initNettyLogger();
     }
 
+    protected ScheduledExecutorService houseKeepingService = ThreadUtils.newSingleThreadScheduledExecutor("HouseKeepingService", true);
+
+    public NettyRemotingAbstract() {
+        this.semaphoreOneway = new Semaphore(65535, true);
+        this.semaphoreAsync = new Semaphore(65535, true);
+    }
+
     /**
      * Constructor, specifying capacity of one-way and asynchronous semaphores.
      *
@@ -110,6 +126,11 @@ public abstract class NettyRemotingAbstract {
         this.semaphoreAsync = new Semaphore(permitsAsync, true);
     }
 
+    public void init(final int permitsOneway, final int permitsAsync) {
+        this.semaphoreOneway = new Semaphore(permitsOneway, true);
+        this.semaphoreAsync = new Semaphore(permitsAsync, true);
+    }
+
     /**
      * Custom channel event listener.
      *
@@ -329,6 +350,15 @@ public abstract class NettyRemotingAbstract {
      */
     public abstract ExecutorService getCallbackExecutor();
 
+    protected void startUpHouseKeepingService() {
+        this.houseKeepingService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                scanResponseTable();
+            }
+        }, 3000, 1000, TimeUnit.MICROSECONDS);
+    }
+
     /**
      * <p>
      * This method is periodically invoked to scan and expire deprecated request.
@@ -358,6 +388,21 @@ public abstract class NettyRemotingAbstract {
         }
     }
 
+    public void start() {
+        if (getChannelEventListener() != null) {
+            nettyEventExecutor.start();
+        }
+    }
+
+    public void shutdown() {
+        if (this.nettyEventExecutor != null) {
+            this.nettyEventExecutor.shutdown();
+        }
+        if (this.houseKeepingService != null) {
+            this.houseKeepingService.shutdown();
+        }
+    }
+
     public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
         final long timeoutMillis)
         throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
@@ -410,7 +455,7 @@ public abstract class NettyRemotingAbstract {
             final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
             long costTime = System.currentTimeMillis() - beginStartTime;
             if (timeoutMillis < costTime) {
-                throw new RemotingTooMuchRequestException("invokeAsyncImpl call timeout");
+                throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
             }
 
             final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
@@ -465,6 +510,7 @@ public abstract class NettyRemotingAbstract {
 
     /**
      * mark the request of the specified channel as fail and to invoke fail callback immediately
+     *
      * @param channel the channel which is close already
      */
     protected void failFast(final Channel channel) {
@@ -570,4 +616,15 @@ public abstract class NettyRemotingAbstract {
             return NettyEventExecutor.class.getSimpleName();
         }
     }
+
+    public class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
+
+        @Override
+        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
+            processMessageReceived(ctx, msg);
+        }
+    }
+
+
+
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
deleted file mode 100644
index 33c2eed..0000000
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ /dev/null
@@ -1,710 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.remoting.netty;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.timeout.IdleState;
-import io.netty.handler.timeout.IdleStateEvent;
-import io.netty.handler.timeout.IdleStateHandler;
-import io.netty.util.concurrent.DefaultEventExecutorGroup;
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.security.cert.CertificateException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import org.apache.rocketmq.remoting.ChannelEventListener;
-import org.apache.rocketmq.remoting.InvokeCallback;
-import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.remoting.RemotingClient;
-import org.apache.rocketmq.remoting.common.Pair;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-
-public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
-    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
-
-    private static final long LOCK_TIMEOUT_MILLIS = 3000;
-
-    private final NettyClientConfig nettyClientConfig;
-    private final Bootstrap bootstrap = new Bootstrap();
-    private final EventLoopGroup eventLoopGroupWorker;
-    private final Lock lockChannelTables = new ReentrantLock();
-    private final ConcurrentMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>();
-
-    private final Timer timer = new Timer("ClientHouseKeepingService", true);
-
-    private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();
-    private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>();
-    private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex());
-    private final Lock lockNamesrvChannel = new ReentrantLock();
-
-    private final ExecutorService publicExecutor;
-
-    /**
-     * Invoke the callback methods in this executor when process response.
-     */
-    private ExecutorService callbackExecutor;
-    private final ChannelEventListener channelEventListener;
-    private DefaultEventExecutorGroup defaultEventExecutorGroup;
-    private RPCHook rpcHook;
-
-    public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {
-        this(nettyClientConfig, null);
-    }
-
-    public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
-        final ChannelEventListener channelEventListener) {
-        super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
-        this.nettyClientConfig = nettyClientConfig;
-        this.channelEventListener = channelEventListener;
-
-        int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
-        if (publicThreadNums <= 0) {
-            publicThreadNums = 4;
-        }
-
-        this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
-            private AtomicInteger threadIndex = new AtomicInteger(0);
-
-            @Override
-            public Thread newThread(Runnable r) {
-                return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
-            }
-        });
-
-        this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
-            private AtomicInteger threadIndex = new AtomicInteger(0);
-
-            @Override
-            public Thread newThread(Runnable r) {
-                return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
-            }
-        });
-
-        if (nettyClientConfig.isUseTLS()) {
-            try {
-                sslContext = TlsHelper.buildSslContext(true);
-                log.info("SSL enabled for client");
-            } catch (IOException e) {
-                log.error("Failed to create SSLContext", e);
-            } catch (CertificateException e) {
-                log.error("Failed to create SSLContext", e);
-                throw new RuntimeException("Failed to create SSLContext", e);
-            }
-        }
-    }
-
-    private static int initValueIndex() {
-        Random r = new Random();
-
-        return Math.abs(r.nextInt() % 999) % 999;
-    }
-
-    @Override
-    public void start() {
-        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
-            nettyClientConfig.getClientWorkerThreads(),
-            new ThreadFactory() {
-
-                private AtomicInteger threadIndex = new AtomicInteger(0);
-
-                @Override
-                public Thread newThread(Runnable r) {
-                    return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
-                }
-            });
-
-        Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
-            .option(ChannelOption.TCP_NODELAY, true)
-            .option(ChannelOption.SO_KEEPALIVE, false)
-            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
-            .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
-            .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
-            .handler(new ChannelInitializer<SocketChannel>() {
-                @Override
-                public void initChannel(SocketChannel ch) throws Exception {
-                    ChannelPipeline pipeline = ch.pipeline();
-                    if (nettyClientConfig.isUseTLS()) {
-                        if (null != sslContext) {
-                            pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
-                            log.info("Prepend SSL handler");
-                        } else {
-                            log.warn("Connections are insecure as SSLContext is null!");
-                        }
-                    }
-                    pipeline.addLast(
-                        defaultEventExecutorGroup,
-                        new NettyEncoder(),
-                        new NettyDecoder(),
-                        new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
-                        new NettyConnectManageHandler(),
-                        new NettyClientHandler());
-                }
-            });
-
-        this.timer.scheduleAtFixedRate(new TimerTask() {
-            @Override
-            public void run() {
-                try {
-                    NettyRemotingClient.this.scanResponseTable();
-                } catch (Throwable e) {
-                    log.error("scanResponseTable exception", e);
-                }
-            }
-        }, 1000 * 3, 1000);
-
-        if (this.channelEventListener != null) {
-            this.nettyEventExecutor.start();
-        }
-    }
-
-    @Override
-    public void shutdown() {
-        try {
-            this.timer.cancel();
-
-            for (ChannelWrapper cw : this.channelTables.values()) {
-                this.closeChannel(null, cw.getChannel());
-            }
-
-            this.channelTables.clear();
-
-            this.eventLoopGroupWorker.shutdownGracefully();
-
-            if (this.nettyEventExecutor != null) {
-                this.nettyEventExecutor.shutdown();
-            }
-
-            if (this.defaultEventExecutorGroup != null) {
-                this.defaultEventExecutorGroup.shutdownGracefully();
-            }
-        } catch (Exception e) {
-            log.error("NettyRemotingClient shutdown exception, ", e);
-        }
-
-        if (this.publicExecutor != null) {
-            try {
-                this.publicExecutor.shutdown();
-            } catch (Exception e) {
-                log.error("NettyRemotingServer shutdown exception, ", e);
-            }
-        }
-    }
-
-    public void closeChannel(final String addr, final Channel channel) {
-        if (null == channel)
-            return;
-
-        final String addrRemote = null == addr ? RemotingHelper.parseChannelRemoteAddr(channel) : addr;
-
-        try {
-            if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
-                try {
-                    boolean removeItemFromTable = true;
-                    final ChannelWrapper prevCW = this.channelTables.get(addrRemote);
-
-                    log.info("closeChannel: begin close the channel[{}] Found: {}", addrRemote, prevCW != null);
-
-                    if (null == prevCW) {
-                        log.info("closeChannel: the channel[{}] has been removed from the channel table before", addrRemote);
-                        removeItemFromTable = false;
-                    } else if (prevCW.getChannel() != channel) {
-                        log.info("closeChannel: the channel[{}] has been closed before, and has been created again, nothing to do.",
-                            addrRemote);
-                        removeItemFromTable = false;
-                    }
-
-                    if (removeItemFromTable) {
-                        this.channelTables.remove(addrRemote);
-                        log.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
-                    }
-
-                    RemotingUtil.closeChannel(channel);
-                } catch (Exception e) {
-                    log.error("closeChannel: close the channel exception", e);
-                } finally {
-                    this.lockChannelTables.unlock();
-                }
-            } else {
-                log.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
-            }
-        } catch (InterruptedException e) {
-            log.error("closeChannel exception", e);
-        }
-    }
-
-    @Override
-    public void registerRPCHook(RPCHook rpcHook) {
-        this.rpcHook = rpcHook;
-    }
-
-    public void closeChannel(final Channel channel) {
-        if (null == channel)
-            return;
-
-        try {
-            if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
-                try {
-                    boolean removeItemFromTable = true;
-                    ChannelWrapper prevCW = null;
-                    String addrRemote = null;
-                    for (Map.Entry<String, ChannelWrapper> entry : channelTables.entrySet()) {
-                        String key = entry.getKey();
-                        ChannelWrapper prev = entry.getValue();
-                        if (prev.getChannel() != null) {
-                            if (prev.getChannel() == channel) {
-                                prevCW = prev;
-                                addrRemote = key;
-                                break;
-                            }
-                        }
-                    }
-
-                    if (null == prevCW) {
-                        log.info("eventCloseChannel: the channel[{}] has been removed from the channel table before", addrRemote);
-                        removeItemFromTable = false;
-                    }
-
-                    if (removeItemFromTable) {
-                        this.channelTables.remove(addrRemote);
-                        log.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
-                        RemotingUtil.closeChannel(channel);
-                    }
-                } catch (Exception e) {
-                    log.error("closeChannel: close the channel exception", e);
-                } finally {
-                    this.lockChannelTables.unlock();
-                }
-            } else {
-                log.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
-            }
-        } catch (InterruptedException e) {
-            log.error("closeChannel exception", e);
-        }
-    }
-
-    @Override
-    public void updateNameServerAddressList(List<String> addrs) {
-        List<String> old = this.namesrvAddrList.get();
-        boolean update = false;
-
-        if (!addrs.isEmpty()) {
-            if (null == old) {
-                update = true;
-            } else if (addrs.size() != old.size()) {
-                update = true;
-            } else {
-                for (int i = 0; i < addrs.size() && !update; i++) {
-                    if (!old.contains(addrs.get(i))) {
-                        update = true;
-                    }
-                }
-            }
-
-            if (update) {
-                Collections.shuffle(addrs);
-                log.info("name server address updated. NEW : {} , OLD: {}", addrs, old);
-                this.namesrvAddrList.set(addrs);
-            }
-        }
-    }
-
-    @Override
-    public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
-        throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
-        long beginStartTime = System.currentTimeMillis();
-        final Channel channel = this.getAndCreateChannel(addr);
-        if (channel != null && channel.isActive()) {
-            try {
-                if (this.rpcHook != null) {
-                    this.rpcHook.doBeforeRequest(addr, request);
-                }
-                long costTime = System.currentTimeMillis() - beginStartTime;
-                if (timeoutMillis < costTime) {
-                    throw new RemotingTimeoutException("invokeSync call timeout");
-                }
-                RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
-                if (this.rpcHook != null) {
-                    this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
-                }
-                return response;
-            } catch (RemotingSendRequestException e) {
-                log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
-                this.closeChannel(addr, channel);
-                throw e;
-            } catch (RemotingTimeoutException e) {
-                if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
-                    this.closeChannel(addr, channel);
-                    log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
-                }
-                log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
-                throw e;
-            }
-        } else {
-            this.closeChannel(addr, channel);
-            throw new RemotingConnectException(addr);
-        }
-    }
-
-    private Channel getAndCreateChannel(final String addr) throws InterruptedException {
-        if (null == addr) {
-            return getAndCreateNameserverChannel();
-        }
-
-        ChannelWrapper cw = this.channelTables.get(addr);
-        if (cw != null && cw.isOK()) {
-            return cw.getChannel();
-        }
-
-        return this.createChannel(addr);
-    }
-
-    private Channel getAndCreateNameserverChannel() throws InterruptedException {
-        String addr = this.namesrvAddrChoosed.get();
-        if (addr != null) {
-            ChannelWrapper cw = this.channelTables.get(addr);
-            if (cw != null && cw.isOK()) {
-                return cw.getChannel();
-            }
-        }
-
-        final List<String> addrList = this.namesrvAddrList.get();
-        if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
-            try {
-                addr = this.namesrvAddrChoosed.get();
-                if (addr != null) {
-                    ChannelWrapper cw = this.channelTables.get(addr);
-                    if (cw != null && cw.isOK()) {
-                        return cw.getChannel();
-                    }
-                }
-
-                if (addrList != null && !addrList.isEmpty()) {
-                    for (int i = 0; i < addrList.size(); i++) {
-                        int index = this.namesrvIndex.incrementAndGet();
-                        index = Math.abs(index);
-                        index = index % addrList.size();
-                        String newAddr = addrList.get(index);
-
-                        this.namesrvAddrChoosed.set(newAddr);
-                        log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
-                        Channel channelNew = this.createChannel(newAddr);
-                        if (channelNew != null) {
-                            return channelNew;
-                        }
-                    }
-                }
-            } catch (Exception e) {
-                log.error("getAndCreateNameserverChannel: create name server channel exception", e);
-            } finally {
-                this.lockNamesrvChannel.unlock();
-            }
-        } else {
-            log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
-        }
-
-        return null;
-    }
-
-    private Channel createChannel(final String addr) throws InterruptedException {
-        ChannelWrapper cw = this.channelTables.get(addr);
-        if (cw != null && cw.isOK()) {
-            cw.getChannel().close();
-            channelTables.remove(addr);
-        }
-
-        if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
-            try {
-                boolean createNewConnection;
-                cw = this.channelTables.get(addr);
-                if (cw != null) {
-
-                    if (cw.isOK()) {
-                        cw.getChannel().close();
-                        this.channelTables.remove(addr);
-                        createNewConnection = true;
-                    } else if (!cw.getChannelFuture().isDone()) {
-                        createNewConnection = false;
-                    } else {
-                        this.channelTables.remove(addr);
-                        createNewConnection = true;
-                    }
-                } else {
-                    createNewConnection = true;
-                }
-
-                if (createNewConnection) {
-                    ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
-                    log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
-                    cw = new ChannelWrapper(channelFuture);
-                    this.channelTables.put(addr, cw);
-                }
-            } catch (Exception e) {
-                log.error("createChannel: create channel exception", e);
-            } finally {
-                this.lockChannelTables.unlock();
-            }
-        } else {
-            log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
-        }
-
-        if (cw != null) {
-            ChannelFuture channelFuture = cw.getChannelFuture();
-            if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
-                if (cw.isOK()) {
-                    log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
-                    return cw.getChannel();
-                } else {
-                    log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause());
-                }
-            } else {
-                log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
-                    channelFuture.toString());
-            }
-        }
-
-        return null;
-    }
-
-    @Override
-    public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
-        throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
-        RemotingSendRequestException {
-        long beginStartTime = System.currentTimeMillis();
-        final Channel channel = this.getAndCreateChannel(addr);
-        if (channel != null && channel.isActive()) {
-            try {
-                if (this.rpcHook != null) {
-                    this.rpcHook.doBeforeRequest(addr, request);
-                }
-                long costTime = System.currentTimeMillis() - beginStartTime;
-                if (timeoutMillis < costTime) {
-                    throw new RemotingTooMuchRequestException("invokeAsync call timeout");
-                }
-                this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);
-            } catch (RemotingSendRequestException e) {
-                log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
-                this.closeChannel(addr, channel);
-                throw e;
-            }
-        } else {
-            this.closeChannel(addr, channel);
-            throw new RemotingConnectException(addr);
-        }
-    }
-
-    @Override
-    public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
-        RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
-        final Channel channel = this.getAndCreateChannel(addr);
-        if (channel != null && channel.isActive()) {
-            try {
-                if (this.rpcHook != null) {
-                    this.rpcHook.doBeforeRequest(addr, request);
-                }
-                this.invokeOnewayImpl(channel, request, timeoutMillis);
-            } catch (RemotingSendRequestException e) {
-                log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
-                this.closeChannel(addr, channel);
-                throw e;
-            }
-        } else {
-            this.closeChannel(addr, channel);
-            throw new RemotingConnectException(addr);
-        }
-    }
-
-    @Override
-    public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
-        ExecutorService executorThis = executor;
-        if (null == executor) {
-            executorThis = this.publicExecutor;
-        }
-
-        Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
-        this.processorTable.put(requestCode, pair);
-    }
-
-    @Override
-    public boolean isChannelWritable(String addr) {
-        ChannelWrapper cw = this.channelTables.get(addr);
-        if (cw != null && cw.isOK()) {
-            return cw.isWritable();
-        }
-        return true;
-    }
-
-    @Override
-    public List<String> getNameServerAddressList() {
-        return this.namesrvAddrList.get();
-    }
-
-    @Override
-    public ChannelEventListener getChannelEventListener() {
-        return channelEventListener;
-    }
-
-    @Override
-    public RPCHook getRPCHook() {
-        return this.rpcHook;
-    }
-
-    @Override
-    public ExecutorService getCallbackExecutor() {
-        return callbackExecutor != null ? callbackExecutor : publicExecutor;
-    }
-
-    @Override
-    public void setCallbackExecutor(final ExecutorService callbackExecutor) {
-        this.callbackExecutor = callbackExecutor;
-    }
-
-    static class ChannelWrapper {
-        private final ChannelFuture channelFuture;
-
-        public ChannelWrapper(ChannelFuture channelFuture) {
-            this.channelFuture = channelFuture;
-        }
-
-        public boolean isOK() {
-            return this.channelFuture.channel() != null && this.channelFuture.channel().isActive();
-        }
-
-        public boolean isWritable() {
-            return this.channelFuture.channel().isWritable();
-        }
-
-        private Channel getChannel() {
-            return this.channelFuture.channel();
-        }
-
-        public ChannelFuture getChannelFuture() {
-            return channelFuture;
-        }
-    }
-
-    class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
-
-        @Override
-        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
-            processMessageReceived(ctx, msg);
-        }
-    }
-
-    class NettyConnectManageHandler extends ChannelDuplexHandler {
-        @Override
-        public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
-            ChannelPromise promise) throws Exception {
-            final String local = localAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(localAddress);
-            final String remote = remoteAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(remoteAddress);
-            log.info("NETTY CLIENT PIPELINE: CONNECT  {} => {}", local, remote);
-
-            super.connect(ctx, remoteAddress, localAddress, promise);
-
-            if (NettyRemotingClient.this.channelEventListener != null) {
-                NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remote, ctx.channel()));
-            }
-        }
-
-        @Override
-        public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
-            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
-            log.info("NETTY CLIENT PIPELINE: DISCONNECT {}", remoteAddress);
-            closeChannel(ctx.channel());
-            super.disconnect(ctx, promise);
-
-            if (NettyRemotingClient.this.channelEventListener != null) {
-                NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
-            }
-        }
-
-        @Override
-        public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
-            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
-            log.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress);
-            closeChannel(ctx.channel());
-            super.close(ctx, promise);
-            NettyRemotingClient.this.failFast(ctx.channel());
-            if (NettyRemotingClient.this.channelEventListener != null) {
-                NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
-            }
-        }
-
-        @Override
-        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
-            if (evt instanceof IdleStateEvent) {
-                IdleStateEvent event = (IdleStateEvent) evt;
-                if (event.state().equals(IdleState.ALL_IDLE)) {
-                    final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
-                    log.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress);
-                    closeChannel(ctx.channel());
-                    if (NettyRemotingClient.this.channelEventListener != null) {
-                        NettyRemotingClient.this
-                            .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
-                    }
-                }
-            }
-
-            ctx.fireUserEventTriggered(evt);
-        }
-
-        @Override
-        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
-            log.warn("NETTY CLIENT PIPELINE: exceptionCaught {}", remoteAddress);
-            log.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", cause);
-            closeChannel(ctx.channel());
-            if (NettyRemotingClient.this.channelEventListener != null) {
-                NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
-            }
-        }
-    }
-}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
index a5e2a23..b3f55cd 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
@@ -19,7 +19,7 @@ package org.apache.rocketmq.remoting.netty;
 public class NettyServerConfig implements Cloneable {
     private int listenPort = 8888;
     private int serverWorkerThreads = 8;
-    private int serverCallbackExecutorThreads = 0;
+    private int serverCallbackExecutorThreads = 8;
     private int serverSelectorThreads = 3;
     private int serverOnewaySemaphoreValue = 256;
     private int serverAsyncSemaphoreValue = 64;
@@ -28,6 +28,33 @@ public class NettyServerConfig implements Cloneable {
     private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
     private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
     private boolean serverPooledByteBufAllocatorEnable = true;
+    private int serverAcceptorThreads = 1;
+    private int connectionChannelReaderIdleSeconds = 0;
+    private int connectionChannelWriterIdleSeconds = 0;
+
+    public int getConnectionChannelReaderIdleSeconds() {
+        return connectionChannelReaderIdleSeconds;
+    }
+
+    public void setConnectionChannelReaderIdleSeconds(int connectionChannelReaderIdleSeconds) {
+        this.connectionChannelReaderIdleSeconds = connectionChannelReaderIdleSeconds;
+    }
+
+    public int getConnectionChannelWriterIdleSeconds() {
+        return connectionChannelWriterIdleSeconds;
+    }
+
+    public void setConnectionChannelWriterIdleSeconds(int connectionChannelWriterIdleSeconds) {
+        this.connectionChannelWriterIdleSeconds = connectionChannelWriterIdleSeconds;
+    }
+
+    public int getServerAcceptorThreads() {
+        return serverAcceptorThreads;
+    }
+
+    public void setServerAcceptorThreads(int serverAcceptorThreads) {
+        this.serverAcceptorThreads = serverAcceptorThreads;
+    }
 
     /**
      * make make install
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index cadcab1..d392e79 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -17,43 +17,41 @@
 package org.apache.rocketmq.remoting.protocol;
 
 import com.alibaba.fastjson.annotation.JSONField;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.netty.CodecHelper;
+import org.apache.rocketmq.remoting.serialize.LanguageCode;
+import org.apache.rocketmq.remoting.serialize.SerializeType;
+import org.msgpack.annotation.Message;
 
+@Message
 public class RemotingCommand {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+
     public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type";
     public static final String SERIALIZE_TYPE_ENV = "ROCKETMQ_SERIALIZE_TYPE";
     public static final String REMOTING_VERSION_KEY = "rocketmq.remoting.version";
-    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
-    private static final int RPC_TYPE = 0; // 0, REQUEST_COMMAND
-    private static final int RPC_ONEWAY = 1; // 0, RPC
-    private static final Map<Class<? extends CommandCustomHeader>, Field[]> CLASS_HASH_MAP =
-        new HashMap<Class<? extends CommandCustomHeader>, Field[]>();
-    private static final Map<Class, String> CANONICAL_NAME_CACHE = new HashMap<Class, String>();
-    // 1, Oneway
-    // 1, RESPONSE_COMMAND
-    private static final Map<Field, Boolean> NULLABLE_FIELD_CACHE = new HashMap<Field, Boolean>();
-    private static final String STRING_CANONICAL_NAME = String.class.getCanonicalName();
-    private static final String DOUBLE_CANONICAL_NAME_1 = Double.class.getCanonicalName();
-    private static final String DOUBLE_CANONICAL_NAME_2 = double.class.getCanonicalName();
-    private static final String INTEGER_CANONICAL_NAME_1 = Integer.class.getCanonicalName();
-    private static final String INTEGER_CANONICAL_NAME_2 = int.class.getCanonicalName();
-    private static final String LONG_CANONICAL_NAME_1 = Long.class.getCanonicalName();
-    private static final String LONG_CANONICAL_NAME_2 = long.class.getCanonicalName();
-    private static final String BOOLEAN_CANONICAL_NAME_1 = Boolean.class.getCanonicalName();
-    private static final String BOOLEAN_CANONICAL_NAME_2 = boolean.class.getCanonicalName();
+
+    /**
+     * REQUEST_COMMAND
+     */
+    private static final int RPC_TYPE = 0;
+
+    /**
+     * One way
+     */
+    private static final int RPC_ONEWAY = 1;
+
+
+
     private static volatile int configVersion = -1;
+
     private static AtomicInteger requestId = new AtomicInteger(0);
 
     private static SerializeType serializeTypeConfigInThisServer = SerializeType.JSON;
@@ -74,15 +72,16 @@ public class RemotingCommand {
     private int version = 0;
     private int opaque = requestId.getAndIncrement();
     private int flag = 0;
+
     private String remark;
     private HashMap<String, String> extFields;
-    private transient CommandCustomHeader customHeader;
+    private CommandCustomHeader customHeader;
 
     private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
 
     private transient byte[] body;
 
-    protected RemotingCommand() {
+    public RemotingCommand() {
     }
 
     public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
@@ -132,59 +131,12 @@ public class RemotingCommand {
         return cmd;
     }
 
-    public static RemotingCommand createResponseCommand(int code, String remark) {
-        return createResponseCommand(code, remark, null);
-    }
-
-    public static RemotingCommand decode(final byte[] array) {
-        ByteBuffer byteBuffer = ByteBuffer.wrap(array);
-        return decode(byteBuffer);
-    }
-
-    public static RemotingCommand decode(final ByteBuffer byteBuffer) {
-        int length = byteBuffer.limit();
-        int oriHeaderLen = byteBuffer.getInt();
-        int headerLength = getHeaderLength(oriHeaderLen);
-
-        byte[] headerData = new byte[headerLength];
-        byteBuffer.get(headerData);
-
-        RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
-
-        int bodyLength = length - 4 - headerLength;
-        byte[] bodyData = null;
-        if (bodyLength > 0) {
-            bodyData = new byte[bodyLength];
-            byteBuffer.get(bodyData);
-        }
-        cmd.body = bodyData;
-
-        return cmd;
-    }
-
-    public static int getHeaderLength(int length) {
-        return length & 0xFFFFFF;
-    }
-
-    private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
-        switch (type) {
-            case JSON:
-                RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
-                resultJson.setSerializeTypeCurrentRPC(type);
-                return resultJson;
-            case ROCKETMQ:
-                RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
-                resultRMQ.setSerializeTypeCurrentRPC(type);
-                return resultRMQ;
-            default:
-                break;
-        }
-
-        return null;
+    public CommandCustomHeader getCustomHeader() {
+        return customHeader;
     }
 
-    public static SerializeType getProtocolType(int source) {
-        return SerializeType.valueOf((byte) ((source >> 24) & 0xFF));
+    public static RemotingCommand createResponseCommand(int code, String remark) {
+        return createResponseCommand(code, remark, null);
     }
 
     public static int createNewRequestId() {
@@ -208,16 +160,6 @@ public class RemotingCommand {
         return true;
     }
 
-    public static byte[] markProtocolType(int source, SerializeType type) {
-        byte[] result = new byte[4];
-
-        result[0] = type.getCode();
-        result[1] = (byte) ((source >> 16) & 0xFF);
-        result[2] = (byte) ((source >> 8) & 0xFF);
-        result[3] = (byte) (source & 0xFF);
-        return result;
-    }
-
     public void markResponseType() {
         int bits = 1 << RPC_TYPE;
         this.flag |= bits;
@@ -227,206 +169,13 @@ public class RemotingCommand {
         return customHeader;
     }
 
-    public void writeCustomHeader(CommandCustomHeader customHeader) {
-        this.customHeader = customHeader;
-    }
-
     public CommandCustomHeader decodeCommandCustomHeader(
         Class<? extends CommandCustomHeader> classHeader) throws RemotingCommandException {
-        CommandCustomHeader objectHeader;
-        try {
-            objectHeader = classHeader.newInstance();
-        } catch (InstantiationException e) {
-            return null;
-        } catch (IllegalAccessException e) {
-            return null;
-        }
-
-        if (this.extFields != null) {
-
-            Field[] fields = getClazzFields(classHeader);
-            for (Field field : fields) {
-                if (!Modifier.isStatic(field.getModifiers())) {
-                    String fieldName = field.getName();
-                    if (!fieldName.startsWith("this")) {
-                        try {
-                            String value = this.extFields.get(fieldName);
-                            if (null == value) {
-                                if (!isFieldNullable(field)) {
-                                    throw new RemotingCommandException("the custom field <" + fieldName + "> is null");
-                                }
-                                continue;
-                            }
-
-                            field.setAccessible(true);
-                            String type = getCanonicalName(field.getType());
-                            Object valueParsed;
-
-                            if (type.equals(STRING_CANONICAL_NAME)) {
-                                valueParsed = value;
-                            } else if (type.equals(INTEGER_CANONICAL_NAME_1) || type.equals(INTEGER_CANONICAL_NAME_2)) {
-                                valueParsed = Integer.parseInt(value);
-                            } else if (type.equals(LONG_CANONICAL_NAME_1) || type.equals(LONG_CANONICAL_NAME_2)) {
-                                valueParsed = Long.parseLong(value);
-                            } else if (type.equals(BOOLEAN_CANONICAL_NAME_1) || type.equals(BOOLEAN_CANONICAL_NAME_2)) {
-                                valueParsed = Boolean.parseBoolean(value);
-                            } else if (type.equals(DOUBLE_CANONICAL_NAME_1) || type.equals(DOUBLE_CANONICAL_NAME_2)) {
-                                valueParsed = Double.parseDouble(value);
-                            } else {
-                                throw new RemotingCommandException("the custom field <" + fieldName + "> type is not supported");
-                            }
-
-                            field.set(objectHeader, valueParsed);
-
-                        } catch (Throwable e) {
-                            log.error("Failed field [{}] decoding", fieldName, e);
-                        }
-                    }
-                }
-            }
-
-            objectHeader.checkFields();
-        }
-
-        return objectHeader;
-    }
-
-    private Field[] getClazzFields(Class<? extends CommandCustomHeader> classHeader) {
-        Field[] field = CLASS_HASH_MAP.get(classHeader);
-
-        if (field == null) {
-            field = classHeader.getDeclaredFields();
-            synchronized (CLASS_HASH_MAP) {
-                CLASS_HASH_MAP.put(classHeader, field);
-            }
-        }
-        return field;
-    }
-
-    private boolean isFieldNullable(Field field) {
-        if (!NULLABLE_FIELD_CACHE.containsKey(field)) {
-            Annotation annotation = field.getAnnotation(CFNotNull.class);
-            synchronized (NULLABLE_FIELD_CACHE) {
-                NULLABLE_FIELD_CACHE.put(field, annotation == null);
-            }
-        }
-        return NULLABLE_FIELD_CACHE.get(field);
-    }
-
-    private String getCanonicalName(Class clazz) {
-        String name = CANONICAL_NAME_CACHE.get(clazz);
-
-        if (name == null) {
-            name = clazz.getCanonicalName();
-            synchronized (CANONICAL_NAME_CACHE) {
-                CANONICAL_NAME_CACHE.put(clazz, name);
-            }
-        }
-        return name;
-    }
-
-    public ByteBuffer encode() {
-        // 1> header length size
-        int length = 4;
-
-        // 2> header data length
-        byte[] headerData = this.headerEncode();
-        length += headerData.length;
-
-        // 3> body data length
-        if (this.body != null) {
-            length += body.length;
-        }
-
-        ByteBuffer result = ByteBuffer.allocate(4 + length);
-
-        // length
-        result.putInt(length);
-
-        // header length
-        result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
-
-        // header data
-        result.put(headerData);
-
-        // body data;
-        if (this.body != null) {
-            result.put(this.body);
-        }
-
-        result.flip();
-
-        return result;
-    }
-
-    private byte[] headerEncode() {
-        this.makeCustomHeaderToNet();
-        if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
-            return RocketMQSerializable.rocketMQProtocolEncode(this);
-        } else {
-            return RemotingSerializable.encode(this);
-        }
-    }
-
-    public void makeCustomHeaderToNet() {
-        if (this.customHeader != null) {
-            Field[] fields = getClazzFields(customHeader.getClass());
-            if (null == this.extFields) {
-                this.extFields = new HashMap<String, String>();
-            }
-
-            for (Field field : fields) {
-                if (!Modifier.isStatic(field.getModifiers())) {
-                    String name = field.getName();
-                    if (!name.startsWith("this")) {
-                        Object value = null;
-                        try {
-                            field.setAccessible(true);
-                            value = field.get(this.customHeader);
-                        } catch (Exception e) {
-                            log.error("Failed to access field [{}]", name, e);
-                        }
-
-                        if (value != null) {
-                            this.extFields.put(name, value.toString());
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    public ByteBuffer encodeHeader() {
-        return encodeHeader(this.body != null ? this.body.length : 0);
+        return CodecHelper.decodeCommandCustomHeader(this, classHeader);
     }
 
     public ByteBuffer encodeHeader(final int bodyLength) {
-        // 1> header length size
-        int length = 4;
-
-        // 2> header data length
-        byte[] headerData;
-        headerData = this.headerEncode();
-
-        length += headerData.length;
-
-        // 3> body data length
-        length += bodyLength;
-
-        ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
-
-        // length
-        result.putInt(length);
-
-        // header length
-        result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
-
-        // header data
-        result.put(headerData);
-
-        result.flip();
-
-        return result;
+        return CodecHelper.encodeHeader(bodyLength, this);
     }
 
     public void markOnewayRPC() {
@@ -440,14 +189,6 @@ public class RemotingCommand {
         return (this.flag & bits) == bits;
     }
 
-    public int getCode() {
-        return code;
-    }
-
-    public void setCode(int code) {
-        this.code = code;
-    }
-
     @JSONField(serialize = false)
     public RemotingCommandType getType() {
         if (this.isResponseType()) {
@@ -463,6 +204,14 @@ public class RemotingCommand {
         return (this.flag & bits) == bits;
     }
 
+    public int getCode() {
+        return code;
+    }
+
+    public void setCode(int code) {
+        this.code = code;
+    }
+
     public LanguageCode getLanguage() {
         return language;
     }
@@ -526,11 +275,8 @@ public class RemotingCommand {
         extFields.put(key, value);
     }
 
-    @Override
-    public String toString() {
-        return "RemotingCommand [code=" + code + ", language=" + language + ", version=" + version + ", opaque=" + opaque + ", flag(B)="
-            + Integer.toBinaryString(flag) + ", remark=" + remark + ", extFields=" + extFields + ", serializeTypeCurrentRPC="
-            + serializeTypeCurrentRPC + "]";
+    public void setCustomHeader(CommandCustomHeader customHeader) {
+        this.customHeader = customHeader;
     }
 
     public SerializeType getSerializeTypeCurrentRPC() {
@@ -540,4 +286,12 @@ public class RemotingCommand {
     public void setSerializeTypeCurrentRPC(SerializeType serializeTypeCurrentRPC) {
         this.serializeTypeCurrentRPC = serializeTypeCurrentRPC;
     }
+
+    @Override
+    public String toString() {
+        return "RemotingCommand [code=" + code + ", language=" + language + ", version=" + version + ", opaque=" + opaque + ", flag(B)="
+            + Integer.toBinaryString(flag) + ", remark=" + remark + ", extFields=" + extFields + ", serializeTypeCurrentRPC="
+            + serializeTypeCurrentRPC + "]";
+    }
+
 }
\ No newline at end of file
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommandType.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommandType.java
index 01c853b..a6a7ec4 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommandType.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommandType.java
@@ -18,5 +18,5 @@ package org.apache.rocketmq.remoting.protocol;
 
 public enum RemotingCommandType {
     REQUEST_COMMAND,
-    RESPONSE_COMMAND;
+    RESPONSE_COMMAND
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/LanguageCode.java
similarity index 92%
rename from remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
rename to remoting/src/main/java/org/apache/rocketmq/remoting/serialize/LanguageCode.java
index 4382af3..246a17b 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/LanguageCode.java
@@ -15,8 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.remoting.protocol;
+package org.apache.rocketmq.remoting.serialize;
 
+import org.msgpack.annotation.MessagePackOrdinalEnum;
+
+@MessagePackOrdinalEnum
 public enum LanguageCode {
     JAVA((byte) 0),
     CPP((byte) 1),
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/MsgPackSerializable.java b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/MsgPackSerializable.java
new file mode 100644
index 0000000..6c7ba78
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/MsgPackSerializable.java
@@ -0,0 +1,40 @@
+package org.apache.rocketmq.remoting.serialize;
+
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.msgpack.MessagePack;
+
+public class MsgPackSerializable implements Serializer {
+    private final MessagePack messagePack = new MessagePack();
+
+//    public MsgPackSerializable(){
+//        messagePack.register(LanguageCode.class);
+//        messagePack.register(SerializeType.class);
+//    }
+    @Override
+    public SerializeType type() {
+        return SerializeType.MSGPACK;
+    }
+
+    @Override
+    public <T> T deserializer(byte[] content, Class<T> c) {
+        try {
+            return messagePack.read(content, c);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public RemotingCommand deserializer(byte[] content) {
+        return deserializer(content, RemotingCommand.class);
+    }
+
+    @Override
+    public byte[] serializer(Object object) {
+        try {
+            return messagePack.write(object);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/RemotingSerializable.java
similarity index 76%
rename from remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java
rename to remoting/src/main/java/org/apache/rocketmq/remoting/serialize/RemotingSerializable.java
index f80ff14..01e9b5c 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/RemotingSerializable.java
@@ -14,12 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.remoting.protocol;
+package org.apache.rocketmq.remoting.serialize;
 
 import com.alibaba.fastjson.JSON;
 import java.nio.charset.Charset;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
-public abstract class RemotingSerializable {
+public class RemotingSerializable implements Serializer {
     private final static Charset CHARSET_UTF8 = Charset.forName("UTF-8");
 
     public static byte[] encode(final Object obj) {
@@ -58,4 +59,24 @@ public abstract class RemotingSerializable {
     public String toJson(final boolean prettyFormat) {
         return toJson(this, prettyFormat);
     }
+
+    @Override
+    public SerializeType type() {
+        return SerializeType.JSON;
+    }
+
+    @Override
+    public <T> T deserializer(byte[] content, Class<T> c) {
+        return decode(content, c);
+    }
+
+    @Override
+    public byte[] serializer(Object object) {
+        return encode(object);
+    }
+
+    @Override
+    public RemotingCommand deserializer(byte[] content) {
+        return decode(content, RemotingCommand.class);
+    }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/RocketMQSerializable.java
similarity index 91%
rename from remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java
rename to remoting/src/main/java/org/apache/rocketmq/remoting/serialize/RocketMQSerializable.java
index 66119e0..f903bde 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/RocketMQSerializable.java
@@ -14,15 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.remoting.protocol;
+package org.apache.rocketmq.remoting.serialize;
 
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
-public class RocketMQSerializable {
+public class RocketMQSerializable implements Serializer {
     private static final Charset CHARSET_UTF8 = Charset.forName("UTF-8");
 
     public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) {
@@ -201,4 +202,25 @@ public class RocketMQSerializable {
         }
         return true;
     }
+
+    @Override
+    public SerializeType type() {
+        return SerializeType.ROCKETMQ;
+    }
+
+    @Override
+    public <T> T deserializer(byte[] content, Class<T> c) {
+        //Fixme
+        return null;
+    }
+
+    @Override
+    public byte[] serializer(Object object) {
+        return rocketMQProtocolEncode((RemotingCommand) object);
+    }
+
+    @Override
+    public RemotingCommand deserializer(byte[] content) {
+        return rocketMQProtocolDecode(content);
+    }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/SerializeType.java b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/SerializeType.java
similarity index 87%
rename from remoting/src/main/java/org/apache/rocketmq/remoting/protocol/SerializeType.java
rename to remoting/src/main/java/org/apache/rocketmq/remoting/serialize/SerializeType.java
index b040f8f..fc5ddba 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/SerializeType.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/SerializeType.java
@@ -15,11 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.remoting.protocol;
+package org.apache.rocketmq.remoting.serialize;
 
+import org.msgpack.annotation.MessagePackOrdinalEnum;
+
+@MessagePackOrdinalEnum
 public enum SerializeType {
     JSON((byte) 0),
-    ROCKETMQ((byte) 1);
+    ROCKETMQ((byte) 1),
+    MSGPACK((byte) 2);
 
     private byte code;
 
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommandType.java b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/Serializer.java
similarity index 70%
copy from remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommandType.java
copy to remoting/src/main/java/org/apache/rocketmq/remoting/serialize/Serializer.java
index 01c853b..b3f5888 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommandType.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/Serializer.java
@@ -14,9 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.remoting.protocol;
 
-public enum RemotingCommandType {
-    REQUEST_COMMAND,
-    RESPONSE_COMMAND;
+package org.apache.rocketmq.remoting.serialize;
+
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public interface Serializer {
+    SerializeType type();
+
+    <T> T deserializer(final byte[] content, final Class<T> c);
+
+    byte[] serializer(final Object object);
+
+    RemotingCommand deserializer(final byte[] content);
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/SerializerFactory.java b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/SerializerFactory.java
new file mode 100644
index 0000000..e015478
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/SerializerFactory.java
@@ -0,0 +1,26 @@
+package org.apache.rocketmq.remoting.serialize;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SerializerFactory {
+
+    private static Map<Enum<SerializeType>, Serializer> serializerMap = new ConcurrentHashMap<Enum<SerializeType>, Serializer>();
+
+    private SerializerFactory() {
+    }
+
+    static {
+        register(SerializeType.JSON, new RemotingSerializable());
+        register(SerializeType.ROCKETMQ, new RocketMQSerializable());
+        register(SerializeType.MSGPACK, new MsgPackSerializable());
+    }
+
+    public static void register(SerializeType type, Serializer serialization) {
+        serializerMap.putIfAbsent(type, serialization);
+    }
+
+    public static Serializer get(SerializeType type) {
+        return serializerMap.get(type);
+    }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingClientAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingClientAbstract.java
new file mode 100644
index 0000000..6dab218
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingClientAbstract.java
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.transport;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Timer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.netty.NettyEvent;
+import org.apache.rocketmq.remoting.netty.NettyEventType;
+import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
+
+public abstract class NettyRemotingClientAbstract extends NettyRemotingAbstract {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+    protected final ConcurrentMap<String, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>();
+
+    protected final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();
+    private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>();
+    private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex());
+    private final Lock lockNamesrvChannel = new ReentrantLock();
+
+    private final Lock lockChannelTables = new ReentrantLock();
+    private static final long LOCK_TIMEOUT_MILLIS = 3000;
+
+    public abstract Bootstrap getBootstrap();
+
+    public NettyRemotingClientAbstract() {
+        super();
+    }
+
+    public NettyRemotingClientAbstract(final int permitsOneway, final int permitsAsync) {
+        super(permitsOneway, permitsAsync);
+    }
+
+    private static int initValueIndex() {
+        Random r = new Random();
+
+        return Math.abs(r.nextInt() % 999) % 999;
+    }
+
+    public void closeChannel(final String addr, final Channel channel) {
+        if (null == channel) {
+            return;
+        }
+
+        final String addrRemote = null == addr ? RemotingHelper.parseChannelRemoteAddr(channel) : addr;
+
+        try {
+            if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+                try {
+                    boolean removeItemFromTable = true;
+                    final ChannelWrapper prevCW = this.channelTables.get(addrRemote);
+
+                    log.info("closeChannel: begin close the channel[{}] Found: {}", addrRemote, prevCW != null);
+
+                    if (null == prevCW) {
+                        log.info("closeChannel: the channel[{}] has been removed from the channel table before", addrRemote);
+                        removeItemFromTable = false;
+                    } else if (prevCW.getChannel() != channel) {
+                        log.info("closeChannel: the channel[{}] has been closed before, and has been created again, nothing to do.",
+                            addrRemote);
+                        removeItemFromTable = false;
+                    }
+
+                    if (removeItemFromTable) {
+                        this.channelTables.remove(addrRemote);
+                        log.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
+                    }
+
+                    RemotingUtil.closeChannel(channel);
+                } catch (Exception e) {
+                    log.error("closeChannel: close the channel exception", e);
+                } finally {
+                    this.lockChannelTables.unlock();
+                }
+            } else {
+                log.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+            }
+        } catch (InterruptedException e) {
+            log.error("closeChannel exception", e);
+        }
+    }
+
+    public void clearChannels() {
+        for (ChannelWrapper cw : this.channelTables.values()) {
+            this.closeChannel(null, cw.getChannel());
+        }
+        this.channelTables.clear();
+    }
+
+    public void closeChannel(final Channel channel) {
+        if (null == channel) {
+            return;
+        }
+
+        try {
+            if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+                try {
+                    boolean removeItemFromTable = true;
+                    ChannelWrapper prevCW = null;
+                    String addrRemote = null;
+                    for (Map.Entry<String, ChannelWrapper> entry : channelTables.entrySet()) {
+                        String key = entry.getKey();
+                        ChannelWrapper prev = entry.getValue();
+                        if (prev.getChannel() != null) {
+                            if (prev.getChannel() == channel) {
+                                prevCW = prev;
+                                addrRemote = key;
+                                break;
+                            }
+                        }
+                    }
+
+                    if (null == prevCW) {
+                        log.info("eventCloseChannel: the channel[{}] has been removed from the channel table before", addrRemote);
+                        removeItemFromTable = false;
+                    }
+
+                    if (removeItemFromTable) {
+                        this.channelTables.remove(addrRemote);
+                        log.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
+                        RemotingUtil.closeChannel(channel);
+                    }
+                } catch (Exception e) {
+                    log.error("closeChannel: close the channel exception", e);
+                } finally {
+                    this.lockChannelTables.unlock();
+                }
+            } else {
+                log.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+            }
+        } catch (InterruptedException e) {
+            log.error("closeChannel exception", e);
+        }
+    }
+
+    protected Channel getAndCreateChannel(final String addr, long timeout) throws InterruptedException {
+        if (null == addr) {
+            return getAndCreateNameserverChannel(timeout);
+        }
+
+        ChannelWrapper cw = this.channelTables.get(addr);
+        if (cw != null && cw.isOK()) {
+            return cw.getChannel();
+        }
+
+        return this.createChannel(addr, timeout);
+    }
+
+    private Channel getAndCreateNameserverChannel(long timeout) throws InterruptedException {
+        String addr = this.namesrvAddrChoosed.get();
+        if (addr != null) {
+            ChannelWrapper cw = this.channelTables.get(addr);
+            if (cw != null && cw.isOK()) {
+                return cw.getChannel();
+            }
+        }
+
+        final List<String> addrList = this.namesrvAddrList.get();
+        if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+            try {
+                addr = this.namesrvAddrChoosed.get();
+                if (addr != null) {
+                    ChannelWrapper cw = this.channelTables.get(addr);
+                    if (cw != null && cw.isOK()) {
+                        return cw.getChannel();
+                    }
+                }
+
+                if (addrList != null && !addrList.isEmpty()) {
+                    for (int i = 0; i < addrList.size(); i++) {
+                        int index = this.namesrvIndex.incrementAndGet();
+                        index = Math.abs(index);
+                        index = index % addrList.size();
+                        String newAddr = addrList.get(index);
+
+                        this.namesrvAddrChoosed.set(newAddr);
+                        log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
+                        Channel channelNew = this.createChannel(newAddr, timeout);
+                        if (channelNew != null) {
+                            return channelNew;
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                log.error("getAndCreateNameserverChannel: create name server channel exception", e);
+            } finally {
+                this.lockNamesrvChannel.unlock();
+            }
+        } else {
+            log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+        }
+
+        return null;
+    }
+
+    private Channel createChannel(final String addr, long timeout) throws InterruptedException {
+        ChannelWrapper cw = this.channelTables.get(addr);
+        if (cw != null && cw.isOK()) {
+            cw.getChannel().close();
+            channelTables.remove(addr);
+        }
+
+        if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+            try {
+                boolean createNewConnection;
+                cw = this.channelTables.get(addr);
+                if (cw != null) {
+
+                    if (cw.isOK()) {
+                        cw.getChannel().close();
+                        this.channelTables.remove(addr);
+                        createNewConnection = true;
+                    } else if (!cw.getChannelFuture().isDone()) {
+                        createNewConnection = false;
+                    } else {
+                        this.channelTables.remove(addr);
+                        createNewConnection = true;
+                    }
+                } else {
+                    createNewConnection = true;
+                }
+
+                if (createNewConnection) {
+                    if (getBootstrap() != null) {
+                        ChannelFuture channelFuture = getBootstrap().connect(RemotingHelper.string2SocketAddress(addr));
+                        log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
+                        cw = new ChannelWrapper(channelFuture);
+                        this.channelTables.put(addr, cw);
+                    }
+                }
+            } catch (Exception e) {
+                log.error("createChannel: create channel exception", e);
+            } finally {
+                this.lockChannelTables.unlock();
+            }
+        } else {
+            log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+        }
+
+        if (cw != null) {
+            ChannelFuture channelFuture = cw.getChannelFuture();
+            if (channelFuture.awaitUninterruptibly(timeout)) {
+                if (cw.isOK()) {
+                    log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
+                    return cw.getChannel();
+                } else {
+                    log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause());
+                }
+            } else {
+                log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, timeout,
+                    channelFuture.toString());
+            }
+        }
+
+        return null;
+    }
+
+    public void updateNameServerAddressList(List<String> addrs) {
+        List<String> old = this.namesrvAddrList.get();
+        boolean update = false;
+
+        if (!addrs.isEmpty()) {
+            if (null == old) {
+                update = true;
+            } else if (addrs.size() != old.size()) {
+                update = true;
+            } else {
+                for (int i = 0; i < addrs.size() && !update; i++) {
+                    if (!old.contains(addrs.get(i))) {
+                        update = true;
+                    }
+                }
+            }
+
+            if (update) {
+                Collections.shuffle(addrs);
+                log.info("name server address updated. NEW : {} , OLD: {}", addrs, old);
+                this.namesrvAddrList.set(addrs);
+            }
+        }
+    }
+
+    public static class ChannelWrapper {
+        private final ChannelFuture channelFuture;
+
+        public ChannelWrapper(ChannelFuture channelFuture) {
+            this.channelFuture = channelFuture;
+        }
+
+        public boolean isOK() {
+            return this.channelFuture.channel() != null && this.channelFuture.channel().isActive();
+        }
+
+        public boolean isWritable() {
+            return this.channelFuture.channel().isWritable();
+        }
+
+        public Channel getChannel() {
+            return this.channelFuture.channel();
+        }
+
+        public ChannelFuture getChannelFuture() {
+            return channelFuture;
+        }
+    }
+
+    public class NettyConnectManageHandler extends ChannelDuplexHandler {
+        @Override
+        public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
+            ChannelPromise promise) throws Exception {
+            final String local = localAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(localAddress);
+            final String remote = remoteAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(remoteAddress);
+            log.info("NETTY CLIENT PIPELINE: CONNECT  {} => {}", local, remote);
+
+            super.connect(ctx, remoteAddress, localAddress, promise);
+
+            if (getChannelEventListener() != null) {
+                putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remote, ctx.channel()));
+            }
+        }
+
+        @Override
+        public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+            log.info("NETTY CLIENT PIPELINE: DISCONNECT {}", remoteAddress);
+            closeChannel(ctx.channel());
+            super.disconnect(ctx, promise);
+
+            if (getChannelEventListener() != null) {
+                putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
+            }
+        }
+
+        @Override
+        public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+            log.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress);
+            closeChannel(ctx.channel());
+            super.close(ctx, promise);
+            failFast(ctx.channel());
+            if (getChannelEventListener() != null) {
+                putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
+            }
+        }
+
+        @Override
+        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+            if (evt instanceof IdleStateEvent) {
+                IdleStateEvent event = (IdleStateEvent) evt;
+                if (event.state().equals(IdleState.ALL_IDLE)) {
+                    final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+                    log.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress);
+                    closeChannel(ctx.channel());
+                    if (getChannelEventListener() != null) {
+                        putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
+                    }
+                }
+            }
+
+            ctx.fireUserEventTriggered(evt);
+        }
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+            log.warn("NETTY CLIENT PIPELINE: exceptionCaught {}", remoteAddress);
+            log.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", cause);
+            closeChannel(ctx.channel());
+            if (getChannelEventListener() != null) {
+                putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
+            }
+        }
+    }
+
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingServerAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingServerAbstract.java
new file mode 100644
index 0000000..e8d8471
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingServerAbstract.java
@@ -0,0 +1,93 @@
+package org.apache.rocketmq.remoting.transport;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.netty.NettyEvent;
+import org.apache.rocketmq.remoting.netty.NettyEventType;
+import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
+
+public abstract class NettyRemotingServerAbstract extends NettyRemotingAbstract {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+
+    public NettyRemotingServerAbstract() {
+        super();
+    }
+
+    public NettyRemotingServerAbstract(final int permitsOneway, final int permitsAsync) {
+        super(permitsOneway, permitsAsync);
+    }
+
+    public class NettyConnectManageHandler extends ChannelDuplexHandler {
+        @Override
+        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+            log.info("NETTY SERVER PIPELINE: channelRegistered {}", remoteAddress);
+            super.channelRegistered(ctx);
+        }
+
+        @Override
+        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+            log.info("NETTY SERVER PIPELINE: channelUnregistered, the channel[{}]", remoteAddress);
+            super.channelUnregistered(ctx);
+        }
+
+        @Override
+        public void channelActive(ChannelHandlerContext ctx) throws Exception {
+            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+            log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);
+            super.channelActive(ctx);
+
+            if (getChannelEventListener() != null) {
+                putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
+            }
+        }
+
+        @Override
+        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+            log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress);
+            super.channelInactive(ctx);
+
+            if (getChannelEventListener() != null) {
+                putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
+            }
+        }
+
+        @Override
+        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+            if (evt instanceof IdleStateEvent) {
+                IdleStateEvent event = (IdleStateEvent) evt;
+                if (event.state().equals(IdleState.ALL_IDLE)) {
+                    final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+                    log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
+                    RemotingUtil.closeChannel(ctx.channel());
+                    if (getChannelEventListener() != null) {
+                        putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
+                    }
+                }
+            }
+
+            ctx.fireUserEventTriggered(evt);
+        }
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+            log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress);
+            log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);
+
+            if (getChannelEventListener() != null) {
+                putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
+            }
+
+            RemotingUtil.closeChannel(ctx.channel());
+        }
+    }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java
new file mode 100644
index 0000000..b294958
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java
@@ -0,0 +1,293 @@
+package org.apache.rocketmq.remoting.transport.http2;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http2.Http2SecurityUtil;
+import io.netty.handler.ssl.OpenSsl;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
+import io.netty.handler.ssl.SupportedCipherSuiteFilter;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import javax.net.ssl.SSLException;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.ChannelEventListener;
+import org.apache.rocketmq.remoting.InvokeCallback;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.common.Pair;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.transport.rocketmq.NettyDecoder;
+import org.apache.rocketmq.remoting.transport.rocketmq.NettyEncoder;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.transport.NettyRemotingClientAbstract;
+import org.apache.rocketmq.remoting.util.ThreadUtils;
+
+public class Http2ClientImpl extends NettyRemotingClientAbstract implements RemotingClient {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+    private NettyClientConfig nettyClientConfig;
+    private final Bootstrap bootstrap = new Bootstrap();
+    private EventLoopGroup ioGroup;
+    private ExecutorService publicExecutor;
+    private ExecutorService callbackExecutor;
+    private ChannelEventListener channelEventListener;
+    private DefaultEventExecutorGroup defaultEventExecutorGroup;
+    private RPCHook rpcHook;
+
+    public Http2ClientImpl(final NettyClientConfig clientConfig,
+        final ChannelEventListener channelEventListener) {
+        super(clientConfig.getClientOnewaySemaphoreValue(), clientConfig.getClientAsyncSemaphoreValue());
+        init(clientConfig, channelEventListener);
+    }
+
+    public Http2ClientImpl(){
+        super();
+    }
+
+    @Override
+    public void init(NettyClientConfig clientConfig, ChannelEventListener channelEventListener) {
+        this.nettyClientConfig = clientConfig;
+        this.channelEventListener = channelEventListener;
+        this.ioGroup = new NioEventLoopGroup(clientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads",
+            clientConfig.getClientWorkerThreads()));
+        this.publicExecutor = ThreadUtils.newFixedThreadPool(
+            clientConfig.getClientCallbackExecutorThreads(),
+            10000, "Remoting-PublicExecutor", true);
+        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(clientConfig.getClientWorkerThreads(),
+            ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", clientConfig.getClientWorkerThreads()));
+        buildSslContext();
+    }
+
+    private void buildSslContext() {
+        SslProvider provider = OpenSsl.isAvailable() ? SslProvider.OPENSSL : SslProvider.JDK;
+        try {
+            sslContext = SslContextBuilder.forClient()
+                .sslProvider(provider)
+                /* NOTE: the cipher filter may not include all ciphers required by the HTTP/2 specification.
+                 * Please refer to the HTTP/2 specification for cipher requirements. */
+                .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
+                .trustManager(InsecureTrustManagerFactory.INSTANCE)
+                .build();
+        } catch (SSLException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void start() {
+        super.start();
+        this.bootstrap.group(this.ioGroup).channel(NioSocketChannel.class)
+            .option(ChannelOption.TCP_NODELAY, true)
+            .option(ChannelOption.SO_KEEPALIVE, false)
+            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
+            .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
+            .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
+            .handler(new ChannelInitializer<SocketChannel>() {
+                @Override
+                public void initChannel(SocketChannel ch) throws Exception {
+                    ChannelPipeline pipeline = ch.pipeline();
+                    pipeline.addLast(
+                        defaultEventExecutorGroup,
+                        Http2Handler.newHandler(false),
+                        new NettyEncoder(),
+                        new NettyDecoder(),
+                        new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
+                        new NettyConnectManageHandler(),
+                        new NettyClientHandler());
+                }
+            });
+        startUpHouseKeepingService();
+    }
+
+    @Override
+    public void shutdown() {
+        super.shutdown();
+        try {
+
+            for (ChannelWrapper cw : this.channelTables.values()) {
+                this.closeChannel(null, cw.getChannel());
+            }
+
+            this.channelTables.clear();
+            if (this.ioGroup != null) {
+                this.ioGroup.shutdownGracefully();
+            }
+
+            if (this.defaultEventExecutorGroup != null) {
+                this.defaultEventExecutorGroup.shutdownGracefully();
+            }
+        } catch (Exception e) {
+            log.error("Http2ClientImpl shutdown exception, ", e);
+        }
+
+        if (this.publicExecutor != null) {
+            try {
+                this.publicExecutor.shutdown();
+            } catch (Exception e) {
+                log.error("NettyRemotingServer shutdown exception, ", e);
+            }
+        }
+    }
+
+    @Override
+    public void registerRPCHook(RPCHook rpcHook) {
+        this.rpcHook = rpcHook;
+    }
+
+    @Override
+    public void updateNameServerAddressList(List<String> addrs) {
+        super.updateNameServerAddressList(addrs);
+    }
+
+    @Override
+    public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
+        throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
+        final Channel channel = this.getAndCreateChannel(addr, timeoutMillis);
+        if (channel != null && channel.isActive()) {
+            try {
+                if (this.rpcHook != null) {
+                    this.rpcHook.doBeforeRequest(addr, request);
+                }
+                RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis);
+                if (this.rpcHook != null) {
+                    this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
+                }
+                return response;
+            } catch (RemotingSendRequestException e) {
+                log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
+                this.closeChannel(addr, channel);
+                throw e;
+            } catch (RemotingTimeoutException e) {
+                if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
+                    this.closeChannel(addr, channel);
+                    log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
+                }
+                log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
+                throw e;
+            }
+        } else {
+            this.closeChannel(addr, channel);
+            throw new RemotingConnectException(addr);
+        }
+    }
+
+    @Override
+    public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
+        throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
+        RemotingSendRequestException {
+        final Channel channel = this.getAndCreateChannel(addr, timeoutMillis);
+        if (channel != null && channel.isActive()) {
+            try {
+                if (this.rpcHook != null) {
+                    this.rpcHook.doBeforeRequest(addr, request);
+                }
+                this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
+            } catch (RemotingSendRequestException e) {
+                log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
+                this.closeChannel(addr, channel);
+                throw e;
+            }
+        } else {
+            this.closeChannel(addr, channel);
+            throw new RemotingConnectException(addr);
+        }
+    }
+
+    @Override
+    public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
+        RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+        final Channel channel = this.getAndCreateChannel(addr, timeoutMillis);
+        if (channel != null && channel.isActive()) {
+            try {
+                if (this.rpcHook != null) {
+                    this.rpcHook.doBeforeRequest(addr, request);
+                }
+                this.invokeOnewayImpl(channel, request, timeoutMillis);
+            } catch (RemotingSendRequestException e) {
+                log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
+                this.closeChannel(addr, channel);
+                throw e;
+            }
+        } else {
+            this.closeChannel(addr, channel);
+            throw new RemotingConnectException(addr);
+        }
+    }
+
+    @Override
+    public Bootstrap getBootstrap() {
+        return this.bootstrap;
+    }
+
+    @Override
+    public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
+        ExecutorService executorThis = executor;
+        if (null == executor) {
+            executorThis = this.publicExecutor;
+        }
+
+        Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
+        this.processorTable.put(requestCode, pair);
+    }
+
+    @Override
+    public boolean isChannelWritable(String addr) {
+        ChannelWrapper cw = this.channelTables.get(addr);
+        if (cw != null && cw.isOK()) {
+            return cw.isWritable();
+        }
+        return true;
+    }
+
+    @Override
+    public List<String> getNameServerAddressList() {
+        return this.namesrvAddrList.get();
+    }
+
+    @Override
+    public ChannelEventListener getChannelEventListener() {
+        return channelEventListener;
+    }
+
+    @Override
+    public RPCHook getRPCHook() {
+        return this.rpcHook;
+    }
+
+    @Override
+    public ExecutorService getCallbackExecutor() {
+        return callbackExecutor != null ? callbackExecutor : publicExecutor;
+    }
+
+    @Override
+    public void setCallbackExecutor(final ExecutorService callbackExecutor) {
+        this.callbackExecutor = callbackExecutor;
+    }
+
+    class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
+        @Override
+        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
+            processMessageReceived(ctx, msg);
+        }
+    }
+
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2Handler.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2Handler.java
new file mode 100644
index 0000000..2f4c8a1
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2Handler.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.transport.http2;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http2.DefaultHttp2Connection;
+import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
+import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
+import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
+import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
+import io.netty.handler.codec.http2.DefaultHttp2Headers;
+import io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder;
+import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2ConnectionDecoder;
+import io.netty.handler.codec.http2.Http2ConnectionEncoder;
+import io.netty.handler.codec.http2.Http2ConnectionHandler;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2FrameAdapter;
+import io.netty.handler.codec.http2.Http2FrameReader;
+import io.netty.handler.codec.http2.Http2FrameWriter;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.codec.http2.Http2HeadersDecoder;
+import io.netty.handler.codec.http2.Http2Settings;
+import io.netty.handler.codec.http2.StreamBufferingEncoder;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
+
+public class Http2Handler extends Http2ConnectionHandler {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+
+    private boolean isServer;
+    private int lastStreamId;
+
+    private Http2Handler(final Http2ConnectionDecoder decoder, final Http2ConnectionEncoder encoder,
+        final Http2Settings initialSettings, final boolean isServer) {
+        super(decoder, encoder, initialSettings);
+        decoder.frameListener(new FrameListener());
+        this.isServer = isServer;
+    }
+
+    public static Http2Handler newHandler(final boolean isServer) {
+        log.info("isServer: " + isServer);
+        Http2HeadersDecoder headersDecoder = new DefaultHttp2HeadersDecoder(true);
+        Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder);
+        Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter();
+
+        Http2Connection connection = new DefaultHttp2Connection(isServer);
+
+        Http2ConnectionEncoder encoder = new StreamBufferingEncoder(
+            new DefaultHttp2ConnectionEncoder(connection, frameWriter));
+
+        connection.local().flowController(new DefaultHttp2LocalFlowController(connection,
+            DEFAULT_WINDOW_UPDATE_RATIO, true));
+
+        Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
+            frameReader);
+
+        Http2Settings settings = new Http2Settings();
+
+        if (!isServer) {
+            settings.pushEnabled(true);
+        }
+
+        settings.initialWindowSize(1048576 * 10); //10MiB
+        settings.maxConcurrentStreams(Integer.MAX_VALUE);
+
+        return newHandler(decoder, encoder, settings, isServer);
+    }
+
+    private static Http2Handler newHandler(final Http2ConnectionDecoder decoder, final Http2ConnectionEncoder encoder,
+        final Http2Settings settings, boolean isServer) {
+        return new Http2Handler(decoder, encoder, settings, isServer);
+    }
+
+    @Override
+    public void write(final ChannelHandlerContext ctx, final Object msg,
+        final ChannelPromise promise) throws Exception {
+        if (isServer) {
+            assert msg instanceof ByteBuf;
+            sendAPushPromise(ctx, lastStreamId, lastStreamId + 1, (ByteBuf) msg);
+        } else {
+
+            final Http2Headers headers = new DefaultHttp2Headers();
+
+            try {
+                long threadId = Thread.currentThread().getId();
+                long streamId = (threadId % 2 == 0) ? threadId + 1 : threadId + 2;
+                encoder().writeHeaders(ctx, (int) streamId, headers, 0, false, promise);
+                encoder().writeData(ctx, (int) streamId, (ByteBuf) msg, 0, false, ctx.newPromise());
+                ctx.flush();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+
+        }
+    }
+
+    private void sendAPushPromise(ChannelHandlerContext ctx, int streamId, int pushPromiseStreamId,
+        ByteBuf payload) throws Http2Exception {
+
+        encoder().writePushPromise(ctx, streamId, pushPromiseStreamId,
+            new DefaultHttp2Headers().status(OK.codeAsText()), 0, ctx.newPromise());
+
+        Http2Headers headers = new DefaultHttp2Headers();
+        headers.status(OK.codeAsText());
+        encoder().writeHeaders(ctx, pushPromiseStreamId, headers, 0, false, ctx.newPromise());
+        encoder().writeData(ctx, pushPromiseStreamId, payload, 0, false, ctx.newPromise());
+    }
+
+    private class FrameListener extends Http2FrameAdapter {
+        @Override
+        public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
+            boolean endOfStream) throws Http2Exception {
+            //Http2Handler.this.onDataRead(ctx, streamId, data, endOfStream);
+            data.retain();
+            Http2Handler.this.lastStreamId = streamId;
+            ctx.fireChannelRead(data);
+            return data.readableBytes() + padding;
+        }
+    }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java
new file mode 100644
index 0000000..02c4fb6
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java
@@ -0,0 +1,242 @@
+package org.apache.rocketmq.remoting.transport.http2;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.ServerSocketChannel;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http2.Http2SecurityUtil;
+import io.netty.handler.ssl.OpenSsl;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
+import io.netty.handler.ssl.SupportedCipherSuiteFilter;
+import io.netty.handler.ssl.util.SelfSignedCertificate;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import io.netty.util.concurrent.EventExecutorGroup;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.ChannelEventListener;
+import org.apache.rocketmq.remoting.InvokeCallback;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.RemotingServer;
+import org.apache.rocketmq.remoting.common.Pair;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
+import org.apache.rocketmq.remoting.netty.ChannelStatisticsHandler;
+import org.apache.rocketmq.remoting.transport.rocketmq.NettyDecoder;
+import org.apache.rocketmq.remoting.transport.rocketmq.NettyEncoder;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.transport.NettyRemotingServerAbstract;
+import org.apache.rocketmq.remoting.util.JvmUtils;
+import org.apache.rocketmq.remoting.util.ThreadUtils;
+
+public class Http2ServerImpl extends NettyRemotingServerAbstract implements RemotingServer {
+    private static final InternalLogger LOG = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+
+    private ServerBootstrap serverBootstrap;
+    private EventLoopGroup bossGroup;
+    private EventLoopGroup ioGroup;
+    private EventExecutorGroup workerGroup;
+    private Class<? extends ServerSocketChannel> socketChannelClass;
+    private NettyServerConfig serverConfig;
+    private ChannelEventListener channelEventListener;
+    private ExecutorService publicExecutor;
+    private int port;
+    private RPCHook rpcHook;
+
+    public Http2ServerImpl(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener) {
+        super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
+        init(nettyServerConfig, channelEventListener);
+    }
+
+    public Http2ServerImpl() {
+        super();
+    }
+
+    @Override
+    public void init(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener) {
+        super.init(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
+        this.serverBootstrap = new ServerBootstrap();
+        this.serverConfig = nettyServerConfig;
+        this.channelEventListener = channelEventListener;
+        this.publicExecutor = ThreadUtils.newFixedThreadPool(
+            serverConfig.getServerCallbackExecutorThreads(),
+            10000, "Remoting-PublicExecutor", true);
+        if (JvmUtils.isLinux() && this.serverConfig.isUseEpollNativeSelector()) {
+            this.ioGroup = new EpollEventLoopGroup(serverConfig.getServerSelectorThreads(), ThreadUtils.newGenericThreadFactory("NettyEpollIoThreads",
+                serverConfig.getServerSelectorThreads()));
+            this.bossGroup = new EpollEventLoopGroup(serverConfig.getServerAcceptorThreads(), ThreadUtils.newGenericThreadFactory("NettyBossThreads",
+                serverConfig.getServerAcceptorThreads()));
+            this.socketChannelClass = EpollServerSocketChannel.class;
+        } else {
+            this.bossGroup = new NioEventLoopGroup(serverConfig.getServerAcceptorThreads(), ThreadUtils.newGenericThreadFactory("NettyBossThreads",
+                serverConfig.getServerAcceptorThreads()));
+            this.ioGroup = new NioEventLoopGroup(serverConfig.getServerSelectorThreads(), ThreadUtils.newGenericThreadFactory("NettyNioIoThreads",
+                serverConfig.getServerSelectorThreads()));
+            this.socketChannelClass = NioServerSocketChannel.class;
+        }
+
+        this.workerGroup = new DefaultEventExecutorGroup(serverConfig.getServerWorkerThreads(),
+            ThreadUtils.newGenericThreadFactory("NettyWorkerThreads", serverConfig.getServerWorkerThreads()));
+        this.port = nettyServerConfig.getListenPort();
+        buildHttp2SslContext();
+    }
+
+    private void buildHttp2SslContext() {
+        try {
+            SslProvider provider = OpenSsl.isAvailable() ? SslProvider.OPENSSL : SslProvider.JDK;
+            SelfSignedCertificate ssc;
+            //NOTE: the cipher filter may not include all ciphers required by the HTTP/2 specification.
+            //Please refer to the HTTP/2 specification for cipher requirements.
+            ssc = new SelfSignedCertificate();
+            sslContext = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey())
+                .sslProvider(provider)
+                .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE).build();
+        } catch (Exception e) {
+            LOG.error("Can not build SSL context !", e);
+        }
+    }
+
+    @Override
+    public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
+        ExecutorService executorThis = executor;
+        if (null == executor) {
+            executorThis = this.publicExecutor;
+        }
+        Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
+        this.processorTable.put(requestCode, pair);
+    }
+
+    @Override
+    public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) {
+        this.defaultRequestProcessor = new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);
+    }
+
+    @Override
+    public int localListenPort() {
+        return this.port;
+    }
+
+    @Override
+    public Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(int requestCode) {
+        return processorTable.get(requestCode);
+    }
+
+    @Override
+    public void push(String addr, String sessionId, RemotingCommand remotingCommand) {
+
+    }
+
+    @Override
+    public RemotingCommand invokeSync(final Channel channel, final RemotingCommand request, final long timeoutMillis)
+        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
+        return this.invokeSyncImpl(channel, request, timeoutMillis);
+    }
+
+    @Override
+    public void invokeAsync(Channel channel, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
+        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+        this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
+    }
+
+    @Override
+    public void invokeOneway(Channel channel, RemotingCommand request, long timeoutMillis) throws InterruptedException,
+        RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+        this.invokeOnewayImpl(channel, request, timeoutMillis);
+    }
+
+    private void applyOptions(ServerBootstrap bootstrap) {
+        if (null != serverConfig) {
+            bootstrap.option(ChannelOption.SO_BACKLOG, 1024)
+                .option(ChannelOption.SO_REUSEADDR, true)
+                .option(ChannelOption.SO_KEEPALIVE, false)
+                .childOption(ChannelOption.TCP_NODELAY, true)
+                .childOption(ChannelOption.SO_SNDBUF, serverConfig.getServerSocketSndBufSize())
+                .childOption(ChannelOption.SO_RCVBUF, serverConfig.getServerSocketRcvBufSize());
+        }
+
+        if (serverConfig.isServerPooledByteBufAllocatorEnable()) {
+            bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+        }
+    }
+
+    @Override
+    public void start() {
+        super.start();
+        final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+        this.serverBootstrap.group(this.bossGroup, this.ioGroup).
+            channel(socketChannelClass).childHandler(new ChannelInitializer<SocketChannel>() {
+            @Override
+            public void initChannel(SocketChannel ch) throws Exception {
+                channels.add(ch);
+
+                ChannelPipeline cp = ch.pipeline();
+
+                cp.addLast(ChannelStatisticsHandler.NAME, new ChannelStatisticsHandler(channels));
+
+                cp.addLast(workerGroup,
+                    Http2Handler.newHandler(true),
+                    new NettyEncoder(),
+                    new NettyDecoder(),
+                    new IdleStateHandler(serverConfig.getConnectionChannelReaderIdleSeconds(),
+                        serverConfig.getConnectionChannelWriterIdleSeconds(),
+                        serverConfig.getServerChannelMaxIdleTimeSeconds()),
+                    new NettyConnectManageHandler(),
+                    new NettyServerHandler());
+            }
+        });
+
+        applyOptions(serverBootstrap);
+
+        ChannelFuture channelFuture = this.serverBootstrap.bind(this.port).syncUninterruptibly();
+        this.port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
+        startUpHouseKeepingService();
+    }
+
+    @Override
+    public void shutdown() {
+        super.shutdown();
+        ThreadUtils.shutdownGracefully(publicExecutor, 2000, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void registerRPCHook(RPCHook rpcHook) {
+        this.rpcHook = rpcHook;
+    }
+
+    @Override
+    public RPCHook getRPCHook() {
+        return this.rpcHook;
+    }
+
+    @Override
+    public ExecutorService getCallbackExecutor() {
+        return this.publicExecutor;
+    }
+
+    @Override
+    public ChannelEventListener getChannelEventListener() {
+        return this.channelEventListener;
+    }
+
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyDecoder.java
similarity index 93%
rename from remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
rename to remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyDecoder.java
index f64ab2d..70858a9 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyDecoder.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.remoting.netty;
+package org.apache.rocketmq.remoting.transport.rocketmq;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
@@ -24,6 +24,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.netty.CodecHelper;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 public class NettyDecoder extends LengthFieldBasedFrameDecoder {
@@ -46,8 +47,7 @@ public class NettyDecoder extends LengthFieldBasedFrameDecoder {
             }
 
             ByteBuffer byteBuffer = frame.nioBuffer();
-
-            return RemotingCommand.decode(byteBuffer);
+            return CodecHelper.decode(byteBuffer);
         } catch (Exception e) {
             log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
             RemotingUtil.closeChannel(ctx.channel());
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyEncoder.java
similarity index 88%
rename from remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java
rename to remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyEncoder.java
index 8c3c56a..8285dee 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyEncoder.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.remoting.netty;
+package org.apache.rocketmq.remoting.transport.rocketmq;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
@@ -24,17 +24,17 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.netty.CodecHelper;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
 
     @Override
-    public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
-        throws Exception {
+    public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out) {
         try {
-            ByteBuffer header = remotingCommand.encodeHeader();
-            out.writeBytes(header);
+            ByteBuffer byteBuffer = CodecHelper.encodeHeader(remotingCommand);
+            out.writeBytes(byteBuffer);
             byte[] body = remotingCommand.getBody();
             if (body != null) {
                 out.writeBytes(body);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java
new file mode 100644
index 0000000..14666d2
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.transport.rocketmq;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import java.io.IOException;
+import java.security.cert.CertificateException;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import javax.xml.ws.AsyncHandler;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.ChannelEventListener;
+import org.apache.rocketmq.remoting.InvokeCallback;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.common.Pair;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.netty.TlsHelper;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.transport.NettyRemotingClientAbstract;
+import org.apache.rocketmq.remoting.util.ThreadUtils;
+
+public class NettyRemotingClient extends NettyRemotingClientAbstract implements RemotingClient {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+
+    private NettyClientConfig nettyClientConfig;
+    private Bootstrap bootstrap = new Bootstrap();
+    private EventLoopGroup eventLoopGroupWorker;
+
+    private ExecutorService publicExecutor;
+
+    private ExecutorService asyncExecutor;
+    /**
+     * Invoke the callback methods in this executor when process response.
+     */
+    private ExecutorService callbackExecutor;
+    private ChannelEventListener channelEventListener;
+    private DefaultEventExecutorGroup defaultEventExecutorGroup;
+    private RPCHook rpcHook;
+
+    public NettyRemotingClient() {
+        super();
+    }
+
+    public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {
+        this(nettyClientConfig, null);
+    }
+
+    public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
+        final ChannelEventListener channelEventListener) {
+        super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
+        init(nettyClientConfig, channelEventListener);
+    }
+
+    @Override
+    public void init(NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) {
+        this.nettyClientConfig = nettyClientConfig;
+        this.channelEventListener = channelEventListener;
+        this.eventLoopGroupWorker = new NioEventLoopGroup(nettyClientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads",
+            nettyClientConfig.getClientWorkerThreads()));
+        this.publicExecutor = ThreadUtils.newFixedThreadPool(
+            nettyClientConfig.getClientCallbackExecutorThreads(),
+            10000, "Remoting-PublicExecutor", true);
+        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),
+            ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", nettyClientConfig.getClientWorkerThreads()));
+        if (nettyClientConfig.isUseTLS()) {
+            try {
+                sslContext = TlsHelper.buildSslContext(true);
+                log.info("SSL enabled for client");
+            } catch (IOException e) {
+                log.error("Failed to create SSLContext", e);
+            } catch (CertificateException e) {
+                log.error("Failed to create SSLContext", e);
+                throw new RuntimeException("Failed to create SSLContext", e);
+            }
+        }
+    }
+
+    @Override
+    public Bootstrap getBootstrap() {
+        return this.bootstrap;
+    }
+
+    @Override
+    public void start() {
+        bootstrap = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
+            .option(ChannelOption.TCP_NODELAY, true)
+            .option(ChannelOption.SO_KEEPALIVE, false)
+            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
+            .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
+            .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
+            .handler(new ChannelInitializer<SocketChannel>() {
+                @Override
+                public void initChannel(SocketChannel ch) throws Exception {
+                    ChannelPipeline pipeline = ch.pipeline();
+                    if (nettyClientConfig.isUseTLS()) {
+                        if (null != sslContext) {
+                            pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
+                            log.info("Prepend SSL handler");
+                        } else {
+                            log.warn("Connections are insecure as SSLContext is null!");
+                        }
+                    }
+                    pipeline.addLast(
+                        defaultEventExecutorGroup,
+                        new NettyEncoder(),
+                        new NettyDecoder(),
+                        new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
+                        new NettyConnectManageHandler(),
+                        new NettyClientHandler());
+                }
+            });
+        startUpHouseKeepingService();
+    }
+
+    @Override
+    public void shutdown() {
+        super.shutdown();
+        try {
+            clearChannels();
+            if (this.eventLoopGroupWorker != null) {
+                this.eventLoopGroupWorker.shutdownGracefully();
+            }
+            if (this.defaultEventExecutorGroup != null) {
+                this.defaultEventExecutorGroup.shutdownGracefully();
+            }
+            if (this.publicExecutor != null) {
+                this.publicExecutor.shutdown();
+            }
+        } catch (Exception e) {
+            log.error("NettyRemotingClient shutdown exception, ", e);
+        }
+    }
+
+    @Override
+    public void registerRPCHook(RPCHook rpcHook) {
+        this.rpcHook = rpcHook;
+    }
+
+    @Override
+    public void updateNameServerAddressList(List<String> addrs) {
+        super.updateNameServerAddressList(addrs);
+    }
+
+    @Override
+    public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
+        throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
+        long beginStartTime = System.currentTimeMillis();
+        final Channel channel = this.getAndCreateChannel(addr, timeoutMillis);
+        if (channel != null && channel.isActive()) {
+            try {
+                if (this.rpcHook != null) {
+                    this.rpcHook.doBeforeRequest(addr, request);
+                }
+                long costTime = System.currentTimeMillis() - beginStartTime;
+                if (timeoutMillis < costTime) {
+                    throw new RemotingTimeoutException("invokeSync call timeout");
+                }
+                RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
+                if (this.rpcHook != null) {
+                    this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
+                }
+                return response;
+            } catch (RemotingSendRequestException e) {
+                log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
+                this.closeChannel(addr, channel);
+                throw e;
+            } catch (RemotingTimeoutException e) {
+                if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
+                    this.closeChannel(addr, channel);
+                    log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
+                }
+                log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
+                throw e;
+            }
+        } else {
+            this.closeChannel(addr, channel);
+            throw new RemotingConnectException(addr);
+        }
+    }
+
+    @Override
+    public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
+        throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
+        RemotingSendRequestException {
+        long beginStartTime = System.currentTimeMillis();
+        final Channel channel = this.getAndCreateChannel(addr, timeoutMillis);
+        if (channel != null && channel.isActive()) {
+            try {
+                if (this.rpcHook != null) {
+                    this.rpcHook.doBeforeRequest(addr, request);
+                }
+                long costTime = System.currentTimeMillis() - beginStartTime;
+                if (timeoutMillis < costTime) {
+                    throw new RemotingTooMuchRequestException("invokeAsync call timeout");
+                }
+                this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);
+            } catch (RemotingSendRequestException e) {
+                log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
+                this.closeChannel(addr, channel);
+                throw e;
+            }
+        } else {
+            this.closeChannel(addr, channel);
+            throw new RemotingConnectException(addr);
+        }
+    }
+
+    @Override
+    public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
+        RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+        final Channel channel = this.getAndCreateChannel(addr, timeoutMillis);
+        if (channel != null && channel.isActive()) {
+            try {
+                if (this.rpcHook != null) {
+                    this.rpcHook.doBeforeRequest(addr, request);
+                }
+                this.invokeOnewayImpl(channel, request, timeoutMillis);
+            } catch (RemotingSendRequestException e) {
+                log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
+                this.closeChannel(addr, channel);
+                throw e;
+            }
+        } else {
+            this.closeChannel(addr, channel);
+            throw new RemotingConnectException(addr);
+        }
+    }
+
+    @Override
+    public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
+        ExecutorService executorThis = executor;
+        if (null == executor) {
+            executorThis = this.publicExecutor;
+        }
+
+        Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
+        this.processorTable.put(requestCode, pair);
+    }
+
+    @Override
+    public boolean isChannelWritable(String addr) {
+        ChannelWrapper cw = this.channelTables.get(addr);
+        if (cw != null && cw.isOK()) {
+            return cw.isWritable();
+        }
+        return true;
+    }
+
+    @Override
+    public List<String> getNameServerAddressList() {
+        return this.namesrvAddrList.get();
+    }
+
+    @Override
+    public ChannelEventListener getChannelEventListener() {
+        return channelEventListener;
+    }
+
+    @Override
+    public RPCHook getRPCHook() {
+        return this.rpcHook;
+    }
+
+    @Override
+    public ExecutorService getCallbackExecutor() {
+        return callbackExecutor != null ? callbackExecutor : publicExecutor;
+    }
+
+    @Override
+    public void setCallbackExecutor(final ExecutorService callbackExecutor) {
+        this.callbackExecutor = callbackExecutor;
+    }
+
+    class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
+        @Override
+        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
+            processMessageReceived(ctx, msg);
+        }
+    }
+
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java
similarity index 59%
rename from remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
rename to remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java
index 1984842..9e7d419 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java
@@ -14,66 +14,64 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.remoting.netty;
+package org.apache.rocketmq.remoting.transport.rocketmq;
 
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.epoll.Epoll;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.epoll.EpollServerSocketChannel;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.ServerSocketChannel;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.timeout.IdleState;
-import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.util.concurrent.DefaultEventExecutorGroup;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.cert.CertificateException;
 import java.util.NoSuchElementException;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.ChannelEventListener;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.RemotingServer;
 import org.apache.rocketmq.remoting.common.Pair;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.remoting.common.TlsMode;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.netty.FileRegionEncoder;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.netty.TlsHelper;
+import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.transport.NettyRemotingServerAbstract;
+import org.apache.rocketmq.remoting.util.JvmUtils;
+import org.apache.rocketmq.remoting.util.ThreadUtils;
 
-public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
+public class NettyRemotingServer extends NettyRemotingServerAbstract implements RemotingServer {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
-    private final ServerBootstrap serverBootstrap;
-    private final EventLoopGroup eventLoopGroupSelector;
-    private final EventLoopGroup eventLoopGroupBoss;
-    private final NettyServerConfig nettyServerConfig;
-
-    private final ExecutorService publicExecutor;
-    private final ChannelEventListener channelEventListener;
+    private ServerBootstrap serverBootstrap;
+    private EventLoopGroup eventLoopGroupSelector;
+    private EventLoopGroup eventLoopGroupBoss;
+    private NettyServerConfig nettyServerConfig;
 
-    private final Timer timer = new Timer("ServerHouseKeepingService", true);
+    private ExecutorService publicExecutor;
+    private ChannelEventListener channelEventListener;
     private DefaultEventExecutorGroup defaultEventExecutorGroup;
+    private Class<? extends ServerSocketChannel> socketChannelClass;
 
     private RPCHook rpcHook;
 
@@ -83,69 +81,56 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
     private static final String TLS_HANDLER_NAME = "sslHandler";
     private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
 
+    public NettyRemotingServer() {
+        super();
+    }
+
     public NettyRemotingServer(final NettyServerConfig nettyServerConfig) {
         this(nettyServerConfig, null);
     }
 
     public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
         final ChannelEventListener channelEventListener) {
-        super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
+        init(nettyServerConfig, channelEventListener);
+    }
+
+    @Override
+    public void init(NettyServerConfig serverConfig, ChannelEventListener channelEventListener) {
+        this.nettyServerConfig = serverConfig;
+        super.init(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
         this.serverBootstrap = new ServerBootstrap();
-        this.nettyServerConfig = nettyServerConfig;
+        this.nettyServerConfig = serverConfig;
         this.channelEventListener = channelEventListener;
 
         int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
         if (publicThreadNums <= 0) {
             publicThreadNums = 4;
         }
-
-        this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
-            private AtomicInteger threadIndex = new AtomicInteger(0);
-
-            @Override
-            public Thread newThread(Runnable r) {
-                return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
-            }
-        });
-
-        this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
-            private AtomicInteger threadIndex = new AtomicInteger(0);
-
-            @Override
-            public Thread newThread(Runnable r) {
-                return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet()));
-            }
-        });
-
-        if (useEpoll()) {
-            this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
-                private AtomicInteger threadIndex = new AtomicInteger(0);
-                private int threadTotal = nettyServerConfig.getServerSelectorThreads();
-
-                @Override
-                public Thread newThread(Runnable r) {
-                    return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
-                }
-            });
+        this.publicExecutor = ThreadUtils.newFixedThreadPool(
+            publicThreadNums,
+            10000, "Remoting-PublicExecutor", true);
+        if (JvmUtils.isUseEpoll() && this.nettyServerConfig.isUseEpollNativeSelector()) {
+            this.eventLoopGroupSelector = new EpollEventLoopGroup(serverConfig.getServerSelectorThreads(), ThreadUtils.newGenericThreadFactory("NettyEpollIoThreads",
+                serverConfig.getServerSelectorThreads()));
+            this.eventLoopGroupBoss = new EpollEventLoopGroup(serverConfig.getServerAcceptorThreads(), ThreadUtils.newGenericThreadFactory("NettyBossThreads",
+                serverConfig.getServerAcceptorThreads()));
+            this.socketChannelClass = EpollServerSocketChannel.class;
         } else {
-            this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
-                private AtomicInteger threadIndex = new AtomicInteger(0);
-                private int threadTotal = nettyServerConfig.getServerSelectorThreads();
-
-                @Override
-                public Thread newThread(Runnable r) {
-                    return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
-                }
-            });
+            this.eventLoopGroupBoss = new NioEventLoopGroup(serverConfig.getServerAcceptorThreads(), ThreadUtils.newGenericThreadFactory("NettyBossThreads",
+                serverConfig.getServerAcceptorThreads()));
+            this.eventLoopGroupSelector = new NioEventLoopGroup(serverConfig.getServerSelectorThreads(), ThreadUtils.newGenericThreadFactory("NettyNioIoThreads",
+                serverConfig.getServerSelectorThreads()));
+            this.socketChannelClass = NioServerSocketChannel.class;
         }
-
+        this.port = nettyServerConfig.getListenPort();
+        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(serverConfig.getServerWorkerThreads(),
+            ThreadUtils.newGenericThreadFactory("NettyWorkerThreads", serverConfig.getServerWorkerThreads()));
         loadSslContext();
     }
 
     public void loadSslContext() {
         TlsMode tlsMode = TlsSystemConfig.tlsMode;
         log.info("Server is running in TLS {} mode", tlsMode.getName());
-
         if (tlsMode != TlsMode.DISABLED) {
             try {
                 sslContext = TlsHelper.buildSslContext(false);
@@ -158,36 +143,19 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
         }
     }
 
-    private boolean useEpoll() {
-        return RemotingUtil.isLinuxPlatform()
-            && nettyServerConfig.isUseEpollNativeSelector()
-            && Epoll.isAvailable();
-    }
-
     @Override
     public void start() {
-        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
-            nettyServerConfig.getServerWorkerThreads(),
-            new ThreadFactory() {
-
-                private AtomicInteger threadIndex = new AtomicInteger(0);
-
-                @Override
-                public Thread newThread(Runnable r) {
-                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
-                }
-            });
-
+        super.start();
         ServerBootstrap childHandler =
             this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
-                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
+                .channel(socketChannelClass)
                 .option(ChannelOption.SO_BACKLOG, 1024)
                 .option(ChannelOption.SO_REUSEADDR, true)
                 .option(ChannelOption.SO_KEEPALIVE, false)
                 .childOption(ChannelOption.TCP_NODELAY, true)
                 .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                 .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
-                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
+                .localAddress(new InetSocketAddress(this.port))
                 .childHandler(new ChannelInitializer<SocketChannel>() {
                     @Override
                     public void initChannel(SocketChannel ch) throws Exception {
@@ -197,7 +165,9 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
                             .addLast(defaultEventExecutorGroup,
                                 new NettyEncoder(),
                                 new NettyDecoder(),
-                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
+                                new IdleStateHandler(nettyServerConfig.getConnectionChannelReaderIdleSeconds(),
+                                    nettyServerConfig.getConnectionChannelWriterIdleSeconds(),
+                                    nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                 new NettyConnectManageHandler(),
                                 new NettyServerHandler()
                             );
@@ -215,46 +185,25 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
         } catch (InterruptedException e1) {
             throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
         }
-
-        if (this.channelEventListener != null) {
-            this.nettyEventExecutor.start();
-        }
-
-        this.timer.scheduleAtFixedRate(new TimerTask() {
-
-            @Override
-            public void run() {
-                try {
-                    NettyRemotingServer.this.scanResponseTable();
-                } catch (Throwable e) {
-                    log.error("scanResponseTable exception", e);
-                }
-            }
-        }, 1000 * 3, 1000);
+        startUpHouseKeepingService();
     }
 
     @Override
     public void shutdown() {
         try {
-            if (this.timer != null) {
-                this.timer.cancel();
+            super.shutdown();
+            if (this.eventLoopGroupBoss != null) {
+                this.eventLoopGroupBoss.shutdownGracefully();
             }
-
-            this.eventLoopGroupBoss.shutdownGracefully();
-
-            this.eventLoopGroupSelector.shutdownGracefully();
-
-            if (this.nettyEventExecutor != null) {
-                this.nettyEventExecutor.shutdown();
+            if (this.eventLoopGroupSelector != null) {
+                this.eventLoopGroupSelector.shutdownGracefully();
             }
-
             if (this.defaultEventExecutorGroup != null) {
                 this.defaultEventExecutorGroup.shutdownGracefully();
             }
         } catch (Exception e) {
             log.error("NettyRemotingServer shutdown exception, ", e);
         }
-
         if (this.publicExecutor != null) {
             try {
                 this.publicExecutor.shutdown();
@@ -390,80 +339,8 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
         }
     }
 
-    class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
-
-        @Override
-        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
-            processMessageReceived(ctx, msg);
-        }
-    }
-
-    class NettyConnectManageHandler extends ChannelDuplexHandler {
-        @Override
-        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
-            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
-            log.info("NETTY SERVER PIPELINE: channelRegistered {}", remoteAddress);
-            super.channelRegistered(ctx);
-        }
-
-        @Override
-        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
-            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
-            log.info("NETTY SERVER PIPELINE: channelUnregistered, the channel[{}]", remoteAddress);
-            super.channelUnregistered(ctx);
-        }
-
-        @Override
-        public void channelActive(ChannelHandlerContext ctx) throws Exception {
-            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
-            log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);
-            super.channelActive(ctx);
-
-            if (NettyRemotingServer.this.channelEventListener != null) {
-                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
-            }
-        }
-
-        @Override
-        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
-            log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress);
-            super.channelInactive(ctx);
-
-            if (NettyRemotingServer.this.channelEventListener != null) {
-                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
-            }
-        }
-
-        @Override
-        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
-            if (evt instanceof IdleStateEvent) {
-                IdleStateEvent event = (IdleStateEvent) evt;
-                if (event.state().equals(IdleState.ALL_IDLE)) {
-                    final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
-                    log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
-                    RemotingUtil.closeChannel(ctx.channel());
-                    if (NettyRemotingServer.this.channelEventListener != null) {
-                        NettyRemotingServer.this
-                            .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
-                    }
-                }
-            }
-
-            ctx.fireUserEventTriggered(evt);
-        }
-
-        @Override
-        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
-            log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress);
-            log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);
-
-            if (NettyRemotingServer.this.channelEventListener != null) {
-                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
-            }
+    @Override
+    public void push(String addr, String sessionId, RemotingCommand remotingCommand) {
 
-            RemotingUtil.closeChannel(ctx.channel());
-        }
     }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/JvmUtils.java b/remoting/src/main/java/org/apache/rocketmq/remoting/util/JvmUtils.java
new file mode 100644
index 0000000..e889a91
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/util/JvmUtils.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.util;
+
+import io.netty.channel.epoll.Epoll;
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.Random;
+
+public final class JvmUtils {
+    public static final String OS_NAME = System.getProperty("os.name").toLowerCase();
+    //public static final String OS_VERSION = System.getProperty("os.version").toLowerCase();
+
+    /**
+     * A constructor to stop this class being constructed.
+     */
+    private JvmUtils() {
+        // Unused
+    }
+
+    public static boolean isWindows() {
+        return OS_NAME.startsWith("win");
+    }
+
+    public static boolean isWindows10() {
+        return OS_NAME.startsWith("win") && OS_NAME.endsWith("10");
+    }
+
+    public static boolean isMacOSX() {
+        return OS_NAME.contains("mac");
+    }
+
+    public static boolean isLinux() {
+        return OS_NAME.startsWith("linux");
+    }
+
+    public static boolean isUseEpoll() {
+        return isLinux() && Epoll.isAvailable();
+    }
+
+    public static boolean isUnix() {
+        return OS_NAME.contains("nix") ||
+            OS_NAME.contains("nux") ||
+            OS_NAME.contains("aix") ||
+            OS_NAME.contains("bsd") ||
+            OS_NAME.contains("sun") ||
+            OS_NAME.contains("hpux");
+    }
+
+    public static boolean isSolaris() {
+        return OS_NAME.startsWith("sun");
+    }
+
+    public static int getProcessId() {
+        String pid = null;
+        final File self = new File("/proc/self");
+        try {
+            if (self.exists()) {
+                pid = self.getCanonicalFile().getName();
+            }
+        } catch (IOException ignored) {
+            //Ignore it
+        }
+
+        if (pid == null) {
+            pid = ManagementFactory.getRuntimeMXBean().getName().split("@", 0)[0];
+        }
+
+        if (pid == null) {
+            int rpid = new Random().nextInt(1 << 16);
+            return rpid;
+        } else {
+            return Integer.parseInt(pid);
+        }
+    }
+
+}
+
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/RemotingUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/util/RemotingUtil.java
new file mode 100644
index 0000000..ccd037f
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/util/RemotingUtil.java
@@ -0,0 +1,6 @@
+package org.apache.rocketmq.remoting.util;
+
+public class RemotingUtil {
+    public static final String REMOTING_CHARSET = "UTF-8";
+    public static final String DEFAULT_PROTOCOL = "rocketmq";
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java b/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java
new file mode 100644
index 0000000..d3a2312
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.rocketmq.remoting.util;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+public class ServiceProvider {
+
+    private static final InternalLogger LOG = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+
+    /**
+     * A reference to the classloader that loaded this class. It's more efficient to compute it once and cache it here.
+     */
+    private static ClassLoader thisClassLoader;
+
+    /**
+     * JDK1.3+ <a href= "http://java.sun.com/j2se/1.3/docs/guide/jar/jar.html#Service%20Provider" > 'Service Provider' specification</a>.
+     */
+
+    static {
+        thisClassLoader = getClassLoader(ServiceProvider.class);
+    }
+
+    /**
+     * Returns a string that uniquely identifies the specified object, including its class.
+     * <p>
+     * The returned string is of form "classname@hashcode", ie is the same as the return value of the Object.toString()
+     * method, but works even when the specified object's class has overidden the toString method.
+     *
+     * @param o may be null.
+     * @return a string of form classname@hashcode, or "null" if param o is null.
+     */
+    protected static String objectId(Object o) {
+        if (o == null) {
+            return "null";
+        } else {
+            return o.getClass().getName() + "@" + System.identityHashCode(o);
+        }
+    }
+
+    protected static ClassLoader getClassLoader(Class<?> clazz) {
+        try {
+            return clazz.getClassLoader();
+        } catch (SecurityException e) {
+            LOG.error("Unable to get classloader for class {} due to security restrictions !",
+                clazz, e.getMessage());
+            throw e;
+        }
+    }
+
+    public static ClassLoader getContextClassLoader() {
+        ClassLoader classLoader = null;
+        try {
+            classLoader = Thread.currentThread().getContextClassLoader();
+        } catch (SecurityException ex) {
+            /**
+             * The getContextClassLoader() method throws SecurityException when the context
+             * class loader isn't an ancestor of the calling class's class
+             * loader, or if security permissions are restricted.
+             */
+        }
+        return classLoader;
+    }
+
+    public static InputStream getResourceAsStream(ClassLoader loader, String name) {
+        if (loader != null) {
+            return loader.getResourceAsStream(name);
+        } else {
+            return ClassLoader.getSystemResourceAsStream(name);
+        }
+    }
+
+    public static <T> Map<String, T> load(String path, Class<?> clazz) {
+        LOG.info("Looking for a resource file of name [{}] ...", path);
+        Map<String, T> services = new ConcurrentHashMap<String, T>();
+        try {
+
+            final InputStream is = getResourceAsStream(getContextClassLoader(), path);
+            if (is != null) {
+                BufferedReader reader;
+                try {
+                    reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
+                } catch (java.io.UnsupportedEncodingException e) {
+                    reader = new BufferedReader(new InputStreamReader(is));
+                }
+                String serviceName = reader.readLine();
+                while (serviceName != null && !"".equals(serviceName)) {
+                    LOG.info(
+                        "Creating an instance as specified by file {} which was present in the path of the context classloader.",
+                        path);
+                    String[] service = serviceName.split("=");
+                    if (service.length != 2) {
+                        continue;
+                    } else {
+                        if (services.containsKey(service[0])) {
+                            continue;
+                        } else {
+                            LOG.info("Begin to load protocol: " + service[0]);
+                            services.put(service[0], (T) initService(getContextClassLoader(), service[1], clazz));
+                        }
+                    }
+                    serviceName = reader.readLine();
+                }
+                reader.close();
+            } else {
+                // is == null
+                LOG.warn("No resource file with name [{}] found.", path);
+            }
+        } catch (Exception e) {
+            LOG.error("Error occured when looking for resource file " + path, e);
+        }
+        return services;
+    }
+
+    public static <T> T loadClass(String name, String path, Class<?> clazz) {
+        final InputStream is = getResourceAsStream(getContextClassLoader(), name);
+        if (is != null) {
+            BufferedReader reader;
+            try {
+                try {
+                    reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
+                } catch (java.io.UnsupportedEncodingException e) {
+                    reader = new BufferedReader(new InputStreamReader(is));
+                }
+                String serviceName = reader.readLine();
+                reader.close();
+                if (serviceName != null && !"".equals(serviceName)) {
+                    return initService(getContextClassLoader(), serviceName, clazz);
+                } else {
+                    LOG.warn("ServiceName is empty!");
+                    return null;
+                }
+            } catch (Exception e) {
+                LOG.warn("Error occurred when looking for resource file " + name, e);
+            }
+        }
+        return null;
+    }
+
+    protected static <T> T initService(ClassLoader classLoader, String serviceName, Class<?> clazz) {
+        Class<?> serviceClazz = null;
+        try {
+            if (classLoader != null) {
+                try {
+                    // Warning: must typecast here & allow exception to be generated/caught & recast properly
+                    serviceClazz = classLoader.loadClass(serviceName);
+                    if (clazz.isAssignableFrom(serviceClazz)) {
+                        LOG.info("Loaded class {} from classloader {}", serviceClazz.getName(),
+                            objectId(classLoader));
+                    } else {
+                        // This indicates a problem with the ClassLoader tree. An incompatible ClassLoader was used to load the implementation.
+                        LOG.error(
+                            "Class {} loaded from classloader {} does not extend {} as loaded by this classloader.",
+                            new Object[] {
+                                serviceClazz.getName(),
+                                objectId(serviceClazz.getClassLoader()), clazz.getName()});
+                    }
+                    return (T) serviceClazz.newInstance();
+                } catch (ClassNotFoundException ex) {
+                    if (classLoader == thisClassLoader) {
+                        // Nothing more to try, onwards.
+                        LOG.warn("Unable to locate any class {} via classloader", serviceName,
+                            objectId(classLoader));
+                        throw ex;
+                    }
+                    // Ignore exception, continue
+                } catch (NoClassDefFoundError e) {
+                    if (classLoader == thisClassLoader) {
+                        // Nothing more to try, onwards.
+                        LOG.warn(
+                            "Class {} cannot be loaded via classloader {}.it depends on some other class that cannot be found.",
+                            serviceClazz, objectId(classLoader));
+                        throw e;
+                    }
+                    // Ignore exception, continue
+                }
+            }
+        } catch (Exception e) {
+            LOG.error("Unable to init service.", e);
+        }
+        return (T) serviceClazz;
+    }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/ThreadUtils.java b/remoting/src/main/java/org/apache/rocketmq/remoting/util/ThreadUtils.java
new file mode 100644
index 0000000..954e692
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/util/ThreadUtils.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.util;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public final class ThreadUtils {
+
+    /**
+     * A constructor to stop this class being constructed.
+     */
+    private ThreadUtils() {
+        // Unused
+
+    }
+
+    public static ExecutorService newThreadPoolExecutor(int corePoolSize,
+        int maximumPoolSize,
+        long keepAliveTime,
+        TimeUnit unit,
+        BlockingQueue<Runnable> workQueue,
+        String processName, boolean isDaemon) {
+        return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, newGenericThreadFactory(processName, isDaemon));
+    }
+
+    public static ExecutorService newFixedThreadPool(int nThreads, int workQueueCapacity, String processName, boolean isDaemon) {
+        return new ThreadPoolExecutor(
+            nThreads,
+            nThreads,
+            0,
+            TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<Runnable>(workQueueCapacity),
+            newGenericThreadFactory(processName, isDaemon));
+    }
+
+    public static ExecutorService newSingleThreadExecutor(String processName, boolean isDaemon) {
+        return Executors.newSingleThreadExecutor(newGenericThreadFactory(processName, isDaemon));
+    }
+
+    public static ScheduledExecutorService newSingleThreadScheduledExecutor(String processName, boolean isDaemon) {
+        return Executors.newSingleThreadScheduledExecutor(newGenericThreadFactory(processName, isDaemon));
+    }
+
+    public static ScheduledExecutorService newFixedThreadScheduledPool(int nThreads, String processName,
+        boolean isDaemon) {
+        return Executors.newScheduledThreadPool(nThreads, newGenericThreadFactory(processName, isDaemon));
+    }
+
+    public static ThreadFactory newGenericThreadFactory(String processName) {
+        return newGenericThreadFactory(processName, false);
+    }
+
+    public static ThreadFactory newGenericThreadFactory(String processName, int threads) {
+        return newGenericThreadFactory(processName, threads, false);
+    }
+
+    public static ThreadFactory newGenericThreadFactory(final String processName, final boolean isDaemon) {
+        return new ThreadFactory() {
+            private AtomicInteger threadIndex = new AtomicInteger(0);
+
+            @Override
+            public Thread newThread(Runnable r) {
+                Thread thread = new Thread(r, String.format("%s_%d", processName, this.threadIndex.incrementAndGet()));
+                thread.setDaemon(isDaemon);
+                return thread;
+            }
+        };
+    }
+
+    public static ThreadFactory newGenericThreadFactory(final String processName, final int threads,
+        final boolean isDaemon) {
+        return new ThreadFactory() {
+            private AtomicInteger threadIndex = new AtomicInteger(0);
+
+            @Override
+            public Thread newThread(Runnable r) {
+                Thread thread = new Thread(r, String.format("%s_%d_%d", processName, threads, this.threadIndex.incrementAndGet()));
+                thread.setDaemon(isDaemon);
+                return thread;
+            }
+        };
+    }
+
+    /**
+     * Create a new thread
+     *
+     * @param name The name of the thread
+     * @param runnable The work for the thread to do
+     * @param daemon Should the thread block JVM stop?
+     * @return The unstarted thread
+     */
+    public static Thread newThread(String name, Runnable runnable, boolean daemon) {
+        Thread thread = new Thread(runnable, name);
+        thread.setDaemon(daemon);
+        thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+            public void uncaughtException(Thread t, Throwable e) {
+            }
+        });
+        return thread;
+    }
+
+    /**
+     * Shutdown passed thread using isAlive and join.
+     *
+     * @param t Thread to stop
+     */
+    public static void shutdownGracefully(final Thread t) {
+        shutdownGracefully(t, 0);
+    }
+
+    /**
+     * Shutdown passed thread using isAlive and join.
+     *
+     * @param millis Pass 0 if we're to wait forever.
+     * @param t Thread to stop
+     */
+    public static void shutdownGracefully(final Thread t, final long millis) {
+        if (t == null)
+            return;
+        while (t.isAlive()) {
+            try {
+                t.interrupt();
+                t.join(millis);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * An implementation of the graceful stop sequence recommended by
+     * {@link ExecutorService}.
+     *
+     * @param executor executor
+     * @param timeout timeout
+     * @param timeUnit timeUnit
+     */
+    public static void shutdownGracefully(ExecutorService executor, long timeout, TimeUnit timeUnit) {
+        // Disable new tasks from being submitted.
+        executor.shutdown();
+        try {
+            // Wait a while for existing tasks to terminate.
+            if (!executor
+                .awaitTermination(timeout, timeUnit)) {
+                executor.shutdownNow();
+                // Wait a while for tasks to respond to being cancelled.
+                if (!executor.awaitTermination(timeout, timeUnit)) {
+
+                }
+            }
+        } catch (InterruptedException ie) {
+            // (Re-)Cancel if current thread also interrupted.
+            executor.shutdownNow();
+            // Preserve interrupt status.
+            Thread.currentThread().interrupt();
+        }
+    }
+}
diff --git a/remoting/src/main/resources/META-INF/service/org.apache.rocketmq.remoting.RemotingClient b/remoting/src/main/resources/META-INF/service/org.apache.rocketmq.remoting.RemotingClient
new file mode 100644
index 0000000..b0ca8ec
--- /dev/null
+++ b/remoting/src/main/resources/META-INF/service/org.apache.rocketmq.remoting.RemotingClient
@@ -0,0 +1,2 @@
+rocketmq=org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient
+http2=org.apache.rocketmq.remoting.transport.http2.Http2ClientImpl
\ No newline at end of file
diff --git a/remoting/src/main/resources/META-INF/service/org.apache.rocketmq.remoting.RemotingServer b/remoting/src/main/resources/META-INF/service/org.apache.rocketmq.remoting.RemotingServer
new file mode 100644
index 0000000..5079c88
--- /dev/null
+++ b/remoting/src/main/resources/META-INF/service/org.apache.rocketmq.remoting.RemotingServer
@@ -0,0 +1,2 @@
+rocketmq=org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingServer
+http2=org.apache.rocketmq.remoting.transport.http2.Http2ServerImpl
\ No newline at end of file
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java
index 0ecfaaa..b41dc37 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java
@@ -27,13 +27,13 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
-import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.remoting.netty.ResponseFuture;
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.serialize.LanguageCode;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
+import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingServer;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
index 13bb172..50409f2 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
@@ -25,10 +25,10 @@ import java.io.PrintWriter;
 import org.apache.rocketmq.remoting.common.TlsMode;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
 import org.apache.rocketmq.remoting.netty.TlsHelper;
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.serialize.LanguageCode;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingServer;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
index c3da3e9..2b96abd 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
@@ -20,6 +20,7 @@ import java.util.concurrent.Semaphore;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Spy;
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
index 6b5633d..d0a7ed7 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.remoting.netty;
 import java.lang.reflect.Field;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.junit.MockitoJUnitRunner;
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
index 2bd41ce..243d7f3 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
@@ -18,10 +18,11 @@ package org.apache.rocketmq.remoting.protocol;
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
-import java.nio.ByteBuffer;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.CodecHelper;
+import org.apache.rocketmq.remoting.serialize.SerializeType;
 import org.junit.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -31,7 +32,7 @@ public class RemotingCommandTest {
     public void testMarkProtocolType_JSONProtocolType() {
         int source = 261;
         SerializeType type = SerializeType.JSON;
-        byte[] result = RemotingCommand.markProtocolType(source, type);
+        byte[] result = CodecHelper.markProtocolType(source, type);
         assertThat(result).isEqualTo(new byte[] {0, 0, 1, 5});
     }
 
@@ -39,13 +40,13 @@ public class RemotingCommandTest {
     public void testMarkProtocolType_ROCKETMQProtocolType() {
         int source = 16777215;
         SerializeType type = SerializeType.ROCKETMQ;
-        byte[] result = RemotingCommand.markProtocolType(source, type);
+        byte[] result = CodecHelper.markProtocolType(source, type);
         assertThat(result).isEqualTo(new byte[] {1, -1, -1, -1});
     }
 
     @Test
     public void testCreateRequestCommand_RegisterBroker() {
-        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, "2333");
+        System.setProperty(CodecHelper.REMOTING_VERSION_KEY, "2333");
 
         int code = 103; //org.apache.rocketmq.common.protocol.RequestCode.REGISTER_BROKER
         CommandCustomHeader header = new SampleCommandCustomHeader();
@@ -57,7 +58,7 @@ public class RemotingCommandTest {
 
     @Test
     public void testCreateResponseCommand_SuccessWithHeader() {
-        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, "2333");
+        System.setProperty(CodecHelper.REMOTING_VERSION_KEY, "2333");
 
         int code = RemotingSysResponseCode.SUCCESS;
         String remark = "Sample remark";
@@ -70,7 +71,7 @@ public class RemotingCommandTest {
 
     @Test
     public void testCreateResponseCommand_SuccessWithoutHeader() {
-        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, "2333");
+        System.setProperty(CodecHelper.REMOTING_VERSION_KEY, "2333");
 
         int code = RemotingSysResponseCode.SUCCESS;
         String remark = "Sample remark";
@@ -83,7 +84,7 @@ public class RemotingCommandTest {
 
     @Test
     public void testCreateResponseCommand_FailToCreateCommand() {
-        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, "2333");
+        System.setProperty(CodecHelper.REMOTING_VERSION_KEY, "2333");
 
         int code = RemotingSysResponseCode.SUCCESS;
         String remark = "Sample remark";
@@ -93,7 +94,7 @@ public class RemotingCommandTest {
 
     @Test
     public void testCreateResponseCommand_SystemError() {
-        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, "2333");
+        System.setProperty(CodecHelper.REMOTING_VERSION_KEY, "2333");
 
         RemotingCommand cmd = RemotingCommand.createResponseCommand(SampleCommandCustomHeader.class);
         assertThat(cmd.getCode()).isEqualTo(RemotingSysResponseCode.SYSTEM_ERROR);
@@ -102,86 +103,8 @@ public class RemotingCommandTest {
         assertThat(cmd.getFlag() & 0x01).isEqualTo(1); //flag bit 0: 1 presents response
     }
 
-    @Test
-    public void testEncodeAndDecode_EmptyBody() {
-        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, "2333");
-
-        int code = 103; //org.apache.rocketmq.common.protocol.RequestCode.REGISTER_BROKER
-        CommandCustomHeader header = new SampleCommandCustomHeader();
-        RemotingCommand cmd = RemotingCommand.createRequestCommand(code, header);
-
-        ByteBuffer buffer = cmd.encode();
-
-        //Simulate buffer being read in NettyDecoder
-        buffer.getInt();
-        byte[] bytes = new byte[buffer.limit() - 4];
-        buffer.get(bytes, 0, buffer.limit() - 4);
-        buffer = ByteBuffer.wrap(bytes);
-
-        RemotingCommand decodedCommand = RemotingCommand.decode(buffer);
-
-        assertThat(decodedCommand.getSerializeTypeCurrentRPC()).isEqualTo(SerializeType.JSON);
-        assertThat(decodedCommand.getBody()).isNull();
-    }
-
-    @Test
-    public void testEncodeAndDecode_FilledBody() {
-        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, "2333");
-
-        int code = 103; //org.apache.rocketmq.common.protocol.RequestCode.REGISTER_BROKER
-        CommandCustomHeader header = new SampleCommandCustomHeader();
-        RemotingCommand cmd = RemotingCommand.createRequestCommand(code, header);
-        cmd.setBody(new byte[] {0, 1, 2, 3, 4});
 
-        ByteBuffer buffer = cmd.encode();
 
-        //Simulate buffer being read in NettyDecoder
-        buffer.getInt();
-        byte[] bytes = new byte[buffer.limit() - 4];
-        buffer.get(bytes, 0, buffer.limit() - 4);
-        buffer = ByteBuffer.wrap(bytes);
-
-        RemotingCommand decodedCommand = RemotingCommand.decode(buffer);
-
-        assertThat(decodedCommand.getSerializeTypeCurrentRPC()).isEqualTo(SerializeType.JSON);
-        assertThat(decodedCommand.getBody()).isEqualTo(new byte[] {0, 1, 2, 3, 4});
-    }
-
-    @Test
-    public void testEncodeAndDecode_FilledBodyWithExtFields() throws RemotingCommandException {
-        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, "2333");
-
-        int code = 103; //org.apache.rocketmq.common.protocol.RequestCode.REGISTER_BROKER
-        CommandCustomHeader header = new ExtFieldsHeader();
-        RemotingCommand cmd = RemotingCommand.createRequestCommand(code, header);
-
-        cmd.addExtField("key", "value");
-
-        ByteBuffer buffer = cmd.encode();
-
-        //Simulate buffer being read in NettyDecoder
-        buffer.getInt();
-        byte[] bytes = new byte[buffer.limit() - 4];
-        buffer.get(bytes, 0, buffer.limit() - 4);
-        buffer = ByteBuffer.wrap(bytes);
-
-        RemotingCommand decodedCommand = RemotingCommand.decode(buffer);
-
-        assertThat(decodedCommand.getExtFields().get("stringValue")).isEqualTo("bilibili");
-        assertThat(decodedCommand.getExtFields().get("intValue")).isEqualTo("2333");
-        assertThat(decodedCommand.getExtFields().get("longValue")).isEqualTo("23333333");
-        assertThat(decodedCommand.getExtFields().get("booleanValue")).isEqualTo("true");
-        assertThat(decodedCommand.getExtFields().get("doubleValue")).isEqualTo("0.618");
-
-        assertThat(decodedCommand.getExtFields().get("key")).isEqualTo("value");
-
-        CommandCustomHeader decodedHeader = decodedCommand.decodeCommandCustomHeader(ExtFieldsHeader.class);
-        assertThat(((ExtFieldsHeader) decodedHeader).getStringValue()).isEqualTo("bilibili");
-        assertThat(((ExtFieldsHeader) decodedHeader).getIntValue()).isEqualTo(2333);
-        assertThat(((ExtFieldsHeader) decodedHeader).getLongValue()).isEqualTo(23333333l);
-        assertThat(((ExtFieldsHeader) decodedHeader).isBooleanValue()).isEqualTo(true);
-        assertThat(((ExtFieldsHeader) decodedHeader).getDoubleValue()).isBetween(0.617, 0.619);
-    }
 
     @Test
     public void testNotNullField() throws Exception {
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingSerializableTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingSerializableTest.java
index 3e8b7a9..7ad6db7 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingSerializableTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingSerializableTest.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.remoting.protocol;
 
 import java.util.Arrays;
 import java.util.List;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 import org.junit.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializableTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializableTest.java
index f1db54f..66bdf73 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializableTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializableTest.java
@@ -17,6 +17,9 @@
 package org.apache.rocketmq.remoting.protocol;
 
 import java.util.HashMap;
+import org.apache.rocketmq.remoting.serialize.LanguageCode;
+import org.apache.rocketmq.remoting.serialize.RocketMQSerializable;
+import org.apache.rocketmq.remoting.serialize.SerializeType;
 import org.junit.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java b/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java
index 7021992..d0c4fcb 100644
--- a/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java
+++ b/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java
@@ -18,7 +18,7 @@ package org.apache.rocketmq.store.schedule;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class DelayOffsetSerializeWrapper extends RemotingSerializable {
     private ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java
index a9b9ab0..f517e2a 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java
@@ -28,7 +28,7 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.apache.rocketmq.tools.command.SubCommand;
 import org.apache.rocketmq.tools.command.SubCommandException;
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/monitor/MonitorServiceTest.java b/tools/src/test/java/org/apache/rocketmq/tools/monitor/MonitorServiceTest.java
index 4989a9b..eae3cfb 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/monitor/MonitorServiceTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/monitor/MonitorServiceTest.java
@@ -52,7 +52,7 @@ import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.remoting.exception.RemotingException;
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.serialize.LanguageCode;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
 import org.junit.AfterClass;