You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/10/11 09:45:43 UTC

[dubbo] branch 3.0 updated: [3.0-Triple]Fix compressor flag = 1 when compressor is none and format (#9021)

This is an automated email from the ASF dual-hosted git repository.

guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 4b7fec2  [3.0-Triple]Fix compressor flag = 1 when compressor is none and format (#9021)
4b7fec2 is described below

commit 4b7fec2d5a9ca15290eb45691984878bc47def40
Author: GuoHao <gu...@gmail.com>
AuthorDate: Mon Oct 11 17:45:19 2021 +0800

    [3.0-Triple]Fix compressor flag = 1 when compressor is none and format (#9021)
    
    * Fix compressor flag = 1 when compressor is none and format
    
    * Fix compressor flag = 1 when compressor is none and format
    
    * Fix server compressor
---
 .../protocol/tri/TripleClientRequestHandler.java   | 25 ++++++----
 .../tri/TripleHttp2ClientResponseHandler.java      | 26 ++++++----
 .../tri/TripleHttp2FrameServerHandler.java         | 58 ++++++++++++----------
 3 files changed, 60 insertions(+), 49 deletions(-)

diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
index 7b28c02..2a01952 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
@@ -79,18 +79,21 @@ public class TripleClientRequestHandler extends ChannelDuplexHandler {
 
         // get compressor
         String compressorStr = ConfigurationUtils
-            .getGlobalConfiguration(url.getScopeModel()).getString(COMPRESSOR_KEY, DEFAULT_COMPRESSOR);
-        Compressor compressor = url.getOrDefaultApplicationModel().getExtensionLoader(Compressor.class).getExtension(compressorStr);
-        stream.setCompressor(compressor);
-        ctx.channel().attr(TripleUtil.COMPRESSOR_KEY).set(compressor);
+            .getGlobalConfiguration(url.getScopeModel()).getString(COMPRESSOR_KEY);
+
+        if (null != compressorStr && !compressorStr.equals(DEFAULT_COMPRESSOR)) {
+            Compressor compressor = url.getOrDefaultApplicationModel().getExtensionLoader(Compressor.class).getExtension(compressorStr);
+            stream.setCompressor(compressor);
+            ctx.channel().attr(TripleUtil.COMPRESSOR_KEY).set(compressor);
+        }
 
         stream.service(consumerModel)
-                .connection(Connection.getConnectionFromChannel(ctx.channel()))
-                .method(methodDescriptor)
-                .methodName(methodDescriptor.getMethodName())
-                .request(req)
-                .serialize((String) inv.getObjectAttachment(Constants.SERIALIZATION_KEY))
-                .subscribe(new ClientTransportObserver(ctx, stream, promise));
+            .connection(Connection.getConnectionFromChannel(ctx.channel()))
+            .method(methodDescriptor)
+            .methodName(methodDescriptor.getMethodName())
+            .request(req)
+            .serialize((String) inv.getObjectAttachment(Constants.SERIALIZATION_KEY))
+            .subscribe(new ClientTransportObserver(ctx, stream, promise));
 
         if (methodDescriptor.isUnary()) {
             stream.asStreamObserver().onNext(inv);
@@ -100,7 +103,7 @@ public class TripleClientRequestHandler extends ChannelDuplexHandler {
             AppResponse result;
             // the stream method params is fixed
             if (methodDescriptor.getRpcType() == MethodDescriptor.RpcType.BIDIRECTIONAL_STREAM
-                    || methodDescriptor.getRpcType() == MethodDescriptor.RpcType.CLIENT_STREAM) {
+                || methodDescriptor.getRpcType() == MethodDescriptor.RpcType.CLIENT_STREAM) {
                 StreamObserver<Object> obServer = (StreamObserver<Object>) inv.getArguments()[0];
                 obServer = attachCancelContext(obServer, stream.getCancellationContext());
                 stream.subscribe(obServer);
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
index b41d4e7..930fc0a 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
@@ -29,6 +29,8 @@ import io.netty.handler.codec.http2.Http2HeadersFrame;
 import io.netty.handler.codec.http2.Http2ResetFrame;
 import io.netty.handler.codec.http2.Http2StreamFrame;
 
+import static org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
+
 public final class TripleHttp2ClientResponseHandler extends SimpleChannelInboundHandler<Http2StreamFrame> {
     private static final Logger logger = LoggerFactory.getLogger(TripleHttp2ClientResponseHandler.class);
 
@@ -43,7 +45,7 @@ public final class TripleHttp2ClientResponseHandler extends SimpleChannelInbound
             Http2GoAwayFrame event = (Http2GoAwayFrame) evt;
             ctx.close();
             logger.debug(
-                    "Event triggered, event name is: " + event.name() + ", last stream id is: " + event.lastStreamId());
+                "Event triggered, event name is: " + event.name() + ", last stream id is: " + event.lastStreamId());
         } else if (evt instanceof Http2ResetFrame) {
             onResetRead(ctx, (Http2ResetFrame) evt);
         }
@@ -73,15 +75,17 @@ public final class TripleHttp2ClientResponseHandler extends SimpleChannelInbound
         CharSequence messageEncoding = headers.get(TripleHeaderEnum.GRPC_ENCODING.getHeader());
         if (null != messageEncoding) {
             String compressorStr = messageEncoding.toString();
-            Compressor compressor = clientStream.getUrl().getOrDefaultApplicationModel()
-                .getExtensionLoader(Compressor.class).getExtension(compressorStr);
-            if (null == compressor) {
-                throw GrpcStatus.fromCode(GrpcStatus.Code.UNIMPLEMENTED)
-                    .withDescription(String.format("Grpc-encoding '%s' is not supported", compressorStr))
-                    .asException();
-            } else {
-                clientStream.setCompressor(compressor);
-                ctx.channel().attr(TripleUtil.COMPRESSOR_KEY).set(compressor);
+            if (!compressorStr.equals(DEFAULT_COMPRESSOR)) {
+                Compressor compressor = clientStream.getUrl().getOrDefaultApplicationModel()
+                    .getExtensionLoader(Compressor.class).getExtension(compressorStr);
+                if (null == compressor) {
+                    throw GrpcStatus.fromCode(GrpcStatus.Code.UNIMPLEMENTED)
+                        .withDescription(String.format("Grpc-encoding '%s' is not supported", compressorStr))
+                        .asException();
+                } else {
+                    clientStream.setCompressor(compressor);
+                    ctx.channel().attr(TripleUtil.COMPRESSOR_KEY).set(compressor);
+                }
             }
         }
         final TransportObserver observer = clientStream.asTransportObserver();
@@ -95,7 +99,7 @@ public final class TripleHttp2ClientResponseHandler extends SimpleChannelInbound
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
         final AbstractClientStream clientStream = TripleUtil.getClientStream(ctx);
         final GrpcStatus status = GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
-                .withCause(cause);
+            .withCause(cause);
         Metadata metadata = new DefaultMetadata();
         metadata.put(TripleHeaderEnum.STATUS_KEY.getHeader(), Integer.toString(status.code.code));
         metadata.put(TripleHeaderEnum.MESSAGE_KEY.getHeader(), status.toMessage());
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
index 64443a5..eb9505d 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
@@ -47,6 +47,8 @@ import io.netty.util.ReferenceCountUtil;
 
 import java.util.List;
 
+import static org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
+
 public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
     private static final Logger LOGGER = LoggerFactory.getLogger(TripleHttp2FrameServerHandler.class);
     private final PathResolver PATH_RESOLVER;
@@ -96,7 +98,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
             TripleUtil.responseErr(ctx, GrpcStatus.rpcExceptionCodeToGrpc(((RpcException) cause).getCode()));
         } else {
             TripleUtil.responseErr(ctx, GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
-                    .withDescription("Provider's error:\n" + cause.getMessage()));
+                .withDescription("Provider's error:\n" + cause.getMessage()));
         }
     }
 
@@ -113,9 +115,9 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
 
     private Invoker<?> getInvoker(Http2Headers headers, String serviceName) {
         final String version = headers.contains(TripleHeaderEnum.SERVICE_VERSION.getHeader()) ? headers.get(
-                TripleHeaderEnum.SERVICE_VERSION.getHeader()).toString() : null;
+            TripleHeaderEnum.SERVICE_VERSION.getHeader()).toString() : null;
         final String group = headers.contains(TripleHeaderEnum.SERVICE_GROUP.getHeader()) ? headers.get(TripleHeaderEnum.SERVICE_GROUP.getHeader())
-                .toString() : null;
+            .toString() : null;
         final String key = URL.buildKey(serviceName, group, version);
         Invoker<?> invoker = PATH_RESOLVER.resolve(key);
         if (invoker == null) {
@@ -129,38 +131,38 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
 
         if (!HttpMethod.POST.asciiName().contentEquals(headers.method())) {
             TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED.code(),
-                    GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
-                            .withDescription(String.format("Method '%s' is not supported", headers.method())));
+                GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
+                    .withDescription(String.format("Method '%s' is not supported", headers.method())));
             return;
         }
 
         if (headers.path() == null) {
             TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.NOT_FOUND.code(),
-                    GrpcStatus.fromCode(Code.UNIMPLEMENTED.code).withDescription("Expected path but is missing"));
+                GrpcStatus.fromCode(Code.UNIMPLEMENTED.code).withDescription("Expected path but is missing"));
             return;
         }
 
         final String path = headers.path().toString();
         if (path.charAt(0) != '/') {
             TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.NOT_FOUND.code(),
-                    GrpcStatus.fromCode(Code.UNIMPLEMENTED.code)
-                            .withDescription(String.format("Expected path to start with /: %s", path)));
+                GrpcStatus.fromCode(Code.UNIMPLEMENTED.code)
+                    .withDescription(String.format("Expected path to start with /: %s", path)));
             return;
         }
 
         final CharSequence contentType = HttpUtil.getMimeType(headers.get(HttpHeaderNames.CONTENT_TYPE));
         if (contentType == null) {
             TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.code(),
-                    GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL.code)
-                            .withDescription("Content-Type is missing from the request"));
+                GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL.code)
+                    .withDescription("Content-Type is missing from the request"));
             return;
         }
 
         final String contentString = contentType.toString();
         if (!TripleUtil.supportContentType(contentString)) {
             TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.code(),
-                    GrpcStatus.fromCode(Code.INTERNAL.code)
-                            .withDescription(String.format("Content-Type '%s' is not supported", contentString)));
+                GrpcStatus.fromCode(Code.INTERNAL.code)
+                    .withDescription(String.format("Content-Type '%s' is not supported", contentString)));
             return;
         }
 
@@ -176,14 +178,14 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
         final Invoker<?> invoker = getInvoker(headers, serviceName);
         if (invoker == null) {
             TripleUtil.responseErr(ctx,
-                    GrpcStatus.fromCode(Code.UNIMPLEMENTED).withDescription("Service not found:" + serviceName));
+                GrpcStatus.fromCode(Code.UNIMPLEMENTED).withDescription("Service not found:" + serviceName));
             return;
         }
         FrameworkServiceRepository repo = frameworkModel.getServiceRepository();
         ProviderModel providerModel = repo.lookupExportedService(invoker.getUrl().getServiceKey());
         if (providerModel == null || providerModel.getServiceModel() == null) {
             TripleUtil.responseErr(ctx,
-                    GrpcStatus.fromCode(Code.UNIMPLEMENTED).withDescription("Service not found:" + serviceName));
+                GrpcStatus.fromCode(Code.UNIMPLEMENTED).withDescription("Service not found:" + serviceName));
             return;
         }
 
@@ -200,7 +202,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
             methodDescriptors = providerModel.getServiceModel().getMethods(methodName);
             if (CollectionUtils.isEmpty(methodDescriptors)) {
                 TripleUtil.responseErr(ctx, GrpcStatus.fromCode(Code.UNIMPLEMENTED)
-                        .withDescription("Method :" + methodName + " not found of service:" + serviceName));
+                    .withDescription("Method :" + methodName + " not found of service:" + serviceName));
                 return;
             }
             // In most cases there is only one method
@@ -214,9 +216,9 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
         Channel channel = ctx.channel();
         // You can add listeners to ChannelPromise here if you want to listen for the result of sending a frame
         stream.service(providerModel.getServiceModel())
-                .invoker(invoker)
-                .methodName(methodName)
-                .subscribe(new ServerTransportObserver(ctx));
+            .invoker(invoker)
+            .methodName(methodName)
+            .subscribe(new ServerTransportObserver(ctx));
         if (methodDescriptor != null) {
             stream.method(methodDescriptor);
         } else {
@@ -226,15 +228,17 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
         CharSequence messageEncoding = headers.get(TripleHeaderEnum.GRPC_ENCODING.getHeader());
         if (null != messageEncoding) {
             String compressorStr = messageEncoding.toString();
-            Compressor compressor = invoker.getUrl().getOrDefaultApplicationModel().
-                getExtensionLoader(Compressor.class).getExtension(compressorStr);
-            if (null == compressor) {
-                TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.NOT_FOUND.code(),
-                    GrpcStatus.fromCode(Code.UNIMPLEMENTED.code)
-                        .withDescription(String.format("Grpc-encoding '%s' is not supported", compressorStr)));
-            } else {
-                stream.setCompressor(compressor);
-                ctx.channel().attr(TripleUtil.COMPRESSOR_KEY).set(compressor);
+            if (!compressorStr.equals(DEFAULT_COMPRESSOR)) {
+                Compressor compressor = invoker.getUrl().getOrDefaultApplicationModel().
+                    getExtensionLoader(Compressor.class).getExtension(compressorStr);
+                if (null == compressor) {
+                    TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.NOT_FOUND.code(),
+                        GrpcStatus.fromCode(Code.UNIMPLEMENTED.code)
+                            .withDescription(String.format("Grpc-encoding '%s' is not supported", compressorStr)));
+                } else {
+                    stream.setCompressor(compressor);
+                    ctx.channel().attr(TripleUtil.COMPRESSOR_KEY).set(compressor);
+                }
             }
         }
         final TransportObserver observer = stream.asTransportObserver();