You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2019/06/28 11:19:58 UTC

[rocketmq-remoting] 23/39: Fix some bugs and polish the netty transport implementation

This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-remoting.git

commit 7eeca295876cb0ee9fb82da6322234c9d18793de
Author: yukon <yu...@apache.org>
AuthorDate: Wed Jun 5 11:54:36 2019 +0800

    Fix some bugs and polish the netty transport implementation
---
 benchmarks/remoting-benchmark/pom.xml              |   4 +-
 .../benchmarks/remoting/AbstractBenchmark.java     |  54 ++--
 .../remoting/RemotingBootstrapFactory.java         |  19 +-
 .../rocketmq/remoting/common/ResponseFuture.java   |   1 -
 .../remoting/config/RemotingClientConfig.java      | 117 +++++++++
 .../rocketmq/remoting/config/RemotingConfig.java   | 281 ++-------------------
 .../remoting/config/RemotingServerConfig.java      | 108 ++++++++
 .../rocketmq/remoting/config/TcpSocketConfig.java  |  24 +-
 .../remoting/impl/command/CodecHelper.java         |   4 +-
 .../remoting/impl/netty/ClientChannelManager.java  |  10 +-
 .../remoting/impl/netty/NettyChannelEventType.java |   4 +-
 .../remoting/impl/netty/NettyRemotingAbstract.java |  27 +-
 .../remoting/impl/netty/NettyRemotingClient.java   |  40 +--
 .../remoting/impl/netty/NettyRemotingServer.java   |  35 ++-
 .../org/apache/rocketmq/remoting/BaseTest.java     |   3 +-
 .../common/SemaphoreReleaseOnlyOnceTest.java       |   2 +-
 .../command/RemotingCommandFactoryImplTest.java    |   5 +-
 .../impl/command/RequestIdGeneratorTest.java       |   2 +-
 .../impl/netty/ClientChannelManagerTest.java       |   4 +-
 .../remoting/impl/netty/handler/EncoderTest.java   |   1 -
 20 files changed, 358 insertions(+), 387 deletions(-)

diff --git a/benchmarks/remoting-benchmark/pom.xml b/benchmarks/remoting-benchmark/pom.xml
index e103ae8..a18838d 100644
--- a/benchmarks/remoting-benchmark/pom.xml
+++ b/benchmarks/remoting-benchmark/pom.xml
@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <artifactId>rocketmq-x</artifactId>
diff --git a/benchmarks/remoting-benchmark/src/main/java/org/apache/rocketmq/benchmarks/remoting/AbstractBenchmark.java b/benchmarks/remoting-benchmark/src/main/java/org/apache/rocketmq/benchmarks/remoting/AbstractBenchmark.java
index b3754e9..04ca3cf 100644
--- a/benchmarks/remoting-benchmark/src/main/java/org/apache/rocketmq/benchmarks/remoting/AbstractBenchmark.java
+++ b/benchmarks/remoting-benchmark/src/main/java/org/apache/rocketmq/benchmarks/remoting/AbstractBenchmark.java
@@ -21,38 +21,16 @@ import org.apache.rocketmq.remoting.RemotingBootstrapFactory;
 import org.apache.rocketmq.remoting.api.RemotingClient;
 import org.apache.rocketmq.remoting.api.RemotingServer;
 import org.apache.rocketmq.remoting.api.command.RemotingCommand;
-import org.apache.rocketmq.remoting.config.RemotingConfig;
+import org.apache.rocketmq.remoting.config.RemotingClientConfig;
+import org.apache.rocketmq.remoting.config.RemotingServerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class AbstractBenchmark {
     protected static final Logger LOG = LoggerFactory.getLogger(AbstractBenchmark.class);
 
-    /**
-     * Standard message sizes.
-     */
-    public enum MessageSize {
-        SMALL(16), MEDIUM(1024), LARGE(65536), JUMBO(1048576);
-
-        private final int bytes;
-        MessageSize(int bytes) {
-            this.bytes = bytes;
-        }
-
-        public int bytes() {
-            return bytes;
-        }
-    }
-
-    /**
-     * Support channel types.
-     */
-    public enum ChannelType {
-        NIO, LOCAL;
-    }
-
     public static void main(String[] args) throws InterruptedException {
-        RemotingServer server = RemotingBootstrapFactory.createRemotingServer(new RemotingConfig());
+        RemotingServer server = RemotingBootstrapFactory.createRemotingServer(new RemotingServerConfig());
 
         server.registerRequestProcessor((short) 1, (channel, request) -> {
             RemotingCommand response = server.commandFactory().createResponse(request);
@@ -62,7 +40,7 @@ public class AbstractBenchmark {
         });
         server.start();
 
-        RemotingClient client = RemotingBootstrapFactory.createRemotingClient(new RemotingConfig());
+        RemotingClient client = RemotingBootstrapFactory.createRemotingClient(new RemotingClientConfig());
         client.start();
 
         RemotingCommand request = client.commandFactory().createRequest();
@@ -75,4 +53,28 @@ public class AbstractBenchmark {
         client.stop();
         server.stop();
     }
+
+    /**
+     * Standard message sizes.
+     */
+    public enum MessageSize {
+        SMALL(16), MEDIUM(1024), LARGE(65536), JUMBO(1048576);
+
+        private final int bytes;
+
+        MessageSize(int bytes) {
+            this.bytes = bytes;
+        }
+
+        public int bytes() {
+            return bytes;
+        }
+    }
+
+    /**
+     * Support channel types.
+     */
+    public enum ChannelType {
+        NIO, LOCAL;
+    }
 }
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/RemotingBootstrapFactory.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/RemotingBootstrapFactory.java
index efa4078..84ae102 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/RemotingBootstrapFactory.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/RemotingBootstrapFactory.java
@@ -19,7 +19,8 @@ package org.apache.rocketmq.remoting;
 
 import java.util.Properties;
 import org.apache.rocketmq.remoting.api.RemotingClient;
-import org.apache.rocketmq.remoting.config.RemotingConfig;
+import org.apache.rocketmq.remoting.config.RemotingClientConfig;
+import org.apache.rocketmq.remoting.config.RemotingServerConfig;
 import org.apache.rocketmq.remoting.impl.netty.NettyRemotingClient;
 import org.apache.rocketmq.remoting.impl.netty.NettyRemotingServer;
 import org.apache.rocketmq.remoting.internal.BeanUtils;
@@ -30,33 +31,33 @@ import org.jetbrains.annotations.NotNull;
  * Remoting Bootstrap entrance.
  */
 public final class RemotingBootstrapFactory {
-    public static RemotingClient createRemotingClient(@NotNull final String fileName) {
-        Properties prop = PropertyUtils.loadProps(fileName);
-        RemotingConfig config = BeanUtils.populate(prop, RemotingConfig.class);
+    public static RemotingClient createRemotingClient(@NotNull final RemotingClientConfig config) {
         return new NettyRemotingClient(config);
     }
 
-    public static RemotingClient createRemotingClient(@NotNull final RemotingConfig config) {
+    public static RemotingClient createRemotingClient(@NotNull final String fileName) {
+        Properties prop = PropertyUtils.loadProps(fileName);
+        RemotingClientConfig config = BeanUtils.populate(prop, RemotingClientConfig.class);
         return new NettyRemotingClient(config);
     }
 
     public static RemotingClient createRemotingClient(@NotNull final Properties properties) {
-        RemotingConfig config = BeanUtils.populate(properties, RemotingConfig.class);
+        RemotingClientConfig config = BeanUtils.populate(properties, RemotingClientConfig.class);
         return new NettyRemotingClient(config);
     }
 
     public static NettyRemotingServer createRemotingServer(@NotNull final String fileName) {
         Properties prop = PropertyUtils.loadProps(fileName);
-        RemotingConfig config = BeanUtils.populate(prop, RemotingConfig.class);
+        RemotingServerConfig config = BeanUtils.populate(prop, RemotingServerConfig.class);
         return new NettyRemotingServer(config);
     }
 
     public static NettyRemotingServer createRemotingServer(@NotNull final Properties properties) {
-        RemotingConfig config = BeanUtils.populate(properties, RemotingConfig.class);
+        RemotingServerConfig config = BeanUtils.populate(properties, RemotingServerConfig.class);
         return new NettyRemotingServer(config);
     }
 
-    public static NettyRemotingServer createRemotingServer(@NotNull final RemotingConfig config) {
+    public static NettyRemotingServer createRemotingServer(@NotNull final RemotingServerConfig config) {
         return new NettyRemotingServer(config);
     }
 }
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java
index 8a2aec2..e6c394b 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java
@@ -24,7 +24,6 @@ import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
 import org.apache.rocketmq.remoting.api.AsyncHandler;
 import org.apache.rocketmq.remoting.api.command.RemotingCommand;
-import org.apache.rocketmq.remoting.api.interceptor.InterceptorGroup;
 import org.jetbrains.annotations.Nullable;
 
 public class ResponseFuture {
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingClientConfig.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingClientConfig.java
new file mode 100644
index 0000000..8d77388
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingClientConfig.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.config;
+
+public class RemotingClientConfig extends RemotingConfig {
+    private int connectTimeoutMillis = 3000;
+
+    private boolean clientNativeEpollEnable = false;
+
+    private int clientIoThreads = 1;
+    private int clientWorkerThreads = 4;
+
+    private int clientOnewayInvokeSemaphore = 65535;
+    private int clientAsyncInvokeSemaphore = 65535;
+
+    private boolean clientPooledBytebufAllocatorEnable = false;
+
+    private boolean clientCloseSocketIfTimeout = false;
+    private boolean clientShortConnectionEnable = false;
+
+    public boolean isClientNativeEpollEnable() {
+        return clientNativeEpollEnable;
+    }
+
+    public void setClientNativeEpollEnable(final boolean clientNativeEpollEnable) {
+        this.clientNativeEpollEnable = clientNativeEpollEnable;
+    }
+
+    public int getClientIoThreads() {
+        return clientIoThreads;
+    }
+
+    public void setClientIoThreads(final int clientIoThreads) {
+        this.clientIoThreads = clientIoThreads;
+    }
+
+    public int getClientWorkerThreads() {
+        return clientWorkerThreads;
+    }
+
+    public void setClientWorkerThreads(final int clientWorkerThreads) {
+        this.clientWorkerThreads = clientWorkerThreads;
+    }
+
+    public int getClientOnewayInvokeSemaphore() {
+        return clientOnewayInvokeSemaphore;
+    }
+
+    public void setClientOnewayInvokeSemaphore(final int clientOnewayInvokeSemaphore) {
+        this.clientOnewayInvokeSemaphore = clientOnewayInvokeSemaphore;
+    }
+
+    public int getClientAsyncInvokeSemaphore() {
+        return clientAsyncInvokeSemaphore;
+    }
+
+    public void setClientAsyncInvokeSemaphore(final int clientAsyncInvokeSemaphore) {
+        this.clientAsyncInvokeSemaphore = clientAsyncInvokeSemaphore;
+    }
+
+    public boolean isClientPooledBytebufAllocatorEnable() {
+        return clientPooledBytebufAllocatorEnable;
+    }
+
+    public void setClientPooledBytebufAllocatorEnable(final boolean clientPooledBytebufAllocatorEnable) {
+        this.clientPooledBytebufAllocatorEnable = clientPooledBytebufAllocatorEnable;
+    }
+
+    public boolean isClientCloseSocketIfTimeout() {
+        return clientCloseSocketIfTimeout;
+    }
+
+    public void setClientCloseSocketIfTimeout(final boolean clientCloseSocketIfTimeout) {
+        this.clientCloseSocketIfTimeout = clientCloseSocketIfTimeout;
+    }
+
+    public boolean isClientShortConnectionEnable() {
+        return clientShortConnectionEnable;
+    }
+
+    public void setClientShortConnectionEnable(final boolean clientShortConnectionEnable) {
+        this.clientShortConnectionEnable = clientShortConnectionEnable;
+    }
+
+    public int getConnectTimeoutMillis() {
+        return connectTimeoutMillis;
+    }
+
+    public void setConnectTimeoutMillis(final int connectTimeoutMillis) {
+        this.connectTimeoutMillis = connectTimeoutMillis;
+    }
+
+    @Override
+    public int getOnewayInvokeSemaphore() {
+        return this.clientOnewayInvokeSemaphore;
+    }
+
+    @Override
+    public int getAsyncInvokeSemaphore() {
+        return this.clientAsyncInvokeSemaphore;
+    }
+}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
index 9fa79c2..d6f636b 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
@@ -17,67 +17,26 @@
 
 package org.apache.rocketmq.remoting.config;
 
-import org.apache.commons.lang3.builder.ToStringBuilder;
-import org.apache.commons.lang3.builder.ToStringStyle;
-
-public class RemotingConfig extends TcpSocketConfig {
-    private int connectionMaxRetries = 3;
-    private int connectionChannelReaderIdleSeconds = 0;
-    private int connectionChannelWriterIdleSeconds = 0;
+public abstract class RemotingConfig extends TcpSocketConfig {
     /**
      * IdleStateEvent will be triggered when neither read nor write was
      * performed for the specified period of this time. Specify {@code 0} to
      * disable
      */
+    private int connectionChannelReaderIdleSeconds = 0;
+    private int connectionChannelWriterIdleSeconds = 0;
     private int connectionChannelIdleSeconds = 120;
+
     private int writeBufLowWaterMark = 32 * 10240;
     private int writeBufHighWaterMark = 64 * 10240;
-    private int threadTaskLowWaterMark = 30000;
-    private int threadTaskHighWaterMark = 50000;
-    private int connectionRetryBackoffMillis = 3000;
-    private int serviceThreadBlockQueueSize = 50000;
-    private boolean clientNativeEpollEnable = false;
-    private int clientWorkerThreads = 16 + Runtime.getRuntime().availableProcessors() * 2;
-    private int clientConnectionFutureAwaitTimeoutMillis = 3000;
-    private int clientAsyncCallbackExecutorThreads = 16 + Runtime.getRuntime().availableProcessors() * 2;
-    private int clientOnewayInvokeSemaphore = 20480;
 
-    //=============Server configuration==================
-    private int clientAsyncInvokeSemaphore = 20480;
-    private boolean clientPooledBytebufAllocatorEnable = false;
-    private boolean clientCloseSocketIfTimeout = true;
-    private boolean clientShortConnectionEnable = false;
-    private long clientPublishServiceTimeout = 10000;
-    private long clientConsumerServiceTimeout = 10000;
-    private long clientInvokeServiceTimeout = 10000;
-    private int clientMaxRetryCount = 10;
-    private int clientSleepBeforeRetry = 100;
-    private int serverListenPort = 8888;
-    /**
-     * If server only listened 1 port,recommend to set the value to 1
-     */
-    private int serverAcceptorThreads = 1;
-    private int serverIoThreads = 16 + Runtime.getRuntime().availableProcessors() * 2;
-    private int serverWorkerThreads = 16 + Runtime.getRuntime().availableProcessors() * 2;
-    private int serverOnewayInvokeSemaphore = 256;
-    private int serverAsyncInvokeSemaphore = 6400;
-    private boolean serverNativeEpollEnable = false;
-    private int serverAsyncCallbackExecutorThreads = Runtime.getRuntime().availableProcessors() * 2;
-    private boolean serverPooledBytebufAllocatorEnable = true;
-    private boolean serverAuthOpenEnable = true;
+    private int asyncHandlerExecutorThreads = Runtime.getRuntime().availableProcessors();
 
-    @Override
-    public String toString() {
-        return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
-    }
+    private int publicExecutorThreads = 4;
 
-    public int getConnectionMaxRetries() {
-        return connectionMaxRetries;
-    }
+    public abstract int getOnewayInvokeSemaphore();
 
-    public void setConnectionMaxRetries(final int connectionMaxRetries) {
-        this.connectionMaxRetries = connectionMaxRetries;
-    }
+    public abstract int getAsyncInvokeSemaphore();
 
     public int getConnectionChannelReaderIdleSeconds() {
         return connectionChannelReaderIdleSeconds;
@@ -119,227 +78,19 @@ public class RemotingConfig extends TcpSocketConfig {
         this.writeBufHighWaterMark = writeBufHighWaterMark;
     }
 
-    public int getThreadTaskLowWaterMark() {
-        return threadTaskLowWaterMark;
-    }
-
-    public void setThreadTaskLowWaterMark(final int threadTaskLowWaterMark) {
-        this.threadTaskLowWaterMark = threadTaskLowWaterMark;
-    }
-
-    public int getThreadTaskHighWaterMark() {
-        return threadTaskHighWaterMark;
-    }
-
-    public void setThreadTaskHighWaterMark(final int threadTaskHighWaterMark) {
-        this.threadTaskHighWaterMark = threadTaskHighWaterMark;
-    }
-
-    public int getConnectionRetryBackoffMillis() {
-        return connectionRetryBackoffMillis;
-    }
-
-    public void setConnectionRetryBackoffMillis(final int connectionRetryBackoffMillis) {
-        this.connectionRetryBackoffMillis = connectionRetryBackoffMillis;
-    }
-
-    public int getServiceThreadBlockQueueSize() {
-        return serviceThreadBlockQueueSize;
-    }
-
-    public void setServiceThreadBlockQueueSize(final int serviceThreadBlockQueueSize) {
-        this.serviceThreadBlockQueueSize = serviceThreadBlockQueueSize;
-    }
-
-    public boolean isClientNativeEpollEnable() {
-        return clientNativeEpollEnable;
-    }
-
-    public void setClientNativeEpollEnable(final boolean clientNativeEpollEnable) {
-        this.clientNativeEpollEnable = clientNativeEpollEnable;
-    }
-
-    public int getClientWorkerThreads() {
-        return clientWorkerThreads;
-    }
-
-    public void setClientWorkerThreads(final int clientWorkerThreads) {
-        this.clientWorkerThreads = clientWorkerThreads;
-    }
-
-    public int getClientConnectionFutureAwaitTimeoutMillis() {
-        return clientConnectionFutureAwaitTimeoutMillis;
-    }
-
-    public void setClientConnectionFutureAwaitTimeoutMillis(final int clientConnectionFutureAwaitTimeoutMillis) {
-        this.clientConnectionFutureAwaitTimeoutMillis = clientConnectionFutureAwaitTimeoutMillis;
-    }
-
-    public int getClientAsyncCallbackExecutorThreads() {
-        return clientAsyncCallbackExecutorThreads;
-    }
-
-    public void setClientAsyncCallbackExecutorThreads(final int clientAsyncCallbackExecutorThreads) {
-        this.clientAsyncCallbackExecutorThreads = clientAsyncCallbackExecutorThreads;
-    }
-
-    public int getClientOnewayInvokeSemaphore() {
-        return clientOnewayInvokeSemaphore;
-    }
-
-    public void setClientOnewayInvokeSemaphore(final int clientOnewayInvokeSemaphore) {
-        this.clientOnewayInvokeSemaphore = clientOnewayInvokeSemaphore;
-    }
-
-    public int getClientAsyncInvokeSemaphore() {
-        return clientAsyncInvokeSemaphore;
-    }
-
-    public void setClientAsyncInvokeSemaphore(final int clientAsyncInvokeSemaphore) {
-        this.clientAsyncInvokeSemaphore = clientAsyncInvokeSemaphore;
-    }
-
-    public boolean isClientPooledBytebufAllocatorEnable() {
-        return clientPooledBytebufAllocatorEnable;
-    }
-
-    public void setClientPooledBytebufAllocatorEnable(final boolean clientPooledBytebufAllocatorEnable) {
-        this.clientPooledBytebufAllocatorEnable = clientPooledBytebufAllocatorEnable;
-    }
-
-    public boolean isClientCloseSocketIfTimeout() {
-        return clientCloseSocketIfTimeout;
-    }
-
-    public void setClientCloseSocketIfTimeout(final boolean clientCloseSocketIfTimeout) {
-        this.clientCloseSocketIfTimeout = clientCloseSocketIfTimeout;
-    }
-
-    public boolean isClientShortConnectionEnable() {
-        return clientShortConnectionEnable;
-    }
-
-    public void setClientShortConnectionEnable(final boolean clientShortConnectionEnable) {
-        this.clientShortConnectionEnable = clientShortConnectionEnable;
-    }
-
-    public long getClientPublishServiceTimeout() {
-        return clientPublishServiceTimeout;
-    }
-
-    public void setClientPublishServiceTimeout(final long clientPublishServiceTimeout) {
-        this.clientPublishServiceTimeout = clientPublishServiceTimeout;
-    }
-
-    public long getClientConsumerServiceTimeout() {
-        return clientConsumerServiceTimeout;
-    }
-
-    public void setClientConsumerServiceTimeout(final long clientConsumerServiceTimeout) {
-        this.clientConsumerServiceTimeout = clientConsumerServiceTimeout;
-    }
-
-    public long getClientInvokeServiceTimeout() {
-        return clientInvokeServiceTimeout;
-    }
-
-    public void setClientInvokeServiceTimeout(final long clientInvokeServiceTimeout) {
-        this.clientInvokeServiceTimeout = clientInvokeServiceTimeout;
-    }
-
-    public int getClientMaxRetryCount() {
-        return clientMaxRetryCount;
-    }
-
-    public void setClientMaxRetryCount(final int clientMaxRetryCount) {
-        this.clientMaxRetryCount = clientMaxRetryCount;
-    }
-
-    public int getClientSleepBeforeRetry() {
-        return clientSleepBeforeRetry;
-    }
-
-    public void setClientSleepBeforeRetry(final int clientSleepBeforeRetry) {
-        this.clientSleepBeforeRetry = clientSleepBeforeRetry;
-    }
-
-    public int getServerListenPort() {
-        return serverListenPort;
-    }
-
-    public void setServerListenPort(final int serverListenPort) {
-        this.serverListenPort = serverListenPort;
-    }
-
-    public int getServerAcceptorThreads() {
-        return serverAcceptorThreads;
-    }
-
-    public void setServerAcceptorThreads(final int serverAcceptorThreads) {
-        this.serverAcceptorThreads = serverAcceptorThreads;
-    }
-
-    public int getServerIoThreads() {
-        return serverIoThreads;
-    }
-
-    public void setServerIoThreads(final int serverIoThreads) {
-        this.serverIoThreads = serverIoThreads;
-    }
-
-    public int getServerWorkerThreads() {
-        return serverWorkerThreads;
-    }
-
-    public void setServerWorkerThreads(final int serverWorkerThreads) {
-        this.serverWorkerThreads = serverWorkerThreads;
-    }
-
-    public int getServerOnewayInvokeSemaphore() {
-        return serverOnewayInvokeSemaphore;
-    }
-
-    public void setServerOnewayInvokeSemaphore(final int serverOnewayInvokeSemaphore) {
-        this.serverOnewayInvokeSemaphore = serverOnewayInvokeSemaphore;
-    }
-
-    public int getServerAsyncInvokeSemaphore() {
-        return serverAsyncInvokeSemaphore;
-    }
-
-    public void setServerAsyncInvokeSemaphore(final int serverAsyncInvokeSemaphore) {
-        this.serverAsyncInvokeSemaphore = serverAsyncInvokeSemaphore;
-    }
-
-    public boolean isServerNativeEpollEnable() {
-        return serverNativeEpollEnable;
-    }
-
-    public void setServerNativeEpollEnable(final boolean serverNativeEpollEnable) {
-        this.serverNativeEpollEnable = serverNativeEpollEnable;
-    }
-
-    public int getServerAsyncCallbackExecutorThreads() {
-        return serverAsyncCallbackExecutorThreads;
-    }
-
-    public void setServerAsyncCallbackExecutorThreads(final int serverAsyncCallbackExecutorThreads) {
-        this.serverAsyncCallbackExecutorThreads = serverAsyncCallbackExecutorThreads;
-    }
-
-    public boolean isServerPooledBytebufAllocatorEnable() {
-        return serverPooledBytebufAllocatorEnable;
+    public int getAsyncHandlerExecutorThreads() {
+        return asyncHandlerExecutorThreads;
     }
 
-    public void setServerPooledBytebufAllocatorEnable(final boolean serverPooledBytebufAllocatorEnable) {
-        this.serverPooledBytebufAllocatorEnable = serverPooledBytebufAllocatorEnable;
+    public void setAsyncHandlerExecutorThreads(final int asyncHandlerExecutorThreads) {
+        this.asyncHandlerExecutorThreads = asyncHandlerExecutorThreads;
     }
 
-    public boolean isServerAuthOpenEnable() {
-        return serverAuthOpenEnable;
+    public int getPublicExecutorThreads() {
+        return publicExecutorThreads;
     }
 
-    public void setServerAuthOpenEnable(final boolean serverAuthOpenEnable) {
-        this.serverAuthOpenEnable = serverAuthOpenEnable;
+    public void setPublicExecutorThreads(final int publicExecutorThreads) {
+        this.publicExecutorThreads = publicExecutorThreads;
     }
 }
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingServerConfig.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingServerConfig.java
new file mode 100644
index 0000000..9879364
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingServerConfig.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.config;
+
+public class RemotingServerConfig extends RemotingConfig {
+    private int serverListenPort = 8888;
+    /**
+     * If server only listened 1 port,recommend to set the value to 1
+     */
+    private int serverAcceptorThreads = 1;
+    private int serverIoThreads = 3;
+    private int serverWorkerThreads = 8;
+
+    private int serverOnewayInvokeSemaphore = 256;
+    private int serverAsyncInvokeSemaphore = 64;
+
+    private boolean serverNativeEpollEnable = false;
+    private boolean serverPooledBytebufAllocatorEnable = true;
+
+    public int getServerListenPort() {
+        return serverListenPort;
+    }
+
+    public void setServerListenPort(final int serverListenPort) {
+        this.serverListenPort = serverListenPort;
+    }
+
+    public int getServerAcceptorThreads() {
+        return serverAcceptorThreads;
+    }
+
+    public void setServerAcceptorThreads(final int serverAcceptorThreads) {
+        this.serverAcceptorThreads = serverAcceptorThreads;
+    }
+
+    public int getServerIoThreads() {
+        return serverIoThreads;
+    }
+
+    public void setServerIoThreads(final int serverIoThreads) {
+        this.serverIoThreads = serverIoThreads;
+    }
+
+    public int getServerWorkerThreads() {
+        return serverWorkerThreads;
+    }
+
+    public void setServerWorkerThreads(final int serverWorkerThreads) {
+        this.serverWorkerThreads = serverWorkerThreads;
+    }
+
+    public int getServerOnewayInvokeSemaphore() {
+        return serverOnewayInvokeSemaphore;
+    }
+
+    public void setServerOnewayInvokeSemaphore(final int serverOnewayInvokeSemaphore) {
+        this.serverOnewayInvokeSemaphore = serverOnewayInvokeSemaphore;
+    }
+
+    public int getServerAsyncInvokeSemaphore() {
+        return serverAsyncInvokeSemaphore;
+    }
+
+    public void setServerAsyncInvokeSemaphore(final int serverAsyncInvokeSemaphore) {
+        this.serverAsyncInvokeSemaphore = serverAsyncInvokeSemaphore;
+    }
+
+    public boolean isServerNativeEpollEnable() {
+        return serverNativeEpollEnable;
+    }
+
+    public void setServerNativeEpollEnable(final boolean serverNativeEpollEnable) {
+        this.serverNativeEpollEnable = serverNativeEpollEnable;
+    }
+
+    public boolean isServerPooledBytebufAllocatorEnable() {
+        return serverPooledBytebufAllocatorEnable;
+    }
+
+    public void setServerPooledBytebufAllocatorEnable(final boolean serverPooledBytebufAllocatorEnable) {
+        this.serverPooledBytebufAllocatorEnable = serverPooledBytebufAllocatorEnable;
+    }
+
+    @Override
+    public int getOnewayInvokeSemaphore() {
+        return this.serverOnewayInvokeSemaphore;
+    }
+
+    @Override
+    public int getAsyncInvokeSemaphore() {
+        return this.serverAsyncInvokeSemaphore;
+    }
+}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/TcpSocketConfig.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/TcpSocketConfig.java
index 4dfcde7..d77bf3d 100755
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/TcpSocketConfig.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/TcpSocketConfig.java
@@ -23,14 +23,14 @@ package org.apache.rocketmq.remoting.config;
  * @see java.net.SocketOptions
  */
 public class TcpSocketConfig {
-    private boolean tcpSoReuseAddress;
-    private boolean tcpSoKeepAlive;
-    private boolean tcpSoNoDelay;
-    private int tcpSoSndBufSize;  // see /proc/sys/net/ipv4/tcp_rmem
-    private int tcpSoRcvBufSize;  // see /proc/sys/net/ipv4/tcp_wmem
-    private int tcpSoBacklogSize;
-    private int tcpSoLinger;
-    private int tcpSoTimeout;
+    private boolean tcpSoReuseAddress = true;
+    private boolean tcpSoKeepAlive = false;
+    private boolean tcpSoNoDelay = true;
+    private int tcpSoSndBufSize = 65535;  // see /proc/sys/net/ipv4/tcp_rmem
+    private int tcpSoRcvBufSize = 65535;  // see /proc/sys/net/ipv4/tcp_wmem
+    private int tcpSoBacklogSize = 1024;
+    private int tcpSoLinger = -1;
+    private int tcpSoTimeoutMillis = 3000;
 
     public boolean isTcpSoReuseAddress() {
         return tcpSoReuseAddress;
@@ -88,11 +88,11 @@ public class TcpSocketConfig {
         this.tcpSoLinger = tcpSoLinger;
     }
 
-    public int getTcpSoTimeout() {
-        return tcpSoTimeout;
+    public int getTcpSoTimeoutMillis() {
+        return tcpSoTimeoutMillis;
     }
 
-    public void setTcpSoTimeout(final int tcpSoTimeout) {
-        this.tcpSoTimeout = tcpSoTimeout;
+    public void setTcpSoTimeoutMillis(final int tcpSoTimeoutMillis) {
+        this.tcpSoTimeoutMillis = tcpSoTimeoutMillis;
     }
 }
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java
index 8d3ff3a..988c20c 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java
@@ -29,12 +29,12 @@ public class CodecHelper {
     // + RemarkLen(2) + PropertiesSize(2) + PayloadLen(4);
     public final static int MIN_PROTOCOL_LEN = 1 + 4 + 2 + 2 + 4 + 1 + 2 + 2 + 2 + 4;
     public final static byte PROTOCOL_MAGIC = 0x14;
-    private final static char PROPERTY_SEPARATOR = '\n';
-    private final static Charset REMOTING_CHARSET = Charset.forName("UTF-8");
     final static int REMARK_MAX_LEN = Short.MAX_VALUE;
     final static int PROPERTY_MAX_LEN = 524288; // 512KB
     final static int PAYLOAD_MAX_LEN = 16777216; // 16MB
     public final static int PACKET_MAX_LEN = MIN_PROTOCOL_LEN + REMARK_MAX_LEN + PROPERTY_MAX_LEN + PAYLOAD_MAX_LEN;
+    private final static char PROPERTY_SEPARATOR = '\n';
+    private final static Charset REMOTING_CHARSET = Charset.forName("UTF-8");
 
     public static void encodeCommand(final RemotingCommand command, final ByteBufferWrapper out) {
         out.writeByte(PROTOCOL_MAGIC);
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java
index 0b084a0..4f4ec5c 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java
@@ -28,7 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import org.apache.rocketmq.remoting.config.RemotingConfig;
+import org.apache.rocketmq.remoting.config.RemotingClientConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,10 +41,10 @@ public class ClientChannelManager {
     final ConcurrentHashMap<String, ChannelWrapper> channelTables = new ConcurrentHashMap<>();
     private final Lock lockChannelTables = new ReentrantLock();
     private final Bootstrap clientBootstrap;
-    private final RemotingConfig clientConfig;
+    private final RemotingClientConfig clientConfig;
 
     ClientChannelManager(final Bootstrap bootstrap,
-        final RemotingConfig config) {
+        final RemotingClientConfig config) {
         clientBootstrap = bootstrap;
         clientConfig = config;
     }
@@ -106,7 +106,7 @@ public class ClientChannelManager {
 
         if (cw != null) {
             ChannelFuture channelFuture = cw.getChannelFuture();
-            if (channelFuture.awaitUninterruptibly(this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis())) {
+            if (channelFuture.awaitUninterruptibly(this.clientConfig.getConnectTimeoutMillis())) {
                 if (cw.isActive()) {
                     LOG.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
                     return cw.getChannel();
@@ -115,7 +115,7 @@ public class ClientChannelManager {
                     this.closeChannel(addr, cw.getChannel());
                 }
             } else {
-                LOG.warn("createChannel: connect remote host[{}] timeout {}ms, {}, and destroy the channel", addr, this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis(),
+                LOG.warn("createChannel: connect remote host[{}] timeout {}ms, {}, and destroy the channel", addr, this.clientConfig.getConnectTimeoutMillis(),
                     channelFuture.toString());
                 this.closeChannel(addr, cw.getChannel());
             }
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEventType.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEventType.java
index 1bf2277..432363d 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEventType.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEventType.java
@@ -18,8 +18,8 @@
 package org.apache.rocketmq.remoting.impl.netty;
 
 public enum NettyChannelEventType {
-    ACTIVE,
-    INACTIVE,
+    CONNECT,
+    CLOSE,
     IDLE,
     EXCEPTION
 }
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
index 920a922..cbd0059 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
@@ -74,15 +74,20 @@ public abstract class NettyRemotingAbstract implements RemotingService {
     private final String remotingInstanceId = UIDGenerator.instance().createUID();
 
     private final ExecutorService publicExecutor;
+    private final ExecutorService asyncHandlerExecutor;
     protected ScheduledExecutorService houseKeepingService = ThreadUtils.newSingleThreadScheduledExecutor("HouseKeepingService", true);
     private InterceptorGroup interceptorGroup = new InterceptorGroup();
     private ChannelEventListenerGroup channelEventListenerGroup = new ChannelEventListenerGroup();
 
-    NettyRemotingAbstract(RemotingConfig clientConfig) {
-        this.semaphoreOneway = new Semaphore(clientConfig.getClientOnewayInvokeSemaphore(), true);
-        this.semaphoreAsync = new Semaphore(clientConfig.getClientAsyncInvokeSemaphore(), true);
+    NettyRemotingAbstract(RemotingConfig remotingConfig) {
+        this.semaphoreOneway = new Semaphore(remotingConfig.getOnewayInvokeSemaphore(), true);
+        this.semaphoreAsync = new Semaphore(remotingConfig.getAsyncInvokeSemaphore(), true);
         this.publicExecutor = ThreadUtils.newFixedThreadPool(
-            clientConfig.getClientAsyncCallbackExecutorThreads(),
+            remotingConfig.getPublicExecutorThreads(),
+            10000, "Remoting-PublicExecutor", true);
+
+        this.asyncHandlerExecutor = ThreadUtils.newFixedThreadPool(
+            remotingConfig.getAsyncHandlerExecutorThreads(),
             10000, "Remoting-PublicExecutor", true);
         this.remotingCommandFactory = new RemotingCommandFactoryImpl();
     }
@@ -133,7 +138,9 @@ public abstract class NettyRemotingAbstract implements RemotingService {
 
     @Override
     public void stop() {
+        ThreadUtils.shutdownGracefully(houseKeepingService, 3000, TimeUnit.MILLISECONDS);
         ThreadUtils.shutdownGracefully(publicExecutor, 2000, TimeUnit.MILLISECONDS);
+        ThreadUtils.shutdownGracefully(asyncHandlerExecutor, 2000, TimeUnit.MILLISECONDS);
         ThreadUtils.shutdownGracefully(channelEventExecutor);
     }
 
@@ -234,16 +241,12 @@ public abstract class NettyRemotingAbstract implements RemotingService {
         channel.writeAndFlush(msg);
     }
 
-    public ExecutorService getCallbackExecutor() {
-        return this.publicExecutor;
-    }
-
     /**
      * Execute callback in callback executor. If callback executor is null, run directly in current thread
      */
     private void executeAsyncHandler(final ResponseFuture responseFuture) {
         boolean runInThisThread = false;
-        ExecutorService executor = this.getCallbackExecutor();
+        ExecutorService executor = asyncHandlerExecutor;
         if (executor != null) {
             try {
                 executor.submit(new Runnable() {
@@ -549,10 +552,10 @@ public abstract class NettyRemotingAbstract implements RemotingService {
                             case IDLE:
                                 listener.onChannelIdle(channel);
                                 break;
-                            case INACTIVE:
+                            case CLOSE:
                                 listener.onChannelClose(channel);
                                 break;
-                            case ACTIVE:
+                            case CONNECT:
                                 listener.onChannelConnect(channel);
                                 break;
                             case EXCEPTION:
@@ -571,7 +574,7 @@ public abstract class NettyRemotingAbstract implements RemotingService {
 
     }
 
-    protected class EventDispatcher extends SimpleChannelInboundHandler<RemotingCommand> {
+    protected class RemotingCommandDispatcher extends SimpleChannelInboundHandler<RemotingCommand> {
 
         @Override
         protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
index ce30aa2..6b2796e 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
@@ -37,18 +37,16 @@ import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.util.concurrent.DefaultEventExecutorGroup;
 import io.netty.util.concurrent.EventExecutorGroup;
 import java.net.SocketAddress;
-import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.remoting.api.AsyncHandler;
 import org.apache.rocketmq.remoting.api.RemotingClient;
 import org.apache.rocketmq.remoting.api.command.RemotingCommand;
 import org.apache.rocketmq.remoting.api.command.TrafficType;
 import org.apache.rocketmq.remoting.api.exception.RemoteConnectFailureException;
 import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException;
-import org.apache.rocketmq.remoting.config.RemotingConfig;
+import org.apache.rocketmq.remoting.config.RemotingClientConfig;
 import org.apache.rocketmq.remoting.external.ThreadUtils;
 import org.apache.rocketmq.remoting.impl.netty.handler.Decoder;
 import org.apache.rocketmq.remoting.impl.netty.handler.Encoder;
-import org.apache.rocketmq.remoting.impl.netty.handler.ExceptionHandler;
 import org.apache.rocketmq.remoting.internal.JvmUtils;
 
 public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
@@ -56,21 +54,21 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
     private final EventLoopGroup ioGroup;
     private final Class<? extends SocketChannel> socketChannelClass;
 
-    private final RemotingConfig clientConfig;
+    private final RemotingClientConfig clientConfig;
 
     private EventExecutorGroup workerGroup;
     private ClientChannelManager clientChannelManager;
 
-    public NettyRemotingClient(final RemotingConfig clientConfig) {
+    public NettyRemotingClient(final RemotingClientConfig clientConfig) {
         super(clientConfig);
         this.clientConfig = clientConfig;
 
         if (JvmUtils.isLinux() && this.clientConfig.isClientNativeEpollEnable()) {
-            this.ioGroup = new EpollEventLoopGroup(clientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads",
+            this.ioGroup = new EpollEventLoopGroup(clientConfig.getClientIoThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads",
                 clientConfig.getClientWorkerThreads()));
             socketChannelClass = EpollSocketChannel.class;
         } else {
-            this.ioGroup = new NioEventLoopGroup(clientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientNioIoThreads",
+            this.ioGroup = new NioEventLoopGroup(clientConfig.getClientIoThreads(), ThreadUtils.newGenericThreadFactory("NettyClientNioIoThreads",
                 clientConfig.getClientWorkerThreads()));
             socketChannelClass = NioSocketChannel.class;
         }
@@ -88,15 +86,14 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
         this.clientBootstrap.group(this.ioGroup).channel(socketChannelClass)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
-                public void initChannel(SocketChannel ch) throws Exception {
+                public void initChannel(SocketChannel ch) {
                     ch.pipeline().addLast(workerGroup,
                         new Decoder(),
                         new Encoder(),
                         new IdleStateHandler(clientConfig.getConnectionChannelReaderIdleSeconds(),
                             clientConfig.getConnectionChannelWriterIdleSeconds(), clientConfig.getConnectionChannelIdleSeconds()),
                         new ClientConnectionHandler(),
-                        new EventDispatcher(),
-                        new ExceptionHandler());
+                        new RemotingCommandDispatcher());
                 }
             });
 
@@ -108,14 +105,10 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
     @Override
     public void stop() {
         try {
-            ThreadUtils.shutdownGracefully(houseKeepingService, 3000, TimeUnit.MILLISECONDS);
-
             clientChannelManager.clear();
 
             this.ioGroup.shutdownGracefully();
 
-            ThreadUtils.shutdownGracefully(channelEventExecutor);
-
             this.workerGroup.shutdownGracefully();
         } catch (Exception e) {
             LOG.warn("RemotingClient stopped error !", e);
@@ -126,10 +119,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
 
     private void applyOptions(Bootstrap bootstrap) {
         if (null != clientConfig) {
-            if (clientConfig.getTcpSoLinger() > 0) {
-                bootstrap.option(ChannelOption.SO_LINGER, clientConfig.getTcpSoLinger());
-            }
-
             if (clientConfig.getTcpSoSndBufSize() > 0) {
                 bootstrap.option(ChannelOption.SO_SNDBUF, clientConfig.getTcpSoSndBufSize());
             }
@@ -137,10 +126,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
                 bootstrap.option(ChannelOption.SO_RCVBUF, clientConfig.getTcpSoRcvBufSize());
             }
 
-            bootstrap.option(ChannelOption.SO_REUSEADDR, clientConfig.isTcpSoReuseAddress()).
-                option(ChannelOption.SO_KEEPALIVE, clientConfig.isTcpSoKeepAlive()).
+            bootstrap.option(ChannelOption.SO_KEEPALIVE, clientConfig.isTcpSoKeepAlive()).
                 option(ChannelOption.TCP_NODELAY, clientConfig.isTcpSoNoDelay()).
-                option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getTcpSoTimeout()).
+                option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getTcpSoTimeoutMillis()).
                 option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(clientConfig.getWriteBufLowWaterMark(),
                     clientConfig.getWriteBufHighWaterMark()));
         }
@@ -206,7 +194,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
             LOG.info("Connected from {} to {}.", localAddress, remoteAddress);
             super.connect(ctx, remoteAddress, localAddress, promise);
 
-            putNettyEvent(new NettyChannelEvent(NettyChannelEventType.ACTIVE, ctx.channel()));
+            putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CONNECT, ctx.channel()));
         }
 
         @Override
@@ -217,7 +205,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
 
             super.disconnect(ctx, promise);
 
-            putNettyEvent(new NettyChannelEvent(NettyChannelEventType.INACTIVE, ctx.channel()));
+            putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CLOSE, ctx.channel()));
         }
 
         @Override
@@ -228,11 +216,11 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
 
             super.close(ctx, promise);
 
-            putNettyEvent(new NettyChannelEvent(NettyChannelEventType.INACTIVE, ctx.channel()));
+            putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CLOSE, ctx.channel()));
         }
 
         @Override
-        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
             if (evt instanceof IdleStateEvent) {
                 IdleStateEvent event = (IdleStateEvent) evt;
                 if (event.state().equals(IdleState.ALL_IDLE)) {
@@ -246,7 +234,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
         }
 
         @Override
-        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
             LOG.info("Close channel {} because of error {} ", ctx.channel(), cause);
             NettyRemotingClient.this.clientChannelManager.closeChannel(ctx.channel());
             putNettyEvent(new NettyChannelEvent(NettyChannelEventType.EXCEPTION, ctx.channel()));
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
index f1e9360..f0dbb45 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
@@ -39,12 +39,11 @@ import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.util.concurrent.DefaultEventExecutorGroup;
 import io.netty.util.concurrent.EventExecutorGroup;
 import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.remoting.api.AsyncHandler;
 import org.apache.rocketmq.remoting.api.RemotingServer;
 import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
 import org.apache.rocketmq.remoting.api.command.RemotingCommand;
-import org.apache.rocketmq.remoting.config.RemotingConfig;
+import org.apache.rocketmq.remoting.config.RemotingServerConfig;
 import org.apache.rocketmq.remoting.external.ThreadUtils;
 import org.apache.rocketmq.remoting.impl.channel.NettyChannelImpl;
 import org.apache.rocketmq.remoting.impl.netty.handler.Decoder;
@@ -52,7 +51,7 @@ import org.apache.rocketmq.remoting.impl.netty.handler.Encoder;
 import org.apache.rocketmq.remoting.internal.JvmUtils;
 
 public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
-    private final RemotingConfig serverConfig;
+    private final RemotingServerConfig serverConfig;
 
     private final ServerBootstrap serverBootstrap;
     private final EventLoopGroup bossGroup;
@@ -62,7 +61,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
 
     private int port;
 
-    public NettyRemotingServer(final RemotingConfig serverConfig) {
+    public NettyRemotingServer(final RemotingServerConfig serverConfig) {
         super(serverConfig);
 
         this.serverBootstrap = new ServerBootstrap();
@@ -107,7 +106,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
                         serverConfig.getConnectionChannelWriterIdleSeconds(),
                         serverConfig.getConnectionChannelIdleSeconds()),
                     new ServerConnectionHandler(),
-                    new EventDispatcher());
+                    new RemotingCommandDispatcher());
             }
         });
 
@@ -122,10 +121,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
     @Override
     public void stop() {
         try {
-            ThreadUtils.shutdownGracefully(houseKeepingService, 3000, TimeUnit.MILLISECONDS);
-
-            ThreadUtils.shutdownGracefully(channelEventExecutor);
-
             this.bossGroup.shutdownGracefully().syncUninterruptibly();
 
             this.ioGroup.shutdownGracefully().syncUninterruptibly();
@@ -160,11 +155,11 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
             bootstrap.option(ChannelOption.SO_REUSEADDR, serverConfig.isTcpSoReuseAddress()).
                 childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isTcpSoKeepAlive()).
                 childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpSoNoDelay()).
-                option(ChannelOption.CONNECT_TIMEOUT_MILLIS, serverConfig.getTcpSoTimeout());
-        }
+                option(ChannelOption.CONNECT_TIMEOUT_MILLIS, serverConfig.getTcpSoTimeoutMillis());
 
-        if (serverConfig.isServerPooledBytebufAllocatorEnable()) {
-            bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+            if (serverConfig.isServerPooledBytebufAllocatorEnable()) {
+                bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+            }
         }
     }
 
@@ -194,28 +189,32 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
     private class ServerConnectionHandler extends ChannelDuplexHandler {
         @Override
         public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+            LOG.info("Channel {} registered, remote address {}.", ctx.channel(), ctx.channel().remoteAddress());
             super.channelRegistered(ctx);
         }
 
         @Override
         public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+            LOG.info("Channel {} unregistered, remote address {}.", ctx.channel(), ctx.channel().remoteAddress());
             super.channelUnregistered(ctx);
         }
 
         @Override
         public void channelActive(ChannelHandlerContext ctx) throws Exception {
+            LOG.info("Channel {} became active, remote address {}.", ctx.channel(), ctx.channel().remoteAddress());
             super.channelActive(ctx);
-            putNettyEvent(new NettyChannelEvent(NettyChannelEventType.ACTIVE, ctx.channel()));
+            putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CONNECT, ctx.channel()));
         }
 
         @Override
         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+            LOG.info("Channel {} became inactive, remote address {}.", ctx.channel(), ctx.channel().remoteAddress());
             super.channelInactive(ctx);
-            putNettyEvent(new NettyChannelEvent(NettyChannelEventType.INACTIVE, ctx.channel()));
+            putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CLOSE, ctx.channel()));
         }
 
         @Override
-        public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
+        public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) {
             if (evt instanceof IdleStateEvent) {
                 final IdleStateEvent event = (IdleStateEvent) evt;
                 if (event.state().equals(IdleState.ALL_IDLE)) {
@@ -233,9 +232,9 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
         }
 
         @Override
-        public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
+        public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+            LOG.info("Close channel {} because of error {} ", ctx.channel(), cause);
             putNettyEvent(new NettyChannelEvent(NettyChannelEventType.EXCEPTION, ctx.channel(), cause));
-
             ctx.channel().close().addListener(new ChannelFutureListener() {
                 @Override
                 public void operationComplete(ChannelFuture future) throws Exception {
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java
index aab6dfb..ed7c93a 100644
--- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java
@@ -43,7 +43,8 @@ public class BaseTest {
         ThreadUtils.shutdownGracefully(executor, 5, TimeUnit.SECONDS);
     }
 
-    protected void runInThreads(final Runnable runnable, int threadsNum, int timeoutMillis) throws InterruptedException {
+    protected void runInThreads(final Runnable runnable, int threadsNum,
+        int timeoutMillis) throws InterruptedException {
         final Semaphore semaphore = new Semaphore(0);
 
         runInThreads(new Runnable() {
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnceTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnceTest.java
index b671662..56440e5 100644
--- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnceTest.java
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnceTest.java
@@ -21,7 +21,7 @@ import java.util.concurrent.Semaphore;
 import org.apache.rocketmq.remoting.BaseTest;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
 public class SemaphoreReleaseOnlyOnceTest extends BaseTest {
 
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImplTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImplTest.java
index c1274c6..391c8bd 100644
--- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImplTest.java
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImplTest.java
@@ -22,7 +22,10 @@ import org.apache.rocketmq.remoting.api.command.RemotingCommandFactory;
 import org.apache.rocketmq.remoting.api.command.TrafficType;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class RemotingCommandFactoryImplTest {
     private RemotingCommandFactory factory = new RemotingCommandFactoryImpl();
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/command/RequestIdGeneratorTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/command/RequestIdGeneratorTest.java
index 5686124..4620542 100644
--- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/command/RequestIdGeneratorTest.java
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/command/RequestIdGeneratorTest.java
@@ -19,7 +19,7 @@ package org.apache.rocketmq.remoting.impl.command;
 
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
 public class RequestIdGeneratorTest {
 
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManagerTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManagerTest.java
index 16084d8..16618e8 100644
--- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManagerTest.java
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManagerTest.java
@@ -26,7 +26,7 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.remoting.BaseTest;
-import org.apache.rocketmq.remoting.config.RemotingConfig;
+import org.apache.rocketmq.remoting.config.RemotingClientConfig;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -60,7 +60,7 @@ public class ClientChannelManagerTest extends BaseTest {
         when(channel.close()).thenReturn(channelPromise);
         when(channel.remoteAddress()).thenReturn(new InetSocketAddress(8080));
 
-        channelManager = new ClientChannelManager(clientBootstrap, new RemotingConfig());
+        channelManager = new ClientChannelManager(clientBootstrap, new RemotingClientConfig());
     }
 
     @Test
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/EncoderTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/EncoderTest.java
index 9a379e9..1629342 100644
--- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/EncoderTest.java
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/EncoderTest.java
@@ -52,7 +52,6 @@ public class EncoderTest extends BaseTest {
         assertEquals(request, decodedRequest);
     }
 
-
     @Test
     public void encode_LenOverLimit_ChannelClosed() {
         EmbeddedChannel channel = new EmbeddedChannel(new Encoder());