You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2018/12/04 01:48:17 UTC
[pulsar] branch master updated: JSONSchema fails to serialise
fields on objects that are nested in a collection (#2969)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new dedeb33 JSONSchema fails to serialise fields on objects that are nested in a collection (#2969)
dedeb33 is described below
commit dedeb33a3637621b4a99211d5e7070cd080566ea
Author: chrismiller <ch...@redyeti.net>
AuthorDate: Tue Dec 4 01:48:12 2018 +0000
JSONSchema fails to serialise fields on objects that are nested in a collection (#2969)
* JSONSchema doesn't serialise fields of objects that are in nested collections
* fix json schema
* Fix a couple of typos
---
.../pulsar/client/impl/schema/JSONSchema.java | 20 +------------
.../pulsar/client/schema/JSONSchemaTest.java | 34 ++++++++++++++++++++--
.../pulsar/client/schema/SchemaTestUtils.java | 18 ++++++++++++
.../connectors/pulsar/FlinkPulsarProducer.java | 4 +--
.../connectors/pulsar/PulsarProduceMode.java | 2 +-
5 files changed, 54 insertions(+), 24 deletions(-)
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
index d46c84b..02185ec 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
@@ -50,25 +50,7 @@ public class JSONSchema<T> implements Schema<T>{
private JSONSchema(Class<T> pojo, Map<String, String> properties) {
this.pojo = pojo;
this.properties = properties;
- this.gson = new GsonBuilder().addSerializationExclusionStrategy(new ExclusionStrategy() {
- Set<String> classes = new HashSet<>();
-
- @Override
- public boolean shouldSkipField(FieldAttributes f) {
- boolean skip = !(f.getDeclaringClass().equals(pojo)
- || classes.contains(f.getDeclaringClass().getName())
- || f.getDeclaringClass().isAssignableFrom(pojo));
- if (!skip) {
- classes.add(f.getDeclaredClass().getName());
- }
- return skip;
- }
-
- @Override
- public boolean shouldSkipClass(Class<?> clazz) {
- return false;
- }
- }).create();
+ this.gson = new Gson();
this.schema = ReflectData.AllowNull.get().getSchema(pojo);
this.schemaInfo = new SchemaInfo();
diff --git a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java
index c7b16bf..93f3478 100644
--- a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java
+++ b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java
@@ -18,12 +18,17 @@
*/
package org.apache.pulsar.client.schema;
+import java.util.Collections;
+import java.util.List;
+
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.schema.SchemaTestUtils.Bar;
import org.apache.pulsar.client.schema.SchemaTestUtils.DerivedFoo;
import org.apache.pulsar.client.schema.SchemaTestUtils.Foo;
+import org.apache.pulsar.client.schema.SchemaTestUtils.NestedBar;
+import org.apache.pulsar.client.schema.SchemaTestUtils.NestedBarList;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -40,7 +45,6 @@ public class JSONSchemaTest {
Assert.assertEquals(jsonSchema.getSchemaInfo().getType(), SchemaType.JSON);
Schema.Parser parser = new Schema.Parser();
String schemaJson = new String(jsonSchema.getSchemaInfo().getSchema());
- log.info("schemaJson: {}", schemaJson);
Assert.assertEquals(schemaJson, SCHEMA_JSON);
Schema schema = parser.parse(schemaJson);
@@ -85,6 +89,33 @@ public class JSONSchemaTest {
}
@Test
+ public void testNestedClasses() {
+ JSONSchema<NestedBar> jsonSchema = JSONSchema.of(NestedBar.class, null);
+ JSONSchema<NestedBarList> listJsonSchema = JSONSchema.of(NestedBarList.class, null);
+
+ Bar bar = new Bar();
+ bar.setField1(true);
+
+ NestedBar nested = new NestedBar();
+ nested.setField1(true);
+ nested.setNested(bar);
+
+ byte[] bytes = jsonSchema.encode(nested);
+ Assert.assertTrue(bytes.length > 0);
+ Assert.assertEquals(jsonSchema.decode(bytes), nested);
+
+ List<Bar> list = Collections.singletonList(bar);
+ NestedBarList nestedList = new NestedBarList();
+ nestedList.setField1(true);
+ nestedList.setList(list);
+
+ bytes = listJsonSchema.encode(nestedList);
+ Assert.assertTrue(bytes.length > 0);
+
+ Assert.assertEquals(listJsonSchema.decode(bytes), nestedList);
+ }
+
+ @Test
public void testCorrectPolymorphism() {
Bar bar = new Bar();
@@ -123,7 +154,6 @@ public class JSONSchemaTest {
// schema for derived class
JSONSchema<DerivedFoo> derivedJsonSchema = JSONSchema.of(DerivedFoo.class);
Assert.assertEquals(derivedJsonSchema.decode(derivedJsonSchema.encode(derivedFoo)), derivedFoo);
- log.info("derivedJsonSchema.encode(derivedDerivedFoo)): {}", derivedJsonSchema.decode(derivedJsonSchema.encode(derivedDerivedFoo)));
Assert.assertEquals(derivedJsonSchema.decode(derivedJsonSchema.encode(derivedDerivedFoo)), derivedFoo);
//schema for derived derived class
diff --git a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/SchemaTestUtils.java b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/SchemaTestUtils.java
index 123b81c..b31b007 100644
--- a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/SchemaTestUtils.java
+++ b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/SchemaTestUtils.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.client.schema;
+import java.util.List;
+
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
@@ -48,6 +50,22 @@ public class SchemaTestUtils {
@Data
@ToString
@EqualsAndHashCode
+ public static class NestedBar {
+ private boolean field1;
+ private Bar nested;
+ }
+
+ @Data
+ @ToString
+ @EqualsAndHashCode
+ public static class NestedBarList {
+ private boolean field1;
+ private List<Bar> list;
+ }
+
+ @Data
+ @ToString
+ @EqualsAndHashCode
public static class DerivedFoo extends Foo {
private String field5;
private int field6;
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
index bced297..d515365 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
@@ -82,7 +82,7 @@ public class FlinkPulsarProducer<IN>
/**
* Produce Mode.
*/
- protected PulsarProduceMode produceMode = PulsarProduceMode.AT_LEAST_ONE;
+ protected PulsarProduceMode produceMode = PulsarProduceMode.AT_LEAST_ONCE;
/**
* If true, the producer will wait until all outstanding records have been send to the broker.
@@ -216,7 +216,7 @@ public class FlinkPulsarProducer<IN>
LOG.error("Error while sending record to Pulsar : " + cause.getMessage(), cause);
return null;
};
- } else if (PulsarProduceMode.AT_LEAST_ONE == produceMode) {
+ } else if (PulsarProduceMode.AT_LEAST_ONCE == produceMode) {
this.failureCallback = cause -> {
if (null == asyncException) {
if (cause instanceof Exception) {
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarProduceMode.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarProduceMode.java
index d1b9fd8..d42f5c3 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarProduceMode.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarProduceMode.java
@@ -32,6 +32,6 @@ public enum PulsarProduceMode {
* The producer will ensure that all the events are persisted in pulsar.
* There could be duplicate events written though.
*/
- AT_LEAST_ONE,
+ AT_LEAST_ONCE,
}