You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/09/08 08:19:26 UTC

[dubbo] branch 3.0 updated: [3.0-Triple] fix tri stream method can not find methodDescriptor (#8705)

This is an automated email from the ASF dual-hosted git repository.

guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 8068505  [3.0-Triple] fix tri stream method can not find methodDescriptor (#8705)
8068505 is described below

commit 806850515bdd45da6f53980b1b35300377ea072e
Author: earthchen <yo...@duobei.com>
AuthorDate: Wed Sep 8 16:19:17 2021 +0800

    [3.0-Triple] fix tri stream method can not find methodDescriptor (#8705)
    
    * fix tri stream method can not find methodDescriptor
    
    * change md method map for tri
    
    * remove hack in getMethod
    
    * put md into rpc invocation
    
    * remove unused code
    
    * change tri get MethodDescriptor way
    
    * remove unused code
    
    * change better method name
---
 .../apache/dubbo/rpc/model/MethodDescriptor.java   | 16 +++++++
 .../apache/dubbo/rpc/model/ServiceDescriptor.java  | 15 +++---
 .../java/org/apache/dubbo/rpc/RpcInvocation.java   |  2 +
 .../rpc/protocol/tri/AbstractServerStream.java     |  3 +-
 .../rpc/protocol/tri/TripleClientHandler.java      | 54 ++++++++++++++--------
 .../tri/TripleHttp2FrameServerHandler.java         | 21 +++++++--
 6 files changed, 82 insertions(+), 29 deletions(-)

diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java
index ccb75b9..02fd95f 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java
@@ -62,10 +62,17 @@ public class MethodDescriptor {
     private final RpcType rpcType;
     private final ConcurrentMap<String, Object> attributeMap = new ConcurrentHashMap<>();
 
+    // only for tri protocol
+    // support StreamObserver ...
+    private final Class<?>[] realParameterClasses;
+    private final Class<?> realReturnClass;
+
     public MethodDescriptor(Method method) {
         this.method = method;
         this.methodName = method.getName();
         Class<?>[] parameterTypes = method.getParameterTypes();
+        realParameterClasses = parameterTypes;
+        realReturnClass = method.getReturnType();
         // bidirectional-stream: StreamObserver<Request> foo(StreamObserver<Response>)
         if (parameterTypes.length == 1 && isStreamType(parameterTypes[0])) {
             this.parameterClasses = new Class<?>[]{
@@ -319,6 +326,15 @@ public class MethodDescriptor {
         return this.attributeMap.get(key);
     }
 
+
+    public Class<?>[] getRealParameterClasses() {
+        return realParameterClasses;
+    }
+
+    public Class<?> getRealReturnClass() {
+        return realReturnClass;
+    }
+
     public enum RpcType {
         UNARY, SERVER_STREAM, CLIENT_STREAM, BIDIRECTIONAL_STREAM
     }
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceDescriptor.java b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceDescriptor.java
index d4d6a90..c063ec5 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceDescriptor.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceDescriptor.java
@@ -29,8 +29,7 @@ import java.util.Objects;
 import java.util.Set;
 
 /**
- * ServiceModel and ServiceMetadata are to some extend duplicated with each other.
- * We should merge them in the future.
+ * ServiceModel and ServiceMetadata are to some extend duplicated with each other. We should merge them in the future.
  */
 public class ServiceDescriptor {
     private final String serviceName;
@@ -50,8 +49,10 @@ public class ServiceDescriptor {
         for (Method method : methodsToExport) {
             method.setAccessible(true);
 
+            MethodDescriptor methodDescriptor = new MethodDescriptor(method);
+
             List<MethodDescriptor> methodModels = methods.computeIfAbsent(method.getName(), (k) -> new ArrayList<>(1));
-            methodModels.add(new MethodDescriptor(method));
+            methodModels.add(methodDescriptor);
         }
 
         methods.forEach((methodName, methodList) -> {
@@ -102,8 +103,7 @@ public class ServiceDescriptor {
     public MethodDescriptor getMethod(String methodName, Class<?>[] paramTypes) {
         List<MethodDescriptor> methodModels = methods.get(methodName);
         if (CollectionUtils.isNotEmpty(methodModels)) {
-            for (int i = 0; i < methodModels.size(); i++) {
-                MethodDescriptor descriptor = methodModels.get(i);
+            for (MethodDescriptor descriptor : methodModels) {
                 if (Arrays.equals(paramTypes, descriptor.getParameterClasses())) {
                     return descriptor;
                 }
@@ -125,7 +125,10 @@ public class ServiceDescriptor {
             return false;
         }
         ServiceDescriptor that = (ServiceDescriptor) o;
-        return Objects.equals(serviceName, that.serviceName) && Objects.equals(serviceInterfaceClass, that.serviceInterfaceClass) && Objects.equals(methods, that.methods) && Objects.equals(descToMethods, that.descToMethods);
+        return Objects.equals(serviceName, that.serviceName)
+            && Objects.equals(serviceInterfaceClass, that.serviceInterfaceClass)
+            && Objects.equals(methods, that.methods)
+            && Objects.equals(descToMethods, that.descToMethods);
     }
 
     @Override
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcInvocation.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcInvocation.java
index 5a6960f..86345f6 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcInvocation.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcInvocation.java
@@ -57,7 +57,9 @@ public class RpcInvocation implements Invocation, Serializable {
     private String protocolServiceKey;
 
     private ServiceModel serviceModel;
+
     private String methodName;
+
     private String serviceName;
 
     private transient Class<?>[] parameterTypes;
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
index 7d282d9..2d4fd41 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
@@ -154,8 +154,9 @@ public abstract class AbstractServerStream extends AbstractStream implements Str
                 }
                 if (getMethodDescriptor() == null) {
                     final String[] paramTypes = wrapper.getArgTypesList().toArray(new String[wrapper.getArgsCount()]);
-
+                    // wrapper mode the method can overload so maybe list
                     for (MethodDescriptor descriptor : getMethodDescriptors()) {
+                        // params type is array
                         if (Arrays.equals(descriptor.getCompatibleParamSignatures(), paramTypes)) {
                             method(descriptor);
                             break;
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
index e33dfe7..a09c7f0 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
@@ -16,9 +16,16 @@
  */
 package org.apache.dubbo.rpc.protocol.tri;
 
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http2.Http2GoAwayFrame;
+import io.netty.handler.codec.http2.Http2SettingsFrame;
+import io.netty.util.ReferenceCountUtil;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.constants.CommonConstants;
 import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.remoting.Constants;
 import org.apache.dubbo.remoting.api.Connection;
@@ -32,12 +39,8 @@ import org.apache.dubbo.rpc.model.ConsumerModel;
 import org.apache.dubbo.rpc.model.FrameworkModel;
 import org.apache.dubbo.rpc.model.MethodDescriptor;
 
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
-import io.netty.handler.codec.http2.Http2GoAwayFrame;
-import io.netty.handler.codec.http2.Http2SettingsFrame;
-import io.netty.util.ReferenceCountUtil;
+import java.util.Arrays;
+import java.util.List;
 
 public class TripleClientHandler extends ChannelDuplexHandler {
 
@@ -74,12 +77,8 @@ public class TripleClientHandler extends ChannelDuplexHandler {
         final URL url = inv.getInvoker().getUrl();
         ConsumerModel consumerModel = (ConsumerModel) url.getServiceModel();
 
-        MethodDescriptor methodDescriptor = consumerModel.getServiceModel().getMethod(inv.getMethodName(), inv.getParameterTypes());
-        String serviceKey = url.getServiceKey();
-        // If it is InstanceAddressURL, the serviceKey may not be obtained.
-        if (null == serviceKey) {
-            serviceKey = inv.getTargetServiceUniqueName();
-        }
+        MethodDescriptor methodDescriptor = getTriMethodDescriptor(consumerModel,inv);
+
         ClassLoadUtil.switchContextLoader(consumerModel.getServiceInterfaceClass().getClassLoader());
         AbstractClientStream stream;
         if (methodDescriptor.isUnary()) {
@@ -92,12 +91,12 @@ public class TripleClientHandler extends ChannelDuplexHandler {
             ctx.channel().attr(TripleConstant.SSL_ATTRIBUTE_KEY).set(Boolean.parseBoolean(ssl));
         }
         stream.service(consumerModel)
-                .connection(Connection.getConnectionFromChannel(ctx.channel()))
-                .method(methodDescriptor)
-                .methodName(methodDescriptor.getMethodName())
-                .request(req)
-                .serialize((String) inv.getObjectAttachment(Constants.SERIALIZATION_KEY))
-                .subscribe(new ClientTransportObserver(ctx, stream, promise));
+            .connection(Connection.getConnectionFromChannel(ctx.channel()))
+            .method(methodDescriptor)
+            .methodName(methodDescriptor.getMethodName())
+            .request(req)
+            .serialize((String) inv.getObjectAttachment(Constants.SERIALIZATION_KEY))
+            .subscribe(new ClientTransportObserver(ctx, stream, promise));
 
         if (methodDescriptor.isUnary()) {
             stream.asStreamObserver().onNext(inv);
@@ -105,8 +104,9 @@ public class TripleClientHandler extends ChannelDuplexHandler {
         } else {
             Response response = new Response(req.getId(), req.getVersion());
             AppResponse result;
+            // the stream method params is fixed
             if (methodDescriptor.getRpcType() == MethodDescriptor.RpcType.BIDIRECTIONAL_STREAM
-                    || methodDescriptor.getRpcType() == MethodDescriptor.RpcType.CLIENT_STREAM) {
+                || methodDescriptor.getRpcType() == MethodDescriptor.RpcType.CLIENT_STREAM) {
                 final StreamObserver<Object> streamObserver = (StreamObserver<Object>) inv.getArguments()[0];
                 stream.subscribe(streamObserver);
                 result = new AppResponse(stream.asStreamObserver());
@@ -121,4 +121,20 @@ public class TripleClientHandler extends ChannelDuplexHandler {
             DefaultFuture2.received(stream.getConnection(), response);
         }
     }
+
+    /**
+     * Get the trI protocol special MethodDescriptor
+     */
+    private MethodDescriptor getTriMethodDescriptor(ConsumerModel consumerModel, RpcInvocation inv) {
+        List<MethodDescriptor> methodDescriptors = consumerModel.getServiceModel().getMethods(inv.getMethodName());
+        if (CollectionUtils.isEmpty(methodDescriptors)) {
+            throw new IllegalStateException("methodDescriptors must not be null method=" + inv.getMethodName());
+        }
+        for (MethodDescriptor methodDescriptor : methodDescriptors) {
+            if (Arrays.equals(inv.getParameterTypes(), methodDescriptor.getRealParameterClasses())) {
+                return methodDescriptor;
+            }
+        }
+        throw new IllegalStateException("methodDescriptors must not be null method=" + inv.getMethodName());
+    }
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
index 21ccbe1..7b9365e 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.constants.CommonConstants;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.RpcException;
 import org.apache.dubbo.rpc.model.FrameworkModel;
@@ -174,17 +175,20 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
         MethodDescriptor methodDescriptor = null;
         List<MethodDescriptor> methodDescriptors = null;
 
-        if (CommonConstants.$INVOKE.equals(methodName) || CommonConstants.$INVOKE_ASYNC.equals(methodName)) {
+        if (isGeneric(methodName)) {
+            // There should be one and only one
             methodDescriptor = ServiceDescriptorInternalCache.genericService().getMethods(methodName).get(0);
-        } else if (CommonConstants.$ECHO.equals(methodName)) {
+        } else if (isEcho(methodName)) {
+            // There should be one and only one
             methodDescriptor = ServiceDescriptorInternalCache.echoService().getMethods(methodName).get(0);
         } else {
             methodDescriptors = providerModel.getServiceModel().getMethods(methodName);
-            if (methodDescriptors == null || methodDescriptors.isEmpty()) {
+            if (CollectionUtils.isEmpty(methodDescriptors)) {
                 responseErr(ctx, GrpcStatus.fromCode(Code.UNIMPLEMENTED)
                     .withDescription("Method :" + methodName + " not found of service:" + serviceName));
                 return;
             }
+            // In most cases there is only one method
             if (methodDescriptors.size() == 1) {
                 methodDescriptor = methodDescriptors.get(0);
             }
@@ -202,6 +206,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
         if (methodDescriptor != null) {
             stream.method(methodDescriptor);
         } else {
+            // Then you need to find the corresponding parameter according to the request body
             stream.methods(methodDescriptors);
         }
         final TransportObserver observer = stream.asTransportObserver();
@@ -212,4 +217,14 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
 
         ctx.channel().attr(TripleUtil.SERVER_STREAM_KEY).set(stream);
     }
+
+
+    private boolean isEcho(String methodName) {
+        return CommonConstants.$ECHO.equals(methodName);
+    }
+
+    private boolean isGeneric(String methodName) {
+        return CommonConstants.$INVOKE.equals(methodName) || CommonConstants.$INVOKE_ASYNC.equals(methodName);
+    }
+
 }