You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2019/06/28 11:20:00 UTC
[rocketmq-remoting] 25/39: Polish the exception structure and add
basic tests for NettyRemotingAbstract
This is an automated email from the ASF dual-hosted git repository.
yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-remoting.git
commit 6ff7202494a003e8ad84b7ac465e53a88d2b7c9d
Author: yukon <yu...@apache.org>
AuthorDate: Wed Jun 5 22:01:27 2019 +0800
Polish the exception structure and add basic tests for NettyRemotingAbstract
---
pom.xml | 2 +-
.../remoting/api/channel/ChannelEventListener.java | 8 +-
.../api/exception/RemoteAccessException.java | 2 +-
...eException.java => RemoteRuntimeException.java} | 10 +-
.../remoting/api/interceptor/RequestContext.java | 2 +-
.../remoting/api/interceptor/ResponseContext.java | 2 +-
.../remoting/common/ChannelEventListenerGroup.java | 4 +-
.../rocketmq/remoting/common/ResponseFuture.java | 17 +-
.../remoting/impl/command/RemotingCommandImpl.java | 7 +-
.../remoting/impl/netty/NettyChannelEvent.java | 2 +-
.../remoting/impl/netty/NettyRemotingAbstract.java | 61 +++-
.../remoting/impl/netty/NettyRemotingClient.java | 4 +-
.../remoting/impl/netty/NettyRemotingServer.java | 2 -
.../remoting/impl/netty/handler/Encoder.java | 7 +-
.../rocketmq/remoting/internal/RemotingUtil.java | 9 +-
.../org/apache/rocketmq/remoting/BaseTest.java | 56 +++-
.../remoting/common/ResponseFutureTest.java | 4 +-
.../impl/netty/NettyRemotingAbstractTest.java | 364 +++++++++++++++++++++
18 files changed, 506 insertions(+), 57 deletions(-)
diff --git a/pom.xml b/pom.xml
index 23eeeb5..3d28b24 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,7 +89,7 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
- <version>3.4</version>
+ <version>3.6</version>
</dependency>
<dependency>
<groupId>org.jetbrains</groupId>
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/channel/ChannelEventListener.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/channel/ChannelEventListener.java
index 0c0afcf..23b6e88 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/channel/ChannelEventListener.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/channel/ChannelEventListener.java
@@ -18,11 +18,11 @@
package org.apache.rocketmq.remoting.api.channel;
public interface ChannelEventListener {
- void onChannelConnect(final RemotingChannel channel);
+ void onChannelConnect(RemotingChannel channel);
- void onChannelClose(final RemotingChannel channel);
+ void onChannelClose(RemotingChannel channel);
- void onChannelException(final RemotingChannel channel);
+ void onChannelException(RemotingChannel channel, Throwable cause);
- void onChannelIdle(final RemotingChannel channel);
+ void onChannelIdle(RemotingChannel channel);
}
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteAccessException.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteAccessException.java
index 6ce6dd4..d6d46f0 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteAccessException.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteAccessException.java
@@ -30,7 +30,7 @@ package org.apache.rocketmq.remoting.api.exception;
*
* @since 1.0.0
*/
-public class RemoteAccessException extends NestedRuntimeException {
+public class RemoteAccessException extends RemoteRuntimeException {
private static final long serialVersionUID = 6280428909532427263L;
/**
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/NestedRuntimeException.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteRuntimeException.java
similarity index 90%
rename from remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/NestedRuntimeException.java
rename to remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteRuntimeException.java
index 7ef01db..a83be9f 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/NestedRuntimeException.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteRuntimeException.java
@@ -27,26 +27,26 @@ package org.apache.rocketmq.remoting.api.exception;
*
* @since 1.0.0
*/
-public abstract class NestedRuntimeException extends RuntimeException {
+public abstract class RemoteRuntimeException extends RuntimeException {
private static final long serialVersionUID = -8371779880133933367L;
/**
- * Construct a {@code NestedRuntimeException} with the specified detail message.
+ * Construct a {@code RemoteRuntimeException} with the specified detail message.
*
* @param msg the detail message
*/
- public NestedRuntimeException(String msg) {
+ public RemoteRuntimeException(String msg) {
super(msg);
}
/**
- * Construct a {@code NestedRuntimeException} with the specified detail message
+ * Construct a {@code RemoteRuntimeException} with the specified detail message
* and nested exception.
*
* @param msg the detail message
* @param cause the nested exception
*/
- public NestedRuntimeException(String msg, Throwable cause) {
+ public RemoteRuntimeException(String msg, Throwable cause) {
super(msg, cause);
}
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/RequestContext.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/RequestContext.java
index d961556..a93e71c 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/RequestContext.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/RequestContext.java
@@ -60,6 +60,6 @@ public class RequestContext {
@Override
public String toString() {
- return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
+ return ToStringBuilder.reflectionToString(this, ToStringStyle.DEFAULT_STYLE);
}
}
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java
index 005aa28..f076d8f 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java
@@ -36,7 +36,7 @@ public class ResponseContext extends RequestContext {
@Override
public String toString() {
- return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
+ return ToStringBuilder.reflectionToString(this, ToStringStyle.DEFAULT_STYLE);
}
public RemotingCommand getResponse() {
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ChannelEventListenerGroup.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ChannelEventListenerGroup.java
index 8af61f7..4c374e9 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ChannelEventListenerGroup.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ChannelEventListenerGroup.java
@@ -47,9 +47,9 @@ public class ChannelEventListenerGroup {
}
}
- public void onChannelException(final RemotingChannel channel) {
+ public void onChannelException(final RemotingChannel channel, final Throwable cause) {
for (ChannelEventListener listener : listenerList) {
- listener.onChannelException(channel);
+ listener.onChannelException(channel, cause);
}
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java
index e6c394b..6a2f246 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java
@@ -21,23 +21,32 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringExclude;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.rocketmq.remoting.api.AsyncHandler;
import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.api.exception.RemoteRuntimeException;
import org.jetbrains.annotations.Nullable;
public class ResponseFuture {
private final long beginTimestamp = System.currentTimeMillis();
+
+ @ToStringExclude
private final CountDownLatch countDownLatch = new CountDownLatch(1);
+ @ToStringExclude
private final AtomicBoolean asyncHandlerExecuted = new AtomicBoolean(false);
private int requestId;
private long timeoutMillis;
+
+ @ToStringExclude
private AsyncHandler asyncHandler;
private volatile RemotingCommand responseCommand;
private volatile boolean sendRequestOK = true;
- private volatile Throwable cause;
+ private volatile RemoteRuntimeException cause;
+
+ @ToStringExclude
private SemaphoreReleaseOnlyOnce once;
private RemotingCommand requestCommand;
@@ -108,11 +117,11 @@ public class ResponseFuture {
return asyncHandler;
}
- public Throwable getCause() {
+ public RemoteRuntimeException getCause() {
return cause;
}
- public void setCause(Throwable cause) {
+ public void setCause(RemoteRuntimeException cause) {
this.cause = cause;
}
@@ -146,6 +155,6 @@ public class ResponseFuture {
@Override
public String toString() {
- return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
+ return ToStringBuilder.reflectionToString(this, ToStringStyle.DEFAULT_STYLE);
}
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java
index 4d1af44..8454616 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java
@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringExclude;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.rocketmq.remoting.api.command.RemotingCommand;
import org.apache.rocketmq.remoting.api.command.TrafficType;
@@ -36,7 +37,11 @@ public class RemotingCommandImpl implements RemotingCommand {
private TrafficType trafficType = TrafficType.REQUEST_SYNC;
private short opCode = RemotingSysResponseCode.SUCCESS;
private String remark = "";
+
+ @ToStringExclude
private Map<String, String> properties = new HashMap<>();
+
+ @ToStringExclude
private byte[] payload;
protected RemotingCommandImpl() {
@@ -139,6 +144,6 @@ public class RemotingCommandImpl implements RemotingCommand {
@Override
public String toString() {
- return ToStringBuilder.reflectionToString(this, ToStringStyle.SIMPLE_STYLE);
+ return ToStringBuilder.reflectionToString(this, ToStringStyle.NO_CLASS_NAME_STYLE);
}
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEvent.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEvent.java
index ec9cece..097213c 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEvent.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEvent.java
@@ -50,6 +50,6 @@ public class NettyChannelEvent {
@Override
public String toString() {
- return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
+ return ToStringBuilder.reflectionToString(this, ToStringStyle.NO_CLASS_NAME_STYLE);
}
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
index 9e865d0..de4fc81 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
@@ -43,6 +43,7 @@ import org.apache.rocketmq.remoting.api.command.RemotingCommand;
import org.apache.rocketmq.remoting.api.command.RemotingCommandFactory;
import org.apache.rocketmq.remoting.api.command.TrafficType;
import org.apache.rocketmq.remoting.api.exception.RemoteAccessException;
+import org.apache.rocketmq.remoting.api.exception.RemoteRuntimeException;
import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException;
import org.apache.rocketmq.remoting.api.interceptor.Interceptor;
import org.apache.rocketmq.remoting.api.interceptor.InterceptorGroup;
@@ -58,7 +59,6 @@ import org.apache.rocketmq.remoting.impl.channel.NettyChannelImpl;
import org.apache.rocketmq.remoting.impl.command.RemotingCommandFactoryImpl;
import org.apache.rocketmq.remoting.impl.command.RemotingSysResponseCode;
import org.apache.rocketmq.remoting.internal.RemotingUtil;
-import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -93,12 +93,35 @@ public abstract class NettyRemotingAbstract implements RemotingService {
* responding processor in this map to handle the request.
*/
private final Map<Short, Pair<RequestProcessor, ExecutorService>> processorTables = new ConcurrentHashMap<>();
+
+ /**
+ * This factory provides methods to create RemotingCommand.
+ */
private final RemotingCommandFactory remotingCommandFactory;
+ /**
+ * Executor to execute RequestProcessor without specific executor.
+ */
private final ExecutorService publicExecutor;
+
+ /**
+ * Invoke the async handler in this executor when process response.
+ */
private final ExecutorService asyncHandlerExecutor;
+
+ /**
+ * This scheduled executor provides the ability to govern on-going response table.
+ */
protected ScheduledExecutorService houseKeepingService = ThreadUtils.newSingleThreadScheduledExecutor("HouseKeepingService", true);
+
+ /**
+ * Provides custom interceptor at the occurrence of beforeRequest and afterResponseReceived event.
+ */
private InterceptorGroup interceptorGroup = new InterceptorGroup();
+
+ /**
+ * Provides listener mechanism to handle netty channel events.
+ */
private ChannelEventListenerGroup channelEventListenerGroup = new ChannelEventListenerGroup();
NettyRemotingAbstract(RemotingConfig remotingConfig) {
@@ -110,7 +133,7 @@ public abstract class NettyRemotingAbstract implements RemotingService {
this.asyncHandlerExecutor = ThreadUtils.newFixedThreadPool(
remotingConfig.getAsyncHandlerExecutorThreads(),
- 10000, "Remoting-PublicExecutor", true);
+ 10000, "Remoting-AsyncExecutor", true);
this.remotingCommandFactory = new RemotingCommandFactoryImpl();
}
@@ -144,8 +167,8 @@ public abstract class NettyRemotingAbstract implements RemotingService {
ResponseFuture rf = this.ackTables.remove(requestID);
if (rf != null) {
- LOG.warn("remove timeout request {} ", rf);
- rf.setCause(new RemoteTimeoutException(rf.getRemoteAddr(), rf.getTimeoutMillis()));
+ LOG.warn("Removes timeout request {} ", rf.getRequestCommand());
+ rf.setCause(new RemoteTimeoutException(String.format("Request to %s timeout", rf.getRemoteAddr()), rf.getTimeoutMillis()));
executeAsyncHandler(rf);
}
}
@@ -153,6 +176,8 @@ public abstract class NettyRemotingAbstract implements RemotingService {
@Override
public void start() {
+ startUpHouseKeepingService();
+
if (this.channelEventListenerGroup.size() > 0) {
this.channelEventExecutor.start();
}
@@ -230,11 +255,10 @@ public abstract class NettyRemotingAbstract implements RemotingService {
responseFuture.release();
}
} else {
- LOG.warn("request {} from {} has not matched response !", response, RemotingUtil.extractRemoteAddress(ctx.channel()));
+ LOG.warn("Response {} from {} doesn't have a matched request!", response, RemotingUtil.extractRemoteAddress(ctx.channel()));
}
}
- @NotNull
private Runnable buildProcessorTask(final ChannelHandlerContext ctx, final RemotingCommand cmd,
final Pair<RequestProcessor, ExecutorService> processorExecutorPair, final RemotingChannel channel) {
return new Runnable() {
@@ -302,7 +326,7 @@ public abstract class NettyRemotingAbstract implements RemotingService {
}
}
- private void requestFail(final int requestID, final Throwable cause) {
+ private void requestFail(final int requestID, final RemoteRuntimeException cause) {
ResponseFuture responseFuture = ackTables.remove(requestID);
if (responseFuture != null) {
responseFuture.setSendRequestOK(false);
@@ -312,7 +336,7 @@ public abstract class NettyRemotingAbstract implements RemotingService {
}
}
- private void requestFail(final ResponseFuture responseFuture, final Throwable cause) {
+ private void requestFail(final ResponseFuture responseFuture, final RemoteRuntimeException cause) {
responseFuture.setCause(cause);
executeAsyncHandler(responseFuture);
}
@@ -368,7 +392,7 @@ public abstract class NettyRemotingAbstract implements RemotingService {
ChannelFutureListener listener = new ChannelFutureListener() {
@Override
- public void operationComplete(ChannelFuture f) throws Exception {
+ public void operationComplete(ChannelFuture f) {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
@@ -376,7 +400,7 @@ public abstract class NettyRemotingAbstract implements RemotingService {
responseFuture.setSendRequestOK(false);
ackTables.remove(requestID);
- responseFuture.setCause(f.cause());
+ responseFuture.setCause(new RemoteAccessException(RemotingUtil.extractRemoteAddress(channel), f.cause()));
responseFuture.putResponse(null);
LOG.warn("Send request command to {} failed !", remoteAddr);
@@ -390,9 +414,10 @@ public abstract class NettyRemotingAbstract implements RemotingService {
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
- throw new RemoteTimeoutException(RemotingUtil.extractRemoteAddress(channel), timeoutMillis, responseFuture.getCause());
+ responseFuture.setCause(new RemoteTimeoutException(RemotingUtil.extractRemoteAddress(channel), timeoutMillis));
+ throw responseFuture.getCause();
} else {
- throw new RemoteAccessException(RemotingUtil.extractRemoteAddress(channel), responseFuture.getCause());
+ throw responseFuture.getCause();
}
}
@@ -439,14 +464,14 @@ public abstract class NettyRemotingAbstract implements RemotingService {
return;
}
- requestFail(requestID, f.cause());
+ requestFail(requestID, new RemoteAccessException(RemotingUtil.extractRemoteAddress(channel), f.cause()));
LOG.warn("Send request command to channel failed.", remoteAddr);
}
};
this.writeAndFlush(channel, request, listener);
} catch (Exception e) {
- requestFail(requestID, e);
+ requestFail(requestID, new RemoteAccessException(RemotingUtil.extractRemoteAddress(channel), e));
LOG.error("Send request command to channel " + channel + " error !", e);
}
} else {
@@ -543,7 +568,7 @@ public abstract class NettyRemotingAbstract implements RemotingService {
if (this.eventQueue.size() <= MAX_SIZE) {
this.eventQueue.add(event);
} else {
- LOG.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString());
+ LOG.warn("Event queue size[{}] meets the limit, so drop this event {}", this.eventQueue.size(), event.toString());
}
}
@@ -559,7 +584,7 @@ public abstract class NettyRemotingAbstract implements RemotingService {
if (event != null && listener != null) {
RemotingChannel channel = new NettyChannelImpl(event.getChannel());
- LOG.warn("Channel Event, {}", event);
+ LOG.info("Dispatch received channel event, {}", event);
switch (event.getType()) {
case IDLE:
@@ -572,14 +597,14 @@ public abstract class NettyRemotingAbstract implements RemotingService {
listener.onChannelConnect(channel);
break;
case EXCEPTION:
- listener.onChannelException(channel);
+ listener.onChannelException(channel, event.getCause());
break;
default:
break;
}
}
} catch (Exception e) {
- LOG.error("error", e);
+ LOG.warn("Exception thrown when dispatching channel event", e);
break;
}
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
index 6b2796e..c146813 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
@@ -98,8 +98,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
});
applyOptions(clientBootstrap);
-
- startUpHouseKeepingService();
}
@Override
@@ -237,7 +235,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOG.info("Close channel {} because of error {} ", ctx.channel(), cause);
NettyRemotingClient.this.clientChannelManager.closeChannel(ctx.channel());
- putNettyEvent(new NettyChannelEvent(NettyChannelEventType.EXCEPTION, ctx.channel()));
+ putNettyEvent(new NettyChannelEvent(NettyChannelEventType.EXCEPTION, ctx.channel(), cause));
}
}
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
index f0dbb45..0967208 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
@@ -114,8 +114,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
ChannelFuture channelFuture = this.serverBootstrap.bind(this.serverConfig.getServerListenPort()).syncUninterruptibly();
this.port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
-
- startUpHouseKeepingService();
}
@Override
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java
index 702e2b4..3c5a90e 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java
@@ -22,12 +22,12 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
-import java.net.InetSocketAddress;
import org.apache.rocketmq.remoting.api.buffer.ByteBufferWrapper;
import org.apache.rocketmq.remoting.api.command.RemotingCommand;
import org.apache.rocketmq.remoting.api.exception.RemoteCodecException;
import org.apache.rocketmq.remoting.impl.buffer.NettyByteBufferWrapper;
import org.apache.rocketmq.remoting.impl.command.CodecHelper;
+import org.apache.rocketmq.remoting.internal.RemotingUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,10 +44,7 @@ public class Encoder extends MessageToByteEncoder<RemotingCommand> {
encode(remotingCommand, wrapper);
} catch (final RemoteCodecException e) {
- String remoteAddress = "UnKnown";
- if (ctx.channel().remoteAddress() instanceof InetSocketAddress) {
- remoteAddress = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();
- }
+ String remoteAddress = RemotingUtil.extractRemoteAddress(ctx.channel());
LOG.error(String.format("Error occurred when encoding command for channel %s", remoteAddress), e);
ctx.channel().close().addListener(new ChannelFutureListener() {
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/RemotingUtil.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/RemotingUtil.java
index 89e4bff..9d23fcf 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/RemotingUtil.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/RemotingUtil.java
@@ -19,9 +19,16 @@ package org.apache.rocketmq.remoting.internal;
import io.netty.channel.Channel;
import java.net.InetSocketAddress;
+import java.net.SocketAddress;
public class RemotingUtil {
public static String extractRemoteAddress(Channel channel) {
- return ((InetSocketAddress) channel.remoteAddress()).getAddress().getHostAddress();
+ SocketAddress socketAddress = channel.remoteAddress();
+
+ if (socketAddress instanceof InetSocketAddress) {
+ return ((InetSocketAddress) socketAddress).getAddress().getHostAddress();
+ }
+
+ return "Unknown";
}
}
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java
index ed7c93a..6a3112b 100644
--- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java
@@ -19,7 +19,7 @@ package org.apache.rocketmq.remoting;
import java.util.Random;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomStringUtils;
@@ -27,11 +27,17 @@ import org.apache.rocketmq.remoting.api.command.RemotingCommand;
import org.apache.rocketmq.remoting.api.command.TrafficType;
import org.apache.rocketmq.remoting.external.ThreadUtils;
import org.apache.rocketmq.remoting.impl.command.RemotingCommandFactoryImpl;
+import org.assertj.core.api.Fail;
public class BaseTest {
- protected void runInThreads(final Runnable runnable, int threadsNum) {
- ExecutorService executor = Executors.newFixedThreadPool(threadsNum);
- for (int i = 0; i < threadsNum; i++) {
+ protected void scheduleInThreads(final Runnable runnable, int periodMillis) {
+ final ScheduledExecutorService executor = ThreadUtils.newSingleThreadScheduledExecutor("UnitTests", true);
+ executor.scheduleAtFixedRate(runnable, 0, periodMillis, TimeUnit.MILLISECONDS);
+ }
+
+ protected void runInThreads(final Runnable runnable, int concurrentNum) {
+ final ExecutorService executor = ThreadUtils.newFixedThreadPool(concurrentNum, 1000, "UnitTests", true);
+ for (int i = 0; i < concurrentNum; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
@@ -39,8 +45,6 @@ public class BaseTest {
}
});
}
-
- ThreadUtils.shutdownGracefully(executor, 5, TimeUnit.SECONDS);
}
protected void runInThreads(final Runnable runnable, int threadsNum,
@@ -84,4 +88,44 @@ public class BaseTest {
return command;
}
+
+ protected <T> ObjectFuture<T> newObjectFuture(int permits, int timeoutMillis) {
+ return new ObjectFuture<>(permits, timeoutMillis);
+ }
+
+ protected class ObjectFuture<T> {
+ private T object;
+ private Semaphore semaphore;
+ private int permits;
+ private int timeoutMillis;
+
+ public ObjectFuture(int permits, int timeoutMillis) {
+ semaphore = new Semaphore(0);
+ this.permits = permits;
+ this.timeoutMillis = timeoutMillis;
+ }
+
+ public void release() {
+ semaphore.release();
+ }
+
+ public void putObject(T object) {
+ this.object = object;
+ }
+
+ public T getObject() {
+ try {
+ if (!semaphore.tryAcquire(permits, timeoutMillis, TimeUnit.MILLISECONDS)) {
+ Fail.fail("Get permits failed");
+ }
+ } catch (InterruptedException e) {
+ Fail.fail("Get object failed", e);
+ }
+ return this.object;
+ }
+ }
+
+ public class UnitTestException extends RuntimeException {
+
+ }
}
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/common/ResponseFutureTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/common/ResponseFutureTest.java
index c8e1f25..1a16e56 100644
--- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/common/ResponseFutureTest.java
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/common/ResponseFutureTest.java
@@ -21,6 +21,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.remoting.BaseTest;
import org.apache.rocketmq.remoting.api.AsyncHandler;
import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.api.exception.RemoteAccessException;
+import org.apache.rocketmq.remoting.api.exception.RemoteRuntimeException;
import org.apache.rocketmq.remoting.impl.command.RemotingCommandFactoryImpl;
import org.junit.Test;
@@ -56,7 +58,7 @@ public class ResponseFutureTest extends BaseTest {
public void executeAsyncHandler_Failure() {
final RemotingCommand reqCommand = factory.createRequest();
final RemotingCommand resCommand = factory.createResponse(reqCommand);
- final Throwable exception = new RuntimeException("Test Exception");
+ final RemoteRuntimeException exception = new RemoteAccessException("Test Exception");
future = new ResponseFuture(1, 3000, new AsyncHandler() {
@Override
public void onFailure(final RemotingCommand request, final Throwable cause) {
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstractTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstractTest.java
new file mode 100644
index 0000000..5537579
--- /dev/null
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstractTest.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelPromise;
+import io.netty.channel.DefaultEventLoop;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.embedded.EmbeddedChannel;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.remoting.BaseTest;
+import org.apache.rocketmq.remoting.api.AsyncHandler;
+import org.apache.rocketmq.remoting.api.RequestProcessor;
+import org.apache.rocketmq.remoting.api.channel.ChannelEventListener;
+import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.api.exception.RemoteAccessException;
+import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException;
+import org.apache.rocketmq.remoting.config.RemotingClientConfig;
+import org.apache.rocketmq.remoting.impl.channel.NettyChannelImpl;
+import org.apache.rocketmq.remoting.impl.netty.handler.Decoder;
+import org.apache.rocketmq.remoting.impl.netty.handler.Encoder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class NettyRemotingAbstractTest extends BaseTest {
+
+ private NettyRemotingAbstract remotingAbstract;
+
+ @Mock
+ private Channel mockedClientChannel;
+
+ private EmbeddedChannel clientChannel;
+
+ private EmbeddedChannel serverChannel;
+
+ private RemotingCommand remotingRequest;
+
+ private short requestCode = 123;
+
+ @Before
+ public void setUp() {
+ remotingAbstract = new NettyRemotingAbstract(new RemotingClientConfig()) {
+ };
+
+ clientChannel = new EmbeddedChannel(new Encoder(), new Decoder(), new SimpleChannelInboundHandler<RemotingCommand>() {
+
+ @Override
+ protected void channelRead0(final ChannelHandlerContext ctx, final RemotingCommand msg) throws Exception {
+ remotingAbstract.processMessageReceived(ctx, msg);
+ }
+ });
+
+ serverChannel = new EmbeddedChannel(new Encoder(), new Decoder(), new SimpleChannelInboundHandler<RemotingCommand>() {
+
+ @Override
+ protected void channelRead0(final ChannelHandlerContext ctx, final RemotingCommand msg) throws Exception {
+ remotingAbstract.processMessageReceived(ctx, msg);
+ }
+ });
+
+ remotingRequest = remotingAbstract.commandFactory().createRequest();
+ remotingRequest.cmdCode(requestCode);
+ remotingRequest.payload("Ping".getBytes());
+
+ // Simulate the tcp stack
+ scheduleInThreads(new Runnable() {
+ @Override
+ public void run() {
+ ByteBuf msg = clientChannel.readOutbound();
+ if (msg != null) {
+ serverChannel.writeInbound(msg);
+ }
+
+ msg = serverChannel.readOutbound();
+
+ if (msg != null) {
+ clientChannel.writeInbound(msg);
+ }
+ }
+ }, 1);
+
+ remotingAbstract.start();
+ }
+
+ @After
+ public void tearDown() {
+ remotingAbstract.stop();
+ }
+
+ @Test
+ public void putNettyEvent_Success() {
+ final Throwable exception = new RuntimeException();
+ final ObjectFuture objectFuture = newObjectFuture(4, 100);
+ remotingAbstract.registerChannelEventListener(new ChannelEventListener() {
+ @Override
+ public void onChannelConnect(final RemotingChannel channel) {
+ if (((NettyChannelImpl) channel).getChannel() == mockedClientChannel) {
+ objectFuture.release();
+ }
+ }
+
+ @Override
+ public void onChannelClose(final RemotingChannel channel) {
+ if (((NettyChannelImpl) channel).getChannel() == mockedClientChannel) {
+ objectFuture.release();
+ }
+ }
+
+ @Override
+ public void onChannelException(final RemotingChannel channel, final Throwable cause) {
+ if (((NettyChannelImpl) channel).getChannel() == mockedClientChannel && exception == cause) {
+ objectFuture.release();
+ }
+ }
+
+ @Override
+ public void onChannelIdle(final RemotingChannel channel) {
+ if (((NettyChannelImpl) channel).getChannel() == mockedClientChannel) {
+ objectFuture.release();
+ }
+ }
+ });
+
+ remotingAbstract.channelEventExecutor.start();
+
+ remotingAbstract.putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CONNECT, mockedClientChannel));
+ remotingAbstract.putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CLOSE, mockedClientChannel));
+ remotingAbstract.putNettyEvent(new NettyChannelEvent(NettyChannelEventType.IDLE, mockedClientChannel));
+ remotingAbstract.putNettyEvent(new NettyChannelEvent(NettyChannelEventType.EXCEPTION, mockedClientChannel, exception));
+
+ objectFuture.getObject();
+ }
+
+ @Test
+ public void putNettyEvent_EventDropped() throws InterruptedException {
+ final Semaphore eventCount = new Semaphore(0);
+ final Semaphore droppedEvent = new Semaphore(0);
+
+ remotingAbstract.registerChannelEventListener(new ChannelEventListener() {
+ @Override
+ public void onChannelConnect(final RemotingChannel channel) {
+ eventCount.release();
+ }
+
+ @Override
+ public void onChannelClose(final RemotingChannel channel) {
+ droppedEvent.release();
+ }
+
+ @Override
+ public void onChannelException(final RemotingChannel channel, final Throwable cause) {
+
+ }
+
+ @Override
+ public void onChannelIdle(final RemotingChannel channel) {
+
+ }
+ });
+
+ int maxLimit = 10001;
+
+ for (int i = 0; i < maxLimit; i++) {
+ remotingAbstract.putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CONNECT, mockedClientChannel));
+ }
+
+ // This event will be dropped
+ remotingAbstract.putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CLOSE, mockedClientChannel));
+
+ remotingAbstract.channelEventExecutor.start();
+
+ assertThat(eventCount.tryAcquire(maxLimit, 1000, TimeUnit.MILLISECONDS)).isTrue();
+
+ assertThat(droppedEvent.tryAcquire(1, 10, TimeUnit.MILLISECONDS)).isFalse();
+ }
+
+ @Test
+ public void scanResponseTable_RemoveTimeoutRequest() throws InterruptedException {
+ final ObjectFuture<Throwable> objectFuture = newObjectFuture(1, 10);
+
+ remotingAbstract.invokeAsyncWithInterceptor(new EmbeddedChannel(),
+ remotingAbstract.commandFactory().createRequest(),
+ new AsyncHandler() {
+ @Override
+ public void onFailure(final RemotingCommand request, final Throwable cause) {
+ objectFuture.putObject(cause);
+ objectFuture.release();
+ }
+
+ @Override
+ public void onSuccess(final RemotingCommand response) {
+
+ }
+ }, 10);
+
+ TimeUnit.MILLISECONDS.sleep(15);
+ remotingAbstract.scanResponseTable();
+
+ assertThat(objectFuture.getObject()).isInstanceOf(RemoteTimeoutException.class);
+ }
+
+ @Test
+ public void invokeWithInterceptor_Success() {
+ registerNormalProcessor();
+
+ RemotingCommand response = remotingAbstract.invokeWithInterceptor(clientChannel, remotingRequest, 3000);
+
+ assertThat(new String(response.payload())).isEqualTo("Pong");
+ }
+
+ @Test
+ public void invokeWithInterceptor_Timeout() {
+ registerTimeoutProcessor(20);
+
+ try {
+ RemotingCommand response = remotingAbstract.invokeWithInterceptor(clientChannel, remotingRequest, 10);
+ failBecauseExceptionWasNotThrown(RemoteTimeoutException.class);
+ } catch (Exception e) {
+ assertThat(e).isInstanceOf(RemoteTimeoutException.class);
+ }
+ }
+
+ @Test
+ public void invokeWithInterceptor_AccessException() {
+ ChannelPromise channelPromise = new DefaultChannelPromise(mockedClientChannel, new DefaultEventLoop());
+
+ when(mockedClientChannel.writeAndFlush(any(Object.class))).thenReturn(channelPromise);
+ channelPromise.setFailure(new UnitTestException());
+
+ try {
+ RemotingCommand response = remotingAbstract.invokeWithInterceptor(mockedClientChannel, remotingRequest, 10);
+ failBecauseExceptionWasNotThrown(RemoteAccessException.class);
+ } catch (Exception e) {
+ assertThat(e.getCause()).isInstanceOf(UnitTestException.class);
+ assertThat(e).isInstanceOf(RemoteAccessException.class);
+ }
+ }
+
+ @Test
+ public void invokeAsyncWithInterceptor_Success() {
+ registerNormalProcessor();
+
+ final ObjectFuture<RemotingCommand> objectFuture = newObjectFuture(1, 10);
+
+ remotingAbstract.invokeAsyncWithInterceptor(clientChannel, remotingRequest, new AsyncHandler() {
+ @Override
+ public void onFailure(final RemotingCommand request, final Throwable cause) {
+
+ }
+
+ @Override
+ public void onSuccess(final RemotingCommand response) {
+ objectFuture.putObject(response);
+ objectFuture.release();
+ }
+ }, 3000);
+
+ assertThat(new String(objectFuture.getObject().payload())).isEqualTo("Pong");
+ }
+
+ @Test
+ public void invokeOnewayWithInterceptor_Success() {
+ ObjectFuture<RemotingCommand> objectFuture = newObjectFuture(1, 10);
+ registerOnewayProcessor(objectFuture);
+
+ remotingAbstract.invokeOnewayWithInterceptor(clientChannel, remotingRequest);
+
+ // Receive the response directly
+ assertThat(new String(objectFuture.getObject().payload())).isEqualTo("Pong");
+ }
+
+ @Test
+ public void registerInterceptor() {
+ }
+
+ @Test
+ public void registerRequestProcessor() {
+ }
+
+ @Test
+ public void registerRequestProcessor1() {
+ }
+
+ @Test
+ public void unregisterRequestProcessor() {
+ }
+
+ @Test
+ public void processor() {
+ }
+
+ @Test
+ public void registerChannelEventListener() {
+ }
+
+ private void registerTimeoutProcessor(final int timeoutMillis) {
+ remotingAbstract.registerRequestProcessor(requestCode, new RequestProcessor() {
+ @Override
+ public RemotingCommand processRequest(final RemotingChannel channel, final RemotingCommand request) {
+ RemotingCommand response = remotingAbstract.commandFactory().createResponse(request);
+ response.payload("Pong".getBytes());
+ try {
+ TimeUnit.MILLISECONDS.sleep(timeoutMillis);
+ } catch (InterruptedException ignore) {
+ }
+ return response;
+ }
+ });
+ }
+
+ private void registerNormalProcessor() {
+ remotingAbstract.registerRequestProcessor(requestCode, new RequestProcessor() {
+ @Override
+ public RemotingCommand processRequest(final RemotingChannel channel, final RemotingCommand request) {
+ RemotingCommand response = remotingAbstract.commandFactory().createResponse(request);
+ response.payload("Pong".getBytes());
+ return response;
+ }
+ });
+ }
+
+ private void registerOnewayProcessor(final ObjectFuture<RemotingCommand> objectFuture) {
+ remotingAbstract.registerRequestProcessor(requestCode, new RequestProcessor() {
+ @Override
+ public RemotingCommand processRequest(final RemotingChannel channel, final RemotingCommand request) {
+ RemotingCommand response = remotingAbstract.commandFactory().createResponse(request);
+ response.payload("Pong".getBytes());
+ objectFuture.putObject(response);
+ objectFuture.release();
+ return response;
+ }
+ });
+ }
+}
\ No newline at end of file