You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/05/13 07:46:25 UTC
[pulsar] 31/46: Add JsonRecordBuilder implementation (#10052)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch 2.7.2_ds_tmp
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fe980e72bd4d188bbbd018551fd7694f571d7b36
Author: Vincent Royer <vr...@strapdata.com>
AuthorDate: Tue Apr 20 04:07:37 2021 +0200
Add JsonRecordBuilder implementation (#10052)
Provide a JSON GenericRecord builder allowing to produce JsonGenericRecord from the GenericJsonSchema.
Add a new class org.apache.pulsar.client.impl.schema.generic.JsonRecordBuilderImpl
Modify the org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema.newRecordBuilder()
Add a unit test in org.apache.pulsar.client.impl.schema.JSONSchemaTest
(cherry picked from commit 08cbdc7c6e0df236d4ee2ab708ea1500063cb212)
---
.../client/impl/schema/FieldSchemaBuilderImpl.java | 7 ++
.../impl/schema/generic/GenericJsonSchema.java | 16 ++-
.../impl/schema/generic/JsonRecordBuilderImpl.java | 108 +++++++++++++++++++
.../pulsar/client/impl/schema/JSONSchemaTest.java | 117 +++++++++++++++++++++
4 files changed, 247 insertions(+), 1 deletion(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FieldSchemaBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FieldSchemaBuilderImpl.java
index 88283bd..eb4df16 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FieldSchemaBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FieldSchemaBuilderImpl.java
@@ -142,6 +142,13 @@ class FieldSchemaBuilderImpl implements FieldSchemaBuilder<FieldSchemaBuilderImp
case TIMESTAMP:
baseSchema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
break;
+ case JSON:
+ checkArgument(genericSchema.getSchemaInfo().getType() == SchemaType.JSON,
+ "The field is expected to be using JSON schema but "
+ + genericSchema.getSchemaInfo().getType() + " schema is found");
+ AvroBaseStructSchema genericJsonSchema = (AvroBaseStructSchema) genericSchema;
+ baseSchema = genericJsonSchema.getAvroSchema();
+ break;
case AVRO:
checkArgument(genericSchema.getSchemaInfo().getType() == SchemaType.AVRO,
"The field is expected to be using AVRO schema but "
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
index 87a1790..24b7777 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.client.impl.schema.generic;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
import org.apache.pulsar.common.schema.SchemaInfo;
@@ -41,6 +43,18 @@ public class GenericJsonSchema extends GenericSchemaImpl {
@Override
public GenericRecordBuilder newRecordBuilder() {
- throw new UnsupportedOperationException("Json Schema doesn't support record builder yet");
+ return new JsonRecordBuilderImpl(this);
+ }
+
+ public boolean supportSchemaVersioning() {
+ return true;
+ }
+
+ public Schema<GenericRecord> clone() {
+ Schema<GenericRecord> schema = of(this.schemaInfo, ((AbstractMultiVersionGenericReader)this.reader).useProvidedSchemaAsReaderSchema);
+ if (this.schemaInfoProvider != null) {
+ schema.setSchemaInfoProvider(this.schemaInfoProvider);
+ }
+ return schema;
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/JsonRecordBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/JsonRecordBuilderImpl.java
new file mode 100644
index 0000000..39249e7
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/JsonRecordBuilderImpl.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.generic;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class JsonRecordBuilderImpl implements GenericRecordBuilder {
+
+ private static ObjectMapper objectMapper = new ObjectMapper();
+
+ private final GenericSchemaImpl genericSchema;
+ private Map<String, Object> map = new HashMap<>();
+
+ public JsonRecordBuilderImpl(GenericSchemaImpl genericSchema) {
+ this.genericSchema = genericSchema;
+ }
+
+ /**
+ * Sets the value of a field.
+ *
+ * @param fieldName the name of the field to set.
+ * @param value the value to set.
+ * @return a reference to the RecordBuilder.
+ */
+ @Override
+ public GenericRecordBuilder set(String fieldName, Object value) {
+ if (value instanceof GenericRecord) {
+ if (!(value instanceof GenericJsonRecord))
+ throw new IllegalArgumentException("JSON Record Builder doesn't support non-JSON record as a field");
+ GenericJsonRecord genericJsonRecord = (GenericJsonRecord) value;
+ value = genericJsonRecord.getJsonNode();
+ }
+
+ map.put(fieldName, value);
+ return this;
+ }
+
+ /**
+ * Sets the value of a field.
+ *
+ * @param field the field to set.
+ * @param value the value to set.
+ * @return a reference to the RecordBuilder.
+ */
+ @Override
+ public GenericRecordBuilder set(Field field, Object value) {
+ set(field.getName(), value);
+ return this;
+ }
+
+ /**
+ * Clears the value of the given field.
+ *
+ * @param fieldName the name of the field to clear.
+ * @return a reference to the RecordBuilder.
+ */
+ @Override
+ public GenericRecordBuilder clear(String fieldName) {
+ map.remove(fieldName);
+ return this;
+ }
+
+ /**
+ * Clears the value of the given field.
+ *
+ * @param field the field to clear.
+ * @return a reference to the RecordBuilder.
+ */
+ @Override
+ public GenericRecordBuilder clear(Field field) {
+ clear(field.getName());
+ return this;
+ }
+
+ @Override
+ public GenericRecord build() {
+ JsonNode jn = objectMapper.valueToTree(map);
+ return new GenericJsonRecord(
+ null,
+ genericSchema.getFields(),
+ jn,
+ null
+ );
+ }
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java
index d17bbf0..172bae3 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java
@@ -20,18 +20,30 @@ package org.apache.pulsar.client.impl.schema;
import java.util.Collections;
import java.util.List;
+import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.FOO_FIELDS;
+import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_JSON_ALLOW_NULL;
+import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_JSON_NOT_ALLOW_NULL;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.AssertJUnit.assertSame;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
+import org.apache.pulsar.client.api.schema.SchemaBuilder;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.DerivedFoo;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.NestedBar;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.NestedBarList;
+import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.skyscreamer.jsonassert.JSONAssert;
import org.testng.Assert;
@@ -332,4 +344,109 @@ public class JSONSchemaTest {
assertEquals(jsonSchema.decode(byteBuf), foo1);
}
+
+ @Data
+ @AllArgsConstructor
+ @NoArgsConstructor
+ private static class Seller {
+ public String state;
+ public String street;
+ public long zipCode;
+ }
+
+ @Data
+ @AllArgsConstructor
+ @NoArgsConstructor
+ private static class PC {
+ public String brand;
+ public String model;
+ public int year;
+ public GPU gpu;
+ public Seller seller;
+ }
+
+ private enum GPU {
+ AMD, NVIDIA
+ }
+
+ @Test
+ public void testEncodeAndDecodeObject() throws JsonProcessingException {
+ JSONSchema<PC> jsonSchema = JSONSchema.of(SchemaDefinition.<PC>builder().withPojo(PC.class).build());
+ PC pc = new PC("dell", "alienware", 2021, GPU.AMD,
+ new Seller("WA", "street", 98004));
+ byte[] encoded = jsonSchema.encode(pc);
+ PC roundtrippedPc = jsonSchema.decode(encoded);
+ assertEquals(roundtrippedPc, pc);
+ }
+
+ @Test
+ public void testGetNativeSchema() throws SchemaValidationException {
+ JSONSchema<PC> schema2 = JSONSchema.of(PC.class);
+ org.apache.avro.Schema avroSchema2 = (Schema) schema2.getNativeSchema().get();
+ assertSame(schema2.schema, avroSchema2);
+ }
+
+ @Test
+ public void testJsonGenericRecordBuilder() {
+ JSONSchema<Seller> sellerJsonSchema = JSONSchema.of(Seller.class);
+
+ RecordSchemaBuilder sellerSchemaBuilder = SchemaBuilder.record("seller");
+ sellerSchemaBuilder.field("state").type(SchemaType.STRING);
+ sellerSchemaBuilder.field("street").type(SchemaType.STRING);
+ sellerSchemaBuilder.field("zipCode").type(SchemaType.INT64);
+ SchemaInfo sellerSchemaInfo = sellerSchemaBuilder.build(SchemaType.JSON);
+ GenericSchemaImpl sellerGenericSchema = GenericSchemaImpl.of(sellerSchemaInfo);
+
+ JSONSchema<PC> pcJsonSchema = JSONSchema.of(PC.class);
+
+ RecordSchemaBuilder pcSchemaBuilder = SchemaBuilder.record("pc");
+ pcSchemaBuilder.field("brand").type(SchemaType.STRING);
+ pcSchemaBuilder.field("model").type(SchemaType.STRING);
+ pcSchemaBuilder.field("gpu").type(SchemaType.STRING);
+ pcSchemaBuilder.field("year").type(SchemaType.INT64);
+ pcSchemaBuilder.field("seller", sellerGenericSchema).type(SchemaType.JSON).optional();
+ SchemaInfo pcGenericSchemaInfo = pcSchemaBuilder.build(SchemaType.JSON);
+ GenericSchemaImpl pcGenericSchema = GenericSchemaImpl.of(pcGenericSchemaInfo);
+
+ Seller seller = new Seller("USA","oakstreet",9999);
+ PC pc = new PC("dell","g3",2020, GPU.AMD, seller);
+
+ byte[] bytes = pcJsonSchema.encode(pc);
+ Assert.assertTrue(bytes.length > 0);
+
+ Object pc2 = pcJsonSchema.decode(bytes);
+ assertEquals(pc, pc2);
+
+ GenericRecord sellerRecord = sellerGenericSchema.newRecordBuilder()
+ .set("state", "USA")
+ .set("street", "oakstreet")
+ .set("zipCode", 9999)
+ .build();
+
+ GenericRecord pcRecord = pcGenericSchema.newRecordBuilder()
+ .set("brand", "dell")
+ .set("model","g3")
+ .set("year", 2020)
+ .set("gpu", GPU.AMD)
+ .set("seller", sellerRecord)
+ .build();
+
+ byte[] bytes3 = pcGenericSchema.encode(pcRecord);
+ Assert.assertTrue(bytes3.length > 0);
+ GenericRecord pc3Record = pcGenericSchema.decode(bytes3);
+
+ for(Field field : pc3Record.getFields()) {
+ assertTrue(pcGenericSchema.getFields().contains(field));
+ }
+ assertEquals("dell", pc3Record.getField("brand"));
+ assertEquals("g3", pc3Record.getField("model"));
+ assertEquals(2020, pc3Record.getField("year"));
+ assertEquals(GPU.AMD.toString(), pc3Record.getField("gpu"));
+
+
+ GenericRecord seller3Record = (GenericRecord) pc3Record.getField("seller");
+ assertEquals("USA", seller3Record.getField("state"));
+ assertEquals("oakstreet", seller3Record.getField("street"));
+ assertEquals(9999, seller3Record.getField("zipCode"));
+ }
}