You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/10/11 09:45:43 UTC
[dubbo] branch 3.0 updated: [3.0-Triple]Fix compressor flag = 1
when compressor is none and format (#9021)
This is an automated email from the ASF dual-hosted git repository.
guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 4b7fec2 [3.0-Triple]Fix compressor flag = 1 when compressor is none and format (#9021)
4b7fec2 is described below
commit 4b7fec2d5a9ca15290eb45691984878bc47def40
Author: GuoHao <gu...@gmail.com>
AuthorDate: Mon Oct 11 17:45:19 2021 +0800
[3.0-Triple]Fix compressor flag = 1 when compressor is none and format (#9021)
* Fix compressor flag = 1 when compressor is none and format
* Fix compressor flag = 1 when compressor is none and format
* Fix server compressor
---
.../protocol/tri/TripleClientRequestHandler.java | 25 ++++++----
.../tri/TripleHttp2ClientResponseHandler.java | 26 ++++++----
.../tri/TripleHttp2FrameServerHandler.java | 58 ++++++++++++----------
3 files changed, 60 insertions(+), 49 deletions(-)
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
index 7b28c02..2a01952 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientRequestHandler.java
@@ -79,18 +79,21 @@ public class TripleClientRequestHandler extends ChannelDuplexHandler {
// get compressor
String compressorStr = ConfigurationUtils
- .getGlobalConfiguration(url.getScopeModel()).getString(COMPRESSOR_KEY, DEFAULT_COMPRESSOR);
- Compressor compressor = url.getOrDefaultApplicationModel().getExtensionLoader(Compressor.class).getExtension(compressorStr);
- stream.setCompressor(compressor);
- ctx.channel().attr(TripleUtil.COMPRESSOR_KEY).set(compressor);
+ .getGlobalConfiguration(url.getScopeModel()).getString(COMPRESSOR_KEY);
+
+ if (null != compressorStr && !compressorStr.equals(DEFAULT_COMPRESSOR)) {
+ Compressor compressor = url.getOrDefaultApplicationModel().getExtensionLoader(Compressor.class).getExtension(compressorStr);
+ stream.setCompressor(compressor);
+ ctx.channel().attr(TripleUtil.COMPRESSOR_KEY).set(compressor);
+ }
stream.service(consumerModel)
- .connection(Connection.getConnectionFromChannel(ctx.channel()))
- .method(methodDescriptor)
- .methodName(methodDescriptor.getMethodName())
- .request(req)
- .serialize((String) inv.getObjectAttachment(Constants.SERIALIZATION_KEY))
- .subscribe(new ClientTransportObserver(ctx, stream, promise));
+ .connection(Connection.getConnectionFromChannel(ctx.channel()))
+ .method(methodDescriptor)
+ .methodName(methodDescriptor.getMethodName())
+ .request(req)
+ .serialize((String) inv.getObjectAttachment(Constants.SERIALIZATION_KEY))
+ .subscribe(new ClientTransportObserver(ctx, stream, promise));
if (methodDescriptor.isUnary()) {
stream.asStreamObserver().onNext(inv);
@@ -100,7 +103,7 @@ public class TripleClientRequestHandler extends ChannelDuplexHandler {
AppResponse result;
// the stream method params is fixed
if (methodDescriptor.getRpcType() == MethodDescriptor.RpcType.BIDIRECTIONAL_STREAM
- || methodDescriptor.getRpcType() == MethodDescriptor.RpcType.CLIENT_STREAM) {
+ || methodDescriptor.getRpcType() == MethodDescriptor.RpcType.CLIENT_STREAM) {
StreamObserver<Object> obServer = (StreamObserver<Object>) inv.getArguments()[0];
obServer = attachCancelContext(obServer, stream.getCancellationContext());
stream.subscribe(obServer);
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
index b41d4e7..930fc0a 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
@@ -29,6 +29,8 @@ import io.netty.handler.codec.http2.Http2HeadersFrame;
import io.netty.handler.codec.http2.Http2ResetFrame;
import io.netty.handler.codec.http2.Http2StreamFrame;
+import static org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
+
public final class TripleHttp2ClientResponseHandler extends SimpleChannelInboundHandler<Http2StreamFrame> {
private static final Logger logger = LoggerFactory.getLogger(TripleHttp2ClientResponseHandler.class);
@@ -43,7 +45,7 @@ public final class TripleHttp2ClientResponseHandler extends SimpleChannelInbound
Http2GoAwayFrame event = (Http2GoAwayFrame) evt;
ctx.close();
logger.debug(
- "Event triggered, event name is: " + event.name() + ", last stream id is: " + event.lastStreamId());
+ "Event triggered, event name is: " + event.name() + ", last stream id is: " + event.lastStreamId());
} else if (evt instanceof Http2ResetFrame) {
onResetRead(ctx, (Http2ResetFrame) evt);
}
@@ -73,15 +75,17 @@ public final class TripleHttp2ClientResponseHandler extends SimpleChannelInbound
CharSequence messageEncoding = headers.get(TripleHeaderEnum.GRPC_ENCODING.getHeader());
if (null != messageEncoding) {
String compressorStr = messageEncoding.toString();
- Compressor compressor = clientStream.getUrl().getOrDefaultApplicationModel()
- .getExtensionLoader(Compressor.class).getExtension(compressorStr);
- if (null == compressor) {
- throw GrpcStatus.fromCode(GrpcStatus.Code.UNIMPLEMENTED)
- .withDescription(String.format("Grpc-encoding '%s' is not supported", compressorStr))
- .asException();
- } else {
- clientStream.setCompressor(compressor);
- ctx.channel().attr(TripleUtil.COMPRESSOR_KEY).set(compressor);
+ if (!compressorStr.equals(DEFAULT_COMPRESSOR)) {
+ Compressor compressor = clientStream.getUrl().getOrDefaultApplicationModel()
+ .getExtensionLoader(Compressor.class).getExtension(compressorStr);
+ if (null == compressor) {
+ throw GrpcStatus.fromCode(GrpcStatus.Code.UNIMPLEMENTED)
+ .withDescription(String.format("Grpc-encoding '%s' is not supported", compressorStr))
+ .asException();
+ } else {
+ clientStream.setCompressor(compressor);
+ ctx.channel().attr(TripleUtil.COMPRESSOR_KEY).set(compressor);
+ }
}
}
final TransportObserver observer = clientStream.asTransportObserver();
@@ -95,7 +99,7 @@ public final class TripleHttp2ClientResponseHandler extends SimpleChannelInbound
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final AbstractClientStream clientStream = TripleUtil.getClientStream(ctx);
final GrpcStatus status = GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
- .withCause(cause);
+ .withCause(cause);
Metadata metadata = new DefaultMetadata();
metadata.put(TripleHeaderEnum.STATUS_KEY.getHeader(), Integer.toString(status.code.code));
metadata.put(TripleHeaderEnum.MESSAGE_KEY.getHeader(), status.toMessage());
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
index 64443a5..eb9505d 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
@@ -47,6 +47,8 @@ import io.netty.util.ReferenceCountUtil;
import java.util.List;
+import static org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
+
public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(TripleHttp2FrameServerHandler.class);
private final PathResolver PATH_RESOLVER;
@@ -96,7 +98,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
TripleUtil.responseErr(ctx, GrpcStatus.rpcExceptionCodeToGrpc(((RpcException) cause).getCode()));
} else {
TripleUtil.responseErr(ctx, GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
- .withDescription("Provider's error:\n" + cause.getMessage()));
+ .withDescription("Provider's error:\n" + cause.getMessage()));
}
}
@@ -113,9 +115,9 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
private Invoker<?> getInvoker(Http2Headers headers, String serviceName) {
final String version = headers.contains(TripleHeaderEnum.SERVICE_VERSION.getHeader()) ? headers.get(
- TripleHeaderEnum.SERVICE_VERSION.getHeader()).toString() : null;
+ TripleHeaderEnum.SERVICE_VERSION.getHeader()).toString() : null;
final String group = headers.contains(TripleHeaderEnum.SERVICE_GROUP.getHeader()) ? headers.get(TripleHeaderEnum.SERVICE_GROUP.getHeader())
- .toString() : null;
+ .toString() : null;
final String key = URL.buildKey(serviceName, group, version);
Invoker<?> invoker = PATH_RESOLVER.resolve(key);
if (invoker == null) {
@@ -129,38 +131,38 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
if (!HttpMethod.POST.asciiName().contentEquals(headers.method())) {
TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED.code(),
- GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
- .withDescription(String.format("Method '%s' is not supported", headers.method())));
+ GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
+ .withDescription(String.format("Method '%s' is not supported", headers.method())));
return;
}
if (headers.path() == null) {
TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.NOT_FOUND.code(),
- GrpcStatus.fromCode(Code.UNIMPLEMENTED.code).withDescription("Expected path but is missing"));
+ GrpcStatus.fromCode(Code.UNIMPLEMENTED.code).withDescription("Expected path but is missing"));
return;
}
final String path = headers.path().toString();
if (path.charAt(0) != '/') {
TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.NOT_FOUND.code(),
- GrpcStatus.fromCode(Code.UNIMPLEMENTED.code)
- .withDescription(String.format("Expected path to start with /: %s", path)));
+ GrpcStatus.fromCode(Code.UNIMPLEMENTED.code)
+ .withDescription(String.format("Expected path to start with /: %s", path)));
return;
}
final CharSequence contentType = HttpUtil.getMimeType(headers.get(HttpHeaderNames.CONTENT_TYPE));
if (contentType == null) {
TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.code(),
- GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL.code)
- .withDescription("Content-Type is missing from the request"));
+ GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL.code)
+ .withDescription("Content-Type is missing from the request"));
return;
}
final String contentString = contentType.toString();
if (!TripleUtil.supportContentType(contentString)) {
TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.code(),
- GrpcStatus.fromCode(Code.INTERNAL.code)
- .withDescription(String.format("Content-Type '%s' is not supported", contentString)));
+ GrpcStatus.fromCode(Code.INTERNAL.code)
+ .withDescription(String.format("Content-Type '%s' is not supported", contentString)));
return;
}
@@ -176,14 +178,14 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
final Invoker<?> invoker = getInvoker(headers, serviceName);
if (invoker == null) {
TripleUtil.responseErr(ctx,
- GrpcStatus.fromCode(Code.UNIMPLEMENTED).withDescription("Service not found:" + serviceName));
+ GrpcStatus.fromCode(Code.UNIMPLEMENTED).withDescription("Service not found:" + serviceName));
return;
}
FrameworkServiceRepository repo = frameworkModel.getServiceRepository();
ProviderModel providerModel = repo.lookupExportedService(invoker.getUrl().getServiceKey());
if (providerModel == null || providerModel.getServiceModel() == null) {
TripleUtil.responseErr(ctx,
- GrpcStatus.fromCode(Code.UNIMPLEMENTED).withDescription("Service not found:" + serviceName));
+ GrpcStatus.fromCode(Code.UNIMPLEMENTED).withDescription("Service not found:" + serviceName));
return;
}
@@ -200,7 +202,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
methodDescriptors = providerModel.getServiceModel().getMethods(methodName);
if (CollectionUtils.isEmpty(methodDescriptors)) {
TripleUtil.responseErr(ctx, GrpcStatus.fromCode(Code.UNIMPLEMENTED)
- .withDescription("Method :" + methodName + " not found of service:" + serviceName));
+ .withDescription("Method :" + methodName + " not found of service:" + serviceName));
return;
}
// In most cases there is only one method
@@ -214,9 +216,9 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
Channel channel = ctx.channel();
// You can add listeners to ChannelPromise here if you want to listen for the result of sending a frame
stream.service(providerModel.getServiceModel())
- .invoker(invoker)
- .methodName(methodName)
- .subscribe(new ServerTransportObserver(ctx));
+ .invoker(invoker)
+ .methodName(methodName)
+ .subscribe(new ServerTransportObserver(ctx));
if (methodDescriptor != null) {
stream.method(methodDescriptor);
} else {
@@ -226,15 +228,17 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
CharSequence messageEncoding = headers.get(TripleHeaderEnum.GRPC_ENCODING.getHeader());
if (null != messageEncoding) {
String compressorStr = messageEncoding.toString();
- Compressor compressor = invoker.getUrl().getOrDefaultApplicationModel().
- getExtensionLoader(Compressor.class).getExtension(compressorStr);
- if (null == compressor) {
- TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.NOT_FOUND.code(),
- GrpcStatus.fromCode(Code.UNIMPLEMENTED.code)
- .withDescription(String.format("Grpc-encoding '%s' is not supported", compressorStr)));
- } else {
- stream.setCompressor(compressor);
- ctx.channel().attr(TripleUtil.COMPRESSOR_KEY).set(compressor);
+ if (!compressorStr.equals(DEFAULT_COMPRESSOR)) {
+ Compressor compressor = invoker.getUrl().getOrDefaultApplicationModel().
+ getExtensionLoader(Compressor.class).getExtension(compressorStr);
+ if (null == compressor) {
+ TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.NOT_FOUND.code(),
+ GrpcStatus.fromCode(Code.UNIMPLEMENTED.code)
+ .withDescription(String.format("Grpc-encoding '%s' is not supported", compressorStr)));
+ } else {
+ stream.setCompressor(compressor);
+ ctx.channel().attr(TripleUtil.COMPRESSOR_KEY).set(compressor);
+ }
}
}
final TransportObserver observer = stream.asTransportObserver();