You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/07/18 07:58:12 UTC

[incubator-pulsar] branch master updated: augmenting protoschema with info for parsing (#2181)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ef9df9  augmenting protoschema with info for parsing (#2181)
6ef9df9 is described below

commit 6ef9df9cb18b6e28c385702becf1d0de7ba294e2
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Wed Jul 18 00:58:10 2018 -0700

    augmenting protoschema with info for parsing (#2181)
    
    ### Motivation
    
    Adding additional information to Protobuf schema so that the protobuf can be more easily parsed
---
 .../pulsar/client/impl/schema/ProtobufSchema.java  | 75 ++++++++++++++++++----
 .../pulsar/client/schema/ProtobufSchemaTest.java   | 47 +++++++++++++-
 pulsar-client-schema/src/test/proto/Test.proto     |  3 +-
 3 files changed, 112 insertions(+), 13 deletions(-)

diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
index 4f2fba3..34ec46b 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
@@ -18,7 +18,12 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.protobuf.Descriptors;
 import com.google.protobuf.Parser;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
 import org.apache.avro.protobuf.ProtobufDatumReader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
@@ -27,23 +32,71 @@ import org.apache.pulsar.common.schema.SchemaType;
 
 import java.lang.reflect.InvocationTargetException;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
+import java.util.function.Consumer;
 
 public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> implements Schema<T> {
 
     private SchemaInfo schemaInfo;
     private Parser<T> tParser;
+    public static final String PARSING_INFO_PROPERTY = "__PARSING_INFO__";
 
-    private ProtobufSchema(SchemaInfo schemaInfo, Class<T> pojo) {
-        this.schemaInfo = schemaInfo;
+    @Getter
+    @AllArgsConstructor
+    public static class ProtoBufParsingInfo {
+        private final int number;
+        private final String name;
+        private final String type;
+        private final String label;
+        // For future nested fields
+        private final Map <String, Object> definition;
+    }
+
+    private ProtobufSchema(Map<String, String> properties, Class<T> pojo) {
         try {
             T protoMessageInstance = (T) pojo.getMethod("getDefaultInstance").invoke(null);
             tParser = (Parser<T>) protoMessageInstance.getParserForType();
+
+            this.schemaInfo = new SchemaInfo();
+            this.schemaInfo.setName("");
+
+            Map<String, String> allProperties = new HashMap<>();
+            allProperties.putAll(properties);
+            // set protobuf parsing info
+            allProperties.put(PARSING_INFO_PROPERTY, getParsingInfo(protoMessageInstance));
+
+            this.schemaInfo.setProperties(allProperties);
+            this.schemaInfo.setType(SchemaType.PROTOBUF);
+            ProtobufDatumReader datumReader = new ProtobufDatumReader(pojo);
+            org.apache.avro.Schema schema = datumReader.getSchema();
+            this.schemaInfo.setSchema(schema.toString().getBytes());
+
         } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
             throw new IllegalArgumentException(e);
         }
     }
 
+    private String getParsingInfo(T protoMessageInstance) {
+        List<ProtoBufParsingInfo> protoBufParsingInfos = new LinkedList<>();
+        protoMessageInstance.getDescriptorForType().getFields().forEach(new Consumer<Descriptors.FieldDescriptor>() {
+            @Override
+            public void accept(Descriptors.FieldDescriptor fieldDescriptor) {
+                protoBufParsingInfos.add(new ProtoBufParsingInfo(fieldDescriptor.getNumber(),
+                        fieldDescriptor.getName(), fieldDescriptor.getType().name(),
+                        fieldDescriptor.toProto().getLabel().name(), null));
+            }
+        });
+
+        try {
+            return new ObjectMapper().writeValueAsString(protoBufParsingInfos);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     @Override
     public byte[] encode(T message) {
         return message.toByteArray();
@@ -67,16 +120,16 @@ public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> im
         return of(pojo, Collections.emptyMap());
     }
 
+    public static ProtobufSchema ofGenericClass(Class pojo, Map<String, String> properties) {
+        if (!com.google.protobuf.GeneratedMessageV3.class.isAssignableFrom(pojo)) {
+            throw new IllegalArgumentException(com.google.protobuf.GeneratedMessageV3.class.getName()
+                    + " is not assignable from " + pojo.getName());
+        }
+        return new ProtobufSchema<>(properties, pojo);
+    }
+
     public static <T extends com.google.protobuf.GeneratedMessageV3> ProtobufSchema<T> of(
             Class<T> pojo, Map<String, String> properties){
-
-        SchemaInfo info = new SchemaInfo();
-        info.setName("");
-        info.setProperties(properties);
-        info.setType(SchemaType.PROTOBUF);
-        ProtobufDatumReader<T> datumReader = new ProtobufDatumReader<>(pojo);
-        org.apache.avro.Schema schema = datumReader.getSchema();
-        info.setSchema(schema.toString().getBytes());
-        return new ProtobufSchema<>(info, pojo);
+        return ofGenericClass(pojo, properties);
     }
 }
diff --git a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/ProtobufSchemaTest.java b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/ProtobufSchemaTest.java
index 7ed575a..69c66dc 100644
--- a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/ProtobufSchemaTest.java
+++ b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/ProtobufSchemaTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.schema;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
 import org.apache.pulsar.client.impl.schema.ProtobufSchema;
@@ -26,6 +28,8 @@ import org.apache.pulsar.functions.proto.Function;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import java.util.Collections;
+
 @Slf4j
 public class ProtobufSchemaTest {
 
@@ -39,7 +43,19 @@ public class ProtobufSchemaTest {
             "\"symbols\":[\"SHARED\",\"FAILOVER\"]},\"default\":\"SHARED\"},{\"name\":\"nestedField\"," +
             "\"type\":[\"null\",{\"type\":\"record\",\"name\":\"SubMessage\",\"fields\":[{\"name\":\"foo\"," +
             "\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"default\":\"\"},{\"name\":\"bar\"," +
-            "\"type\":\"double\",\"default\":0}]}],\"default\":null}]}";
+            "\"type\":\"double\",\"default\":0}]}],\"default\":null},{\"name\":\"repeatedField\"," +
+            "\"type\":{\"type\":\"array\",\"items\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}}]}";
+
+    private static final String EXPECTED_PARSING_INFO = "{\"__PARSING_INFO__\":\"[{\\\"number\\\":1," +
+            "\\\"name\\\":\\\"stringField\\\",\\\"type\\\":\\\"STRING\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\"," +
+            "\\\"definition\\\":null},{\\\"number\\\":2,\\\"name\\\":\\\"doubleField\\\",\\\"type\\\":\\\"DOUBLE\\\"," +
+            "\\\"label\\\":\\\"LABEL_OPTIONAL\\\",\\\"definition\\\":null},{\\\"number\\\":6," +
+            "\\\"name\\\":\\\"intField\\\",\\\"type\\\":\\\"INT32\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\"," +
+            "\\\"definition\\\":null},{\\\"number\\\":4,\\\"name\\\":\\\"testEnum\\\",\\\"type\\\":\\\"ENUM\\\"," +
+            "\\\"label\\\":\\\"LABEL_OPTIONAL\\\",\\\"definition\\\":null},{\\\"number\\\":5," +
+            "\\\"name\\\":\\\"nestedField\\\",\\\"type\\\":\\\"MESSAGE\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\"," +
+            "\\\"definition\\\":null},{\\\"number\\\":10,\\\"name\\\":\\\"repeatedField\\\"," +
+            "\\\"type\\\":\\\"STRING\\\",\\\"label\\\":\\\"LABEL_REPEATED\\\",\\\"definition\\\":null}]\"}";
 
     @Test
     public void testEncodeAndDecode() {
@@ -67,4 +83,33 @@ public class ProtobufSchemaTest {
 
         Assert.assertEquals(schema.toString(), EXPECTED_SCHEMA_JSON);
     }
+
+    @Test
+    public void testGenericOf() {
+        try {
+            ProtobufSchema<org.apache.pulsar.client.schema.proto.Test.TestMessage> protobufSchema
+                    = ProtobufSchema.ofGenericClass(org.apache.pulsar.client.schema.proto.Test.TestMessage.class,
+                    Collections.emptyMap());
+        } catch (Exception e) {
+            Assert.fail("Should not construct a ProtobufShema over a non-protobuf-generated class");
+        }
+
+        try {
+            ProtobufSchema<org.apache.pulsar.client.schema.proto.Test.TestMessage> protobufSchema
+                    = ProtobufSchema.ofGenericClass(String.class,
+                    Collections.emptyMap());
+            Assert.fail("Should not construct a ProtobufShema over a non-protobuf-generated class");
+        } catch (Exception e) {
+
+        }
+    }
+
+    @Test
+    public void testParsingInfoProperty() throws JsonProcessingException {
+        ProtobufSchema<org.apache.pulsar.client.schema.proto.Test.TestMessage> protobufSchema
+                = ProtobufSchema.of(org.apache.pulsar.client.schema.proto.Test.TestMessage.class);
+
+        Assert.assertEquals(new ObjectMapper().writeValueAsString(protobufSchema.getSchemaInfo().getProperties()), EXPECTED_PARSING_INFO);
+
+    }
 }
diff --git a/pulsar-client-schema/src/test/proto/Test.proto b/pulsar-client-schema/src/test/proto/Test.proto
index d640d2c..7d7b1b6 100644
--- a/pulsar-client-schema/src/test/proto/Test.proto
+++ b/pulsar-client-schema/src/test/proto/Test.proto
@@ -35,7 +35,8 @@ message SubMessage {
 message TestMessage {
     string stringField = 1;
     double doubleField = 2;
-    int32 intField = 3;
+    int32 intField = 6;
     TestEnum testEnum = 4;
     SubMessage nestedField = 5;
+    repeated string repeatedField = 10;
 }
\ No newline at end of file