You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2018/07/14 02:40:49 UTC

[rocketmq] branch develop updated: Invoke callback at once when channel is close (#95)

This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 6ae619c  Invoke callback at once when channel is close (#95)
6ae619c is described below

commit 6ae619c403175d54550c7f8e0c6d49f78c5f546a
Author: Jaskey <li...@gmail.com>
AuthorDate: Sat Jul 14 10:40:46 2018 +0800

    Invoke callback at once when channel is close (#95)
---
 .../rocketmq/client/impl/MQClientAPIImplTest.java  |  2 +-
 .../remoting/netty/NettyRemotingAbstract.java      | 51 +++++++++++++++-------
 .../remoting/netty/NettyRemotingClient.java        |  2 +-
 .../rocketmq/remoting/netty/ResponseFuture.java    | 20 +++++++--
 4 files changed, 53 insertions(+), 22 deletions(-)

diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
index bf01961..c13e75c 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@ -162,7 +162,7 @@ public class MQClientAPIImplTest {
             public Object answer(InvocationOnMock mock) throws Throwable {
                 InvokeCallback callback = mock.getArgument(3);
                 RemotingCommand request = mock.getArgument(1);
-                ResponseFuture responseFuture = new ResponseFuture(request.getOpaque(), 3 * 1000, null, null);
+                ResponseFuture responseFuture = new ResponseFuture(null,request.getOpaque(), 3 * 1000, null, null);
                 responseFuture.setResponseCommand(createSuccessResponse(request));
                 callback.operationComplete(responseFuture);
                 return null;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 7c414e9..45ca730 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -364,7 +364,7 @@ public abstract class NettyRemotingAbstract {
         final int opaque = request.getOpaque();
 
         try {
-            final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
+            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
             this.responseTable.put(opaque, responseFuture);
             final SocketAddress addr = channel.remoteAddress();
             channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@@ -407,8 +407,7 @@ public abstract class NettyRemotingAbstract {
         boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
         if (acquired) {
             final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
-
-            final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once);
+            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, invokeCallback, once);
             this.responseTable.put(opaque, responseFuture);
             try {
                 channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@@ -417,20 +416,8 @@ public abstract class NettyRemotingAbstract {
                         if (f.isSuccess()) {
                             responseFuture.setSendRequestOK(true);
                             return;
-                        } else {
-                            responseFuture.setSendRequestOK(false);
-                        }
-
-                        responseFuture.putResponse(null);
-                        responseTable.remove(opaque);
-                        try {
-                            executeInvokeCallback(responseFuture);
-                        } catch (Throwable e) {
-                            log.warn("excute callback in writeAndFlush addListener, and callback throw", e);
-                        } finally {
-                            responseFuture.release();
                         }
-
+                        requestFail(opaque);
                         log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
                     }
                 });
@@ -455,6 +442,38 @@ public abstract class NettyRemotingAbstract {
         }
     }
 
+    private void requestFail(final int opaque) {
+        ResponseFuture responseFuture = responseTable.remove(opaque);
+        if (responseFuture != null) {
+            responseFuture.setSendRequestOK(false);
+            responseFuture.putResponse(null);
+            try {
+                executeInvokeCallback(responseFuture);
+            } catch (Throwable e) {
+                log.warn("execute callback in requestFail, and callback throw", e);
+            } finally {
+                responseFuture.release();
+            }
+        }
+    }
+
+    /**
+     * mark the request of the specified channel as fail and to invoke fail callback immediately
+     * @param channel the channel which is close already
+     */
+    protected void failFast(final Channel channel) {
+        Iterator<Entry<Integer, ResponseFuture>> it = responseTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<Integer, ResponseFuture> entry = it.next();
+            if (entry.getValue().getProcessChannel() == channel) {
+                Integer opaque = entry.getKey();
+                if (opaque != null) {
+                    requestFail(opaque);
+                }
+            }
+        }
+    }
+
     public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
         throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
         request.markOnewayRPC();
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 7cdfb80..241f2b0 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -660,7 +660,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
             log.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress);
             closeChannel(ctx.channel());
             super.close(ctx, promise);
-
+            NettyRemotingClient.this.failFast(ctx.channel());
             if (NettyRemotingClient.this.channelEventListener != null) {
                 NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
             }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
index 1157c45..5f4c8c6 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.remoting.netty;
 
+import io.netty.channel.Channel;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -25,6 +26,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 public class ResponseFuture {
     private final int opaque;
+    private final Channel processChannel;
     private final long timeoutMillis;
     private final InvokeCallback invokeCallback;
     private final long beginTimestamp = System.currentTimeMillis();
@@ -37,9 +39,10 @@ public class ResponseFuture {
     private volatile boolean sendRequestOK = true;
     private volatile Throwable cause;
 
-    public ResponseFuture(int opaque, long timeoutMillis, InvokeCallback invokeCallback,
+    public ResponseFuture(Channel channel, int opaque, long timeoutMillis, InvokeCallback invokeCallback,
         SemaphoreReleaseOnlyOnce once) {
         this.opaque = opaque;
+        this.processChannel = channel;
         this.timeoutMillis = timeoutMillis;
         this.invokeCallback = invokeCallback;
         this.once = once;
@@ -114,11 +117,20 @@ public class ResponseFuture {
         return opaque;
     }
 
+    public Channel getProcessChannel() {
+        return processChannel;
+    }
+
     @Override
     public String toString() {
-        return "ResponseFuture [responseCommand=" + responseCommand + ", sendRequestOK=" + sendRequestOK
-            + ", cause=" + cause + ", opaque=" + opaque + ", timeoutMillis=" + timeoutMillis
-            + ", invokeCallback=" + invokeCallback + ", beginTimestamp=" + beginTimestamp
+        return "ResponseFuture [responseCommand=" + responseCommand
+            + ", sendRequestOK=" + sendRequestOK
+            + ", cause=" + cause
+            + ", opaque=" + opaque
+            + ", processChannel=" + processChannel
+            + ", timeoutMillis=" + timeoutMillis
+            + ", invokeCallback=" + invokeCallback
+            + ", beginTimestamp=" + beginTimestamp
             + ", countDownLatch=" + countDownLatch + "]";
     }
 }