You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/09/27 03:15:58 UTC
[dubbo] branch 3.0 updated: optimize triple server send frame error
handler (#8922)
This is an automated email from the ASF dual-hosted git repository.
guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 6a79caf optimize triple server send frame error handler (#8922)
6a79caf is described below
commit 6a79caffe59396acea7ebacecbc4388917065512
Author: earthchen <yo...@duobei.com>
AuthorDate: Mon Sep 27 11:15:43 2021 +0800
optimize triple server send frame error handler (#8922)
---
.../dubbo/rpc/protocol/tri/ServerTransportObserver.java | 11 +++++------
.../rpc/protocol/tri/TripleHttp2FrameServerHandler.java | 14 ++------------
2 files changed, 7 insertions(+), 18 deletions(-)
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
index fdb5085..552ce7e 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
@@ -22,7 +22,6 @@ import org.apache.dubbo.common.logger.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
@@ -32,16 +31,15 @@ import io.netty.handler.codec.http2.Http2Error;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
public class ServerTransportObserver implements TransportObserver {
+
private static final Logger LOGGER = LoggerFactory.getLogger(ServerTransportObserver.class);
private final ChannelHandlerContext ctx;
- private final ChannelPromise promise;
private boolean headerSent = false;
private boolean resetSent = false;
- public ServerTransportObserver(ChannelHandlerContext ctx, ChannelPromise promise) {
+ public ServerTransportObserver(ChannelHandlerContext ctx) {
this.ctx = ctx;
- this.promise = promise;
}
@Override
@@ -58,10 +56,11 @@ public class ServerTransportObserver implements TransportObserver {
headers.status(OK.codeAsText());
headers.set(TripleHeaderEnum.CONTENT_TYPE_KEY.getHeader(), TripleConstant.CONTENT_PROTO);
}
+ // If endStream is true, the channel will be closed, so you cannot listen for errors and continue sending any frame
ctx.writeAndFlush(new DefaultHttp2HeadersFrame(headers, endStream))
.addListener(future -> {
if (!future.isSuccess()) {
- LOGGER.warn("write header error", future.cause());
+ LOGGER.warn("send header error endStream=" + endStream, future.cause());
}
});
}
@@ -89,7 +88,7 @@ public class ServerTransportObserver implements TransportObserver {
ctx.writeAndFlush(new DefaultHttp2DataFrame(buf, false))
.addListener(future -> {
if (!future.isSuccess()) {
- LOGGER.warn("write data error", future.cause());
+ LOGGER.warn("send data error endStream=" + endStream, future.cause());
}
});
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
index 34ee52f..e3d0421 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
@@ -33,7 +33,6 @@ import org.apache.dubbo.rpc.service.ServiceDescriptorInternalCache;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
@@ -216,16 +215,11 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
stream = AbstractServerStream.unary(invoker.getUrl());
}
Channel channel = ctx.channel();
- ChannelPromise promise = channel.newPromise();
- promise.addListener(future -> {
- if (!future.isSuccess()) {
- exceptionCaught(ctx, future.cause());
- }
- });
+ // You can add listeners to ChannelPromise here if you want to listen for the result of sending a frame
stream.service(providerModel.getServiceModel())
.invoker(invoker)
.methodName(methodName)
- .subscribe(new ServerTransportObserver(ctx, promise));
+ .subscribe(new ServerTransportObserver(ctx));
if (methodDescriptor != null) {
stream.method(methodDescriptor);
} else {
@@ -250,8 +244,4 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
return CommonConstants.$INVOKE.equals(methodName) || CommonConstants.$INVOKE_ASYNC.equals(methodName);
}
- @Override
- public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
- super.write(ctx, msg, promise);
- }
}