You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2019/06/28 11:20:00 UTC

[rocketmq-remoting] 25/39: Polish the exception structure and add basic tests for NettyRemotingAbstract

This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-remoting.git

commit 6ff7202494a003e8ad84b7ac465e53a88d2b7c9d
Author: yukon <yu...@apache.org>
AuthorDate: Wed Jun 5 22:01:27 2019 +0800

    Polish the exception structure and add basic tests for NettyRemotingAbstract
---
 pom.xml                                            |   2 +-
 .../remoting/api/channel/ChannelEventListener.java |   8 +-
 .../api/exception/RemoteAccessException.java       |   2 +-
 ...eException.java => RemoteRuntimeException.java} |  10 +-
 .../remoting/api/interceptor/RequestContext.java   |   2 +-
 .../remoting/api/interceptor/ResponseContext.java  |   2 +-
 .../remoting/common/ChannelEventListenerGroup.java |   4 +-
 .../rocketmq/remoting/common/ResponseFuture.java   |  17 +-
 .../remoting/impl/command/RemotingCommandImpl.java |   7 +-
 .../remoting/impl/netty/NettyChannelEvent.java     |   2 +-
 .../remoting/impl/netty/NettyRemotingAbstract.java |  61 +++-
 .../remoting/impl/netty/NettyRemotingClient.java   |   4 +-
 .../remoting/impl/netty/NettyRemotingServer.java   |   2 -
 .../remoting/impl/netty/handler/Encoder.java       |   7 +-
 .../rocketmq/remoting/internal/RemotingUtil.java   |   9 +-
 .../org/apache/rocketmq/remoting/BaseTest.java     |  56 +++-
 .../remoting/common/ResponseFutureTest.java        |   4 +-
 .../impl/netty/NettyRemotingAbstractTest.java      | 364 +++++++++++++++++++++
 18 files changed, 506 insertions(+), 57 deletions(-)

diff --git a/pom.xml b/pom.xml
index 23eeeb5..3d28b24 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,7 +89,7 @@
             <dependency>
                 <groupId>org.apache.commons</groupId>
                 <artifactId>commons-lang3</artifactId>
-                <version>3.4</version>
+                <version>3.6</version>
             </dependency>
             <dependency>
                 <groupId>org.jetbrains</groupId>
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/channel/ChannelEventListener.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/channel/ChannelEventListener.java
index 0c0afcf..23b6e88 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/channel/ChannelEventListener.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/channel/ChannelEventListener.java
@@ -18,11 +18,11 @@
 package org.apache.rocketmq.remoting.api.channel;
 
 public interface ChannelEventListener {
-    void onChannelConnect(final RemotingChannel channel);
+    void onChannelConnect(RemotingChannel channel);
 
-    void onChannelClose(final RemotingChannel channel);
+    void onChannelClose(RemotingChannel channel);
 
-    void onChannelException(final RemotingChannel channel);
+    void onChannelException(RemotingChannel channel, Throwable cause);
 
-    void onChannelIdle(final RemotingChannel channel);
+    void onChannelIdle(RemotingChannel channel);
 }
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteAccessException.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteAccessException.java
index 6ce6dd4..d6d46f0 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteAccessException.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteAccessException.java
@@ -30,7 +30,7 @@ package org.apache.rocketmq.remoting.api.exception;
  *
  * @since 1.0.0
  */
-public class RemoteAccessException extends NestedRuntimeException {
+public class RemoteAccessException extends RemoteRuntimeException {
     private static final long serialVersionUID = 6280428909532427263L;
 
     /**
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/NestedRuntimeException.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteRuntimeException.java
similarity index 90%
rename from remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/NestedRuntimeException.java
rename to remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteRuntimeException.java
index 7ef01db..a83be9f 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/NestedRuntimeException.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteRuntimeException.java
@@ -27,26 +27,26 @@ package org.apache.rocketmq.remoting.api.exception;
  *
  * @since 1.0.0
  */
-public abstract class NestedRuntimeException extends RuntimeException {
+public abstract class RemoteRuntimeException extends RuntimeException {
     private static final long serialVersionUID = -8371779880133933367L;
 
     /**
-     * Construct a {@code NestedRuntimeException} with the specified detail message.
+     * Construct a {@code RemoteRuntimeException} with the specified detail message.
      *
      * @param msg the detail message
      */
-    public NestedRuntimeException(String msg) {
+    public RemoteRuntimeException(String msg) {
         super(msg);
     }
 
     /**
-     * Construct a {@code NestedRuntimeException} with the specified detail message
+     * Construct a {@code RemoteRuntimeException} with the specified detail message
      * and nested exception.
      *
      * @param msg the detail message
      * @param cause the nested exception
      */
-    public NestedRuntimeException(String msg, Throwable cause) {
+    public RemoteRuntimeException(String msg, Throwable cause) {
         super(msg, cause);
     }
 
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/RequestContext.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/RequestContext.java
index d961556..a93e71c 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/RequestContext.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/RequestContext.java
@@ -60,6 +60,6 @@ public class RequestContext {
 
     @Override
     public String toString() {
-        return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
+        return ToStringBuilder.reflectionToString(this, ToStringStyle.DEFAULT_STYLE);
     }
 }
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java
index 005aa28..f076d8f 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java
@@ -36,7 +36,7 @@ public class ResponseContext extends RequestContext {
 
     @Override
     public String toString() {
-        return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
+        return ToStringBuilder.reflectionToString(this, ToStringStyle.DEFAULT_STYLE);
     }
 
     public RemotingCommand getResponse() {
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ChannelEventListenerGroup.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ChannelEventListenerGroup.java
index 8af61f7..4c374e9 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ChannelEventListenerGroup.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ChannelEventListenerGroup.java
@@ -47,9 +47,9 @@ public class ChannelEventListenerGroup {
         }
     }
 
-    public void onChannelException(final RemotingChannel channel) {
+    public void onChannelException(final RemotingChannel channel, final Throwable cause) {
         for (ChannelEventListener listener : listenerList) {
-            listener.onChannelException(channel);
+            listener.onChannelException(channel, cause);
         }
     }
 
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java
index e6c394b..6a2f246 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java
@@ -21,23 +21,32 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringExclude;
 import org.apache.commons.lang3.builder.ToStringStyle;
 import org.apache.rocketmq.remoting.api.AsyncHandler;
 import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.api.exception.RemoteRuntimeException;
 import org.jetbrains.annotations.Nullable;
 
 public class ResponseFuture {
     private final long beginTimestamp = System.currentTimeMillis();
+
+    @ToStringExclude
     private final CountDownLatch countDownLatch = new CountDownLatch(1);
+    @ToStringExclude
     private final AtomicBoolean asyncHandlerExecuted = new AtomicBoolean(false);
 
     private int requestId;
     private long timeoutMillis;
+
+    @ToStringExclude
     private AsyncHandler asyncHandler;
 
     private volatile RemotingCommand responseCommand;
     private volatile boolean sendRequestOK = true;
-    private volatile Throwable cause;
+    private volatile RemoteRuntimeException cause;
+
+    @ToStringExclude
     private SemaphoreReleaseOnlyOnce once;
 
     private RemotingCommand requestCommand;
@@ -108,11 +117,11 @@ public class ResponseFuture {
         return asyncHandler;
     }
 
-    public Throwable getCause() {
+    public RemoteRuntimeException getCause() {
         return cause;
     }
 
-    public void setCause(Throwable cause) {
+    public void setCause(RemoteRuntimeException cause) {
         this.cause = cause;
     }
 
@@ -146,6 +155,6 @@ public class ResponseFuture {
 
     @Override
     public String toString() {
-        return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
+        return ToStringBuilder.reflectionToString(this, ToStringStyle.DEFAULT_STYLE);
     }
 }
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java
index 4d1af44..8454616 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringExclude;
 import org.apache.commons.lang3.builder.ToStringStyle;
 import org.apache.rocketmq.remoting.api.command.RemotingCommand;
 import org.apache.rocketmq.remoting.api.command.TrafficType;
@@ -36,7 +37,11 @@ public class RemotingCommandImpl implements RemotingCommand {
     private TrafficType trafficType = TrafficType.REQUEST_SYNC;
     private short opCode = RemotingSysResponseCode.SUCCESS;
     private String remark = "";
+
+    @ToStringExclude
     private Map<String, String> properties = new HashMap<>();
+
+    @ToStringExclude
     private byte[] payload;
 
     protected RemotingCommandImpl() {
@@ -139,6 +144,6 @@ public class RemotingCommandImpl implements RemotingCommand {
 
     @Override
     public String toString() {
-        return ToStringBuilder.reflectionToString(this, ToStringStyle.SIMPLE_STYLE);
+        return ToStringBuilder.reflectionToString(this, ToStringStyle.NO_CLASS_NAME_STYLE);
     }
 }
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEvent.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEvent.java
index ec9cece..097213c 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEvent.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEvent.java
@@ -50,6 +50,6 @@ public class NettyChannelEvent {
 
     @Override
     public String toString() {
-        return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
+        return ToStringBuilder.reflectionToString(this, ToStringStyle.NO_CLASS_NAME_STYLE);
     }
 }
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
index 9e865d0..de4fc81 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
@@ -43,6 +43,7 @@ import org.apache.rocketmq.remoting.api.command.RemotingCommand;
 import org.apache.rocketmq.remoting.api.command.RemotingCommandFactory;
 import org.apache.rocketmq.remoting.api.command.TrafficType;
 import org.apache.rocketmq.remoting.api.exception.RemoteAccessException;
+import org.apache.rocketmq.remoting.api.exception.RemoteRuntimeException;
 import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException;
 import org.apache.rocketmq.remoting.api.interceptor.Interceptor;
 import org.apache.rocketmq.remoting.api.interceptor.InterceptorGroup;
@@ -58,7 +59,6 @@ import org.apache.rocketmq.remoting.impl.channel.NettyChannelImpl;
 import org.apache.rocketmq.remoting.impl.command.RemotingCommandFactoryImpl;
 import org.apache.rocketmq.remoting.impl.command.RemotingSysResponseCode;
 import org.apache.rocketmq.remoting.internal.RemotingUtil;
-import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -93,12 +93,35 @@ public abstract class NettyRemotingAbstract implements RemotingService {
      * responding processor in this map to handle the request.
      */
     private final Map<Short, Pair<RequestProcessor, ExecutorService>> processorTables = new ConcurrentHashMap<>();
+
+    /**
+     * This factory provides methods to create RemotingCommand.
+     */
     private final RemotingCommandFactory remotingCommandFactory;
 
+    /**
+     * Executor to execute RequestProcessor without specific executor.
+     */
     private final ExecutorService publicExecutor;
+
+    /**
+     * Invoke the async handler in this executor when process response.
+     */
     private final ExecutorService asyncHandlerExecutor;
+
+    /**
+     * This scheduled executor provides the ability to govern on-going response table.
+     */
     protected ScheduledExecutorService houseKeepingService = ThreadUtils.newSingleThreadScheduledExecutor("HouseKeepingService", true);
+
+    /**
+     * Provides custom interceptor at the occurrence of beforeRequest and afterResponseReceived event.
+     */
     private InterceptorGroup interceptorGroup = new InterceptorGroup();
+
+    /**
+     * Provides listener mechanism to handle netty channel events.
+     */
     private ChannelEventListenerGroup channelEventListenerGroup = new ChannelEventListenerGroup();
 
     NettyRemotingAbstract(RemotingConfig remotingConfig) {
@@ -110,7 +133,7 @@ public abstract class NettyRemotingAbstract implements RemotingService {
 
         this.asyncHandlerExecutor = ThreadUtils.newFixedThreadPool(
             remotingConfig.getAsyncHandlerExecutorThreads(),
-            10000, "Remoting-PublicExecutor", true);
+            10000, "Remoting-AsyncExecutor", true);
         this.remotingCommandFactory = new RemotingCommandFactoryImpl();
     }
 
@@ -144,8 +167,8 @@ public abstract class NettyRemotingAbstract implements RemotingService {
             ResponseFuture rf = this.ackTables.remove(requestID);
 
             if (rf != null) {
-                LOG.warn("remove timeout request {} ", rf);
-                rf.setCause(new RemoteTimeoutException(rf.getRemoteAddr(), rf.getTimeoutMillis()));
+                LOG.warn("Removes timeout request {} ", rf.getRequestCommand());
+                rf.setCause(new RemoteTimeoutException(String.format("Request to %s timeout", rf.getRemoteAddr()), rf.getTimeoutMillis()));
                 executeAsyncHandler(rf);
             }
         }
@@ -153,6 +176,8 @@ public abstract class NettyRemotingAbstract implements RemotingService {
 
     @Override
     public void start() {
+        startUpHouseKeepingService();
+
         if (this.channelEventListenerGroup.size() > 0) {
             this.channelEventExecutor.start();
         }
@@ -230,11 +255,10 @@ public abstract class NettyRemotingAbstract implements RemotingService {
                 responseFuture.release();
             }
         } else {
-            LOG.warn("request {} from {} has not matched response !", response, RemotingUtil.extractRemoteAddress(ctx.channel()));
+            LOG.warn("Response {} from {} doesn't have a matched request!", response, RemotingUtil.extractRemoteAddress(ctx.channel()));
         }
     }
 
-    @NotNull
     private Runnable buildProcessorTask(final ChannelHandlerContext ctx, final RemotingCommand cmd,
         final Pair<RequestProcessor, ExecutorService> processorExecutorPair, final RemotingChannel channel) {
         return new Runnable() {
@@ -302,7 +326,7 @@ public abstract class NettyRemotingAbstract implements RemotingService {
         }
     }
 
-    private void requestFail(final int requestID, final Throwable cause) {
+    private void requestFail(final int requestID, final RemoteRuntimeException cause) {
         ResponseFuture responseFuture = ackTables.remove(requestID);
         if (responseFuture != null) {
             responseFuture.setSendRequestOK(false);
@@ -312,7 +336,7 @@ public abstract class NettyRemotingAbstract implements RemotingService {
         }
     }
 
-    private void requestFail(final ResponseFuture responseFuture, final Throwable cause) {
+    private void requestFail(final ResponseFuture responseFuture, final RemoteRuntimeException cause) {
         responseFuture.setCause(cause);
         executeAsyncHandler(responseFuture);
     }
@@ -368,7 +392,7 @@ public abstract class NettyRemotingAbstract implements RemotingService {
 
             ChannelFutureListener listener = new ChannelFutureListener() {
                 @Override
-                public void operationComplete(ChannelFuture f) throws Exception {
+                public void operationComplete(ChannelFuture f) {
                     if (f.isSuccess()) {
                         responseFuture.setSendRequestOK(true);
                         return;
@@ -376,7 +400,7 @@ public abstract class NettyRemotingAbstract implements RemotingService {
                         responseFuture.setSendRequestOK(false);
 
                         ackTables.remove(requestID);
-                        responseFuture.setCause(f.cause());
+                        responseFuture.setCause(new RemoteAccessException(RemotingUtil.extractRemoteAddress(channel), f.cause()));
                         responseFuture.putResponse(null);
 
                         LOG.warn("Send request command to {} failed !", remoteAddr);
@@ -390,9 +414,10 @@ public abstract class NettyRemotingAbstract implements RemotingService {
 
             if (null == responseCommand) {
                 if (responseFuture.isSendRequestOK()) {
-                    throw new RemoteTimeoutException(RemotingUtil.extractRemoteAddress(channel), timeoutMillis, responseFuture.getCause());
+                    responseFuture.setCause(new RemoteTimeoutException(RemotingUtil.extractRemoteAddress(channel), timeoutMillis));
+                    throw responseFuture.getCause();
                 } else {
-                    throw new RemoteAccessException(RemotingUtil.extractRemoteAddress(channel), responseFuture.getCause());
+                    throw responseFuture.getCause();
                 }
             }
 
@@ -439,14 +464,14 @@ public abstract class NettyRemotingAbstract implements RemotingService {
                             return;
                         }
 
-                        requestFail(requestID, f.cause());
+                        requestFail(requestID, new RemoteAccessException(RemotingUtil.extractRemoteAddress(channel), f.cause()));
                         LOG.warn("Send request command to channel  failed.", remoteAddr);
                     }
                 };
 
                 this.writeAndFlush(channel, request, listener);
             } catch (Exception e) {
-                requestFail(requestID, e);
+                requestFail(requestID, new RemoteAccessException(RemotingUtil.extractRemoteAddress(channel), e));
                 LOG.error("Send request command to channel " + channel + " error !", e);
             }
         } else {
@@ -543,7 +568,7 @@ public abstract class NettyRemotingAbstract implements RemotingService {
             if (this.eventQueue.size() <= MAX_SIZE) {
                 this.eventQueue.add(event);
             } else {
-                LOG.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString());
+                LOG.warn("Event queue size[{}] meets the limit, so drop this event {}", this.eventQueue.size(), event.toString());
             }
         }
 
@@ -559,7 +584,7 @@ public abstract class NettyRemotingAbstract implements RemotingService {
                     if (event != null && listener != null) {
                         RemotingChannel channel = new NettyChannelImpl(event.getChannel());
 
-                        LOG.warn("Channel Event, {}", event);
+                        LOG.info("Dispatch received channel event, {}", event);
 
                         switch (event.getType()) {
                             case IDLE:
@@ -572,14 +597,14 @@ public abstract class NettyRemotingAbstract implements RemotingService {
                                 listener.onChannelConnect(channel);
                                 break;
                             case EXCEPTION:
-                                listener.onChannelException(channel);
+                                listener.onChannelException(channel, event.getCause());
                                 break;
                             default:
                                 break;
                         }
                     }
                 } catch (Exception e) {
-                    LOG.error("error", e);
+                    LOG.warn("Exception thrown when dispatching channel event", e);
                     break;
                 }
             }
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
index 6b2796e..c146813 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
@@ -98,8 +98,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
             });
 
         applyOptions(clientBootstrap);
-
-        startUpHouseKeepingService();
     }
 
     @Override
@@ -237,7 +235,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
             LOG.info("Close channel {} because of error {} ", ctx.channel(), cause);
             NettyRemotingClient.this.clientChannelManager.closeChannel(ctx.channel());
-            putNettyEvent(new NettyChannelEvent(NettyChannelEventType.EXCEPTION, ctx.channel()));
+            putNettyEvent(new NettyChannelEvent(NettyChannelEventType.EXCEPTION, ctx.channel(), cause));
         }
     }
 }
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
index f0dbb45..0967208 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
@@ -114,8 +114,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
 
         ChannelFuture channelFuture = this.serverBootstrap.bind(this.serverConfig.getServerListenPort()).syncUninterruptibly();
         this.port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
-
-        startUpHouseKeepingService();
     }
 
     @Override
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java
index 702e2b4..3c5a90e 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java
@@ -22,12 +22,12 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToByteEncoder;
-import java.net.InetSocketAddress;
 import org.apache.rocketmq.remoting.api.buffer.ByteBufferWrapper;
 import org.apache.rocketmq.remoting.api.command.RemotingCommand;
 import org.apache.rocketmq.remoting.api.exception.RemoteCodecException;
 import org.apache.rocketmq.remoting.impl.buffer.NettyByteBufferWrapper;
 import org.apache.rocketmq.remoting.impl.command.CodecHelper;
+import org.apache.rocketmq.remoting.internal.RemotingUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,10 +44,7 @@ public class Encoder extends MessageToByteEncoder<RemotingCommand> {
 
             encode(remotingCommand, wrapper);
         } catch (final RemoteCodecException e) {
-            String remoteAddress = "UnKnown";
-            if (ctx.channel().remoteAddress() instanceof InetSocketAddress) {
-                remoteAddress = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();
-            }
+            String remoteAddress = RemotingUtil.extractRemoteAddress(ctx.channel());
             LOG.error(String.format("Error occurred when encoding command for channel %s", remoteAddress), e);
 
             ctx.channel().close().addListener(new ChannelFutureListener() {
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/RemotingUtil.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/RemotingUtil.java
index 89e4bff..9d23fcf 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/RemotingUtil.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/RemotingUtil.java
@@ -19,9 +19,16 @@ package org.apache.rocketmq.remoting.internal;
 
 import io.netty.channel.Channel;
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 
 public class RemotingUtil {
     public static String extractRemoteAddress(Channel channel) {
-        return ((InetSocketAddress) channel.remoteAddress()).getAddress().getHostAddress();
+        SocketAddress socketAddress = channel.remoteAddress();
+
+        if (socketAddress instanceof InetSocketAddress) {
+            return ((InetSocketAddress) socketAddress).getAddress().getHostAddress();
+        }
+
+        return "Unknown";
     }
 }
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java
index ed7c93a..6a3112b 100644
--- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java
@@ -19,7 +19,7 @@ package org.apache.rocketmq.remoting;
 
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.RandomStringUtils;
@@ -27,11 +27,17 @@ import org.apache.rocketmq.remoting.api.command.RemotingCommand;
 import org.apache.rocketmq.remoting.api.command.TrafficType;
 import org.apache.rocketmq.remoting.external.ThreadUtils;
 import org.apache.rocketmq.remoting.impl.command.RemotingCommandFactoryImpl;
+import org.assertj.core.api.Fail;
 
 public class BaseTest {
-    protected void runInThreads(final Runnable runnable, int threadsNum) {
-        ExecutorService executor = Executors.newFixedThreadPool(threadsNum);
-        for (int i = 0; i < threadsNum; i++) {
+    protected void scheduleInThreads(final Runnable runnable, int periodMillis) {
+        final ScheduledExecutorService executor = ThreadUtils.newSingleThreadScheduledExecutor("UnitTests", true);
+        executor.scheduleAtFixedRate(runnable, 0, periodMillis, TimeUnit.MILLISECONDS);
+    }
+
+    protected void runInThreads(final Runnable runnable, int concurrentNum) {
+        final ExecutorService executor = ThreadUtils.newFixedThreadPool(concurrentNum, 1000, "UnitTests", true);
+        for (int i = 0; i < concurrentNum; i++) {
             executor.submit(new Runnable() {
                 @Override
                 public void run() {
@@ -39,8 +45,6 @@ public class BaseTest {
                 }
             });
         }
-
-        ThreadUtils.shutdownGracefully(executor, 5, TimeUnit.SECONDS);
     }
 
     protected void runInThreads(final Runnable runnable, int threadsNum,
@@ -84,4 +88,44 @@ public class BaseTest {
 
         return command;
     }
+
+    protected <T> ObjectFuture<T> newObjectFuture(int permits, int timeoutMillis) {
+        return new ObjectFuture<>(permits, timeoutMillis);
+    }
+
+    protected class ObjectFuture<T> {
+        private T object;
+        private Semaphore semaphore;
+        private int permits;
+        private int timeoutMillis;
+
+        public ObjectFuture(int permits, int timeoutMillis) {
+            semaphore = new Semaphore(0);
+            this.permits = permits;
+            this.timeoutMillis = timeoutMillis;
+        }
+
+        public void release() {
+            semaphore.release();
+        }
+
+        public void putObject(T object) {
+            this.object = object;
+        }
+
+        public T getObject() {
+            try {
+                if (!semaphore.tryAcquire(permits, timeoutMillis, TimeUnit.MILLISECONDS)) {
+                    Fail.fail("Get permits failed");
+                }
+            } catch (InterruptedException e) {
+                Fail.fail("Get object failed", e);
+            }
+            return this.object;
+        }
+    }
+
+    public class UnitTestException extends RuntimeException {
+
+    }
 }
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/common/ResponseFutureTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/common/ResponseFutureTest.java
index c8e1f25..1a16e56 100644
--- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/common/ResponseFutureTest.java
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/common/ResponseFutureTest.java
@@ -21,6 +21,8 @@ import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.remoting.BaseTest;
 import org.apache.rocketmq.remoting.api.AsyncHandler;
 import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.api.exception.RemoteAccessException;
+import org.apache.rocketmq.remoting.api.exception.RemoteRuntimeException;
 import org.apache.rocketmq.remoting.impl.command.RemotingCommandFactoryImpl;
 import org.junit.Test;
 
@@ -56,7 +58,7 @@ public class ResponseFutureTest extends BaseTest {
     public void executeAsyncHandler_Failure() {
         final RemotingCommand reqCommand = factory.createRequest();
         final RemotingCommand resCommand = factory.createResponse(reqCommand);
-        final Throwable exception = new RuntimeException("Test Exception");
+        final RemoteRuntimeException exception = new RemoteAccessException("Test Exception");
         future = new ResponseFuture(1, 3000, new AsyncHandler() {
             @Override
             public void onFailure(final RemotingCommand request, final Throwable cause) {
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstractTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstractTest.java
new file mode 100644
index 0000000..5537579
--- /dev/null
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstractTest.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelPromise;
+import io.netty.channel.DefaultEventLoop;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.embedded.EmbeddedChannel;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.remoting.BaseTest;
+import org.apache.rocketmq.remoting.api.AsyncHandler;
+import org.apache.rocketmq.remoting.api.RequestProcessor;
+import org.apache.rocketmq.remoting.api.channel.ChannelEventListener;
+import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.api.exception.RemoteAccessException;
+import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException;
+import org.apache.rocketmq.remoting.config.RemotingClientConfig;
+import org.apache.rocketmq.remoting.impl.channel.NettyChannelImpl;
+import org.apache.rocketmq.remoting.impl.netty.handler.Decoder;
+import org.apache.rocketmq.remoting.impl.netty.handler.Encoder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class NettyRemotingAbstractTest extends BaseTest {
+
+    private NettyRemotingAbstract remotingAbstract;
+
+    @Mock
+    private Channel mockedClientChannel;
+
+    private EmbeddedChannel clientChannel;
+
+    private EmbeddedChannel serverChannel;
+
+    private RemotingCommand remotingRequest;
+
+    private short requestCode = 123;
+
+    @Before
+    public void setUp() {
+        remotingAbstract = new NettyRemotingAbstract(new RemotingClientConfig()) {
+        };
+
+        clientChannel = new EmbeddedChannel(new Encoder(), new Decoder(), new SimpleChannelInboundHandler<RemotingCommand>() {
+
+            @Override
+            protected void channelRead0(final ChannelHandlerContext ctx, final RemotingCommand msg) throws Exception {
+                remotingAbstract.processMessageReceived(ctx, msg);
+            }
+        });
+
+        serverChannel = new EmbeddedChannel(new Encoder(), new Decoder(), new SimpleChannelInboundHandler<RemotingCommand>() {
+
+            @Override
+            protected void channelRead0(final ChannelHandlerContext ctx, final RemotingCommand msg) throws Exception {
+                remotingAbstract.processMessageReceived(ctx, msg);
+            }
+        });
+
+        remotingRequest = remotingAbstract.commandFactory().createRequest();
+        remotingRequest.cmdCode(requestCode);
+        remotingRequest.payload("Ping".getBytes());
+
+        // Simulate the tcp stack
+        scheduleInThreads(new Runnable() {
+            @Override
+            public void run() {
+                ByteBuf msg = clientChannel.readOutbound();
+                if (msg != null) {
+                    serverChannel.writeInbound(msg);
+                }
+
+                msg = serverChannel.readOutbound();
+
+                if (msg != null) {
+                    clientChannel.writeInbound(msg);
+                }
+            }
+        }, 1);
+
+        remotingAbstract.start();
+    }
+
+    @After
+    public void tearDown() {
+        remotingAbstract.stop();
+    }
+
+    @Test
+    public void putNettyEvent_Success() {
+        final Throwable exception = new RuntimeException();
+        final ObjectFuture objectFuture = newObjectFuture(4, 100);
+        remotingAbstract.registerChannelEventListener(new ChannelEventListener() {
+            @Override
+            public void onChannelConnect(final RemotingChannel channel) {
+                if (((NettyChannelImpl) channel).getChannel() == mockedClientChannel) {
+                    objectFuture.release();
+                }
+            }
+
+            @Override
+            public void onChannelClose(final RemotingChannel channel) {
+                if (((NettyChannelImpl) channel).getChannel() == mockedClientChannel) {
+                    objectFuture.release();
+                }
+            }
+
+            @Override
+            public void onChannelException(final RemotingChannel channel, final Throwable cause) {
+                if (((NettyChannelImpl) channel).getChannel() == mockedClientChannel && exception == cause) {
+                    objectFuture.release();
+                }
+            }
+
+            @Override
+            public void onChannelIdle(final RemotingChannel channel) {
+                if (((NettyChannelImpl) channel).getChannel() == mockedClientChannel) {
+                    objectFuture.release();
+                }
+            }
+        });
+
+        remotingAbstract.channelEventExecutor.start();
+
+        remotingAbstract.putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CONNECT, mockedClientChannel));
+        remotingAbstract.putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CLOSE, mockedClientChannel));
+        remotingAbstract.putNettyEvent(new NettyChannelEvent(NettyChannelEventType.IDLE, mockedClientChannel));
+        remotingAbstract.putNettyEvent(new NettyChannelEvent(NettyChannelEventType.EXCEPTION, mockedClientChannel, exception));
+
+        objectFuture.getObject();
+    }
+
+    @Test
+    public void putNettyEvent_EventDropped() throws InterruptedException {
+        final Semaphore eventCount = new Semaphore(0);
+        final Semaphore droppedEvent = new Semaphore(0);
+
+        remotingAbstract.registerChannelEventListener(new ChannelEventListener() {
+            @Override
+            public void onChannelConnect(final RemotingChannel channel) {
+                eventCount.release();
+            }
+
+            @Override
+            public void onChannelClose(final RemotingChannel channel) {
+                droppedEvent.release();
+            }
+
+            @Override
+            public void onChannelException(final RemotingChannel channel, final Throwable cause) {
+
+            }
+
+            @Override
+            public void onChannelIdle(final RemotingChannel channel) {
+
+            }
+        });
+
+        int maxLimit = 10001;
+
+        for (int i = 0; i < maxLimit; i++) {
+            remotingAbstract.putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CONNECT, mockedClientChannel));
+        }
+
+        // This event will be dropped
+        remotingAbstract.putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CLOSE, mockedClientChannel));
+
+        remotingAbstract.channelEventExecutor.start();
+
+        assertThat(eventCount.tryAcquire(maxLimit, 1000, TimeUnit.MILLISECONDS)).isTrue();
+
+        assertThat(droppedEvent.tryAcquire(1, 10, TimeUnit.MILLISECONDS)).isFalse();
+    }
+
+    @Test
+    public void scanResponseTable_RemoveTimeoutRequest() throws InterruptedException {
+        final ObjectFuture<Throwable> objectFuture = newObjectFuture(1, 10);
+
+        remotingAbstract.invokeAsyncWithInterceptor(new EmbeddedChannel(),
+            remotingAbstract.commandFactory().createRequest(),
+            new AsyncHandler() {
+                @Override
+                public void onFailure(final RemotingCommand request, final Throwable cause) {
+                    objectFuture.putObject(cause);
+                    objectFuture.release();
+                }
+
+                @Override
+                public void onSuccess(final RemotingCommand response) {
+
+                }
+            }, 10);
+
+        TimeUnit.MILLISECONDS.sleep(15);
+        remotingAbstract.scanResponseTable();
+
+        assertThat(objectFuture.getObject()).isInstanceOf(RemoteTimeoutException.class);
+    }
+
+    @Test
+    public void invokeWithInterceptor_Success() {
+        registerNormalProcessor();
+
+        RemotingCommand response = remotingAbstract.invokeWithInterceptor(clientChannel, remotingRequest, 3000);
+
+        assertThat(new String(response.payload())).isEqualTo("Pong");
+    }
+
+    @Test
+    public void invokeWithInterceptor_Timeout() {
+        registerTimeoutProcessor(20);
+
+        try {
+            RemotingCommand response = remotingAbstract.invokeWithInterceptor(clientChannel, remotingRequest, 10);
+            failBecauseExceptionWasNotThrown(RemoteTimeoutException.class);
+        } catch (Exception e) {
+            assertThat(e).isInstanceOf(RemoteTimeoutException.class);
+        }
+    }
+
+    @Test
+    public void invokeWithInterceptor_AccessException() {
+        ChannelPromise channelPromise = new DefaultChannelPromise(mockedClientChannel, new DefaultEventLoop());
+
+        when(mockedClientChannel.writeAndFlush(any(Object.class))).thenReturn(channelPromise);
+        channelPromise.setFailure(new UnitTestException());
+
+        try {
+            RemotingCommand response = remotingAbstract.invokeWithInterceptor(mockedClientChannel, remotingRequest, 10);
+            failBecauseExceptionWasNotThrown(RemoteAccessException.class);
+        } catch (Exception e) {
+            assertThat(e.getCause()).isInstanceOf(UnitTestException.class);
+            assertThat(e).isInstanceOf(RemoteAccessException.class);
+        }
+    }
+
+    @Test
+    public void invokeAsyncWithInterceptor_Success() {
+        registerNormalProcessor();
+
+        final ObjectFuture<RemotingCommand> objectFuture = newObjectFuture(1, 10);
+
+        remotingAbstract.invokeAsyncWithInterceptor(clientChannel, remotingRequest, new AsyncHandler() {
+            @Override
+            public void onFailure(final RemotingCommand request, final Throwable cause) {
+
+            }
+
+            @Override
+            public void onSuccess(final RemotingCommand response) {
+                objectFuture.putObject(response);
+                objectFuture.release();
+            }
+        }, 3000);
+
+        assertThat(new String(objectFuture.getObject().payload())).isEqualTo("Pong");
+    }
+
+    @Test
+    public void invokeOnewayWithInterceptor_Success() {
+        ObjectFuture<RemotingCommand> objectFuture = newObjectFuture(1, 10);
+        registerOnewayProcessor(objectFuture);
+
+        remotingAbstract.invokeOnewayWithInterceptor(clientChannel, remotingRequest);
+
+        // Receive the response directly
+        assertThat(new String(objectFuture.getObject().payload())).isEqualTo("Pong");
+    }
+
+    @Test
+    public void registerInterceptor() {
+    }
+
+    @Test
+    public void registerRequestProcessor() {
+    }
+
+    @Test
+    public void registerRequestProcessor1() {
+    }
+
+    @Test
+    public void unregisterRequestProcessor() {
+    }
+
+    @Test
+    public void processor() {
+    }
+
+    @Test
+    public void registerChannelEventListener() {
+    }
+
+    private void registerTimeoutProcessor(final int timeoutMillis) {
+        remotingAbstract.registerRequestProcessor(requestCode, new RequestProcessor() {
+            @Override
+            public RemotingCommand processRequest(final RemotingChannel channel, final RemotingCommand request) {
+                RemotingCommand response = remotingAbstract.commandFactory().createResponse(request);
+                response.payload("Pong".getBytes());
+                try {
+                    TimeUnit.MILLISECONDS.sleep(timeoutMillis);
+                } catch (InterruptedException ignore) {
+                }
+                return response;
+            }
+        });
+    }
+
+    private void registerNormalProcessor() {
+        remotingAbstract.registerRequestProcessor(requestCode, new RequestProcessor() {
+            @Override
+            public RemotingCommand processRequest(final RemotingChannel channel, final RemotingCommand request) {
+                RemotingCommand response = remotingAbstract.commandFactory().createResponse(request);
+                response.payload("Pong".getBytes());
+                return response;
+            }
+        });
+    }
+
+    private void registerOnewayProcessor(final ObjectFuture<RemotingCommand> objectFuture) {
+        remotingAbstract.registerRequestProcessor(requestCode, new RequestProcessor() {
+            @Override
+            public RemotingCommand processRequest(final RemotingChannel channel, final RemotingCommand request) {
+                RemotingCommand response = remotingAbstract.commandFactory().createResponse(request);
+                response.payload("Pong".getBytes());
+                objectFuture.putObject(response);
+                objectFuture.release();
+                return response;
+            }
+        });
+    }
+}
\ No newline at end of file