You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/09/23 07:13:23 UTC

[dubbo] branch 3.0 updated: close stream when timeout (#8888)

This is an automated email from the ASF dual-hosted git repository.

guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 7894430  close stream when timeout (#8888)
7894430 is described below

commit 789443008d4b62bb59a55638cf14549c4dde92d7
Author: earthchen <yo...@duobei.com>
AuthorDate: Thu Sep 23 15:13:08 2021 +0800

    close stream when timeout (#8888)
---
 .../remoting/exchange/support/DefaultFuture2.java  | 24 +++++++++++++++++++++-
 .../rpc/protocol/tri/TripleClientHandler.java      | 14 +++++++------
 2 files changed, 31 insertions(+), 7 deletions(-)

diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture2.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture2.java
index 45702f9..8268093 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture2.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture2.java
@@ -32,7 +32,9 @@ import org.apache.dubbo.remoting.exchange.Response;
 
 import java.net.InetSocketAddress;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -58,6 +60,8 @@ public class DefaultFuture2 extends CompletableFuture<Object> {
     private volatile long sent;
     private Timeout timeoutCheckTask;
 
+    private List<Runnable> timeoutListeners = new ArrayList<>();
+
     private ExecutorService executor;
 
     private DefaultFuture2(Connection client2, Request request, int timeout) {
@@ -68,6 +72,19 @@ public class DefaultFuture2 extends CompletableFuture<Object> {
         FUTURES.put(request.getId(), this);
     }
 
+    public void addTimeoutListener(Runnable runnable) {
+        timeoutListeners.add(runnable);
+    }
+
+    public static void addTimeoutListener(long id, Runnable runnable) {
+        DefaultFuture2 defaultFuture2 = FUTURES.get(id);
+        defaultFuture2.addTimeoutListener(runnable);
+    }
+
+    public List<Runnable> getTimeoutListeners() {
+        return timeoutListeners;
+    }
+
     /**
      * check time out of the future
      */
@@ -236,7 +253,12 @@ public class DefaultFuture2 extends CompletableFuture<Object> {
             }
 
             if (future.getExecutor() != null) {
-                future.getExecutor().execute(() -> notifyTimeout(future));
+                future.getExecutor().execute(() -> {
+                    notifyTimeout(future);
+                    for (Runnable timeoutListener : future.getTimeoutListeners()) {
+                        timeoutListener.run();
+                    }
+                });
             } else {
                 notifyTimeout(future);
             }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
index a8ccb01..1fb4bf0 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
@@ -16,12 +16,6 @@
  */
 package org.apache.dubbo.rpc.protocol.tri;
 
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
-import io.netty.handler.codec.http2.Http2GoAwayFrame;
-import io.netty.handler.codec.http2.Http2SettingsFrame;
-import io.netty.util.ReferenceCountUtil;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.constants.CommonConstants;
 import org.apache.dubbo.common.stream.StreamObserver;
@@ -39,6 +33,13 @@ import org.apache.dubbo.rpc.model.ConsumerModel;
 import org.apache.dubbo.rpc.model.FrameworkModel;
 import org.apache.dubbo.rpc.model.MethodDescriptor;
 
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http2.Http2GoAwayFrame;
+import io.netty.handler.codec.http2.Http2SettingsFrame;
+import io.netty.util.ReferenceCountUtil;
+
 import java.util.Arrays;
 import java.util.List;
 
@@ -73,6 +74,7 @@ public class TripleClientHandler extends ChannelDuplexHandler {
     }
 
     private void writeRequest(ChannelHandlerContext ctx, final Request req, final ChannelPromise promise) {
+        DefaultFuture2.addTimeoutListener(req.getId(), ctx::close);
         final RpcInvocation inv = (RpcInvocation) req.getData();
         final URL url = inv.getInvoker().getUrl();
         ConsumerModel consumerModel = inv.getServiceModel() != null ? (ConsumerModel) inv.getServiceModel() : (ConsumerModel) url.getServiceModel();