You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/07/18 07:58:12 UTC
[incubator-pulsar] branch master updated: augmenting protoschema
with info for parsing (#2181)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6ef9df9 augmenting protoschema with info for parsing (#2181)
6ef9df9 is described below
commit 6ef9df9cb18b6e28c385702becf1d0de7ba294e2
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Wed Jul 18 00:58:10 2018 -0700
augmenting protoschema with info for parsing (#2181)
### Motivation
Adding additional information to Protobuf schema so that the protobuf can be more easily parsed
---
.../pulsar/client/impl/schema/ProtobufSchema.java | 75 ++++++++++++++++++----
.../pulsar/client/schema/ProtobufSchemaTest.java | 47 +++++++++++++-
pulsar-client-schema/src/test/proto/Test.proto | 3 +-
3 files changed, 112 insertions(+), 13 deletions(-)
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
index 4f2fba3..34ec46b 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
@@ -18,7 +18,12 @@
*/
package org.apache.pulsar.client.impl.schema;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.protobuf.Descriptors;
import com.google.protobuf.Parser;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
import org.apache.avro.protobuf.ProtobufDatumReader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
@@ -27,23 +32,71 @@ import org.apache.pulsar.common.schema.SchemaType;
import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
+import java.util.function.Consumer;
public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> implements Schema<T> {
private SchemaInfo schemaInfo;
private Parser<T> tParser;
+ public static final String PARSING_INFO_PROPERTY = "__PARSING_INFO__";
- private ProtobufSchema(SchemaInfo schemaInfo, Class<T> pojo) {
- this.schemaInfo = schemaInfo;
+ @Getter
+ @AllArgsConstructor
+ public static class ProtoBufParsingInfo {
+ private final int number;
+ private final String name;
+ private final String type;
+ private final String label;
+ // For future nested fields
+ private final Map <String, Object> definition;
+ }
+
+ private ProtobufSchema(Map<String, String> properties, Class<T> pojo) {
try {
T protoMessageInstance = (T) pojo.getMethod("getDefaultInstance").invoke(null);
tParser = (Parser<T>) protoMessageInstance.getParserForType();
+
+ this.schemaInfo = new SchemaInfo();
+ this.schemaInfo.setName("");
+
+ Map<String, String> allProperties = new HashMap<>();
+ allProperties.putAll(properties);
+ // set protobuf parsing info
+ allProperties.put(PARSING_INFO_PROPERTY, getParsingInfo(protoMessageInstance));
+
+ this.schemaInfo.setProperties(allProperties);
+ this.schemaInfo.setType(SchemaType.PROTOBUF);
+ ProtobufDatumReader datumReader = new ProtobufDatumReader(pojo);
+ org.apache.avro.Schema schema = datumReader.getSchema();
+ this.schemaInfo.setSchema(schema.toString().getBytes());
+
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
throw new IllegalArgumentException(e);
}
}
+ private String getParsingInfo(T protoMessageInstance) {
+ List<ProtoBufParsingInfo> protoBufParsingInfos = new LinkedList<>();
+ protoMessageInstance.getDescriptorForType().getFields().forEach(new Consumer<Descriptors.FieldDescriptor>() {
+ @Override
+ public void accept(Descriptors.FieldDescriptor fieldDescriptor) {
+ protoBufParsingInfos.add(new ProtoBufParsingInfo(fieldDescriptor.getNumber(),
+ fieldDescriptor.getName(), fieldDescriptor.getType().name(),
+ fieldDescriptor.toProto().getLabel().name(), null));
+ }
+ });
+
+ try {
+ return new ObjectMapper().writeValueAsString(protoBufParsingInfos);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@Override
public byte[] encode(T message) {
return message.toByteArray();
@@ -67,16 +120,16 @@ public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> im
return of(pojo, Collections.emptyMap());
}
+ public static ProtobufSchema ofGenericClass(Class pojo, Map<String, String> properties) {
+ if (!com.google.protobuf.GeneratedMessageV3.class.isAssignableFrom(pojo)) {
+ throw new IllegalArgumentException(com.google.protobuf.GeneratedMessageV3.class.getName()
+ + " is not assignable from " + pojo.getName());
+ }
+ return new ProtobufSchema<>(properties, pojo);
+ }
+
public static <T extends com.google.protobuf.GeneratedMessageV3> ProtobufSchema<T> of(
Class<T> pojo, Map<String, String> properties){
-
- SchemaInfo info = new SchemaInfo();
- info.setName("");
- info.setProperties(properties);
- info.setType(SchemaType.PROTOBUF);
- ProtobufDatumReader<T> datumReader = new ProtobufDatumReader<>(pojo);
- org.apache.avro.Schema schema = datumReader.getSchema();
- info.setSchema(schema.toString().getBytes());
- return new ProtobufSchema<>(info, pojo);
+ return ofGenericClass(pojo, properties);
}
}
diff --git a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/ProtobufSchemaTest.java b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/ProtobufSchemaTest.java
index 7ed575a..69c66dc 100644
--- a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/ProtobufSchemaTest.java
+++ b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/ProtobufSchemaTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.client.schema;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.pulsar.client.impl.schema.ProtobufSchema;
@@ -26,6 +28,8 @@ import org.apache.pulsar.functions.proto.Function;
import org.testng.Assert;
import org.testng.annotations.Test;
+import java.util.Collections;
+
@Slf4j
public class ProtobufSchemaTest {
@@ -39,7 +43,19 @@ public class ProtobufSchemaTest {
"\"symbols\":[\"SHARED\",\"FAILOVER\"]},\"default\":\"SHARED\"},{\"name\":\"nestedField\"," +
"\"type\":[\"null\",{\"type\":\"record\",\"name\":\"SubMessage\",\"fields\":[{\"name\":\"foo\"," +
"\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"default\":\"\"},{\"name\":\"bar\"," +
- "\"type\":\"double\",\"default\":0}]}],\"default\":null}]}";
+ "\"type\":\"double\",\"default\":0}]}],\"default\":null},{\"name\":\"repeatedField\"," +
+ "\"type\":{\"type\":\"array\",\"items\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}}]}";
+
+ private static final String EXPECTED_PARSING_INFO = "{\"__PARSING_INFO__\":\"[{\\\"number\\\":1," +
+ "\\\"name\\\":\\\"stringField\\\",\\\"type\\\":\\\"STRING\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\"," +
+ "\\\"definition\\\":null},{\\\"number\\\":2,\\\"name\\\":\\\"doubleField\\\",\\\"type\\\":\\\"DOUBLE\\\"," +
+ "\\\"label\\\":\\\"LABEL_OPTIONAL\\\",\\\"definition\\\":null},{\\\"number\\\":6," +
+ "\\\"name\\\":\\\"intField\\\",\\\"type\\\":\\\"INT32\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\"," +
+ "\\\"definition\\\":null},{\\\"number\\\":4,\\\"name\\\":\\\"testEnum\\\",\\\"type\\\":\\\"ENUM\\\"," +
+ "\\\"label\\\":\\\"LABEL_OPTIONAL\\\",\\\"definition\\\":null},{\\\"number\\\":5," +
+ "\\\"name\\\":\\\"nestedField\\\",\\\"type\\\":\\\"MESSAGE\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\"," +
+ "\\\"definition\\\":null},{\\\"number\\\":10,\\\"name\\\":\\\"repeatedField\\\"," +
+ "\\\"type\\\":\\\"STRING\\\",\\\"label\\\":\\\"LABEL_REPEATED\\\",\\\"definition\\\":null}]\"}";
@Test
public void testEncodeAndDecode() {
@@ -67,4 +83,33 @@ public class ProtobufSchemaTest {
Assert.assertEquals(schema.toString(), EXPECTED_SCHEMA_JSON);
}
+
+ @Test
+ public void testGenericOf() {
+ try {
+ ProtobufSchema<org.apache.pulsar.client.schema.proto.Test.TestMessage> protobufSchema
+ = ProtobufSchema.ofGenericClass(org.apache.pulsar.client.schema.proto.Test.TestMessage.class,
+ Collections.emptyMap());
+ } catch (Exception e) {
+ Assert.fail("Should not construct a ProtobufShema over a non-protobuf-generated class");
+ }
+
+ try {
+ ProtobufSchema<org.apache.pulsar.client.schema.proto.Test.TestMessage> protobufSchema
+ = ProtobufSchema.ofGenericClass(String.class,
+ Collections.emptyMap());
+ Assert.fail("Should not construct a ProtobufShema over a non-protobuf-generated class");
+ } catch (Exception e) {
+
+ }
+ }
+
+ @Test
+ public void testParsingInfoProperty() throws JsonProcessingException {
+ ProtobufSchema<org.apache.pulsar.client.schema.proto.Test.TestMessage> protobufSchema
+ = ProtobufSchema.of(org.apache.pulsar.client.schema.proto.Test.TestMessage.class);
+
+ Assert.assertEquals(new ObjectMapper().writeValueAsString(protobufSchema.getSchemaInfo().getProperties()), EXPECTED_PARSING_INFO);
+
+ }
}
diff --git a/pulsar-client-schema/src/test/proto/Test.proto b/pulsar-client-schema/src/test/proto/Test.proto
index d640d2c..7d7b1b6 100644
--- a/pulsar-client-schema/src/test/proto/Test.proto
+++ b/pulsar-client-schema/src/test/proto/Test.proto
@@ -35,7 +35,8 @@ message SubMessage {
message TestMessage {
string stringField = 1;
double doubleField = 2;
- int32 intField = 3;
+ int32 intField = 6;
TestEnum testEnum = 4;
SubMessage nestedField = 5;
+ repeated string repeatedField = 10;
}
\ No newline at end of file