You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2021/08/26 12:04:13 UTC
[dubbo] branch 3.0 updated: [3.0-tri-compiler] dubbo compiler
support stream (#8566)
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 67bd4d1 [3.0-tri-compiler] dubbo compiler support stream (#8566)
67bd4d1 is described below
commit 67bd4d13e802b9bf8b3ade234416612ea21eea8b
Author: earthchen <yo...@duobei.com>
AuthorDate: Thu Aug 26 07:03:30 2021 -0500
[3.0-tri-compiler] dubbo compiler support stream (#8566)
---
compiler/pom.xml | 2 +-
.../main/resources/Dubbo3InterfaceStub.mustache | 44 ++++++++++++---
compiler/src/main/resources/Dubbo3Stub.mustache | 5 +-
compiler/src/main/resources/DubboGrpcStub.mustache | 3 -
compiler/src/main/resources/DubboStub.mustache | 3 -
.../main/resources/ReactorDubboGrpcStub.mustache | 3 -
.../src/main/resources/RxDubboGrpcStub.mustache | 3 -
...Serialization.java => SingleProtobufUtils.java} | 64 +++++++++++++++++++---
.../apache/dubbo/rpc/protocol/tri/TripleUtil.java | 6 +-
9 files changed, 94 insertions(+), 39 deletions(-)
diff --git a/compiler/pom.xml b/compiler/pom.xml
index f556e42..7f07fc7 100644
--- a/compiler/pom.xml
+++ b/compiler/pom.xml
@@ -26,7 +26,7 @@
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-compiler</artifactId>
- <version>0.0.2</version>
+ <version>0.0.3-SNAPSHOT</version>
<packaging>jar</packaging>
diff --git a/compiler/src/main/resources/Dubbo3InterfaceStub.mustache b/compiler/src/main/resources/Dubbo3InterfaceStub.mustache
index 206971c..824fff7 100644
--- a/compiler/src/main/resources/Dubbo3InterfaceStub.mustache
+++ b/compiler/src/main/resources/Dubbo3InterfaceStub.mustache
@@ -1,5 +1,5 @@
{{#packageName}}
- package {{packageName}};
+package {{packageName}};
{{/packageName}}
import java.util.concurrent.CompletableFuture;
@@ -8,21 +8,47 @@ import java.util.concurrent.atomic.AtomicBoolean;
{{#deprecated}}
@java.lang.Deprecated
{{/deprecated}}
-@javax.annotation.Generated(
-value = "by Dubbo generator",
-comments = "Source: {{protoName}}")
public interface {{interfaceClassName}} {
-static final String JAVA_SERVICE_NAME = "{{packageName}}.{{serviceName}}";
-static final String SERVICE_NAME = "{{commonPackageName}}.{{serviceName}}";
+
+ static final String JAVA_SERVICE_NAME = "{{packageName}}.{{serviceName}}";
+ static final String SERVICE_NAME = "{{commonPackageName}}.{{serviceName}}";
// FIXME, initialize Dubbo3 stub when interface loaded, thinking of new ways doing this.
static final boolean inited = {{className}}.init();
-{{#methods}}
+{{#unaryMethods}}
+ {{#javaDoc}}
+ {{{javaDoc}}}
+ {{/javaDoc}}
+ {{#deprecated}}
+ @java.lang.Deprecated
+ {{/deprecated}}
{{outputType}} {{methodName}}({{inputType}} request);
- CompletableFuture<{{outputType}}> {{methodName}}Async({{inputType}} request);
+ default CompletableFuture<{{outputType}}> {{methodName}}Async({{inputType}} request){
+ return CompletableFuture.supplyAsync(() -> {{methodName}}(request));
+ }
+
+{{/unaryMethods}}
+
+{{#serverStreamingMethods}}
+ {{#javaDoc}}
+ {{{javaDoc}}}
+ {{/javaDoc}}
+ {{#deprecated}}
+ @java.lang.Deprecated
+ {{/deprecated}}
+ void {{methodName}}({{inputType}} request, org.apache.dubbo.common.stream.StreamObserver<{{outputType}}> responseObserver);
+{{/serverStreamingMethods}}
-{{/methods}}
+{{#biStreamingMethods}}
+ {{#javaDoc}}
+ {{{javaDoc}}}
+ {{/javaDoc}}
+ {{#deprecated}}
+ @java.lang.Deprecated
+ {{/deprecated}}
+ org.apache.dubbo.common.stream.StreamObserver<{{inputType}}> {{methodName}}(org.apache.dubbo.common.stream.StreamObserver<{{outputType}}> responseObserver);
+{{/biStreamingMethods}}
}
diff --git a/compiler/src/main/resources/Dubbo3Stub.mustache b/compiler/src/main/resources/Dubbo3Stub.mustache
index 53be61c..5071e96 100644
--- a/compiler/src/main/resources/Dubbo3Stub.mustache
+++ b/compiler/src/main/resources/Dubbo3Stub.mustache
@@ -8,16 +8,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
{{#deprecated}}
@java.lang.Deprecated
{{/deprecated}}
-@javax.annotation.Generated(
-value = "by Dubbo generator",
-comments = "Source: {{protoName}}")
public final class {{className}} {
private static final AtomicBoolean registered = new AtomicBoolean();
public static boolean init() {
if (registered.compareAndSet(false, true)) {
{{#methodTypes}}
- org.apache.dubbo.common.serialize.protobuf.support.ProtobufUtils.marshaller(
+ org.apache.dubbo.rpc.protocol.tri.SingleProtobufUtils.marshaller(
{{.}}.getDefaultInstance());
{{/methodTypes}}
}
diff --git a/compiler/src/main/resources/DubboGrpcStub.mustache b/compiler/src/main/resources/DubboGrpcStub.mustache
index ad43494..703ce55 100644
--- a/compiler/src/main/resources/DubboGrpcStub.mustache
+++ b/compiler/src/main/resources/DubboGrpcStub.mustache
@@ -21,9 +21,6 @@ import static io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall;
{{#deprecated}}
@java.lang.Deprecated
{{/deprecated}}
-@javax.annotation.Generated(
-value = "by DubboGrpc generator",
-comments = "Source: {{protoName}}")
public final class {{className}} {
private {{className}}() {}
diff --git a/compiler/src/main/resources/DubboStub.mustache b/compiler/src/main/resources/DubboStub.mustache
index 06bd7b4..da6adf7 100644
--- a/compiler/src/main/resources/DubboStub.mustache
+++ b/compiler/src/main/resources/DubboStub.mustache
@@ -8,9 +8,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
{{#deprecated}}
@java.lang.Deprecated
{{/deprecated}}
-@javax.annotation.Generated(
-value = "by Dubbo generator",
-comments = "Source: {{protoName}}")
public final class {{className}} {
private static final AtomicBoolean registered = new AtomicBoolean();
diff --git a/compiler/src/main/resources/ReactorDubboGrpcStub.mustache b/compiler/src/main/resources/ReactorDubboGrpcStub.mustache
index 2cf1471..d8d3fba 100644
--- a/compiler/src/main/resources/ReactorDubboGrpcStub.mustache
+++ b/compiler/src/main/resources/ReactorDubboGrpcStub.mustache
@@ -20,9 +20,6 @@ import static io.grpc.stub.ServerCalls.asyncBidiStreamingCall;
{{#deprecated}}
@java.lang.Deprecated
{{/deprecated}}
-@javax.annotation.Generated(
-value = "by ReactorDubboGrpc generator",
-comments = "Source: {{protoName}}")
public final class {{className}} {
private {{className}}() {}
diff --git a/compiler/src/main/resources/RxDubboGrpcStub.mustache b/compiler/src/main/resources/RxDubboGrpcStub.mustache
index 22c06b0..6f81ce8 100644
--- a/compiler/src/main/resources/RxDubboGrpcStub.mustache
+++ b/compiler/src/main/resources/RxDubboGrpcStub.mustache
@@ -20,9 +20,6 @@ import static io.grpc.stub.ServerCalls.asyncBidiStreamingCall;
{{#deprecated}}
@java.lang.Deprecated
{{/deprecated}}
-@javax.annotation.Generated(
-value = "by RxDubboGrpc generator",
-comments = "Source: {{protoName}}")
public final class {{className}} {
private {{className}}() {}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/SingleProtobufSerialization.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/SingleProtobufUtils.java
similarity index 54%
rename from dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/SingleProtobufSerialization.java
rename to dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/SingleProtobufUtils.java
index 75b2ac3..f9e94ea 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/SingleProtobufSerialization.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/SingleProtobufUtils.java
@@ -16,11 +16,21 @@
*/
package org.apache.dubbo.rpc.protocol.tri;
+import com.google.protobuf.BoolValue;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.DoubleValue;
+import com.google.protobuf.Empty;
+import com.google.protobuf.EnumValue;
import com.google.protobuf.ExtensionRegistryLite;
+import com.google.protobuf.FloatValue;
+import com.google.protobuf.Int32Value;
+import com.google.protobuf.Int64Value;
import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.ListValue;
import com.google.protobuf.Message;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
+import com.google.protobuf.StringValue;
import java.io.IOException;
import java.io.InputStream;
@@ -29,11 +39,36 @@ import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-public class SingleProtobufSerialization {
+public class SingleProtobufUtils {
private static final ConcurrentHashMap<Class<?>, Message> instCache = new ConcurrentHashMap<>();
private static final ExtensionRegistryLite globalRegistry =
- ExtensionRegistryLite.getEmptyRegistry();
- private final ConcurrentMap<Class<?>, SingleMessageMarshaller<?>> marshallers = new ConcurrentHashMap<>();
+ ExtensionRegistryLite.getEmptyRegistry();
+ private static final ConcurrentMap<Class<?>, SingleMessageMarshaller<?>> marshallers = new ConcurrentHashMap<>();
+
+ static boolean isSupported(Class<?> clazz) {
+ if (clazz == null) {
+ return false;
+ }
+ return MessageLite.class.isAssignableFrom(clazz);
+ }
+
+ static {
+ // Built-in types need to be registered in advance
+ marshaller(Empty.getDefaultInstance());
+ marshaller(BoolValue.getDefaultInstance());
+ marshaller(Int32Value.getDefaultInstance());
+ marshaller(Int64Value.getDefaultInstance());
+ marshaller(FloatValue.getDefaultInstance());
+ marshaller(DoubleValue.getDefaultInstance());
+ marshaller(BytesValue.getDefaultInstance());
+ marshaller(StringValue.getDefaultInstance());
+ marshaller(EnumValue.getDefaultInstance());
+ marshaller(ListValue.getDefaultInstance());
+ }
+
+ public static <T extends MessageLite> void marshaller(T defaultInstance) {
+ marshallers.put(defaultInstance.getClass(), new SingleMessageMarshaller<>(defaultInstance));
+ }
@SuppressWarnings("all")
public static Message defaultInst(Class<?> clz) {
@@ -56,29 +91,40 @@ public class SingleProtobufSerialization {
return (Parser<T>) defaultInst.getParserForType();
}
- public Object deserialize(InputStream in, Class<?> clz) throws IOException {
+
+ public static <T> T deserialize(InputStream in, Class<T> clz) throws IOException {
+ if (!isSupported(clz)) {
+ throw new IllegalArgumentException("This serialization only support google protobuf messages, but the " +
+ "actual input type is :" + clz.getName());
+ }
try {
- return getMarshaller(clz).parse(in);
+ return (T) getMarshaller(clz).parse(in);
} catch (InvalidProtocolBufferException e) {
throw new IOException(e);
}
}
- public void serialize(Object obj, OutputStream os) throws IOException {
+ public static void serialize(Object obj, OutputStream os) throws IOException {
final MessageLite msg = (MessageLite) obj;
msg.writeTo(os);
}
- private SingleMessageMarshaller<?> getMarshaller(Class<?> clz) {
+ private static SingleMessageMarshaller<?> getMarshaller(Class<?> clz) {
return marshallers.computeIfAbsent(clz, k -> new SingleMessageMarshaller(k));
}
public static final class SingleMessageMarshaller<T extends MessageLite> {
private final Parser<T> parser;
+ private final T defaultInstance;
SingleMessageMarshaller(Class<T> clz) {
- final T inst = (T) defaultInst(clz);
- this.parser = (Parser<T>) inst.getParserForType();
+ this.defaultInstance = (T) defaultInst(clz);
+ this.parser = (Parser<T>) defaultInstance.getParserForType();
+ }
+
+ SingleMessageMarshaller(T defaultInstance) {
+ this.defaultInstance = defaultInstance;
+ this.parser = (Parser<T>) defaultInstance.getParserForType();
}
public T parse(InputStream stream) throws InvalidProtocolBufferException {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
index 2f231d9..149de74 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
@@ -60,8 +60,6 @@ public class TripleUtil {
public static final String LANGUAGE = "java";
-
- private static final SingleProtobufSerialization pbSerialization = new SingleProtobufSerialization();
private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder();
private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder().withoutPadding();
@@ -255,7 +253,7 @@ public class TripleUtil {
public static <T> T unpack(InputStream is, Class<T> clz) {
try {
- final T req = (T) pbSerialization.deserialize(is, clz);
+ final T req = SingleProtobufUtils.deserialize(is, clz);
is.close();
return req;
} catch (IOException e) {
@@ -278,7 +276,7 @@ public class TripleUtil {
public static byte[] pack(Object obj) {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
- pbSerialization.serialize(obj, baos);
+ SingleProtobufUtils.serialize(obj, baos);
} catch (IOException e) {
throw new RuntimeException("Failed to pack protobuf object", e);
}