You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/09/23 07:13:23 UTC
[dubbo] branch 3.0 updated: close stream when timeout (#8888)
This is an automated email from the ASF dual-hosted git repository.
guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 7894430 close stream when timeout (#8888)
7894430 is described below
commit 789443008d4b62bb59a55638cf14549c4dde92d7
Author: earthchen <yo...@duobei.com>
AuthorDate: Thu Sep 23 15:13:08 2021 +0800
close stream when timeout (#8888)
---
.../remoting/exchange/support/DefaultFuture2.java | 24 +++++++++++++++++++++-
.../rpc/protocol/tri/TripleClientHandler.java | 14 +++++++------
2 files changed, 31 insertions(+), 7 deletions(-)
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture2.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture2.java
index 45702f9..8268093 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture2.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture2.java
@@ -32,7 +32,9 @@ import org.apache.dubbo.remoting.exchange.Response;
import java.net.InetSocketAddress;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
import java.util.Date;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -58,6 +60,8 @@ public class DefaultFuture2 extends CompletableFuture<Object> {
private volatile long sent;
private Timeout timeoutCheckTask;
+ private List<Runnable> timeoutListeners = new ArrayList<>();
+
private ExecutorService executor;
private DefaultFuture2(Connection client2, Request request, int timeout) {
@@ -68,6 +72,19 @@ public class DefaultFuture2 extends CompletableFuture<Object> {
FUTURES.put(request.getId(), this);
}
+ public void addTimeoutListener(Runnable runnable) {
+ timeoutListeners.add(runnable);
+ }
+
+ public static void addTimeoutListener(long id, Runnable runnable) {
+ DefaultFuture2 defaultFuture2 = FUTURES.get(id);
+ defaultFuture2.addTimeoutListener(runnable);
+ }
+
+ public List<Runnable> getTimeoutListeners() {
+ return timeoutListeners;
+ }
+
/**
* check time out of the future
*/
@@ -236,7 +253,12 @@ public class DefaultFuture2 extends CompletableFuture<Object> {
}
if (future.getExecutor() != null) {
- future.getExecutor().execute(() -> notifyTimeout(future));
+ future.getExecutor().execute(() -> {
+ notifyTimeout(future);
+ for (Runnable timeoutListener : future.getTimeoutListeners()) {
+ timeoutListener.run();
+ }
+ });
} else {
notifyTimeout(future);
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
index a8ccb01..1fb4bf0 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
@@ -16,12 +16,6 @@
*/
package org.apache.dubbo.rpc.protocol.tri;
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
-import io.netty.handler.codec.http2.Http2GoAwayFrame;
-import io.netty.handler.codec.http2.Http2SettingsFrame;
-import io.netty.util.ReferenceCountUtil;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.stream.StreamObserver;
@@ -39,6 +33,13 @@ import org.apache.dubbo.rpc.model.ConsumerModel;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.model.MethodDescriptor;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http2.Http2GoAwayFrame;
+import io.netty.handler.codec.http2.Http2SettingsFrame;
+import io.netty.util.ReferenceCountUtil;
+
import java.util.Arrays;
import java.util.List;
@@ -73,6 +74,7 @@ public class TripleClientHandler extends ChannelDuplexHandler {
}
private void writeRequest(ChannelHandlerContext ctx, final Request req, final ChannelPromise promise) {
+ DefaultFuture2.addTimeoutListener(req.getId(), ctx::close);
final RpcInvocation inv = (RpcInvocation) req.getData();
final URL url = inv.getInvoker().getUrl();
ConsumerModel consumerModel = inv.getServiceModel() != null ? (ConsumerModel) inv.getServiceModel() : (ConsumerModel) url.getServiceModel();