You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/05/13 07:46:25 UTC

[pulsar] 31/46: Add JsonRecordBuilder implementation (#10052)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch 2.7.2_ds_tmp
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fe980e72bd4d188bbbd018551fd7694f571d7b36
Author: Vincent Royer <vr...@strapdata.com>
AuthorDate: Tue Apr 20 04:07:37 2021 +0200

    Add JsonRecordBuilder implementation (#10052)
    
    Provide a JSON GenericRecord builder allowing to produce JsonGenericRecord from the GenericJsonSchema.
    
    Add a new class org.apache.pulsar.client.impl.schema.generic.JsonRecordBuilderImpl
    Modify the org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema.newRecordBuilder()
    
    Add a unit test in org.apache.pulsar.client.impl.schema.JSONSchemaTest
    
    (cherry picked from commit 08cbdc7c6e0df236d4ee2ab708ea1500063cb212)
---
 .../client/impl/schema/FieldSchemaBuilderImpl.java |   7 ++
 .../impl/schema/generic/GenericJsonSchema.java     |  16 ++-
 .../impl/schema/generic/JsonRecordBuilderImpl.java | 108 +++++++++++++++++++
 .../pulsar/client/impl/schema/JSONSchemaTest.java  | 117 +++++++++++++++++++++
 4 files changed, 247 insertions(+), 1 deletion(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FieldSchemaBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FieldSchemaBuilderImpl.java
index 88283bd..eb4df16 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FieldSchemaBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FieldSchemaBuilderImpl.java
@@ -142,6 +142,13 @@ class FieldSchemaBuilderImpl implements FieldSchemaBuilder<FieldSchemaBuilderImp
             case TIMESTAMP:
                 baseSchema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
                 break;
+            case JSON:
+                checkArgument(genericSchema.getSchemaInfo().getType() == SchemaType.JSON,
+                        "The field is expected to be using JSON schema but "
+                                + genericSchema.getSchemaInfo().getType() + " schema is found");
+                AvroBaseStructSchema genericJsonSchema = (AvroBaseStructSchema) genericSchema;
+                baseSchema = genericJsonSchema.getAvroSchema();
+                break;
             case AVRO:
                 checkArgument(genericSchema.getSchemaInfo().getType() == SchemaType.AVRO,
                         "The field is expected to be using AVRO schema but "
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
index 87a1790..24b7777 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.client.impl.schema.generic;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
 import org.apache.pulsar.common.schema.SchemaInfo;
 
@@ -41,6 +43,18 @@ public class GenericJsonSchema extends GenericSchemaImpl {
 
     @Override
     public GenericRecordBuilder newRecordBuilder() {
-        throw new UnsupportedOperationException("Json Schema doesn't support record builder yet");
+        return new JsonRecordBuilderImpl(this);
+    }
+
+    public boolean supportSchemaVersioning() {
+        return true;
+    }
+
+    public Schema<GenericRecord> clone() {
+        Schema<GenericRecord> schema = of(this.schemaInfo, ((AbstractMultiVersionGenericReader)this.reader).useProvidedSchemaAsReaderSchema);
+        if (this.schemaInfoProvider != null) {
+            schema.setSchemaInfoProvider(this.schemaInfoProvider);
+        }
+        return schema;
     }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/JsonRecordBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/JsonRecordBuilderImpl.java
new file mode 100644
index 0000000..39249e7
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/JsonRecordBuilderImpl.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.generic;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class JsonRecordBuilderImpl implements GenericRecordBuilder {
+
+    private static ObjectMapper objectMapper = new ObjectMapper();
+
+    private final GenericSchemaImpl genericSchema;
+    private Map<String, Object> map = new HashMap<>();
+
+    public JsonRecordBuilderImpl(GenericSchemaImpl genericSchema) {
+        this.genericSchema = genericSchema;
+    }
+
+    /**
+     * Sets the value of a field.
+     *
+     * @param fieldName the name of the field to set.
+     * @param value     the value to set.
+     * @return a reference to the RecordBuilder.
+     */
+    @Override
+    public GenericRecordBuilder set(String fieldName, Object value) {
+        if (value instanceof GenericRecord) {
+            if (!(value instanceof GenericJsonRecord))
+                throw new IllegalArgumentException("JSON Record Builder doesn't support non-JSON record as a field");
+            GenericJsonRecord genericJsonRecord = (GenericJsonRecord) value;
+            value = genericJsonRecord.getJsonNode();
+        }
+
+        map.put(fieldName, value);
+        return this;
+    }
+
+    /**
+     * Sets the value of a field.
+     *
+     * @param field the field to set.
+     * @param value the value to set.
+     * @return a reference to the RecordBuilder.
+     */
+    @Override
+    public GenericRecordBuilder set(Field field, Object value) {
+        set(field.getName(), value);
+        return this;
+    }
+
+    /**
+     * Clears the value of the given field.
+     *
+     * @param fieldName the name of the field to clear.
+     * @return a reference to the RecordBuilder.
+     */
+    @Override
+    public GenericRecordBuilder clear(String fieldName) {
+        map.remove(fieldName);
+        return this;
+    }
+
+    /**
+     * Clears the value of the given field.
+     *
+     * @param field the field to clear.
+     * @return a reference to the RecordBuilder.
+     */
+    @Override
+    public GenericRecordBuilder clear(Field field) {
+        clear(field.getName());
+        return this;
+    }
+
+    @Override
+    public GenericRecord build() {
+        JsonNode jn = objectMapper.valueToTree(map);
+        return new GenericJsonRecord(
+                null,
+                genericSchema.getFields(),
+                jn,
+                null
+                );
+    }
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java
index d17bbf0..172bae3 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java
@@ -20,18 +20,30 @@ package org.apache.pulsar.client.impl.schema;
 
 import java.util.Collections;
 import java.util.List;
+import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.FOO_FIELDS;
+import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_JSON_ALLOW_NULL;
+import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_JSON_NOT_ALLOW_NULL;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.AssertJUnit.assertSame;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
+import org.apache.pulsar.client.api.schema.SchemaBuilder;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.DerivedFoo;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.NestedBar;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.NestedBarList;
+import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.skyscreamer.jsonassert.JSONAssert;
 import org.testng.Assert;
@@ -332,4 +344,109 @@ public class JSONSchemaTest {
         assertEquals(jsonSchema.decode(byteBuf), foo1);
 
     }
+
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    private static class Seller {
+        public String state;
+        public String street;
+        public long zipCode;
+    }
+
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    private static class PC {
+        public String brand;
+        public String model;
+        public int year;
+        public GPU gpu;
+        public Seller seller;
+    }
+
+    private enum GPU {
+        AMD, NVIDIA
+    }
+
+    @Test
+    public void testEncodeAndDecodeObject() throws JsonProcessingException {
+        JSONSchema<PC> jsonSchema = JSONSchema.of(SchemaDefinition.<PC>builder().withPojo(PC.class).build());
+        PC pc = new PC("dell", "alienware", 2021, GPU.AMD,
+                new Seller("WA", "street", 98004));
+        byte[] encoded = jsonSchema.encode(pc);
+        PC roundtrippedPc = jsonSchema.decode(encoded);
+        assertEquals(roundtrippedPc, pc);
+    }
+
+    @Test
+    public void testGetNativeSchema() throws SchemaValidationException {
+        JSONSchema<PC> schema2 = JSONSchema.of(PC.class);
+        org.apache.avro.Schema avroSchema2 = (Schema) schema2.getNativeSchema().get();
+        assertSame(schema2.schema, avroSchema2);
+    }
+
+    @Test
+    public void testJsonGenericRecordBuilder() {
+        JSONSchema<Seller> sellerJsonSchema = JSONSchema.of(Seller.class);
+
+        RecordSchemaBuilder sellerSchemaBuilder = SchemaBuilder.record("seller");
+        sellerSchemaBuilder.field("state").type(SchemaType.STRING);
+        sellerSchemaBuilder.field("street").type(SchemaType.STRING);
+        sellerSchemaBuilder.field("zipCode").type(SchemaType.INT64);
+        SchemaInfo sellerSchemaInfo = sellerSchemaBuilder.build(SchemaType.JSON);
+        GenericSchemaImpl sellerGenericSchema = GenericSchemaImpl.of(sellerSchemaInfo);
+
+        JSONSchema<PC> pcJsonSchema = JSONSchema.of(PC.class);
+
+        RecordSchemaBuilder pcSchemaBuilder = SchemaBuilder.record("pc");
+        pcSchemaBuilder.field("brand").type(SchemaType.STRING);
+        pcSchemaBuilder.field("model").type(SchemaType.STRING);
+        pcSchemaBuilder.field("gpu").type(SchemaType.STRING);
+        pcSchemaBuilder.field("year").type(SchemaType.INT64);
+        pcSchemaBuilder.field("seller", sellerGenericSchema).type(SchemaType.JSON).optional();
+        SchemaInfo pcGenericSchemaInfo = pcSchemaBuilder.build(SchemaType.JSON);
+        GenericSchemaImpl pcGenericSchema = GenericSchemaImpl.of(pcGenericSchemaInfo);
+
+        Seller seller = new Seller("USA","oakstreet",9999);
+        PC pc = new PC("dell","g3",2020, GPU.AMD, seller);
+
+        byte[] bytes = pcJsonSchema.encode(pc);
+        Assert.assertTrue(bytes.length > 0);
+
+        Object pc2 = pcJsonSchema.decode(bytes);
+        assertEquals(pc, pc2);
+
+        GenericRecord sellerRecord = sellerGenericSchema.newRecordBuilder()
+                .set("state", "USA")
+                .set("street", "oakstreet")
+                .set("zipCode", 9999)
+                .build();
+
+        GenericRecord pcRecord = pcGenericSchema.newRecordBuilder()
+                .set("brand", "dell")
+                .set("model","g3")
+                .set("year", 2020)
+                .set("gpu", GPU.AMD)
+                .set("seller", sellerRecord)
+                .build();
+
+        byte[] bytes3 = pcGenericSchema.encode(pcRecord);
+        Assert.assertTrue(bytes3.length > 0);
+        GenericRecord pc3Record = pcGenericSchema.decode(bytes3);
+
+        for(Field field : pc3Record.getFields()) {
+            assertTrue(pcGenericSchema.getFields().contains(field));
+        }
+        assertEquals("dell", pc3Record.getField("brand"));
+        assertEquals("g3", pc3Record.getField("model"));
+        assertEquals(2020, pc3Record.getField("year"));
+        assertEquals(GPU.AMD.toString(), pc3Record.getField("gpu"));
+
+
+        GenericRecord seller3Record = (GenericRecord) pc3Record.getField("seller");
+        assertEquals("USA", seller3Record.getField("state"));
+        assertEquals("oakstreet", seller3Record.getField("street"));
+        assertEquals(9999, seller3Record.getField("zipCode"));
+    }
 }