You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/09/14 12:13:30 UTC
[dubbo] branch 3.0 updated: [3.0-Triple] use promise catch server
send big header exception (#8774)
This is an automated email from the ASF dual-hosted git repository.
guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 7757f28 [3.0-Triple] use promise catch server send big header exception (#8774)
7757f28 is described below
commit 7757f28118b023128dd9ee557d75c7f759be6212
Author: earthchen <yo...@duobei.com>
AuthorDate: Tue Sep 14 07:13:16 2021 -0500
[3.0-Triple] use promise catch server send big header exception (#8774)
* use promise catch server send big header exception
* remove duplicate set server stream key
---
.../rpc/protocol/tri/AbstractServerStream.java | 3 +-
.../dubbo/rpc/protocol/tri/AbstractStream.java | 6 ----
.../apache/dubbo/rpc/protocol/tri/GrpcStatus.java | 5 +--
.../rpc/protocol/tri/ServerTransportObserver.java | 12 ++++++--
.../tri/TripleHttp2FrameServerHandler.java | 36 ++++++++++++++--------
.../rpc/protocol/tri/TripleHttp2Protocol.java | 15 ++++-----
.../dubbo/rpc/protocol/tri/TripleInvoker.java | 3 +-
.../dubbo/rpc/protocol/tri/UnaryClientStream.java | 7 +++--
.../dubbo/rpc/protocol/tri/UnaryServerStream.java | 3 +-
9 files changed, 54 insertions(+), 36 deletions(-)
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
index 7905405..36ebd88 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
@@ -17,7 +17,6 @@
package org.apache.dubbo.rpc.protocol.tri;
-import com.google.protobuf.Message;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.extension.ExtensionLoader;
@@ -33,6 +32,8 @@ import org.apache.dubbo.rpc.model.ScopeModelUtil;
import org.apache.dubbo.rpc.model.ServiceDescriptor;
import org.apache.dubbo.triple.TripleWrapper;
+import com.google.protobuf.Message;
+
import java.util.Arrays;
import java.util.List;
import java.util.Map;
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
index 4265de7..822d655 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
@@ -295,15 +295,9 @@ public abstract class AbstractStream implements Stream {
try {
if (v instanceof String) {
String str = (String) v;
- if (TripleUtil.overEachHeaderListSize(str)) {
- return;
- }
metadata.put(key, str);
} else if (v instanceof byte[]) {
String str = TripleUtil.encodeBase64ASCII((byte[]) v);
- if (TripleUtil.overEachHeaderListSize(str)) {
- return;
- }
metadata.put(key + "-bin", str);
}
} catch (Throwable t) {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java
index 0df9771..c21ec32 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java
@@ -16,12 +16,13 @@
*/
package org.apache.dubbo.rpc.protocol.tri;
-import io.netty.handler.codec.http.QueryStringDecoder;
-import io.netty.handler.codec.http.QueryStringEncoder;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.rpc.RpcException;
+import io.netty.handler.codec.http.QueryStringDecoder;
+import io.netty.handler.codec.http.QueryStringEncoder;
+
import static org.apache.dubbo.rpc.RpcException.FORBIDDEN_EXCEPTION;
import static org.apache.dubbo.rpc.RpcException.LIMIT_EXCEEDED_EXCEPTION;
import static org.apache.dubbo.rpc.RpcException.METHOD_NOT_FOUND;
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
index fabea1c..a15e5e0 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.rpc.protocol.tri;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
@@ -27,10 +28,12 @@ import static io.netty.handler.codec.http.HttpResponseStatus.OK;
public class ServerTransportObserver implements TransportObserver {
private final ChannelHandlerContext ctx;
+ private final ChannelPromise promise;
private boolean headerSent = false;
- public ServerTransportObserver(ChannelHandlerContext ctx) {
+ public ServerTransportObserver(ChannelHandlerContext ctx, ChannelPromise promise) {
this.ctx = ctx;
+ this.promise = promise;
}
@Override
@@ -44,7 +47,12 @@ public class ServerTransportObserver implements TransportObserver {
headers.status(OK.codeAsText());
headers.set(TripleHeaderEnum.CONTENT_TYPE_KEY.getHeader(), TripleConstant.CONTENT_PROTO);
}
- ctx.writeAndFlush(new DefaultHttp2HeadersFrame(headers, endStream));
+ ctx.writeAndFlush(new DefaultHttp2HeadersFrame(headers, endStream))
+ .addListener(future -> {
+ if (!future.isSuccess()) {
+ promise.tryFailure(future.cause());
+ }
+ });
}
@Override
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
index df3b05e..1ad2b80 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
@@ -16,17 +16,6 @@
*/
package org.apache.dubbo.rpc.protocol.tri;
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.http.HttpHeaderNames;
-import io.netty.handler.codec.http.HttpMethod;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http.HttpUtil;
-import io.netty.handler.codec.http2.Http2DataFrame;
-import io.netty.handler.codec.http2.Http2Frame;
-import io.netty.handler.codec.http2.Http2Headers;
-import io.netty.handler.codec.http2.Http2HeadersFrame;
-import io.netty.util.ReferenceCountUtil;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.logger.Logger;
@@ -41,6 +30,20 @@ import org.apache.dubbo.rpc.model.ProviderModel;
import org.apache.dubbo.rpc.protocol.tri.GrpcStatus.Code;
import org.apache.dubbo.rpc.service.ServiceDescriptorInternalCache;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http2.Http2DataFrame;
+import io.netty.handler.codec.http2.Http2Frame;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.codec.http2.Http2HeadersFrame;
+import io.netty.util.ReferenceCountUtil;
+
import java.util.List;
import static org.apache.dubbo.rpc.protocol.tri.GrpcStatus.rpcExceptionCodeToGrpc;
@@ -198,10 +201,17 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
} else {
stream = AbstractServerStream.unary(invoker.getUrl());
}
+ Channel channel = ctx.channel();
+ ChannelPromise promise = channel.newPromise();
+ promise.addListener(future -> {
+ if (!future.isSuccess()) {
+ exceptionCaught(ctx, future.cause());
+ }
+ });
stream.service(providerModel.getServiceModel())
.invoker(invoker)
.methodName(methodName)
- .subscribe(new ServerTransportObserver(ctx));
+ .subscribe(new ServerTransportObserver(ctx,promise));
if (methodDescriptor != null) {
stream.method(methodDescriptor);
} else {
@@ -214,7 +224,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
observer.onComplete();
}
- ctx.channel().attr(TripleUtil.SERVER_STREAM_KEY).set(stream);
+ channel.attr(TripleUtil.SERVER_STREAM_KEY).set(stream);
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index 79eba43..625136b 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -16,6 +16,14 @@
*/
package org.apache.dubbo.rpc.protocol.tri;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.Configuration;
+import org.apache.dubbo.common.config.ConfigurationUtils;
+import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.remoting.api.Http2WireProtocol;
+import org.apache.dubbo.rpc.model.FrameworkModel;
+import org.apache.dubbo.rpc.model.ScopeModelAware;
+
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler;
@@ -24,13 +32,6 @@ import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
import io.netty.handler.codec.http2.Http2MultiplexHandler;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.ssl.SslContext;
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.config.Configuration;
-import org.apache.dubbo.common.config.ConfigurationUtils;
-import org.apache.dubbo.common.extension.Activate;
-import org.apache.dubbo.remoting.api.Http2WireProtocol;
-import org.apache.dubbo.rpc.model.FrameworkModel;
-import org.apache.dubbo.rpc.model.ScopeModelAware;
import static org.apache.dubbo.rpc.Constants.H2_SETTINGS_ENABLE_PUSH_KEY;
import static org.apache.dubbo.rpc.Constants.H2_SETTINGS_HEADER_TABLE_SIZE_KEY;
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
index fdf9005..da25ae0 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
@@ -16,7 +16,6 @@
*/
package org.apache.dubbo.rpc.protocol.tri;
-import io.netty.channel.ChannelFuture;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.extension.ExtensionLoader;
@@ -42,6 +41,8 @@ import org.apache.dubbo.rpc.TimeoutCountDown;
import org.apache.dubbo.rpc.protocol.AbstractInvoker;
import org.apache.dubbo.rpc.support.RpcUtils;
+import io.netty.channel.ChannelFuture;
+
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
index 415bdc5..6ae3b59 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
@@ -17,9 +17,6 @@
package org.apache.dubbo.rpc.protocol.tri;
-import com.google.protobuf.Any;
-import com.google.rpc.DebugInfo;
-import com.google.rpc.Status;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.remoting.exchange.Response;
@@ -27,6 +24,10 @@ import org.apache.dubbo.remoting.exchange.support.DefaultFuture2;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.RpcException;
+import com.google.protobuf.Any;
+import com.google.rpc.DebugInfo;
+import com.google.rpc.Status;
+
import java.util.List;
import java.util.Map;
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
index 27005a6..2140463 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
@@ -17,7 +17,6 @@
package org.apache.dubbo.rpc.protocol.tri;
-import io.netty.handler.codec.http.HttpHeaderNames;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.remoting.TimeoutException;
@@ -27,6 +26,8 @@ import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
+import io.netty.handler.codec.http.HttpHeaderNames;
+
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;