You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/11/08 21:59:01 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-933] add
support for array of unions in json schema_new
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4efc5fb [GOBBLIN-933] add support for array of unions in json schema_new
4efc5fb is described below
commit 4efc5fb6d8f16b56e8fc1858c8093a150353a348
Author: Zihan Li <zi...@zihli-mn1.linkedin.biz>
AuthorDate: Fri Nov 8 13:58:53 2019 -0800
[GOBBLIN-933] add support for array of unions in json schema_new
remove assumption that we only support 2 types in
union
try to check type first
minor change for code format
address typos
Closes #2800 from ZihanLi58/avrounionschema
---
...JsonElementConversionWithAvroSchemaFactory.java | 126 ++++++++++++++++++++-
.../avro/JsonRecordAvroSchemaToAvroConverter.java | 2 +-
.../JsonRecordAvroSchemaToAvroConverterTest.java | 23 +++-
.../test/resources/converter/jsonToAvroRecord.json | 8 +-
.../test/resources/converter/jsonToAvroSchema.avsc | 23 +++-
5 files changed, 171 insertions(+), 11 deletions(-)
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java
index f9627a5..1990514 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.converter.avro;
+import com.sun.javafx.binding.StringFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -43,7 +44,7 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve
* Use to create a converter for a single field from a schema.
*/
- public static JsonElementConverter getConvertor(String fieldName, String fieldType, Schema schemaNode,
+ public static JsonElementConverter getConverter(String fieldName, String fieldType, Schema schemaNode,
WorkUnitState state, boolean nullable, List<String> ignoreFields) throws UnsupportedDateTypeException {
Type type;
@@ -70,6 +71,10 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve
return new JsonElementConversionWithAvroSchemaFactory.RecordConverter(fieldName, nullable, type.toString(),
schemaNode, state, ignoreFields);
+ case UNION:
+ return new JsonElementConversionWithAvroSchemaFactory.UnionConverter(fieldName, nullable, type.toString(),
+ schemaNode, state, ignoreFields);
+
default:
return JsonElementConversionFactory.getConvertor(fieldName, fieldType, new JsonObject(), state, nullable);
}
@@ -81,7 +86,7 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve
List<String> ignoreFields) throws UnsupportedDateTypeException {
super(fieldName, nullable, sourceType);
super.setElementConverter(
- getConvertor(fieldName, schemaNode.getElementType().getType().getName(), schemaNode.getElementType(), state,
+ getConverter(fieldName, schemaNode.getElementType().getType().getName(), schemaNode.getElementType(), state,
isNullable(), ignoreFields));
}
@@ -115,7 +120,7 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve
List<String> ignoreFields) throws UnsupportedDateTypeException {
super(fieldName, nullable, sourceType);
super.setElementConverter(
- getConvertor(fieldName, schemaNode.getValueType().getType().getName(), schemaNode.getValueType(), state,
+ getConverter(fieldName, schemaNode.getValueType().getType().getName(), schemaNode.getValueType(), state,
isNullable(), ignoreFields));
}
@@ -183,7 +188,7 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve
WorkUnitState state;
public RecordConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode,
- WorkUnitState state, List<String> ignoreFields) throws UnsupportedDateTypeException {
+ WorkUnitState state, List<String> ignoreFields) {
super(fieldName, nullable, sourceType);
this.schema = schemaNode;
this.state = state;
@@ -210,4 +215,115 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve
return this.schema;
}
}
-}
+
+ /**
+ * A converter to convert Union type to avro
+ * Here it will try all the possible converters for one type, for example, to convert an int value, it will try all Number converters
+ * until meet the first workable one.
+ * So a known bug here is there's no guarantee on preserving precision from Json to Avro type as the exact type information is clear from JsonElement
+ * We're doing this since there is no way to determine what exact type it is for a JsonElement
+ */
+ public static class UnionConverter extends ComplexConverter {
+ private final List<Schema> schemas;
+ private final List<JsonElementConverter> converters;
+ private final Schema schemaNode;
+
+ public UnionConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode,
+ WorkUnitState state, List<String> ignoreFields) throws UnsupportedDateTypeException {
+ super(fieldName, nullable, sourceType);
+ this.schemas = schemaNode.getTypes();
+ converters = new ArrayList<>();
+ for(Schema schema: schemas) {
+ converters.add(getConverter(fieldName, schema.getType().getName(), schemaNode, state, isNullable(), ignoreFields));
+ }
+ this.schemaNode = schemaNode;
+ }
+
+ @Override
+ Object convertField(JsonElement value) {
+ for(JsonElementConverter converter: converters)
+ {
+ try {
+ switch (converter.getTargetType()) {
+ case STRING: {
+ if (value.isJsonPrimitive() && value.getAsJsonPrimitive().isString()) {
+ return converter.convert(value);
+ }
+ break;
+ }
+ case FIXED:
+ case BYTES:
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE: {
+ if (value.isJsonPrimitive() && value.getAsJsonPrimitive().isNumber()) {
+ return converter.convert(value);
+ }
+ break;
+ }
+ case BOOLEAN:{
+ if (value.isJsonPrimitive() && value.getAsJsonPrimitive().isBoolean()) {
+ return converter.convert(value);
+ }
+ break;
+ }
+ case ARRAY:{
+ if (value.isJsonArray()) {
+ return converter.convert(value);
+ }
+ break;
+ }
+ case MAP:
+ case ENUM:
+ case RECORD:{
+ if (value.isJsonObject()) {
+ return converter.convert(value);
+ }
+ break;
+ }
+ case NULL:{
+ if(value.isJsonNull()) {
+ return converter.convert(value);
+ }
+ break;
+ }
+ case UNION:
+ return new UnsupportedDateTypeException("does not support union type in union");
+ default:
+ return converter.convert(value);
+ }
+ } catch (Exception e){}
+ }
+ throw new RuntimeException(StringFormatter.format("Cannot convert %s to avro using schema %s", value.getAsString(), schemaNode.toString()).toString());
+ }
+
+ @Override
+ public Schema.Type getTargetType() {
+ return schema().getType();
+ }
+
+ @Override
+ public Schema schema() {
+ if(schemas.size() == 2 && isNullable()) {
+ if(schemas.get(0).getType() == Schema.Type.NULL) {
+ return schemas.get(1);
+ } else {
+ return schemas.get(0);
+ }
+ }
+ return Schema.createUnion(schemas);
+ }
+
+ @Override
+ public boolean isNullable() {
+ boolean isNullable = false;
+ for(Schema schema: schemas) {
+ if(schema.getType() == Schema.Type.NULL) {
+ isNullable = true;
+ }
+ }
+ return isNullable;
+ }
+ }
+}
\ No newline at end of file
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
index e3e2a0d..7f58991 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
@@ -120,7 +120,7 @@ public class JsonRecordAvroSchemaToAvroConverter<SI> extends ToAvroConverterBase
}
} else {
try {
- converter = JsonElementConversionWithAvroSchemaFactory.getConvertor(field.name(), type.getName(), schema,
+ converter = JsonElementConversionWithAvroSchemaFactory.getConverter(field.name(), type.getName(), schema,
workUnit, nullable, ignoreFields);
avroRecord.put(field.name(), converter.convert(inputRecord.get(field.name())));
} catch (Exception e) {
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java
index f59de8c..c46c0d7 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java
@@ -85,5 +85,24 @@ public class JsonRecordAvroSchemaToAvroConverterTest {
Assert.assertEquals(recordInArray.get("field1").toString(), "test1");
Assert.assertEquals((record.get("enumField")).toString(), "ENUM2");
- }
-}
+
+ Assert.assertTrue(record.get("arrayFieldWithUnion") instanceof GenericArray);
+ GenericArray arrayWithUnion = (GenericArray) record.get("arrayFieldWithUnion");
+ Assert.assertEquals(arrayWithUnion.size(), 4);
+ Assert.assertEquals(arrayWithUnion.get(0).toString(), "arrU1");
+ Assert.assertEquals(arrayWithUnion.get(1).toString(), "arrU2");
+ Assert.assertEquals(arrayWithUnion.get(2).toString(), "arrU3");
+ Assert.assertEquals(arrayWithUnion.get(3), 123L);
+
+ Assert.assertTrue(record.get("nullArrayFieldWithUnion") instanceof GenericArray);
+ GenericArray nullArrayWithUnion = (GenericArray) record.get("nullArrayFieldWithUnion");
+ Assert.assertEquals(nullArrayWithUnion.size(), 1);
+ Assert.assertNull(nullArrayWithUnion.get(0));
+
+ Assert.assertTrue(record.get("arrayFieldWithUnion2") instanceof GenericArray);
+ GenericArray arrayWithUnion2 = (GenericArray) record.get("arrayFieldWithUnion2");
+ Assert.assertEquals(arrayWithUnion2.size(), 3);
+ Assert.assertEquals(arrayWithUnion2.get(0).toString(), "arrU1");
+ Assert.assertNull(arrayWithUnion2.get(1));
+ Assert.assertEquals(arrayWithUnion2.get(2).toString(), "arrU3"); }
+}
\ No newline at end of file
diff --git a/gobblin-core/src/test/resources/converter/jsonToAvroRecord.json b/gobblin-core/src/test/resources/converter/jsonToAvroRecord.json
index bbdd232..5af9fd6 100644
--- a/gobblin-core/src/test/resources/converter/jsonToAvroRecord.json
+++ b/gobblin-core/src/test/resources/converter/jsonToAvroRecord.json
@@ -1,4 +1,8 @@
-{"nullableField": null,
+{
+ "arrayFieldWithUnion2": ["arrU1", null, "arrU3"],
+ "arrayFieldWithUnion": ["arrU1", "arrU2", "arrU3", 123],
+ "nullArrayFieldWithUnion": [null],
+ "nullableField": null,
"longField": 1234,
"arrayField": ["arr1", "arr2", "arr3"],
"mapField": {
@@ -22,4 +26,4 @@
}
],
"enumField": "ENUM2"
-}
+}
\ No newline at end of file
diff --git a/gobblin-core/src/test/resources/converter/jsonToAvroSchema.avsc b/gobblin-core/src/test/resources/converter/jsonToAvroSchema.avsc
index 2502f33..dfde0a8 100644
--- a/gobblin-core/src/test/resources/converter/jsonToAvroSchema.avsc
+++ b/gobblin-core/src/test/resources/converter/jsonToAvroSchema.avsc
@@ -23,6 +23,27 @@
}
},
{
+ "name": "arrayFieldWithUnion",
+ "type": {
+ "type": "array",
+ "items": ["null","string","long"]
+ }
+ },
+ {
+ "name": "arrayFieldWithUnion2",
+ "type": {
+ "type": "array",
+ "items": ["string","null"]
+ }
+ },
+ {
+ "name": "nullArrayFieldWithUnion",
+ "type": {
+ "type": "array",
+ "items": ["null","string"]
+ }
+ },
+ {
"name": "mapField",
"type": {
"type": "map",
@@ -114,4 +135,4 @@
]
}
]
-}
+}
\ No newline at end of file