You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2018/12/04 01:48:17 UTC

[pulsar] branch master updated: JSONSchema fails to serialise fields on objects that are nested in a collection (#2969)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new dedeb33  JSONSchema fails to serialise fields on objects that are nested in a collection (#2969)
dedeb33 is described below

commit dedeb33a3637621b4a99211d5e7070cd080566ea
Author: chrismiller <ch...@redyeti.net>
AuthorDate: Tue Dec 4 01:48:12 2018 +0000

    JSONSchema fails to serialise fields on objects that are nested in a collection (#2969)
    
    * JSONSchema doesn't serialise fields of objects that are in nested collections
    
    * fix json schema
    
    * Fix a couple of typos
---
 .../pulsar/client/impl/schema/JSONSchema.java      | 20 +------------
 .../pulsar/client/schema/JSONSchemaTest.java       | 34 ++++++++++++++++++++--
 .../pulsar/client/schema/SchemaTestUtils.java      | 18 ++++++++++++
 .../connectors/pulsar/FlinkPulsarProducer.java     |  4 +--
 .../connectors/pulsar/PulsarProduceMode.java       |  2 +-
 5 files changed, 54 insertions(+), 24 deletions(-)

diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
index d46c84b..02185ec 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
@@ -50,25 +50,7 @@ public class JSONSchema<T> implements Schema<T>{
     private JSONSchema(Class<T> pojo, Map<String, String> properties) {
         this.pojo = pojo;
         this.properties = properties;
-        this.gson = new GsonBuilder().addSerializationExclusionStrategy(new ExclusionStrategy() {
-            Set<String> classes = new HashSet<>();
-
-            @Override
-            public boolean shouldSkipField(FieldAttributes f) {
-                boolean skip = !(f.getDeclaringClass().equals(pojo)
-                        || classes.contains(f.getDeclaringClass().getName())
-                        || f.getDeclaringClass().isAssignableFrom(pojo));
-                if (!skip) {
-                    classes.add(f.getDeclaredClass().getName());
-                }
-                return skip;
-            }
-
-            @Override
-            public boolean shouldSkipClass(Class<?> clazz) {
-                return false;
-            }
-        }).create();
+        this.gson = new Gson();
 
         this.schema = ReflectData.AllowNull.get().getSchema(pojo);
         this.schemaInfo = new SchemaInfo();
diff --git a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java
index c7b16bf..93f3478 100644
--- a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java
+++ b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java
@@ -18,12 +18,17 @@
  */
 package org.apache.pulsar.client.schema;
 
+import java.util.Collections;
+import java.util.List;
+
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.client.schema.SchemaTestUtils.Bar;
 import org.apache.pulsar.client.schema.SchemaTestUtils.DerivedFoo;
 import org.apache.pulsar.client.schema.SchemaTestUtils.Foo;
+import org.apache.pulsar.client.schema.SchemaTestUtils.NestedBar;
+import org.apache.pulsar.client.schema.SchemaTestUtils.NestedBarList;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -40,7 +45,6 @@ public class JSONSchemaTest {
         Assert.assertEquals(jsonSchema.getSchemaInfo().getType(), SchemaType.JSON);
         Schema.Parser parser = new Schema.Parser();
         String schemaJson = new String(jsonSchema.getSchemaInfo().getSchema());
-        log.info("schemaJson: {}", schemaJson);
         Assert.assertEquals(schemaJson, SCHEMA_JSON);
         Schema schema = parser.parse(schemaJson);
 
@@ -85,6 +89,33 @@ public class JSONSchemaTest {
     }
 
     @Test
+    public void testNestedClasses() {
+        JSONSchema<NestedBar> jsonSchema = JSONSchema.of(NestedBar.class, null);
+        JSONSchema<NestedBarList> listJsonSchema = JSONSchema.of(NestedBarList.class, null);
+
+        Bar bar = new Bar();
+        bar.setField1(true);
+
+        NestedBar nested = new NestedBar();
+        nested.setField1(true);
+        nested.setNested(bar);
+
+        byte[] bytes = jsonSchema.encode(nested);
+        Assert.assertTrue(bytes.length > 0);
+        Assert.assertEquals(jsonSchema.decode(bytes), nested);
+
+        List<Bar> list = Collections.singletonList(bar);
+        NestedBarList nestedList = new NestedBarList();
+        nestedList.setField1(true);
+        nestedList.setList(list);
+
+        bytes = listJsonSchema.encode(nestedList);
+        Assert.assertTrue(bytes.length > 0);
+
+        Assert.assertEquals(listJsonSchema.decode(bytes), nestedList);
+    }
+
+    @Test
     public void testCorrectPolymorphism() {
 
         Bar bar = new Bar();
@@ -123,7 +154,6 @@ public class JSONSchemaTest {
         // schema for derived class
         JSONSchema<DerivedFoo> derivedJsonSchema = JSONSchema.of(DerivedFoo.class);
         Assert.assertEquals(derivedJsonSchema.decode(derivedJsonSchema.encode(derivedFoo)), derivedFoo);
-        log.info("derivedJsonSchema.encode(derivedDerivedFoo)): {}", derivedJsonSchema.decode(derivedJsonSchema.encode(derivedDerivedFoo)));
         Assert.assertEquals(derivedJsonSchema.decode(derivedJsonSchema.encode(derivedDerivedFoo)), derivedFoo);
 
         //schema for derived derived class
diff --git a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/SchemaTestUtils.java b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/SchemaTestUtils.java
index 123b81c..b31b007 100644
--- a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/SchemaTestUtils.java
+++ b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/SchemaTestUtils.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.schema;
 
+import java.util.List;
+
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
@@ -48,6 +50,22 @@ public class SchemaTestUtils {
     @Data
     @ToString
     @EqualsAndHashCode
+    public static class NestedBar {
+        private boolean field1;
+        private Bar nested;
+    }
+
+    @Data
+    @ToString
+    @EqualsAndHashCode
+    public static class NestedBarList {
+        private boolean field1;
+        private List<Bar> list;
+    }
+
+    @Data
+    @ToString
+    @EqualsAndHashCode
     public static class DerivedFoo extends Foo {
         private String field5;
         private int field6;
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
index bced297..d515365 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
@@ -82,7 +82,7 @@ public class FlinkPulsarProducer<IN>
     /**
      * Produce Mode.
      */
-    protected PulsarProduceMode produceMode = PulsarProduceMode.AT_LEAST_ONE;
+    protected PulsarProduceMode produceMode = PulsarProduceMode.AT_LEAST_ONCE;
 
     /**
      * If true, the producer will wait until all outstanding records have been send to the broker.
@@ -216,7 +216,7 @@ public class FlinkPulsarProducer<IN>
                 LOG.error("Error while sending record to Pulsar : " + cause.getMessage(), cause);
                 return null;
             };
-        } else if (PulsarProduceMode.AT_LEAST_ONE == produceMode) {
+        } else if (PulsarProduceMode.AT_LEAST_ONCE == produceMode) {
             this.failureCallback = cause -> {
                 if (null == asyncException) {
                     if (cause instanceof Exception) {
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarProduceMode.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarProduceMode.java
index d1b9fd8..d42f5c3 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarProduceMode.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarProduceMode.java
@@ -32,6 +32,6 @@ public enum PulsarProduceMode {
      * The producer will ensure that all the events are persisted in pulsar.
      * There could be duplicate events written though.
      */
-    AT_LEAST_ONE,
+    AT_LEAST_ONCE,
 
 }