You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2021/08/26 12:04:13 UTC

[dubbo] branch 3.0 updated: [3.0-tri-compiler] dubbo compiler support stream (#8566)

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 67bd4d1  [3.0-tri-compiler] dubbo compiler support stream (#8566)
67bd4d1 is described below

commit 67bd4d13e802b9bf8b3ade234416612ea21eea8b
Author: earthchen <yo...@duobei.com>
AuthorDate: Thu Aug 26 07:03:30 2021 -0500

    [3.0-tri-compiler] dubbo compiler support stream (#8566)
---
 compiler/pom.xml                                   |  2 +-
 .../main/resources/Dubbo3InterfaceStub.mustache    | 44 ++++++++++++---
 compiler/src/main/resources/Dubbo3Stub.mustache    |  5 +-
 compiler/src/main/resources/DubboGrpcStub.mustache |  3 -
 compiler/src/main/resources/DubboStub.mustache     |  3 -
 .../main/resources/ReactorDubboGrpcStub.mustache   |  3 -
 .../src/main/resources/RxDubboGrpcStub.mustache    |  3 -
 ...Serialization.java => SingleProtobufUtils.java} | 64 +++++++++++++++++++---
 .../apache/dubbo/rpc/protocol/tri/TripleUtil.java  |  6 +-
 9 files changed, 94 insertions(+), 39 deletions(-)

diff --git a/compiler/pom.xml b/compiler/pom.xml
index f556e42..7f07fc7 100644
--- a/compiler/pom.xml
+++ b/compiler/pom.xml
@@ -26,7 +26,7 @@
 
     <groupId>org.apache.dubbo</groupId>
     <artifactId>dubbo-compiler</artifactId>
-    <version>0.0.2</version>
+    <version>0.0.3-SNAPSHOT</version>
 
     <packaging>jar</packaging>
 
diff --git a/compiler/src/main/resources/Dubbo3InterfaceStub.mustache b/compiler/src/main/resources/Dubbo3InterfaceStub.mustache
index 206971c..824fff7 100644
--- a/compiler/src/main/resources/Dubbo3InterfaceStub.mustache
+++ b/compiler/src/main/resources/Dubbo3InterfaceStub.mustache
@@ -1,5 +1,5 @@
 {{#packageName}}
-    package {{packageName}};
+package {{packageName}};
 {{/packageName}}
 
 import java.util.concurrent.CompletableFuture;
@@ -8,21 +8,47 @@ import java.util.concurrent.atomic.AtomicBoolean;
 {{#deprecated}}
     @java.lang.Deprecated
 {{/deprecated}}
-@javax.annotation.Generated(
-value = "by Dubbo generator",
-comments = "Source: {{protoName}}")
 public interface {{interfaceClassName}} {
-static final String JAVA_SERVICE_NAME = "{{packageName}}.{{serviceName}}";
-static final String SERVICE_NAME = "{{commonPackageName}}.{{serviceName}}";
+
+    static final String JAVA_SERVICE_NAME = "{{packageName}}.{{serviceName}}";
+    static final String SERVICE_NAME = "{{commonPackageName}}.{{serviceName}}";
 
     // FIXME, initialize Dubbo3 stub when interface loaded, thinking of new ways doing this.
     static final boolean inited = {{className}}.init();
 
-{{#methods}}
+{{#unaryMethods}}
+    {{#javaDoc}}
+        {{{javaDoc}}}
+    {{/javaDoc}}
+    {{#deprecated}}
+        @java.lang.Deprecated
+    {{/deprecated}}
     {{outputType}} {{methodName}}({{inputType}} request);
 
-    CompletableFuture<{{outputType}}> {{methodName}}Async({{inputType}} request);
+    default CompletableFuture<{{outputType}}> {{methodName}}Async({{inputType}} request){
+        return CompletableFuture.supplyAsync(() -> {{methodName}}(request));
+    }
+
+{{/unaryMethods}}
+
+{{#serverStreamingMethods}}
+    {{#javaDoc}}
+        {{{javaDoc}}}
+    {{/javaDoc}}
+    {{#deprecated}}
+        @java.lang.Deprecated
+    {{/deprecated}}
+    void {{methodName}}({{inputType}} request, org.apache.dubbo.common.stream.StreamObserver<{{outputType}}> responseObserver);
+{{/serverStreamingMethods}}
 
-{{/methods}}
+{{#biStreamingMethods}}
+    {{#javaDoc}}
+        {{{javaDoc}}}
+    {{/javaDoc}}
+    {{#deprecated}}
+        @java.lang.Deprecated
+    {{/deprecated}}
+    org.apache.dubbo.common.stream.StreamObserver<{{inputType}}> {{methodName}}(org.apache.dubbo.common.stream.StreamObserver<{{outputType}}> responseObserver);
+{{/biStreamingMethods}}
 
 }
diff --git a/compiler/src/main/resources/Dubbo3Stub.mustache b/compiler/src/main/resources/Dubbo3Stub.mustache
index 53be61c..5071e96 100644
--- a/compiler/src/main/resources/Dubbo3Stub.mustache
+++ b/compiler/src/main/resources/Dubbo3Stub.mustache
@@ -8,16 +8,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
 {{#deprecated}}
     @java.lang.Deprecated
 {{/deprecated}}
-@javax.annotation.Generated(
-value = "by Dubbo generator",
-comments = "Source: {{protoName}}")
 public final class {{className}} {
 private static final AtomicBoolean registered = new AtomicBoolean();
 
 public static boolean init() {
     if (registered.compareAndSet(false, true)) {
         {{#methodTypes}}
-            org.apache.dubbo.common.serialize.protobuf.support.ProtobufUtils.marshaller(
+            org.apache.dubbo.rpc.protocol.tri.SingleProtobufUtils.marshaller(
             {{.}}.getDefaultInstance());
         {{/methodTypes}}
     }
diff --git a/compiler/src/main/resources/DubboGrpcStub.mustache b/compiler/src/main/resources/DubboGrpcStub.mustache
index ad43494..703ce55 100644
--- a/compiler/src/main/resources/DubboGrpcStub.mustache
+++ b/compiler/src/main/resources/DubboGrpcStub.mustache
@@ -21,9 +21,6 @@ import static io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall;
 {{#deprecated}}
     @java.lang.Deprecated
 {{/deprecated}}
-@javax.annotation.Generated(
-value = "by DubboGrpc generator",
-comments = "Source: {{protoName}}")
 public final class {{className}} {
 private {{className}}() {}
 
diff --git a/compiler/src/main/resources/DubboStub.mustache b/compiler/src/main/resources/DubboStub.mustache
index 06bd7b4..da6adf7 100644
--- a/compiler/src/main/resources/DubboStub.mustache
+++ b/compiler/src/main/resources/DubboStub.mustache
@@ -8,9 +8,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 {{#deprecated}}
     @java.lang.Deprecated
 {{/deprecated}}
-@javax.annotation.Generated(
-value = "by Dubbo generator",
-comments = "Source: {{protoName}}")
 public final class {{className}} {
 private static final AtomicBoolean registered = new AtomicBoolean();
 
diff --git a/compiler/src/main/resources/ReactorDubboGrpcStub.mustache b/compiler/src/main/resources/ReactorDubboGrpcStub.mustache
index 2cf1471..d8d3fba 100644
--- a/compiler/src/main/resources/ReactorDubboGrpcStub.mustache
+++ b/compiler/src/main/resources/ReactorDubboGrpcStub.mustache
@@ -20,9 +20,6 @@ import static io.grpc.stub.ServerCalls.asyncBidiStreamingCall;
 {{#deprecated}}
     @java.lang.Deprecated
 {{/deprecated}}
-@javax.annotation.Generated(
-value = "by ReactorDubboGrpc generator",
-comments = "Source: {{protoName}}")
 public final class {{className}} {
 private {{className}}() {}
 
diff --git a/compiler/src/main/resources/RxDubboGrpcStub.mustache b/compiler/src/main/resources/RxDubboGrpcStub.mustache
index 22c06b0..6f81ce8 100644
--- a/compiler/src/main/resources/RxDubboGrpcStub.mustache
+++ b/compiler/src/main/resources/RxDubboGrpcStub.mustache
@@ -20,9 +20,6 @@ import static io.grpc.stub.ServerCalls.asyncBidiStreamingCall;
 {{#deprecated}}
     @java.lang.Deprecated
 {{/deprecated}}
-@javax.annotation.Generated(
-value = "by RxDubboGrpc generator",
-comments = "Source: {{protoName}}")
 public final class {{className}} {
 private {{className}}() {}
 
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/SingleProtobufSerialization.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/SingleProtobufUtils.java
similarity index 54%
rename from dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/SingleProtobufSerialization.java
rename to dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/SingleProtobufUtils.java
index 75b2ac3..f9e94ea 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/SingleProtobufSerialization.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/SingleProtobufUtils.java
@@ -16,11 +16,21 @@
  */
 package org.apache.dubbo.rpc.protocol.tri;
 
+import com.google.protobuf.BoolValue;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.DoubleValue;
+import com.google.protobuf.Empty;
+import com.google.protobuf.EnumValue;
 import com.google.protobuf.ExtensionRegistryLite;
+import com.google.protobuf.FloatValue;
+import com.google.protobuf.Int32Value;
+import com.google.protobuf.Int64Value;
 import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.ListValue;
 import com.google.protobuf.Message;
 import com.google.protobuf.MessageLite;
 import com.google.protobuf.Parser;
+import com.google.protobuf.StringValue;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -29,11 +39,36 @@ import java.lang.reflect.InvocationTargetException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-public class SingleProtobufSerialization {
+public class SingleProtobufUtils {
     private static final ConcurrentHashMap<Class<?>, Message> instCache = new ConcurrentHashMap<>();
     private static final ExtensionRegistryLite globalRegistry =
-            ExtensionRegistryLite.getEmptyRegistry();
-    private final ConcurrentMap<Class<?>, SingleMessageMarshaller<?>> marshallers = new ConcurrentHashMap<>();
+        ExtensionRegistryLite.getEmptyRegistry();
+    private static final ConcurrentMap<Class<?>, SingleMessageMarshaller<?>> marshallers = new ConcurrentHashMap<>();
+
+    static boolean isSupported(Class<?> clazz) {
+        if (clazz == null) {
+            return false;
+        }
+        return MessageLite.class.isAssignableFrom(clazz);
+    }
+
+    static {
+        // Built-in types need to be registered in advance
+        marshaller(Empty.getDefaultInstance());
+        marshaller(BoolValue.getDefaultInstance());
+        marshaller(Int32Value.getDefaultInstance());
+        marshaller(Int64Value.getDefaultInstance());
+        marshaller(FloatValue.getDefaultInstance());
+        marshaller(DoubleValue.getDefaultInstance());
+        marshaller(BytesValue.getDefaultInstance());
+        marshaller(StringValue.getDefaultInstance());
+        marshaller(EnumValue.getDefaultInstance());
+        marshaller(ListValue.getDefaultInstance());
+    }
+
+    public static <T extends MessageLite> void marshaller(T defaultInstance) {
+        marshallers.put(defaultInstance.getClass(), new SingleMessageMarshaller<>(defaultInstance));
+    }
 
     @SuppressWarnings("all")
     public static Message defaultInst(Class<?> clz) {
@@ -56,29 +91,40 @@ public class SingleProtobufSerialization {
         return (Parser<T>) defaultInst.getParserForType();
     }
 
-    public Object deserialize(InputStream in, Class<?> clz) throws IOException {
+
+    public static <T> T deserialize(InputStream in, Class<T> clz) throws IOException {
+        if (!isSupported(clz)) {
+            throw new IllegalArgumentException("This serialization only support google protobuf messages, but the " +
+                "actual input type is :" + clz.getName());
+        }
         try {
-            return getMarshaller(clz).parse(in);
+            return (T) getMarshaller(clz).parse(in);
         } catch (InvalidProtocolBufferException e) {
             throw new IOException(e);
         }
     }
 
-    public void serialize(Object obj, OutputStream os) throws IOException {
+    public static void serialize(Object obj, OutputStream os) throws IOException {
         final MessageLite msg = (MessageLite) obj;
         msg.writeTo(os);
     }
 
-    private SingleMessageMarshaller<?> getMarshaller(Class<?> clz) {
+    private static SingleMessageMarshaller<?> getMarshaller(Class<?> clz) {
         return marshallers.computeIfAbsent(clz, k -> new SingleMessageMarshaller(k));
     }
 
     public static final class SingleMessageMarshaller<T extends MessageLite> {
         private final Parser<T> parser;
+        private final T defaultInstance;
 
         SingleMessageMarshaller(Class<T> clz) {
-            final T inst = (T) defaultInst(clz);
-            this.parser = (Parser<T>) inst.getParserForType();
+            this.defaultInstance = (T) defaultInst(clz);
+            this.parser = (Parser<T>) defaultInstance.getParserForType();
+        }
+
+        SingleMessageMarshaller(T defaultInstance) {
+            this.defaultInstance = defaultInstance;
+            this.parser = (Parser<T>) defaultInstance.getParserForType();
         }
 
         public T parse(InputStream stream) throws InvalidProtocolBufferException {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
index 2f231d9..149de74 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
@@ -60,8 +60,6 @@ public class TripleUtil {
 
     public static final String LANGUAGE = "java";
 
-
-    private static final SingleProtobufSerialization pbSerialization = new SingleProtobufSerialization();
     private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder();
     private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder().withoutPadding();
 
@@ -255,7 +253,7 @@ public class TripleUtil {
 
     public static <T> T unpack(InputStream is, Class<T> clz) {
         try {
-            final T req = (T) pbSerialization.deserialize(is, clz);
+            final T req = SingleProtobufUtils.deserialize(is, clz);
             is.close();
             return req;
         } catch (IOException e) {
@@ -278,7 +276,7 @@ public class TripleUtil {
     public static byte[] pack(Object obj) {
         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
         try {
-            pbSerialization.serialize(obj, baos);
+            SingleProtobufUtils.serialize(obj, baos);
         } catch (IOException e) {
             throw new RuntimeException("Failed to pack protobuf object", e);
         }