You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/09/08 08:19:26 UTC
[dubbo] branch 3.0 updated: [3.0-Triple] fix tri stream method can
not find methodDescriptor (#8705)
This is an automated email from the ASF dual-hosted git repository.
guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 8068505 [3.0-Triple] fix tri stream method can not find methodDescriptor (#8705)
8068505 is described below
commit 806850515bdd45da6f53980b1b35300377ea072e
Author: earthchen <yo...@duobei.com>
AuthorDate: Wed Sep 8 16:19:17 2021 +0800
[3.0-Triple] fix tri stream method can not find methodDescriptor (#8705)
* fix tri stream method can not find methodDescriptor
* change md method map for tri
* remove hack in getMethod
* put md into rpc invocation
* remove unused code
* change tri get MethodDescriptor way
* remove unused code
* change better method name
---
.../apache/dubbo/rpc/model/MethodDescriptor.java | 16 +++++++
.../apache/dubbo/rpc/model/ServiceDescriptor.java | 15 +++---
.../java/org/apache/dubbo/rpc/RpcInvocation.java | 2 +
.../rpc/protocol/tri/AbstractServerStream.java | 3 +-
.../rpc/protocol/tri/TripleClientHandler.java | 54 ++++++++++++++--------
.../tri/TripleHttp2FrameServerHandler.java | 21 +++++++--
6 files changed, 82 insertions(+), 29 deletions(-)
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java
index ccb75b9..02fd95f 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java
@@ -62,10 +62,17 @@ public class MethodDescriptor {
private final RpcType rpcType;
private final ConcurrentMap<String, Object> attributeMap = new ConcurrentHashMap<>();
+ // only for tri protocol
+ // support StreamObserver ...
+ private final Class<?>[] realParameterClasses;
+ private final Class<?> realReturnClass;
+
public MethodDescriptor(Method method) {
this.method = method;
this.methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
+ realParameterClasses = parameterTypes;
+ realReturnClass = method.getReturnType();
// bidirectional-stream: StreamObserver<Request> foo(StreamObserver<Response>)
if (parameterTypes.length == 1 && isStreamType(parameterTypes[0])) {
this.parameterClasses = new Class<?>[]{
@@ -319,6 +326,15 @@ public class MethodDescriptor {
return this.attributeMap.get(key);
}
+
+ public Class<?>[] getRealParameterClasses() {
+ return realParameterClasses;
+ }
+
+ public Class<?> getRealReturnClass() {
+ return realReturnClass;
+ }
+
public enum RpcType {
UNARY, SERVER_STREAM, CLIENT_STREAM, BIDIRECTIONAL_STREAM
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceDescriptor.java b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceDescriptor.java
index d4d6a90..c063ec5 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceDescriptor.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceDescriptor.java
@@ -29,8 +29,7 @@ import java.util.Objects;
import java.util.Set;
/**
- * ServiceModel and ServiceMetadata are to some extend duplicated with each other.
- * We should merge them in the future.
+ * ServiceModel and ServiceMetadata are to some extend duplicated with each other. We should merge them in the future.
*/
public class ServiceDescriptor {
private final String serviceName;
@@ -50,8 +49,10 @@ public class ServiceDescriptor {
for (Method method : methodsToExport) {
method.setAccessible(true);
+ MethodDescriptor methodDescriptor = new MethodDescriptor(method);
+
List<MethodDescriptor> methodModels = methods.computeIfAbsent(method.getName(), (k) -> new ArrayList<>(1));
- methodModels.add(new MethodDescriptor(method));
+ methodModels.add(methodDescriptor);
}
methods.forEach((methodName, methodList) -> {
@@ -102,8 +103,7 @@ public class ServiceDescriptor {
public MethodDescriptor getMethod(String methodName, Class<?>[] paramTypes) {
List<MethodDescriptor> methodModels = methods.get(methodName);
if (CollectionUtils.isNotEmpty(methodModels)) {
- for (int i = 0; i < methodModels.size(); i++) {
- MethodDescriptor descriptor = methodModels.get(i);
+ for (MethodDescriptor descriptor : methodModels) {
if (Arrays.equals(paramTypes, descriptor.getParameterClasses())) {
return descriptor;
}
@@ -125,7 +125,10 @@ public class ServiceDescriptor {
return false;
}
ServiceDescriptor that = (ServiceDescriptor) o;
- return Objects.equals(serviceName, that.serviceName) && Objects.equals(serviceInterfaceClass, that.serviceInterfaceClass) && Objects.equals(methods, that.methods) && Objects.equals(descToMethods, that.descToMethods);
+ return Objects.equals(serviceName, that.serviceName)
+ && Objects.equals(serviceInterfaceClass, that.serviceInterfaceClass)
+ && Objects.equals(methods, that.methods)
+ && Objects.equals(descToMethods, that.descToMethods);
}
@Override
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcInvocation.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcInvocation.java
index 5a6960f..86345f6 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcInvocation.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcInvocation.java
@@ -57,7 +57,9 @@ public class RpcInvocation implements Invocation, Serializable {
private String protocolServiceKey;
private ServiceModel serviceModel;
+
private String methodName;
+
private String serviceName;
private transient Class<?>[] parameterTypes;
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
index 7d282d9..2d4fd41 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
@@ -154,8 +154,9 @@ public abstract class AbstractServerStream extends AbstractStream implements Str
}
if (getMethodDescriptor() == null) {
final String[] paramTypes = wrapper.getArgTypesList().toArray(new String[wrapper.getArgsCount()]);
-
+ // wrapper mode the method can overload so maybe list
for (MethodDescriptor descriptor : getMethodDescriptors()) {
+ // params type is array
if (Arrays.equals(descriptor.getCompatibleParamSignatures(), paramTypes)) {
method(descriptor);
break;
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
index e33dfe7..a09c7f0 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
@@ -16,9 +16,16 @@
*/
package org.apache.dubbo.rpc.protocol.tri;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http2.Http2GoAwayFrame;
+import io.netty.handler.codec.http2.Http2SettingsFrame;
+import io.netty.util.ReferenceCountUtil;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.api.Connection;
@@ -32,12 +39,8 @@ import org.apache.dubbo.rpc.model.ConsumerModel;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.model.MethodDescriptor;
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
-import io.netty.handler.codec.http2.Http2GoAwayFrame;
-import io.netty.handler.codec.http2.Http2SettingsFrame;
-import io.netty.util.ReferenceCountUtil;
+import java.util.Arrays;
+import java.util.List;
public class TripleClientHandler extends ChannelDuplexHandler {
@@ -74,12 +77,8 @@ public class TripleClientHandler extends ChannelDuplexHandler {
final URL url = inv.getInvoker().getUrl();
ConsumerModel consumerModel = (ConsumerModel) url.getServiceModel();
- MethodDescriptor methodDescriptor = consumerModel.getServiceModel().getMethod(inv.getMethodName(), inv.getParameterTypes());
- String serviceKey = url.getServiceKey();
- // If it is InstanceAddressURL, the serviceKey may not be obtained.
- if (null == serviceKey) {
- serviceKey = inv.getTargetServiceUniqueName();
- }
+ MethodDescriptor methodDescriptor = getTriMethodDescriptor(consumerModel,inv);
+
ClassLoadUtil.switchContextLoader(consumerModel.getServiceInterfaceClass().getClassLoader());
AbstractClientStream stream;
if (methodDescriptor.isUnary()) {
@@ -92,12 +91,12 @@ public class TripleClientHandler extends ChannelDuplexHandler {
ctx.channel().attr(TripleConstant.SSL_ATTRIBUTE_KEY).set(Boolean.parseBoolean(ssl));
}
stream.service(consumerModel)
- .connection(Connection.getConnectionFromChannel(ctx.channel()))
- .method(methodDescriptor)
- .methodName(methodDescriptor.getMethodName())
- .request(req)
- .serialize((String) inv.getObjectAttachment(Constants.SERIALIZATION_KEY))
- .subscribe(new ClientTransportObserver(ctx, stream, promise));
+ .connection(Connection.getConnectionFromChannel(ctx.channel()))
+ .method(methodDescriptor)
+ .methodName(methodDescriptor.getMethodName())
+ .request(req)
+ .serialize((String) inv.getObjectAttachment(Constants.SERIALIZATION_KEY))
+ .subscribe(new ClientTransportObserver(ctx, stream, promise));
if (methodDescriptor.isUnary()) {
stream.asStreamObserver().onNext(inv);
@@ -105,8 +104,9 @@ public class TripleClientHandler extends ChannelDuplexHandler {
} else {
Response response = new Response(req.getId(), req.getVersion());
AppResponse result;
+ // the stream method params is fixed
if (methodDescriptor.getRpcType() == MethodDescriptor.RpcType.BIDIRECTIONAL_STREAM
- || methodDescriptor.getRpcType() == MethodDescriptor.RpcType.CLIENT_STREAM) {
+ || methodDescriptor.getRpcType() == MethodDescriptor.RpcType.CLIENT_STREAM) {
final StreamObserver<Object> streamObserver = (StreamObserver<Object>) inv.getArguments()[0];
stream.subscribe(streamObserver);
result = new AppResponse(stream.asStreamObserver());
@@ -121,4 +121,20 @@ public class TripleClientHandler extends ChannelDuplexHandler {
DefaultFuture2.received(stream.getConnection(), response);
}
}
+
+ /**
+ * Get the trI protocol special MethodDescriptor
+ */
+ private MethodDescriptor getTriMethodDescriptor(ConsumerModel consumerModel, RpcInvocation inv) {
+ List<MethodDescriptor> methodDescriptors = consumerModel.getServiceModel().getMethods(inv.getMethodName());
+ if (CollectionUtils.isEmpty(methodDescriptors)) {
+ throw new IllegalStateException("methodDescriptors must not be null method=" + inv.getMethodName());
+ }
+ for (MethodDescriptor methodDescriptor : methodDescriptors) {
+ if (Arrays.equals(inv.getParameterTypes(), methodDescriptor.getRealParameterClasses())) {
+ return methodDescriptor;
+ }
+ }
+ throw new IllegalStateException("methodDescriptors must not be null method=" + inv.getMethodName());
+ }
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
index 21ccbe1..7b9365e 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.model.FrameworkModel;
@@ -174,17 +175,20 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
MethodDescriptor methodDescriptor = null;
List<MethodDescriptor> methodDescriptors = null;
- if (CommonConstants.$INVOKE.equals(methodName) || CommonConstants.$INVOKE_ASYNC.equals(methodName)) {
+ if (isGeneric(methodName)) {
+ // There should be one and only one
methodDescriptor = ServiceDescriptorInternalCache.genericService().getMethods(methodName).get(0);
- } else if (CommonConstants.$ECHO.equals(methodName)) {
+ } else if (isEcho(methodName)) {
+ // There should be one and only one
methodDescriptor = ServiceDescriptorInternalCache.echoService().getMethods(methodName).get(0);
} else {
methodDescriptors = providerModel.getServiceModel().getMethods(methodName);
- if (methodDescriptors == null || methodDescriptors.isEmpty()) {
+ if (CollectionUtils.isEmpty(methodDescriptors)) {
responseErr(ctx, GrpcStatus.fromCode(Code.UNIMPLEMENTED)
.withDescription("Method :" + methodName + " not found of service:" + serviceName));
return;
}
+ // In most cases there is only one method
if (methodDescriptors.size() == 1) {
methodDescriptor = methodDescriptors.get(0);
}
@@ -202,6 +206,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
if (methodDescriptor != null) {
stream.method(methodDescriptor);
} else {
+ // Then you need to find the corresponding parameter according to the request body
stream.methods(methodDescriptors);
}
final TransportObserver observer = stream.asTransportObserver();
@@ -212,4 +217,14 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
ctx.channel().attr(TripleUtil.SERVER_STREAM_KEY).set(stream);
}
+
+
+ private boolean isEcho(String methodName) {
+ return CommonConstants.$ECHO.equals(methodName);
+ }
+
+ private boolean isGeneric(String methodName) {
+ return CommonConstants.$INVOKE.equals(methodName) || CommonConstants.$INVOKE_ASYNC.equals(methodName);
+ }
+
}