You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/09/02 06:08:39 UTC

[dubbo] branch 3.0 updated: [3.0] Add gen MethodDescriptor valid, avoid strange situations (#8651)

This is an automated email from the ASF dual-hosted git repository.

guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new b8de0a7  [3.0] Add  gen MethodDescriptor valid, avoid strange situations (#8651)
b8de0a7 is described below

commit b8de0a7fd505a438b35e0db2a3e660aed33784dd
Author: earthchen <yo...@duobei.com>
AuthorDate: Thu Sep 2 14:08:23 2021 +0800

    [3.0] Add  gen MethodDescriptor valid, avoid strange situations (#8651)
    
    * Add  gen MethodDescriptor valid, avoid strange situations
    
    * fix ut error in grpc
    
    * fix ut error in tri
    
    * Add some comments
    
    * fix ut error in rx and reactor
    
    Co-authored-by: guohao <gu...@gmail.com>
---
 dubbo-common/pom.xml                               |  14 ++
 .../apache/dubbo/rpc/model/MethodDescriptor.java   | 166 +++++++++++++++++++--
 .../apache/dubbo/descriptor/DescriptorService.java |  65 ++++++++
 .../dubbo/descriptor/MethodDescriptorTest.java     | 157 ++++++++++++++++++-
 .../dubbo/rpc/protocol/grpc/GrpcProtocolTest.java  |  22 +++
 .../protocol/grpc/support/DubboGreeterGrpc.java    |   3 +-
 .../dubbo/rpc/protocol/tri/TripleProtocolTest.java |  21 ++-
 .../dubbo/rpc/protocol/tri/support/IGreeter.java   |  11 ++
 .../rpc/protocol/tri/support/IGreeterImpl.java     |   9 ++
 9 files changed, 450 insertions(+), 18 deletions(-)

diff --git a/dubbo-common/pom.xml b/dubbo-common/pom.xml
index 0e8be69..1e2d18a 100644
--- a/dubbo-common/pom.xml
+++ b/dubbo-common/pom.xml
@@ -81,6 +81,20 @@
             <artifactId>protobuf-java</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-core</artifactId>
+            <version>3.4.9</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.reactivex.rxjava2</groupId>
+            <artifactId>rxjava</artifactId>
+            <version>2.2.21</version>
+            <scope>test</scope>
+        </dependency>
+
+
     </dependencies>
 
 </project>
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java
index 8ce071a..ccb75b9 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java
@@ -25,6 +25,7 @@ import org.apache.dubbo.common.utils.ReflectUtils;
 import java.lang.reflect.Method;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
+import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.stream.Stream;
@@ -38,6 +39,13 @@ import static org.apache.dubbo.common.constants.CommonConstants.PROTOBUF_MESSAGE
  *
  */
 public class MethodDescriptor {
+
+    private static final String GRPC_ASYNC_RETURN_CLASS = "com.google.common.util.concurrent.ListenableFuture";
+    private static final String TRI_ASYNC_RETURN_CLASS = "java.util.concurrent.CompletableFuture";
+    private static final String REACTOR_RETURN_CLASS = "reactor.core.publisher.Mono";
+    private static final String RX_RETURN_CLASS = "io.reactivex.Single";
+    private static final String GRPC_STREAM_CLASS = "io.grpc.stub.StreamObserver";
+
     private static final Logger logger = LoggerFactory.getLogger(MethodDescriptor.class);
     private final Method method;
     //    private final boolean isCallBack;
@@ -58,18 +66,22 @@ public class MethodDescriptor {
         this.method = method;
         this.methodName = method.getName();
         Class<?>[] parameterTypes = method.getParameterTypes();
+        // bidirectional-stream: StreamObserver<Request> foo(StreamObserver<Response>)
         if (parameterTypes.length == 1 && isStreamType(parameterTypes[0])) {
             this.parameterClasses = new Class<?>[]{
                     (Class<?>) ((ParameterizedType) method.getGenericReturnType()).getActualTypeArguments()[0]};
             this.returnClass = (Class<?>) ((ParameterizedType) method.getGenericParameterTypes()[0])
                     .getActualTypeArguments()[0];
             this.rpcType = RpcType.BIDIRECTIONAL_STREAM;
+            // server-stream: void foo(Request, StreamObserver<Response>)
         } else if (parameterTypes.length == 2 && method.getReturnType().equals(Void.TYPE)
                 && !isStreamType(parameterTypes[0]) && isStreamType(parameterTypes[1])) {
             this.parameterClasses = method.getParameterTypes();
-            this.returnClass = (Class<?>) ((ParameterizedType)method.getGenericParameterTypes()[1]).getActualTypeArguments()[0];
+            this.returnClass =
+                    (Class<?>) ((ParameterizedType) method.getGenericParameterTypes()[1]).getActualTypeArguments()[0];
             this.rpcType = RpcType.SERVER_STREAM;
         } else {
+            // unary: Response foo(Request)
             this.parameterClasses = method.getParameterTypes();
             this.returnClass = method.getReturnType();
             this.rpcType = RpcType.UNARY;
@@ -92,7 +104,7 @@ public class MethodDescriptor {
     }
 
     private static boolean isStreamType(Class<?> clz) {
-        return StreamObserver.class.isAssignableFrom(clz);
+        return StreamObserver.class.isAssignableFrom(clz) || GRPC_STREAM_CLASS.equalsIgnoreCase(clz.getName());
     }
 
     public boolean isStream() {
@@ -111,30 +123,156 @@ public class MethodDescriptor {
         return rpcType;
     }
 
+    /**
+     * Determine if the request and response instance should be wrapped in Protobuf wrapper object
+     *
+     * @return true if the request and response object is not generated by protobuf
+     */
     private boolean needWrap() {
+        // generic call must be wrapped
         if (CommonConstants.$INVOKE.equals(methodName) || CommonConstants.$INVOKE_ASYNC.equals(methodName)) {
             return true;
-        } else if ($ECHO.equals(methodName)) {
+        }
+        // echo must be wrapped
+        if ($ECHO.equals(methodName)) {
             return true;
+        }
+        boolean returnClassProtobuf = isProtobufClass(returnClass);
+        // Response foo()
+        if (parameterClasses.length == 0) {
+            return !returnClassProtobuf;
+        }
+        int protobufParameterCount = 0;
+        int javaParameterCount = 0;
+        int streamParameterCount = 0;
+        boolean secondParameterStream = false;
+        // count normal and protobuf param
+        for (int i = 0; i < parameterClasses.length; i++) {
+            Class<?> parameterClass = parameterClasses[i];
+            if (isProtobufClass(parameterClass)) {
+                protobufParameterCount++;
+            } else {
+                if (isStreamType(parameterClass)) {
+                    if (i == 1) {
+                        secondParameterStream = true;
+                    }
+                    streamParameterCount++;
+                } else {
+                    javaParameterCount++;
+                }
+            }
+        }
+        // more than one stream param
+        if (streamParameterCount > 1) {
+            throw new IllegalStateException("method params error: more than one Stream params. method=" + methodName);
+        }
+        // protobuf only support one param
+        if (protobufParameterCount >= 2) {
+            throw new IllegalStateException("method params error: more than one protobuf params. method=" + methodName);
+        }
+        // server stream support one normal param and one stream param
+        if (streamParameterCount == 1) {
+            if (javaParameterCount + protobufParameterCount > 1) {
+                throw new IllegalStateException("method params error: server stream does not support more than one normal param." +
+                        " method=" + methodName);
+            }
+            // server stream: void foo(Request, StreamObserver<Response>)
+            if (!secondParameterStream) {
+                throw new IllegalStateException("method params error: server stream's second param must be StreamObserver." +
+                        " method=" + methodName);
+            }
+        }
+        if (isStream()) {
+            if (RpcType.SERVER_STREAM == rpcType) {
+                if (!secondParameterStream) {
+                    throw new IllegalStateException("method params error:server stream's second param must be StreamObserver." +
+                            " method=" + methodName);
+                }
+            }
+            // param type must be consistent
+            if (returnClassProtobuf) {
+                if (javaParameterCount > 0) {
+                    throw new IllegalStateException("method params error: both normal and protobuf param found. method=" + methodName);
+                }
+            } else {
+                if (protobufParameterCount > 0) {
+                    throw new IllegalStateException("method params error method=" + methodName);
+                }
+            }
         } else {
-            if ((rpcType != RpcType.SERVER_STREAM && parameterClasses.length != 1) || parameterClasses[0] == null) {
+            if (streamParameterCount > 0) {
+                throw new IllegalStateException("method params error: unary method should not contain any StreamObserver." +
+                        " method=" + methodName);
+            }
+            if (protobufParameterCount > 0 && returnClassProtobuf) {
+                return false;
+            }
+            // handler reactor or rxjava only consider gen by proto
+            if (isMono(returnClass) || isRx(returnClass)) {
+                return false;
+            }
+            if (protobufParameterCount <= 0 && !returnClassProtobuf) {
                 return true;
             }
+            // handle grpc stub only consider gen by proto
+            if (GRPC_ASYNC_RETURN_CLASS.equalsIgnoreCase(returnClass.getName()) && protobufParameterCount == 1) {
+                return false;
+            }
+            // handle dubbo generated method
+            if (TRI_ASYNC_RETURN_CLASS.equalsIgnoreCase(returnClass.getName())) {
+                Class<?> actualReturnClass =
+                        (Class<?>) ((ParameterizedType) method.getGenericReturnType()).getActualTypeArguments()[0];
+                boolean actualReturnClassProtobuf = isProtobufClass(actualReturnClass);
+                if (actualReturnClassProtobuf && protobufParameterCount == 1) {
+                    return false;
+                }
+                if (!actualReturnClassProtobuf && protobufParameterCount == 0) {
+                    return true;
+                }
+            }
+            // todo remove this in future
+            boolean ignore = checkNeedIgnore();
+            if (ignore) {
+                return protobufParameterCount != 1;
+            }
+            throw new IllegalStateException("method params error method=" + methodName);
+        }
+        // java param should be wrapped
+        return javaParameterCount > 0;
+    }
 
-            Class<?> clazz = parameterClasses[0];
-            while (clazz != Object.class && clazz != null) {
-                Class<?>[] interfaces = clazz.getInterfaces();
-                if (interfaces.length > 0) {
-                    for (Class<?> clazzInterface : interfaces) {
-                        if (PROTOBUF_MESSAGE_CLASS_NAME.equalsIgnoreCase(clazzInterface.getName())) {
-                            return false;
-                        }
+    /**
+     * fixme will produce error on grpc. but is harmless so ignore now
+     */
+    private boolean checkNeedIgnore() {
+        if (Iterator.class.isAssignableFrom(returnClass)) {
+            return true;
+        }
+        return false;
+    }
+
+    private boolean isMono(Class<?> clz) {
+        return REACTOR_RETURN_CLASS.equalsIgnoreCase(clz.getName());
+    }
+
+    private boolean isRx(Class<?> clz) {
+        return RX_RETURN_CLASS.equalsIgnoreCase(clz.getName());
+    }
+
+
+    public boolean isProtobufClass(Class<?> clazz) {
+        while (clazz != Object.class && clazz != null) {
+            Class<?>[] interfaces = clazz.getInterfaces();
+            if (interfaces.length > 0) {
+                for (Class<?> clazzInterface : interfaces) {
+                    if (PROTOBUF_MESSAGE_CLASS_NAME.equalsIgnoreCase(clazzInterface.getName())) {
+                        return true;
                     }
                 }
-                clazz = clazz.getSuperclass();
             }
-            return true;
+            clazz = clazz.getSuperclass();
         }
+        return false;
     }
 
     public boolean matchParams(String params) {
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/descriptor/DescriptorService.java b/dubbo-common/src/test/java/org/apache/dubbo/descriptor/DescriptorService.java
index 98d2dc7..a0117ce 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/descriptor/DescriptorService.java
+++ b/dubbo-common/src/test/java/org/apache/dubbo/descriptor/DescriptorService.java
@@ -24,6 +24,37 @@ public interface DescriptorService {
     void noParameterMethod();
 
     /**
+     * unray return protobuf class
+     *
+     * @return protobuf class
+     */
+    HelloReply noParameterAndReturnProtobufMethod();
+
+    /**
+     * unray return java class
+     *
+     * @return
+     */
+    String noParameterAndReturnJavaClassMethod();
+
+
+    /**
+     * bi stream need wrapper
+     *
+     * @param streamObserver
+     * @return
+     */
+    StreamObserver<String> wrapBidirectionalStream(StreamObserver<String> streamObserver);
+
+    /**
+     * no need wrapper bi stream
+     *
+     * @param streamObserver
+     * @return
+     */
+    StreamObserver<HelloReply> bidirectionalStream(StreamObserver<HelloReply> streamObserver);
+
+    /**
      * only for test.
      *
      * @param reply
@@ -34,4 +65,38 @@ public interface DescriptorService {
     void sayHelloServerStream(HelloReply request, StreamObserver<HelloReply> reply);
 
     void sayHelloServerStream2(Object request, StreamObserver<Object> reply);
+
+    /***********************grpc******************************/
+
+    java.util.Iterator<HelloReply> iteratorServerStream(HelloReply request);
+
+    reactor.core.publisher.Mono<HelloReply> reactorMethod(HelloReply reactorRequest);
+
+    reactor.core.publisher.Mono<HelloReply> reactorMethod2(reactor.core.publisher.Mono<HelloReply> reactorRequest);
+
+   io.reactivex.Single<HelloReply> rxJavaMethod(io.reactivex.Single<HelloReply> replySingle);
+
+    /**********************test error*****************/
+    void testMultiProtobufParameters(HelloReply reply1, HelloReply reply2);
+
+    String testDiffParametersAndReturn(HelloReply reply1);
+
+    HelloReply testDiffParametersAndReturn2(String reply1);
+
+    void testErrorServerStream(StreamObserver<HelloReply> reply, HelloReply request);
+
+    void testErrorServerStream2(HelloReply request, HelloReply request2, StreamObserver<HelloReply> reply);
+
+    void testErrorServerStream3(String aa, StreamObserver<HelloReply> reply);
+
+    void testErrorServerStream4(String aa, String bb, StreamObserver<String> reply);
+
+    StreamObserver<HelloReply> testErrorBiStream(HelloReply reply, StreamObserver<HelloReply> observer);
+
+    StreamObserver<HelloReply> testErrorBiStream2(String reply, StreamObserver<HelloReply> observer);
+
+    StreamObserver<String> testErrorBiStream3(StreamObserver<HelloReply> observer);
+    StreamObserver<String> testErrorBiStream4(StreamObserver<HelloReply> observer,String str);
+
+
 }
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/descriptor/MethodDescriptorTest.java b/dubbo-common/src/test/java/org/apache/dubbo/descriptor/MethodDescriptorTest.java
index ef42c79..0ec485d 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/descriptor/MethodDescriptorTest.java
+++ b/dubbo-common/src/test/java/org/apache/dubbo/descriptor/MethodDescriptorTest.java
@@ -16,16 +16,21 @@
  */
 package org.apache.dubbo.descriptor;
 
+import io.reactivex.Single;
 import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.proto.HelloReply;
 import org.apache.dubbo.rpc.model.MethodDescriptor;
-
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
 
 import java.lang.reflect.Method;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class MethodDescriptorTest {
     @Test
@@ -37,6 +42,46 @@ public class MethodDescriptorTest {
     }
 
     @Test
+    public void testMethodWithNoParametersAndReturnProtobuf() throws Exception {
+        Method method = DescriptorService.class.getMethod("noParameterAndReturnProtobufMethod");
+        MethodDescriptor descriptor = new MethodDescriptor(method);
+        assertEquals("", descriptor.getParamDesc());
+        Assertions.assertEquals(0, descriptor.getParameterClasses().length);
+        assertTrue(descriptor.isUnary());
+        assertFalse(descriptor.isNeedWrap());
+    }
+
+    @Test
+    public void testMethodWithNoParametersAndReturnJava() throws Exception {
+        Method method = DescriptorService.class.getMethod("noParameterAndReturnJavaClassMethod");
+        MethodDescriptor descriptor = new MethodDescriptor(method);
+        assertEquals("", descriptor.getParamDesc());
+        Assertions.assertEquals(0, descriptor.getParameterClasses().length);
+        assertTrue(descriptor.isUnary());
+        assertTrue(descriptor.isNeedWrap());
+    }
+
+    @Test
+    public void testWrapperBiStream() throws Exception {
+        Method method = DescriptorService.class.getMethod("wrapBidirectionalStream", StreamObserver.class);
+        MethodDescriptor descriptor = new MethodDescriptor(method);
+        Assertions.assertEquals(1, descriptor.getParameterClasses().length);
+        assertTrue(descriptor.isStream());
+        assertSame(descriptor.getRpcType(), MethodDescriptor.RpcType.BIDIRECTIONAL_STREAM);
+        assertTrue(descriptor.isNeedWrap());
+    }
+
+    @Test
+    public void testBiStream() throws Exception {
+        Method method = DescriptorService.class.getMethod("bidirectionalStream", StreamObserver.class);
+        MethodDescriptor descriptor = new MethodDescriptor(method);
+        Assertions.assertEquals(1, descriptor.getParameterClasses().length);
+        assertTrue(descriptor.isStream());
+        assertSame(descriptor.getRpcType(), MethodDescriptor.RpcType.BIDIRECTIONAL_STREAM);
+        assertFalse(descriptor.isNeedWrap());
+    }
+
+    @Test
     public void testIsStream() throws NoSuchMethodException {
         Method method = DescriptorService.class.getMethod("noParameterMethod");
         MethodDescriptor descriptor = new MethodDescriptor(method);
@@ -60,7 +105,8 @@ public class MethodDescriptorTest {
 
     @Test
     public void testIsServerStream() throws NoSuchMethodException {
-        Method method = DescriptorService.class.getMethod("sayHelloServerStream", HelloReply.class, StreamObserver.class);
+        Method method = DescriptorService.class.getMethod("sayHelloServerStream", HelloReply.class,
+            StreamObserver.class);
         MethodDescriptor descriptor = new MethodDescriptor(method);
         Assertions.assertFalse(descriptor.isUnary());
         Assertions.assertFalse(descriptor.isNeedWrap());
@@ -81,4 +127,111 @@ public class MethodDescriptorTest {
         descriptor = new MethodDescriptor(method);
         Assertions.assertFalse(descriptor.isNeedWrap());
     }
+
+    @Test
+    public void testIgnoreMethod() throws NoSuchMethodException {
+        Method method = DescriptorService.class.getMethod("iteratorServerStream", HelloReply.class);
+        MethodDescriptor descriptor = new MethodDescriptor(method);
+        Assertions.assertFalse(descriptor.isNeedWrap());
+
+        Method method2 = DescriptorService.class.getMethod("reactorMethod", HelloReply.class);
+        MethodDescriptor descriptor2 = new MethodDescriptor(method2);
+        Assertions.assertFalse(descriptor2.isNeedWrap());
+
+        Method method3 = DescriptorService.class.getMethod("reactorMethod2", Mono.class);
+        MethodDescriptor  descriptor3 = new MethodDescriptor(method3);
+        Assertions.assertFalse(descriptor3.isNeedWrap());
+
+
+        Method method4 = DescriptorService.class.getMethod("rxJavaMethod", Single.class);
+        MethodDescriptor  descriptor4 = new MethodDescriptor(method4);
+        Assertions.assertFalse(descriptor4.isNeedWrap());
+    }
+
+
+    @Test
+    public void testMultiProtoParameter() throws Exception {
+        Method method = DescriptorService.class.getMethod("testMultiProtobufParameters", HelloReply.class,
+            HelloReply.class);
+        assertThrows(IllegalStateException.class,
+            () -> {
+                MethodDescriptor descriptor = new MethodDescriptor(method);
+            });
+    }
+
+    @Test
+    public void testDiffParametersAndReturn() throws Exception {
+        Method method = DescriptorService.class.getMethod("testDiffParametersAndReturn", HelloReply.class);
+        assertThrows(IllegalStateException.class,
+            () -> {
+                MethodDescriptor descriptor = new MethodDescriptor(method);
+            });
+
+        Method method2 = DescriptorService.class.getMethod("testDiffParametersAndReturn2", String.class);
+        assertThrows(IllegalStateException.class,
+            () -> {
+                MethodDescriptor descriptor = new MethodDescriptor(method2);
+            });
+    }
+
+    @Test
+    public void testErrorServerStream() throws Exception {
+        Method method = DescriptorService.class.getMethod("testErrorServerStream", StreamObserver.class,
+            HelloReply.class);
+        assertThrows(IllegalStateException.class,
+            () -> {
+                MethodDescriptor descriptor = new MethodDescriptor(method);
+            });
+
+        Method method2 = DescriptorService.class.getMethod("testErrorServerStream2", HelloReply.class, HelloReply
+            .class, StreamObserver.class);
+        assertThrows(IllegalStateException.class,
+            () -> {
+                MethodDescriptor descriptor = new MethodDescriptor(method2);
+            });
+
+        Method method3 = DescriptorService.class.getMethod("testErrorServerStream3", String.class,
+            StreamObserver.class);
+        assertThrows(IllegalStateException.class,
+            () -> {
+                MethodDescriptor descriptor = new MethodDescriptor(method3);
+            });
+
+        Method method4 = DescriptorService.class.getMethod("testErrorServerStream4", String.class, String.class,
+            StreamObserver.class);
+        assertThrows(IllegalStateException.class,
+            () -> {
+                MethodDescriptor descriptor = new MethodDescriptor(method4);
+            });
+    }
+
+    @Test
+    public void testErrorBiStream() throws Exception {
+        Method method = DescriptorService.class.getMethod("testErrorBiStream", HelloReply.class, StreamObserver
+            .class);
+        assertThrows(IllegalStateException.class,
+            () -> {
+                MethodDescriptor descriptor = new MethodDescriptor(method);
+            });
+
+        Method method2 = DescriptorService.class.getMethod("testErrorBiStream2", String.class, StreamObserver.class);
+        assertThrows(IllegalStateException.class,
+            () -> {
+                MethodDescriptor descriptor = new MethodDescriptor(method2);
+            });
+
+        Method method3 = DescriptorService.class.getMethod("testErrorBiStream3", StreamObserver.class);
+        assertThrows(IllegalStateException.class,
+            () -> {
+                MethodDescriptor descriptor = new MethodDescriptor(method3);
+            });
+
+        Method method4 = DescriptorService.class.getMethod("testErrorBiStream4", StreamObserver.class, String.class);
+        assertThrows(IllegalStateException.class,
+            () -> {
+                MethodDescriptor descriptor = new MethodDescriptor(method4);
+            });
+    }
+
+
 }
diff --git a/dubbo-rpc/dubbo-rpc-grpc/src/test/java/org/apache/dubbo/rpc/protocol/grpc/GrpcProtocolTest.java b/dubbo-rpc/dubbo-rpc-grpc/src/test/java/org/apache/dubbo/rpc/protocol/grpc/GrpcProtocolTest.java
index ca4d6b1..d5118d9 100644
--- a/dubbo-rpc/dubbo-rpc-grpc/src/test/java/org/apache/dubbo/rpc/protocol/grpc/GrpcProtocolTest.java
+++ b/dubbo-rpc/dubbo-rpc-grpc/src/test/java/org/apache/dubbo/rpc/protocol/grpc/GrpcProtocolTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.dubbo.rpc.protocol.grpc;
 
+import com.google.common.util.concurrent.ListenableFuture;
+import io.grpc.stub.StreamObserver;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.extension.ExtensionLoader;
 import org.apache.dubbo.common.utils.NetUtils;
@@ -80,6 +82,26 @@ public class GrpcProtocolTest {
         HelloReply hello = serviceImpl.sayHello(HelloRequest.newBuilder().setName("World").build());
         Assertions.assertEquals("Hello World", hello.getMessage());
 
+        ListenableFuture<HelloReply> future = serviceImpl.sayHelloAsync(HelloRequest.newBuilder().setName("World").build());
+        Assertions.assertEquals("Hello World", future.get().getMessage());
+
+        serviceImpl.sayHello(HelloRequest.newBuilder().setName("World").build(), new StreamObserver<HelloReply>() {
+
+            @Override
+            public void onNext(HelloReply helloReply) {
+                Assertions.assertEquals("Hello World", helloReply.getMessage());
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                throwable.printStackTrace();
+            }
+
+            @Override
+            public void onCompleted() {
+                System.out.println("onCompleted");
+            }
+        });
         // resource recycle.
         ApplicationModel.getServiceRepository().destroy();
     }
diff --git a/dubbo-rpc/dubbo-rpc-grpc/src/test/java/org/apache/dubbo/rpc/protocol/grpc/support/DubboGreeterGrpc.java b/dubbo-rpc/dubbo-rpc-grpc/src/test/java/org/apache/dubbo/rpc/protocol/grpc/support/DubboGreeterGrpc.java
index ac37cc3..6f22e7b 100644
--- a/dubbo-rpc/dubbo-rpc-grpc/src/test/java/org/apache/dubbo/rpc/protocol/grpc/support/DubboGreeterGrpc.java
+++ b/dubbo-rpc/dubbo-rpc-grpc/src/test/java/org/apache/dubbo/rpc/protocol/grpc/support/DubboGreeterGrpc.java
@@ -44,7 +44,8 @@ public final class DubboGreeterGrpc {
         protected GreeterGrpc.GreeterFutureStub futureStub;
         protected GreeterGrpc.GreeterStub stub;
 
-        public DubboGreeterStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions, URL url, ReferenceConfigBase<?> referenceConfig) {
+        public DubboGreeterStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions, URL url,
+                                ReferenceConfigBase<?> referenceConfig) {
             this.url = url;
             this.referenceConfig = referenceConfig;
 
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocolTest.java b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocolTest.java
index 23d6d89..0db41e3 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocolTest.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocolTest.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.rpc.protocol.tri;
 
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.common.utils.NetUtils;
 import org.apache.dubbo.rpc.Protocol;
 import org.apache.dubbo.rpc.ProxyFactory;
@@ -54,8 +55,26 @@ public class TripleProtocolTest {
 
         protocol.export(proxy.getInvoker(serviceImpl, IGreeter.class, url));
         serviceImpl = proxy.getProxy(protocol.refer(IGreeter.class, url));
-         Thread.sleep(1000);
+        Thread.sleep(1000);
         Assertions.assertEquals("hello world", serviceImpl.echo("hello world"));
+        // fixme will throw exception
+        // Assertions.assertEquals("hello world", serviceImpl.echoAsync("hello world").get());
+        serviceImpl.serverStream("hello world", new StreamObserver<String>() {
+            @Override
+            public void onNext(String data) {
+                Assertions.assertEquals("hello world",data);
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                throwable.printStackTrace();
+            }
+
+            @Override
+            public void onCompleted() {
+                System.out.println("onCompleted");
+            }
+        });
 
         // resource recycle.
         ApplicationModel.getServiceRepository().destroy();
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/support/IGreeter.java b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/support/IGreeter.java
index 786dda0..55a5a7c 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/support/IGreeter.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/support/IGreeter.java
@@ -17,10 +17,21 @@
 
 package org.apache.dubbo.rpc.protocol.tri.support;
 
+import org.apache.dubbo.common.stream.StreamObserver;
+
+import java.util.concurrent.CompletableFuture;
+
 public interface IGreeter {
     /**
      * Use request to respond
      */
     String echo(String request);
 
+    default CompletableFuture<String> echoAsync(String request) {
+        return CompletableFuture.supplyAsync(() -> echo(request));
+    }
+
+    void serverStream(String str, StreamObserver<String> observer);
+
+
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/support/IGreeterImpl.java b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/support/IGreeterImpl.java
index aa4c114..daeeb0b 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/support/IGreeterImpl.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/support/IGreeterImpl.java
@@ -17,10 +17,19 @@
 
 package org.apache.dubbo.rpc.protocol.tri.support;
 
+import org.apache.dubbo.common.stream.StreamObserver;
+
 public class IGreeterImpl implements IGreeter {
 
     @Override
     public String echo(String request) {
         return request;
     }
+
+    @Override
+    public void serverStream(String str, StreamObserver<String> observer) {
+        System.out.println("srt="+str);
+        observer.onNext(str);
+        observer.onCompleted();
+    }
 }