You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/09/14 12:13:30 UTC

[dubbo] branch 3.0 updated: [3.0-Triple] use promise catch server send big header exception (#8774)

This is an automated email from the ASF dual-hosted git repository.

guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 7757f28  [3.0-Triple] use promise catch server send big header exception (#8774)
7757f28 is described below

commit 7757f28118b023128dd9ee557d75c7f759be6212
Author: earthchen <yo...@duobei.com>
AuthorDate: Tue Sep 14 07:13:16 2021 -0500

    [3.0-Triple] use promise catch server send big header exception (#8774)
    
    * use promise catch server send big header exception
    
    * remove duplicate set server stream key
---
 .../rpc/protocol/tri/AbstractServerStream.java     |  3 +-
 .../dubbo/rpc/protocol/tri/AbstractStream.java     |  6 ----
 .../apache/dubbo/rpc/protocol/tri/GrpcStatus.java  |  5 +--
 .../rpc/protocol/tri/ServerTransportObserver.java  | 12 ++++++--
 .../tri/TripleHttp2FrameServerHandler.java         | 36 ++++++++++++++--------
 .../rpc/protocol/tri/TripleHttp2Protocol.java      | 15 ++++-----
 .../dubbo/rpc/protocol/tri/TripleInvoker.java      |  3 +-
 .../dubbo/rpc/protocol/tri/UnaryClientStream.java  |  7 +++--
 .../dubbo/rpc/protocol/tri/UnaryServerStream.java  |  3 +-
 9 files changed, 54 insertions(+), 36 deletions(-)

diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
index 7905405..36ebd88 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
@@ -17,7 +17,6 @@
 
 package org.apache.dubbo.rpc.protocol.tri;
 
-import com.google.protobuf.Message;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.constants.CommonConstants;
 import org.apache.dubbo.common.extension.ExtensionLoader;
@@ -33,6 +32,8 @@ import org.apache.dubbo.rpc.model.ScopeModelUtil;
 import org.apache.dubbo.rpc.model.ServiceDescriptor;
 import org.apache.dubbo.triple.TripleWrapper;
 
+import com.google.protobuf.Message;
+
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
index 4265de7..822d655 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
@@ -295,15 +295,9 @@ public abstract class AbstractStream implements Stream {
         try {
             if (v instanceof String) {
                 String str = (String) v;
-                if (TripleUtil.overEachHeaderListSize(str)) {
-                    return;
-                }
                 metadata.put(key, str);
             } else if (v instanceof byte[]) {
                 String str = TripleUtil.encodeBase64ASCII((byte[]) v);
-                if (TripleUtil.overEachHeaderListSize(str)) {
-                    return;
-                }
                 metadata.put(key + "-bin", str);
             }
         } catch (Throwable t) {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java
index 0df9771..c21ec32 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java
@@ -16,12 +16,13 @@
  */
 package org.apache.dubbo.rpc.protocol.tri;
 
-import io.netty.handler.codec.http.QueryStringDecoder;
-import io.netty.handler.codec.http.QueryStringEncoder;
 import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.remoting.exchange.Response;
 import org.apache.dubbo.rpc.RpcException;
 
+import io.netty.handler.codec.http.QueryStringDecoder;
+import io.netty.handler.codec.http.QueryStringEncoder;
+
 import static org.apache.dubbo.rpc.RpcException.FORBIDDEN_EXCEPTION;
 import static org.apache.dubbo.rpc.RpcException.LIMIT_EXCEEDED_EXCEPTION;
 import static org.apache.dubbo.rpc.RpcException.METHOD_NOT_FOUND;
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
index fabea1c..a15e5e0 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.rpc.protocol.tri;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
 import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
 import io.netty.handler.codec.http2.DefaultHttp2Headers;
 import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
@@ -27,10 +28,12 @@ import static io.netty.handler.codec.http.HttpResponseStatus.OK;
 
 public class ServerTransportObserver implements TransportObserver {
     private final ChannelHandlerContext ctx;
+    private final ChannelPromise promise;
     private boolean headerSent = false;
 
-    public ServerTransportObserver(ChannelHandlerContext ctx) {
+    public ServerTransportObserver(ChannelHandlerContext ctx, ChannelPromise promise) {
         this.ctx = ctx;
+        this.promise = promise;
     }
 
     @Override
@@ -44,7 +47,12 @@ public class ServerTransportObserver implements TransportObserver {
             headers.status(OK.codeAsText());
             headers.set(TripleHeaderEnum.CONTENT_TYPE_KEY.getHeader(), TripleConstant.CONTENT_PROTO);
         }
-        ctx.writeAndFlush(new DefaultHttp2HeadersFrame(headers, endStream));
+        ctx.writeAndFlush(new DefaultHttp2HeadersFrame(headers, endStream))
+            .addListener(future -> {
+                if (!future.isSuccess()) {
+                    promise.tryFailure(future.cause());
+                }
+            });
     }
 
     @Override
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
index df3b05e..1ad2b80 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
@@ -16,17 +16,6 @@
  */
 package org.apache.dubbo.rpc.protocol.tri;
 
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.http.HttpHeaderNames;
-import io.netty.handler.codec.http.HttpMethod;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http.HttpUtil;
-import io.netty.handler.codec.http2.Http2DataFrame;
-import io.netty.handler.codec.http2.Http2Frame;
-import io.netty.handler.codec.http2.Http2Headers;
-import io.netty.handler.codec.http2.Http2HeadersFrame;
-import io.netty.util.ReferenceCountUtil;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.constants.CommonConstants;
 import org.apache.dubbo.common.logger.Logger;
@@ -41,6 +30,20 @@ import org.apache.dubbo.rpc.model.ProviderModel;
 import org.apache.dubbo.rpc.protocol.tri.GrpcStatus.Code;
 import org.apache.dubbo.rpc.service.ServiceDescriptorInternalCache;
 
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http2.Http2DataFrame;
+import io.netty.handler.codec.http2.Http2Frame;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.codec.http2.Http2HeadersFrame;
+import io.netty.util.ReferenceCountUtil;
+
 import java.util.List;
 
 import static org.apache.dubbo.rpc.protocol.tri.GrpcStatus.rpcExceptionCodeToGrpc;
@@ -198,10 +201,17 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
         } else {
             stream = AbstractServerStream.unary(invoker.getUrl());
         }
+        Channel channel = ctx.channel();
+        ChannelPromise promise = channel.newPromise();
+        promise.addListener(future -> {
+            if (!future.isSuccess()) {
+                exceptionCaught(ctx, future.cause());
+            }
+        });
         stream.service(providerModel.getServiceModel())
             .invoker(invoker)
             .methodName(methodName)
-            .subscribe(new ServerTransportObserver(ctx));
+            .subscribe(new ServerTransportObserver(ctx,promise));
         if (methodDescriptor != null) {
             stream.method(methodDescriptor);
         } else {
@@ -214,7 +224,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
             observer.onComplete();
         }
 
-        ctx.channel().attr(TripleUtil.SERVER_STREAM_KEY).set(stream);
+        channel.attr(TripleUtil.SERVER_STREAM_KEY).set(stream);
     }
 
 
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index 79eba43..625136b 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -16,6 +16,14 @@
  */
 package org.apache.dubbo.rpc.protocol.tri;
 
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.Configuration;
+import org.apache.dubbo.common.config.ConfigurationUtils;
+import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.remoting.api.Http2WireProtocol;
+import org.apache.dubbo.rpc.model.FrameworkModel;
+import org.apache.dubbo.rpc.model.ScopeModelAware;
+
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.SimpleChannelInboundHandler;
@@ -24,13 +32,6 @@ import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
 import io.netty.handler.codec.http2.Http2MultiplexHandler;
 import io.netty.handler.codec.http2.Http2Settings;
 import io.netty.handler.ssl.SslContext;
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.config.Configuration;
-import org.apache.dubbo.common.config.ConfigurationUtils;
-import org.apache.dubbo.common.extension.Activate;
-import org.apache.dubbo.remoting.api.Http2WireProtocol;
-import org.apache.dubbo.rpc.model.FrameworkModel;
-import org.apache.dubbo.rpc.model.ScopeModelAware;
 
 import static org.apache.dubbo.rpc.Constants.H2_SETTINGS_ENABLE_PUSH_KEY;
 import static org.apache.dubbo.rpc.Constants.H2_SETTINGS_HEADER_TABLE_SIZE_KEY;
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
index fdf9005..da25ae0 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
@@ -16,7 +16,6 @@
  */
 package org.apache.dubbo.rpc.protocol.tri;
 
-import io.netty.channel.ChannelFuture;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.Version;
 import org.apache.dubbo.common.extension.ExtensionLoader;
@@ -42,6 +41,8 @@ import org.apache.dubbo.rpc.TimeoutCountDown;
 import org.apache.dubbo.rpc.protocol.AbstractInvoker;
 import org.apache.dubbo.rpc.support.RpcUtils;
 
+import io.netty.channel.ChannelFuture;
+
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
index 415bdc5..6ae3b59 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
@@ -17,9 +17,6 @@
 
 package org.apache.dubbo.rpc.protocol.tri;
 
-import com.google.protobuf.Any;
-import com.google.rpc.DebugInfo;
-import com.google.rpc.Status;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.remoting.exchange.Response;
@@ -27,6 +24,10 @@ import org.apache.dubbo.remoting.exchange.support.DefaultFuture2;
 import org.apache.dubbo.rpc.AppResponse;
 import org.apache.dubbo.rpc.RpcException;
 
+import com.google.protobuf.Any;
+import com.google.rpc.DebugInfo;
+import com.google.rpc.Status;
+
 import java.util.List;
 import java.util.Map;
 
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
index 27005a6..2140463 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
@@ -17,7 +17,6 @@
 
 package org.apache.dubbo.rpc.protocol.tri;
 
-import io.netty.handler.codec.http.HttpHeaderNames;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.remoting.TimeoutException;
@@ -27,6 +26,8 @@ import org.apache.dubbo.rpc.RpcContext;
 import org.apache.dubbo.rpc.RpcException;
 import org.apache.dubbo.rpc.RpcInvocation;
 
+import io.netty.handler.codec.http.HttpHeaderNames;
+
 import java.util.Map;
 import java.util.concurrent.CompletionStage;
 import java.util.function.BiConsumer;