You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2021/02/24 14:37:57 UTC
[dubbo] branch 3.0 updated: Triple bugfix (#7263)
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new adcd2df Triple bugfix (#7263)
adcd2df is described below
commit adcd2df4ee5e4c3eb1889646f092582e4ff10099
Author: GuoHao <gu...@gmail.com>
AuthorDate: Wed Feb 24 22:37:07 2021 +0800
Triple bugfix (#7263)
* binary header value
* Remove channel context
* generic invoke
* mem leak in tri server handler
---
.../dubbo/rpc/protocol/tri/AbstractStream.java | 2 +-
.../dubbo/rpc/protocol/tri/ServerStream.java | 30 ++++++++++++----------
.../dubbo/rpc/protocol/tri/TripleConstant.java | 1 -
.../rpc/protocol/tri/TripleHttp2Protocol.java | 9 ++++++-
4 files changed, 25 insertions(+), 17 deletions(-)
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
index ae4d9c7..8782cd0 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
@@ -161,7 +161,7 @@ public abstract class AbstractStream implements Stream {
if (v instanceof String) {
trailers.addObject(key, v);
} else if (v instanceof byte[]) {
- trailers.addObject(key + "-bin", TripleUtil.encodeBase64((byte[]) v));
+ trailers.add(key + "-bin", TripleUtil.encodeBase64ASCII((byte[]) v));
}
} else {
if (v instanceof String || serializeType == null) {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
index c46bac3..b924f46 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
@@ -213,17 +213,22 @@ public class ServerStream extends AbstractStream implements Stream {
ClassLoader tccl = Thread.currentThread().getContextClassLoader();
ServiceRepository repo = ApplicationModel.getServiceRepository();
final List<MethodDescriptor> methods = serviceDescriptor.getMethods(methodName);
- if (methods == null || methods.isEmpty()) {
- responseErr(ctx, GrpcStatus.fromCode(Code.UNIMPLEMENTED)
- .withDescription("Method not found:" + methodName + " of service:" + serviceDescriptor.getServiceName()));
- return null;
- }
- if (methods.size() == 1) {
- this.methodDescriptor = methods.get(0);
- setNeedWrap(TripleUtil.needWrapper(this.methodDescriptor.getParameterClasses()));
- } else {
- // can not determine which one to invoke when same protobuf method name is used, force wrap it
+ if (CommonConstants.$INVOKE.equals(methodName) || CommonConstants.$INVOKE_ASYNC.equals(methodName)) {
+ this.methodDescriptor = repo.lookupMethod(GenericService.class.getName(), methodName);
setNeedWrap(true);
+ } else {
+ if (methods == null || methods.isEmpty()) {
+ responseErr(ctx, GrpcStatus.fromCode(Code.UNIMPLEMENTED)
+ .withDescription("Method not found:" + methodName + " of service:" + serviceDescriptor.getServiceName()));
+ return null;
+ }
+ if (methods.size() == 1) {
+ this.methodDescriptor = methods.get(0);
+ setNeedWrap(TripleUtil.needWrapper(this.methodDescriptor.getParameterClasses()));
+ } else {
+ // can not determine which one to invoke when same protobuf method name is used, force wrap it
+ setNeedWrap(true);
+ }
}
if (isNeedWrap()) {
loadFromURL(getUrl());
@@ -236,9 +241,7 @@ public class ServerStream extends AbstractStream implements Stream {
if (isNeedWrap()) {
final TripleWrapper.TripleRequestWrapper req = TripleUtil.unpack(getData(), TripleWrapper.TripleRequestWrapper.class);
setSerializeType(req.getSerializeType());
- if (CommonConstants.$INVOKE.equals(methodName) || CommonConstants.$INVOKE_ASYNC.equals(methodName)) {
- this.methodDescriptor = repo.lookupMethod(GenericService.class.getName(), methodName);
- } else {
+ if (this.methodDescriptor == null) {
String[] paramTypes = req.getArgTypesList().toArray(new String[req.getArgsCount()]);
for (MethodDescriptor method : methods) {
if (Arrays.equals(method.getCompatibleParamSignatures(), paramTypes)) {
@@ -269,7 +272,6 @@ public class ServerStream extends AbstractStream implements Stream {
inv.setParameterTypes(methodDescriptor.getParameterClasses());
inv.setReturnTypes(methodDescriptor.getReturnTypes());
final Map<String, Object> attachments = parseHeadersToMap(getHeaders());
- attachments.put(TripleConstant.TRI_CHANNEL_CTX_KEY, ctx);
inv.setObjectAttachments(attachments);
return inv;
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
index bf7af26..23ce833 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
@@ -17,7 +17,6 @@
package org.apache.dubbo.rpc.protocol.tri;
public interface TripleConstant {
- String TRI_CHANNEL_CTX_KEY = "tri-ctx-channel";
String STATUS_KEY = "grpc-status";
String MESSAGE_KEY = "grpc-message";
String CONTENT_TYPE_KEY = "content-type";
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index ebc4288..8d8766d 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -20,8 +20,10 @@ package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.remoting.api.Http2WireProtocol;
+import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
+import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http2.Http2FrameCodec;
import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
import io.netty.handler.codec.http2.Http2MultiplexHandler;
@@ -47,7 +49,12 @@ public class TripleHttp2Protocol extends Http2WireProtocol {
.frameLogger(SERVER_LOGGER)
.build();
final Http2MultiplexHandler handler = new Http2MultiplexHandler(new TripleServerInitializer());
- pipeline.addLast(codec, new TripleServerConnectionHandler(), handler);
+ pipeline.addLast(codec, new TripleServerConnectionHandler(), handler, new SimpleChannelInboundHandler<Object>() {
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
+ // empty
+ }
+ });
}
@Override