You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/09/02 06:08:39 UTC
[dubbo] branch 3.0 updated: [3.0] Add gen MethodDescriptor valid,
avoid strange situations (#8651)
This is an automated email from the ASF dual-hosted git repository.
guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new b8de0a7 [3.0] Add gen MethodDescriptor valid, avoid strange situations (#8651)
b8de0a7 is described below
commit b8de0a7fd505a438b35e0db2a3e660aed33784dd
Author: earthchen <yo...@duobei.com>
AuthorDate: Thu Sep 2 14:08:23 2021 +0800
[3.0] Add gen MethodDescriptor valid, avoid strange situations (#8651)
* Add gen MethodDescriptor valid, avoid strange situations
* fix ut error in grpc
* fix ut error in tri
* Add some comments
* fix ut error in rx and reactor
Co-authored-by: guohao <gu...@gmail.com>
---
dubbo-common/pom.xml | 14 ++
.../apache/dubbo/rpc/model/MethodDescriptor.java | 166 +++++++++++++++++++--
.../apache/dubbo/descriptor/DescriptorService.java | 65 ++++++++
.../dubbo/descriptor/MethodDescriptorTest.java | 157 ++++++++++++++++++-
.../dubbo/rpc/protocol/grpc/GrpcProtocolTest.java | 22 +++
.../protocol/grpc/support/DubboGreeterGrpc.java | 3 +-
.../dubbo/rpc/protocol/tri/TripleProtocolTest.java | 21 ++-
.../dubbo/rpc/protocol/tri/support/IGreeter.java | 11 ++
.../rpc/protocol/tri/support/IGreeterImpl.java | 9 ++
9 files changed, 450 insertions(+), 18 deletions(-)
diff --git a/dubbo-common/pom.xml b/dubbo-common/pom.xml
index 0e8be69..1e2d18a 100644
--- a/dubbo-common/pom.xml
+++ b/dubbo-common/pom.xml
@@ -81,6 +81,20 @@
<artifactId>protobuf-java</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ <version>3.4.9</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.reactivex.rxjava2</groupId>
+ <artifactId>rxjava</artifactId>
+ <version>2.2.21</version>
+ <scope>test</scope>
+ </dependency>
+
+
</dependencies>
</project>
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java
index 8ce071a..ccb75b9 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java
@@ -25,6 +25,7 @@ import org.apache.dubbo.common.utils.ReflectUtils;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
+import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Stream;
@@ -38,6 +39,13 @@ import static org.apache.dubbo.common.constants.CommonConstants.PROTOBUF_MESSAGE
*
*/
public class MethodDescriptor {
+
+ private static final String GRPC_ASYNC_RETURN_CLASS = "com.google.common.util.concurrent.ListenableFuture";
+ private static final String TRI_ASYNC_RETURN_CLASS = "java.util.concurrent.CompletableFuture";
+ private static final String REACTOR_RETURN_CLASS = "reactor.core.publisher.Mono";
+ private static final String RX_RETURN_CLASS = "io.reactivex.Single";
+ private static final String GRPC_STREAM_CLASS = "io.grpc.stub.StreamObserver";
+
private static final Logger logger = LoggerFactory.getLogger(MethodDescriptor.class);
private final Method method;
// private final boolean isCallBack;
@@ -58,18 +66,22 @@ public class MethodDescriptor {
this.method = method;
this.methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
+ // bidirectional-stream: StreamObserver<Request> foo(StreamObserver<Response>)
if (parameterTypes.length == 1 && isStreamType(parameterTypes[0])) {
this.parameterClasses = new Class<?>[]{
(Class<?>) ((ParameterizedType) method.getGenericReturnType()).getActualTypeArguments()[0]};
this.returnClass = (Class<?>) ((ParameterizedType) method.getGenericParameterTypes()[0])
.getActualTypeArguments()[0];
this.rpcType = RpcType.BIDIRECTIONAL_STREAM;
+ // server-stream: void foo(Request, StreamObserver<Response>)
} else if (parameterTypes.length == 2 && method.getReturnType().equals(Void.TYPE)
&& !isStreamType(parameterTypes[0]) && isStreamType(parameterTypes[1])) {
this.parameterClasses = method.getParameterTypes();
- this.returnClass = (Class<?>) ((ParameterizedType)method.getGenericParameterTypes()[1]).getActualTypeArguments()[0];
+ this.returnClass =
+ (Class<?>) ((ParameterizedType) method.getGenericParameterTypes()[1]).getActualTypeArguments()[0];
this.rpcType = RpcType.SERVER_STREAM;
} else {
+ // unary: Response foo(Request)
this.parameterClasses = method.getParameterTypes();
this.returnClass = method.getReturnType();
this.rpcType = RpcType.UNARY;
@@ -92,7 +104,7 @@ public class MethodDescriptor {
}
private static boolean isStreamType(Class<?> clz) {
- return StreamObserver.class.isAssignableFrom(clz);
+ return StreamObserver.class.isAssignableFrom(clz) || GRPC_STREAM_CLASS.equalsIgnoreCase(clz.getName());
}
public boolean isStream() {
@@ -111,30 +123,156 @@ public class MethodDescriptor {
return rpcType;
}
+ /**
+ * Determine if the request and response instance should be wrapped in Protobuf wrapper object
+ *
+ * @return true if the request and response object is not generated by protobuf
+ */
private boolean needWrap() {
+ // generic call must be wrapped
if (CommonConstants.$INVOKE.equals(methodName) || CommonConstants.$INVOKE_ASYNC.equals(methodName)) {
return true;
- } else if ($ECHO.equals(methodName)) {
+ }
+ // echo must be wrapped
+ if ($ECHO.equals(methodName)) {
return true;
+ }
+ boolean returnClassProtobuf = isProtobufClass(returnClass);
+ // Response foo()
+ if (parameterClasses.length == 0) {
+ return !returnClassProtobuf;
+ }
+ int protobufParameterCount = 0;
+ int javaParameterCount = 0;
+ int streamParameterCount = 0;
+ boolean secondParameterStream = false;
+ // count normal and protobuf param
+ for (int i = 0; i < parameterClasses.length; i++) {
+ Class<?> parameterClass = parameterClasses[i];
+ if (isProtobufClass(parameterClass)) {
+ protobufParameterCount++;
+ } else {
+ if (isStreamType(parameterClass)) {
+ if (i == 1) {
+ secondParameterStream = true;
+ }
+ streamParameterCount++;
+ } else {
+ javaParameterCount++;
+ }
+ }
+ }
+ // more than one stream param
+ if (streamParameterCount > 1) {
+ throw new IllegalStateException("method params error: more than one Stream params. method=" + methodName);
+ }
+ // protobuf only support one param
+ if (protobufParameterCount >= 2) {
+ throw new IllegalStateException("method params error: more than one protobuf params. method=" + methodName);
+ }
+ // server stream support one normal param and one stream param
+ if (streamParameterCount == 1) {
+ if (javaParameterCount + protobufParameterCount > 1) {
+ throw new IllegalStateException("method params error: server stream does not support more than one normal param." +
+ " method=" + methodName);
+ }
+ // server stream: void foo(Request, StreamObserver<Response>)
+ if (!secondParameterStream) {
+ throw new IllegalStateException("method params error: server stream's second param must be StreamObserver." +
+ " method=" + methodName);
+ }
+ }
+ if (isStream()) {
+ if (RpcType.SERVER_STREAM == rpcType) {
+ if (!secondParameterStream) {
+ throw new IllegalStateException("method params error:server stream's second param must be StreamObserver." +
+ " method=" + methodName);
+ }
+ }
+ // param type must be consistent
+ if (returnClassProtobuf) {
+ if (javaParameterCount > 0) {
+ throw new IllegalStateException("method params error: both normal and protobuf param found. method=" + methodName);
+ }
+ } else {
+ if (protobufParameterCount > 0) {
+ throw new IllegalStateException("method params error method=" + methodName);
+ }
+ }
} else {
- if ((rpcType != RpcType.SERVER_STREAM && parameterClasses.length != 1) || parameterClasses[0] == null) {
+ if (streamParameterCount > 0) {
+ throw new IllegalStateException("method params error: unary method should not contain any StreamObserver." +
+ " method=" + methodName);
+ }
+ if (protobufParameterCount > 0 && returnClassProtobuf) {
+ return false;
+ }
+ // handler reactor or rxjava only consider gen by proto
+ if (isMono(returnClass) || isRx(returnClass)) {
+ return false;
+ }
+ if (protobufParameterCount <= 0 && !returnClassProtobuf) {
return true;
}
+ // handle grpc stub only consider gen by proto
+ if (GRPC_ASYNC_RETURN_CLASS.equalsIgnoreCase(returnClass.getName()) && protobufParameterCount == 1) {
+ return false;
+ }
+ // handle dubbo generated method
+ if (TRI_ASYNC_RETURN_CLASS.equalsIgnoreCase(returnClass.getName())) {
+ Class<?> actualReturnClass =
+ (Class<?>) ((ParameterizedType) method.getGenericReturnType()).getActualTypeArguments()[0];
+ boolean actualReturnClassProtobuf = isProtobufClass(actualReturnClass);
+ if (actualReturnClassProtobuf && protobufParameterCount == 1) {
+ return false;
+ }
+ if (!actualReturnClassProtobuf && protobufParameterCount == 0) {
+ return true;
+ }
+ }
+ // todo remove this in future
+ boolean ignore = checkNeedIgnore();
+ if (ignore) {
+ return protobufParameterCount != 1;
+ }
+ throw new IllegalStateException("method params error method=" + methodName);
+ }
+ // java param should be wrapped
+ return javaParameterCount > 0;
+ }
- Class<?> clazz = parameterClasses[0];
- while (clazz != Object.class && clazz != null) {
- Class<?>[] interfaces = clazz.getInterfaces();
- if (interfaces.length > 0) {
- for (Class<?> clazzInterface : interfaces) {
- if (PROTOBUF_MESSAGE_CLASS_NAME.equalsIgnoreCase(clazzInterface.getName())) {
- return false;
- }
+ /**
+ * fixme will produce error on grpc. but is harmless so ignore now
+ */
+ private boolean checkNeedIgnore() {
+ if (Iterator.class.isAssignableFrom(returnClass)) {
+ return true;
+ }
+ return false;
+ }
+
+ private boolean isMono(Class<?> clz) {
+ return REACTOR_RETURN_CLASS.equalsIgnoreCase(clz.getName());
+ }
+
+ private boolean isRx(Class<?> clz) {
+ return RX_RETURN_CLASS.equalsIgnoreCase(clz.getName());
+ }
+
+
+ public boolean isProtobufClass(Class<?> clazz) {
+ while (clazz != Object.class && clazz != null) {
+ Class<?>[] interfaces = clazz.getInterfaces();
+ if (interfaces.length > 0) {
+ for (Class<?> clazzInterface : interfaces) {
+ if (PROTOBUF_MESSAGE_CLASS_NAME.equalsIgnoreCase(clazzInterface.getName())) {
+ return true;
}
}
- clazz = clazz.getSuperclass();
}
- return true;
+ clazz = clazz.getSuperclass();
}
+ return false;
}
public boolean matchParams(String params) {
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/descriptor/DescriptorService.java b/dubbo-common/src/test/java/org/apache/dubbo/descriptor/DescriptorService.java
index 98d2dc7..a0117ce 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/descriptor/DescriptorService.java
+++ b/dubbo-common/src/test/java/org/apache/dubbo/descriptor/DescriptorService.java
@@ -24,6 +24,37 @@ public interface DescriptorService {
void noParameterMethod();
/**
+ * unray return protobuf class
+ *
+ * @return protobuf class
+ */
+ HelloReply noParameterAndReturnProtobufMethod();
+
+ /**
+ * unray return java class
+ *
+ * @return
+ */
+ String noParameterAndReturnJavaClassMethod();
+
+
+ /**
+ * bi stream need wrapper
+ *
+ * @param streamObserver
+ * @return
+ */
+ StreamObserver<String> wrapBidirectionalStream(StreamObserver<String> streamObserver);
+
+ /**
+ * no need wrapper bi stream
+ *
+ * @param streamObserver
+ * @return
+ */
+ StreamObserver<HelloReply> bidirectionalStream(StreamObserver<HelloReply> streamObserver);
+
+ /**
* only for test.
*
* @param reply
@@ -34,4 +65,38 @@ public interface DescriptorService {
void sayHelloServerStream(HelloReply request, StreamObserver<HelloReply> reply);
void sayHelloServerStream2(Object request, StreamObserver<Object> reply);
+
+ /***********************grpc******************************/
+
+ java.util.Iterator<HelloReply> iteratorServerStream(HelloReply request);
+
+ reactor.core.publisher.Mono<HelloReply> reactorMethod(HelloReply reactorRequest);
+
+ reactor.core.publisher.Mono<HelloReply> reactorMethod2(reactor.core.publisher.Mono<HelloReply> reactorRequest);
+
+ io.reactivex.Single<HelloReply> rxJavaMethod(io.reactivex.Single<HelloReply> replySingle);
+
+ /**********************test error*****************/
+ void testMultiProtobufParameters(HelloReply reply1, HelloReply reply2);
+
+ String testDiffParametersAndReturn(HelloReply reply1);
+
+ HelloReply testDiffParametersAndReturn2(String reply1);
+
+ void testErrorServerStream(StreamObserver<HelloReply> reply, HelloReply request);
+
+ void testErrorServerStream2(HelloReply request, HelloReply request2, StreamObserver<HelloReply> reply);
+
+ void testErrorServerStream3(String aa, StreamObserver<HelloReply> reply);
+
+ void testErrorServerStream4(String aa, String bb, StreamObserver<String> reply);
+
+ StreamObserver<HelloReply> testErrorBiStream(HelloReply reply, StreamObserver<HelloReply> observer);
+
+ StreamObserver<HelloReply> testErrorBiStream2(String reply, StreamObserver<HelloReply> observer);
+
+ StreamObserver<String> testErrorBiStream3(StreamObserver<HelloReply> observer);
+ StreamObserver<String> testErrorBiStream4(StreamObserver<HelloReply> observer,String str);
+
+
}
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/descriptor/MethodDescriptorTest.java b/dubbo-common/src/test/java/org/apache/dubbo/descriptor/MethodDescriptorTest.java
index ef42c79..0ec485d 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/descriptor/MethodDescriptorTest.java
+++ b/dubbo-common/src/test/java/org/apache/dubbo/descriptor/MethodDescriptorTest.java
@@ -16,16 +16,21 @@
*/
package org.apache.dubbo.descriptor;
+import io.reactivex.Single;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.proto.HelloReply;
import org.apache.dubbo.rpc.model.MethodDescriptor;
-
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
import java.lang.reflect.Method;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class MethodDescriptorTest {
@Test
@@ -37,6 +42,46 @@ public class MethodDescriptorTest {
}
@Test
+ public void testMethodWithNoParametersAndReturnProtobuf() throws Exception {
+ Method method = DescriptorService.class.getMethod("noParameterAndReturnProtobufMethod");
+ MethodDescriptor descriptor = new MethodDescriptor(method);
+ assertEquals("", descriptor.getParamDesc());
+ Assertions.assertEquals(0, descriptor.getParameterClasses().length);
+ assertTrue(descriptor.isUnary());
+ assertFalse(descriptor.isNeedWrap());
+ }
+
+ @Test
+ public void testMethodWithNoParametersAndReturnJava() throws Exception {
+ Method method = DescriptorService.class.getMethod("noParameterAndReturnJavaClassMethod");
+ MethodDescriptor descriptor = new MethodDescriptor(method);
+ assertEquals("", descriptor.getParamDesc());
+ Assertions.assertEquals(0, descriptor.getParameterClasses().length);
+ assertTrue(descriptor.isUnary());
+ assertTrue(descriptor.isNeedWrap());
+ }
+
+ @Test
+ public void testWrapperBiStream() throws Exception {
+ Method method = DescriptorService.class.getMethod("wrapBidirectionalStream", StreamObserver.class);
+ MethodDescriptor descriptor = new MethodDescriptor(method);
+ Assertions.assertEquals(1, descriptor.getParameterClasses().length);
+ assertTrue(descriptor.isStream());
+ assertSame(descriptor.getRpcType(), MethodDescriptor.RpcType.BIDIRECTIONAL_STREAM);
+ assertTrue(descriptor.isNeedWrap());
+ }
+
+ @Test
+ public void testBiStream() throws Exception {
+ Method method = DescriptorService.class.getMethod("bidirectionalStream", StreamObserver.class);
+ MethodDescriptor descriptor = new MethodDescriptor(method);
+ Assertions.assertEquals(1, descriptor.getParameterClasses().length);
+ assertTrue(descriptor.isStream());
+ assertSame(descriptor.getRpcType(), MethodDescriptor.RpcType.BIDIRECTIONAL_STREAM);
+ assertFalse(descriptor.isNeedWrap());
+ }
+
+ @Test
public void testIsStream() throws NoSuchMethodException {
Method method = DescriptorService.class.getMethod("noParameterMethod");
MethodDescriptor descriptor = new MethodDescriptor(method);
@@ -60,7 +105,8 @@ public class MethodDescriptorTest {
@Test
public void testIsServerStream() throws NoSuchMethodException {
- Method method = DescriptorService.class.getMethod("sayHelloServerStream", HelloReply.class, StreamObserver.class);
+ Method method = DescriptorService.class.getMethod("sayHelloServerStream", HelloReply.class,
+ StreamObserver.class);
MethodDescriptor descriptor = new MethodDescriptor(method);
Assertions.assertFalse(descriptor.isUnary());
Assertions.assertFalse(descriptor.isNeedWrap());
@@ -81,4 +127,111 @@ public class MethodDescriptorTest {
descriptor = new MethodDescriptor(method);
Assertions.assertFalse(descriptor.isNeedWrap());
}
+
+ @Test
+ public void testIgnoreMethod() throws NoSuchMethodException {
+ Method method = DescriptorService.class.getMethod("iteratorServerStream", HelloReply.class);
+ MethodDescriptor descriptor = new MethodDescriptor(method);
+ Assertions.assertFalse(descriptor.isNeedWrap());
+
+ Method method2 = DescriptorService.class.getMethod("reactorMethod", HelloReply.class);
+ MethodDescriptor descriptor2 = new MethodDescriptor(method2);
+ Assertions.assertFalse(descriptor2.isNeedWrap());
+
+ Method method3 = DescriptorService.class.getMethod("reactorMethod2", Mono.class);
+ MethodDescriptor descriptor3 = new MethodDescriptor(method3);
+ Assertions.assertFalse(descriptor3.isNeedWrap());
+
+
+ Method method4 = DescriptorService.class.getMethod("rxJavaMethod", Single.class);
+ MethodDescriptor descriptor4 = new MethodDescriptor(method4);
+ Assertions.assertFalse(descriptor4.isNeedWrap());
+ }
+
+
+ @Test
+ public void testMultiProtoParameter() throws Exception {
+ Method method = DescriptorService.class.getMethod("testMultiProtobufParameters", HelloReply.class,
+ HelloReply.class);
+ assertThrows(IllegalStateException.class,
+ () -> {
+ MethodDescriptor descriptor = new MethodDescriptor(method);
+ });
+ }
+
+ @Test
+ public void testDiffParametersAndReturn() throws Exception {
+ Method method = DescriptorService.class.getMethod("testDiffParametersAndReturn", HelloReply.class);
+ assertThrows(IllegalStateException.class,
+ () -> {
+ MethodDescriptor descriptor = new MethodDescriptor(method);
+ });
+
+ Method method2 = DescriptorService.class.getMethod("testDiffParametersAndReturn2", String.class);
+ assertThrows(IllegalStateException.class,
+ () -> {
+ MethodDescriptor descriptor = new MethodDescriptor(method2);
+ });
+ }
+
+ @Test
+ public void testErrorServerStream() throws Exception {
+ Method method = DescriptorService.class.getMethod("testErrorServerStream", StreamObserver.class,
+ HelloReply.class);
+ assertThrows(IllegalStateException.class,
+ () -> {
+ MethodDescriptor descriptor = new MethodDescriptor(method);
+ });
+
+ Method method2 = DescriptorService.class.getMethod("testErrorServerStream2", HelloReply.class, HelloReply
+ .class, StreamObserver.class);
+ assertThrows(IllegalStateException.class,
+ () -> {
+ MethodDescriptor descriptor = new MethodDescriptor(method2);
+ });
+
+ Method method3 = DescriptorService.class.getMethod("testErrorServerStream3", String.class,
+ StreamObserver.class);
+ assertThrows(IllegalStateException.class,
+ () -> {
+ MethodDescriptor descriptor = new MethodDescriptor(method3);
+ });
+
+ Method method4 = DescriptorService.class.getMethod("testErrorServerStream4", String.class, String.class,
+ StreamObserver.class);
+ assertThrows(IllegalStateException.class,
+ () -> {
+ MethodDescriptor descriptor = new MethodDescriptor(method4);
+ });
+ }
+
+ @Test
+ public void testErrorBiStream() throws Exception {
+ Method method = DescriptorService.class.getMethod("testErrorBiStream", HelloReply.class, StreamObserver
+ .class);
+ assertThrows(IllegalStateException.class,
+ () -> {
+ MethodDescriptor descriptor = new MethodDescriptor(method);
+ });
+
+ Method method2 = DescriptorService.class.getMethod("testErrorBiStream2", String.class, StreamObserver.class);
+ assertThrows(IllegalStateException.class,
+ () -> {
+ MethodDescriptor descriptor = new MethodDescriptor(method2);
+ });
+
+ Method method3 = DescriptorService.class.getMethod("testErrorBiStream3", StreamObserver.class);
+ assertThrows(IllegalStateException.class,
+ () -> {
+ MethodDescriptor descriptor = new MethodDescriptor(method3);
+ });
+
+ Method method4 = DescriptorService.class.getMethod("testErrorBiStream4", StreamObserver.class, String.class);
+ assertThrows(IllegalStateException.class,
+ () -> {
+ MethodDescriptor descriptor = new MethodDescriptor(method4);
+ });
+ }
+
+
}
diff --git a/dubbo-rpc/dubbo-rpc-grpc/src/test/java/org/apache/dubbo/rpc/protocol/grpc/GrpcProtocolTest.java b/dubbo-rpc/dubbo-rpc-grpc/src/test/java/org/apache/dubbo/rpc/protocol/grpc/GrpcProtocolTest.java
index ca4d6b1..d5118d9 100644
--- a/dubbo-rpc/dubbo-rpc-grpc/src/test/java/org/apache/dubbo/rpc/protocol/grpc/GrpcProtocolTest.java
+++ b/dubbo-rpc/dubbo-rpc-grpc/src/test/java/org/apache/dubbo/rpc/protocol/grpc/GrpcProtocolTest.java
@@ -17,6 +17,8 @@
package org.apache.dubbo.rpc.protocol.grpc;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.grpc.stub.StreamObserver;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.utils.NetUtils;
@@ -80,6 +82,26 @@ public class GrpcProtocolTest {
HelloReply hello = serviceImpl.sayHello(HelloRequest.newBuilder().setName("World").build());
Assertions.assertEquals("Hello World", hello.getMessage());
+ ListenableFuture<HelloReply> future = serviceImpl.sayHelloAsync(HelloRequest.newBuilder().setName("World").build());
+ Assertions.assertEquals("Hello World", future.get().getMessage());
+
+ serviceImpl.sayHello(HelloRequest.newBuilder().setName("World").build(), new StreamObserver<HelloReply>() {
+
+ @Override
+ public void onNext(HelloReply helloReply) {
+ Assertions.assertEquals("Hello World", helloReply.getMessage());
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ throwable.printStackTrace();
+ }
+
+ @Override
+ public void onCompleted() {
+ System.out.println("onCompleted");
+ }
+ });
// resource recycle.
ApplicationModel.getServiceRepository().destroy();
}
diff --git a/dubbo-rpc/dubbo-rpc-grpc/src/test/java/org/apache/dubbo/rpc/protocol/grpc/support/DubboGreeterGrpc.java b/dubbo-rpc/dubbo-rpc-grpc/src/test/java/org/apache/dubbo/rpc/protocol/grpc/support/DubboGreeterGrpc.java
index ac37cc3..6f22e7b 100644
--- a/dubbo-rpc/dubbo-rpc-grpc/src/test/java/org/apache/dubbo/rpc/protocol/grpc/support/DubboGreeterGrpc.java
+++ b/dubbo-rpc/dubbo-rpc-grpc/src/test/java/org/apache/dubbo/rpc/protocol/grpc/support/DubboGreeterGrpc.java
@@ -44,7 +44,8 @@ public final class DubboGreeterGrpc {
protected GreeterGrpc.GreeterFutureStub futureStub;
protected GreeterGrpc.GreeterStub stub;
- public DubboGreeterStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions, URL url, ReferenceConfigBase<?> referenceConfig) {
+ public DubboGreeterStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions, URL url,
+ ReferenceConfigBase<?> referenceConfig) {
this.url = url;
this.referenceConfig = referenceConfig;
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocolTest.java b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocolTest.java
index 23d6d89..0db41e3 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocolTest.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocolTest.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.rpc.Protocol;
import org.apache.dubbo.rpc.ProxyFactory;
@@ -54,8 +55,26 @@ public class TripleProtocolTest {
protocol.export(proxy.getInvoker(serviceImpl, IGreeter.class, url));
serviceImpl = proxy.getProxy(protocol.refer(IGreeter.class, url));
- Thread.sleep(1000);
+ Thread.sleep(1000);
Assertions.assertEquals("hello world", serviceImpl.echo("hello world"));
+ // fixme will throw exception
+ // Assertions.assertEquals("hello world", serviceImpl.echoAsync("hello world").get());
+ serviceImpl.serverStream("hello world", new StreamObserver<String>() {
+ @Override
+ public void onNext(String data) {
+ Assertions.assertEquals("hello world",data);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ throwable.printStackTrace();
+ }
+
+ @Override
+ public void onCompleted() {
+ System.out.println("onCompleted");
+ }
+ });
// resource recycle.
ApplicationModel.getServiceRepository().destroy();
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/support/IGreeter.java b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/support/IGreeter.java
index 786dda0..55a5a7c 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/support/IGreeter.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/support/IGreeter.java
@@ -17,10 +17,21 @@
package org.apache.dubbo.rpc.protocol.tri.support;
+import org.apache.dubbo.common.stream.StreamObserver;
+
+import java.util.concurrent.CompletableFuture;
+
public interface IGreeter {
/**
* Use request to respond
*/
String echo(String request);
+ default CompletableFuture<String> echoAsync(String request) {
+ return CompletableFuture.supplyAsync(() -> echo(request));
+ }
+
+ void serverStream(String str, StreamObserver<String> observer);
+
+
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/support/IGreeterImpl.java b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/support/IGreeterImpl.java
index aa4c114..daeeb0b 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/support/IGreeterImpl.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/support/IGreeterImpl.java
@@ -17,10 +17,19 @@
package org.apache.dubbo.rpc.protocol.tri.support;
+import org.apache.dubbo.common.stream.StreamObserver;
+
public class IGreeterImpl implements IGreeter {
@Override
public String echo(String request) {
return request;
}
+
+ @Override
+ public void serverStream(String str, StreamObserver<String> observer) {
+ System.out.println("srt="+str);
+ observer.onNext(str);
+ observer.onCompleted();
+ }
}