You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2018/07/14 02:40:49 UTC
[rocketmq] branch develop updated: Invoke callback at once when
channel is close (#95)
This is an automated email from the ASF dual-hosted git repository.
vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 6ae619c Invoke callback at once when channel is close (#95)
6ae619c is described below
commit 6ae619c403175d54550c7f8e0c6d49f78c5f546a
Author: Jaskey <li...@gmail.com>
AuthorDate: Sat Jul 14 10:40:46 2018 +0800
Invoke callback at once when channel is close (#95)
---
.../rocketmq/client/impl/MQClientAPIImplTest.java | 2 +-
.../remoting/netty/NettyRemotingAbstract.java | 51 +++++++++++++++-------
.../remoting/netty/NettyRemotingClient.java | 2 +-
.../rocketmq/remoting/netty/ResponseFuture.java | 20 +++++++--
4 files changed, 53 insertions(+), 22 deletions(-)
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
index bf01961..c13e75c 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@ -162,7 +162,7 @@ public class MQClientAPIImplTest {
public Object answer(InvocationOnMock mock) throws Throwable {
InvokeCallback callback = mock.getArgument(3);
RemotingCommand request = mock.getArgument(1);
- ResponseFuture responseFuture = new ResponseFuture(request.getOpaque(), 3 * 1000, null, null);
+ ResponseFuture responseFuture = new ResponseFuture(null,request.getOpaque(), 3 * 1000, null, null);
responseFuture.setResponseCommand(createSuccessResponse(request));
callback.operationComplete(responseFuture);
return null;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 7c414e9..45ca730 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -364,7 +364,7 @@ public abstract class NettyRemotingAbstract {
final int opaque = request.getOpaque();
try {
- final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
+ final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@@ -407,8 +407,7 @@ public abstract class NettyRemotingAbstract {
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
-
- final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once);
+ final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, invokeCallback, once);
this.responseTable.put(opaque, responseFuture);
try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@@ -417,20 +416,8 @@ public abstract class NettyRemotingAbstract {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
- } else {
- responseFuture.setSendRequestOK(false);
- }
-
- responseFuture.putResponse(null);
- responseTable.remove(opaque);
- try {
- executeInvokeCallback(responseFuture);
- } catch (Throwable e) {
- log.warn("excute callback in writeAndFlush addListener, and callback throw", e);
- } finally {
- responseFuture.release();
}
-
+ requestFail(opaque);
log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
}
});
@@ -455,6 +442,38 @@ public abstract class NettyRemotingAbstract {
}
}
+ private void requestFail(final int opaque) {
+ ResponseFuture responseFuture = responseTable.remove(opaque);
+ if (responseFuture != null) {
+ responseFuture.setSendRequestOK(false);
+ responseFuture.putResponse(null);
+ try {
+ executeInvokeCallback(responseFuture);
+ } catch (Throwable e) {
+ log.warn("execute callback in requestFail, and callback throw", e);
+ } finally {
+ responseFuture.release();
+ }
+ }
+ }
+
+ /**
+ * mark the request of the specified channel as fail and to invoke fail callback immediately
+ * @param channel the channel which is close already
+ */
+ protected void failFast(final Channel channel) {
+ Iterator<Entry<Integer, ResponseFuture>> it = responseTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<Integer, ResponseFuture> entry = it.next();
+ if (entry.getValue().getProcessChannel() == channel) {
+ Integer opaque = entry.getKey();
+ if (opaque != null) {
+ requestFail(opaque);
+ }
+ }
+ }
+ }
+
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
request.markOnewayRPC();
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 7cdfb80..241f2b0 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -660,7 +660,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
log.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress);
closeChannel(ctx.channel());
super.close(ctx, promise);
-
+ NettyRemotingClient.this.failFast(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
index 1157c45..5f4c8c6 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.remoting.netty;
+import io.netty.channel.Channel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -25,6 +26,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class ResponseFuture {
private final int opaque;
+ private final Channel processChannel;
private final long timeoutMillis;
private final InvokeCallback invokeCallback;
private final long beginTimestamp = System.currentTimeMillis();
@@ -37,9 +39,10 @@ public class ResponseFuture {
private volatile boolean sendRequestOK = true;
private volatile Throwable cause;
- public ResponseFuture(int opaque, long timeoutMillis, InvokeCallback invokeCallback,
+ public ResponseFuture(Channel channel, int opaque, long timeoutMillis, InvokeCallback invokeCallback,
SemaphoreReleaseOnlyOnce once) {
this.opaque = opaque;
+ this.processChannel = channel;
this.timeoutMillis = timeoutMillis;
this.invokeCallback = invokeCallback;
this.once = once;
@@ -114,11 +117,20 @@ public class ResponseFuture {
return opaque;
}
+ public Channel getProcessChannel() {
+ return processChannel;
+ }
+
@Override
public String toString() {
- return "ResponseFuture [responseCommand=" + responseCommand + ", sendRequestOK=" + sendRequestOK
- + ", cause=" + cause + ", opaque=" + opaque + ", timeoutMillis=" + timeoutMillis
- + ", invokeCallback=" + invokeCallback + ", beginTimestamp=" + beginTimestamp
+ return "ResponseFuture [responseCommand=" + responseCommand
+ + ", sendRequestOK=" + sendRequestOK
+ + ", cause=" + cause
+ + ", opaque=" + opaque
+ + ", processChannel=" + processChannel
+ + ", timeoutMillis=" + timeoutMillis
+ + ", invokeCallback=" + invokeCallback
+ + ", beginTimestamp=" + beginTimestamp
+ ", countDownLatch=" + countDownLatch + "]";
}
}