You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/11/08 21:59:01 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-933] add support for array of unions in json schema_new

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4efc5fb  [GOBBLIN-933] add support for array of unions in json schema_new
4efc5fb is described below

commit 4efc5fb6d8f16b56e8fc1858c8093a150353a348
Author: Zihan Li <zi...@zihli-mn1.linkedin.biz>
AuthorDate: Fri Nov 8 13:58:53 2019 -0800

    [GOBBLIN-933] add support for array of unions in json schema_new
    
    remove assumption that we only support 2 types in
    union
    
    try to check type first
    
    minor change for code format
    
    address typos
    
    Closes #2800 from ZihanLi58/avrounionschema
---
 ...JsonElementConversionWithAvroSchemaFactory.java | 126 ++++++++++++++++++++-
 .../avro/JsonRecordAvroSchemaToAvroConverter.java  |   2 +-
 .../JsonRecordAvroSchemaToAvroConverterTest.java   |  23 +++-
 .../test/resources/converter/jsonToAvroRecord.json |   8 +-
 .../test/resources/converter/jsonToAvroSchema.avsc |  23 +++-
 5 files changed, 171 insertions(+), 11 deletions(-)

diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java
index f9627a5..1990514 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.converter.avro;
 
+import com.sun.javafx.binding.StringFormatter;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -43,7 +44,7 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve
    * Use to create a converter for a single field from a schema.
    */
 
-  public static JsonElementConverter getConvertor(String fieldName, String fieldType, Schema schemaNode,
+  public static JsonElementConverter getConverter(String fieldName, String fieldType, Schema schemaNode,
       WorkUnitState state, boolean nullable, List<String> ignoreFields) throws UnsupportedDateTypeException {
 
     Type type;
@@ -70,6 +71,10 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve
         return new JsonElementConversionWithAvroSchemaFactory.RecordConverter(fieldName, nullable, type.toString(),
             schemaNode, state, ignoreFields);
 
+      case UNION:
+        return new JsonElementConversionWithAvroSchemaFactory.UnionConverter(fieldName, nullable, type.toString(),
+            schemaNode, state, ignoreFields);
+
       default:
         return JsonElementConversionFactory.getConvertor(fieldName, fieldType, new JsonObject(), state, nullable);
     }
@@ -81,7 +86,7 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve
         List<String> ignoreFields) throws UnsupportedDateTypeException {
       super(fieldName, nullable, sourceType);
       super.setElementConverter(
-          getConvertor(fieldName, schemaNode.getElementType().getType().getName(), schemaNode.getElementType(), state,
+          getConverter(fieldName, schemaNode.getElementType().getType().getName(), schemaNode.getElementType(), state,
               isNullable(), ignoreFields));
     }
 
@@ -115,7 +120,7 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve
         List<String> ignoreFields) throws UnsupportedDateTypeException {
       super(fieldName, nullable, sourceType);
       super.setElementConverter(
-          getConvertor(fieldName, schemaNode.getValueType().getType().getName(), schemaNode.getValueType(), state,
+          getConverter(fieldName, schemaNode.getValueType().getType().getName(), schemaNode.getValueType(), state,
               isNullable(), ignoreFields));
     }
 
@@ -183,7 +188,7 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve
     WorkUnitState state;
 
     public RecordConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode,
-        WorkUnitState state, List<String> ignoreFields) throws UnsupportedDateTypeException {
+        WorkUnitState state, List<String> ignoreFields) {
       super(fieldName, nullable, sourceType);
       this.schema = schemaNode;
       this.state = state;
@@ -210,4 +215,115 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve
       return this.schema;
     }
   }
-}
+
+  /**
+   * A converter to convert Union type to avro
+   * Here it will try all the possible converters for one type, for example, to convert an int value, it will try all Number converters
+   * until meet the first workable one.
+   * So a known bug here is there's no guarantee on preserving precision from Json to Avro type as the exact type information is clear from JsonElement
+   * We're doing this since there is no way to determine what exact type it is for a JsonElement
+   */
+  public static class UnionConverter extends ComplexConverter {
+    private final List<Schema> schemas;
+    private final List<JsonElementConverter> converters;
+    private final Schema schemaNode;
+
+    public UnionConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode,
+        WorkUnitState state, List<String> ignoreFields) throws UnsupportedDateTypeException {
+      super(fieldName, nullable, sourceType);
+      this.schemas = schemaNode.getTypes();
+      converters = new ArrayList<>();
+      for(Schema schema: schemas) {
+        converters.add(getConverter(fieldName, schema.getType().getName(), schemaNode, state, isNullable(), ignoreFields));
+      }
+      this.schemaNode = schemaNode;
+    }
+
+    @Override
+    Object convertField(JsonElement value) {
+       for(JsonElementConverter converter: converters)
+       {
+         try {
+           switch (converter.getTargetType()) {
+             case STRING: {
+               if (value.isJsonPrimitive() && value.getAsJsonPrimitive().isString()) {
+                 return converter.convert(value);
+               }
+               break;
+             }
+             case FIXED:
+             case BYTES:
+             case INT:
+             case LONG:
+             case FLOAT:
+             case DOUBLE: {
+               if (value.isJsonPrimitive() && value.getAsJsonPrimitive().isNumber()) {
+                 return converter.convert(value);
+               }
+               break;
+             }
+             case BOOLEAN:{
+               if (value.isJsonPrimitive() && value.getAsJsonPrimitive().isBoolean()) {
+                 return converter.convert(value);
+               }
+               break;
+             }
+             case ARRAY:{
+               if (value.isJsonArray()) {
+                 return converter.convert(value);
+               }
+               break;
+             }
+             case MAP:
+             case ENUM:
+             case RECORD:{
+               if (value.isJsonObject()) {
+                 return converter.convert(value);
+               }
+               break;
+             }
+             case NULL:{
+               if(value.isJsonNull()) {
+                 return converter.convert(value);
+               }
+               break;
+             }
+             case UNION:
+               return new UnsupportedDateTypeException("does not support union type in union");
+             default:
+               return converter.convert(value);
+           }
+         } catch (Exception e){}
+       }
+       throw new RuntimeException(StringFormatter.format("Cannot convert %s to avro using schema %s", value.getAsString(), schemaNode.toString()).toString());
+    }
+
+    @Override
+    public Schema.Type getTargetType() {
+      return schema().getType();
+    }
+
+    @Override
+    public Schema schema() {
+      if(schemas.size() == 2 && isNullable()) {
+        if(schemas.get(0).getType() == Schema.Type.NULL) {
+          return schemas.get(1);
+        } else {
+          return schemas.get(0);
+        }
+      }
+      return Schema.createUnion(schemas);
+    }
+
+    @Override
+    public boolean isNullable() {
+      boolean isNullable = false;
+      for(Schema schema: schemas) {
+        if(schema.getType() == Schema.Type.NULL) {
+          isNullable = true;
+        }
+      }
+      return isNullable;
+    }
+  }
+}
\ No newline at end of file
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
index e3e2a0d..7f58991 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
@@ -120,7 +120,7 @@ public class JsonRecordAvroSchemaToAvroConverter<SI> extends ToAvroConverterBase
         }
       } else {
         try {
-          converter = JsonElementConversionWithAvroSchemaFactory.getConvertor(field.name(), type.getName(), schema,
+          converter = JsonElementConversionWithAvroSchemaFactory.getConverter(field.name(), type.getName(), schema,
               workUnit, nullable, ignoreFields);
           avroRecord.put(field.name(), converter.convert(inputRecord.get(field.name())));
         } catch (Exception e) {
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java
index f59de8c..c46c0d7 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java
@@ -85,5 +85,24 @@ public class JsonRecordAvroSchemaToAvroConverterTest {
     Assert.assertEquals(recordInArray.get("field1").toString(), "test1");
 
     Assert.assertEquals((record.get("enumField")).toString(), "ENUM2");
-  }
-}
+
+    Assert.assertTrue(record.get("arrayFieldWithUnion") instanceof GenericArray);
+    GenericArray arrayWithUnion =  (GenericArray) record.get("arrayFieldWithUnion");
+    Assert.assertEquals(arrayWithUnion.size(), 4);
+    Assert.assertEquals(arrayWithUnion.get(0).toString(), "arrU1");
+    Assert.assertEquals(arrayWithUnion.get(1).toString(), "arrU2");
+    Assert.assertEquals(arrayWithUnion.get(2).toString(), "arrU3");
+    Assert.assertEquals(arrayWithUnion.get(3), 123L);
+
+    Assert.assertTrue(record.get("nullArrayFieldWithUnion") instanceof GenericArray);
+    GenericArray nullArrayWithUnion =  (GenericArray) record.get("nullArrayFieldWithUnion");
+    Assert.assertEquals(nullArrayWithUnion.size(), 1);
+    Assert.assertNull(nullArrayWithUnion.get(0));
+
+    Assert.assertTrue(record.get("arrayFieldWithUnion2") instanceof GenericArray);
+    GenericArray arrayWithUnion2 =  (GenericArray) record.get("arrayFieldWithUnion2");
+    Assert.assertEquals(arrayWithUnion2.size(), 3);
+    Assert.assertEquals(arrayWithUnion2.get(0).toString(), "arrU1");
+    Assert.assertNull(arrayWithUnion2.get(1));
+    Assert.assertEquals(arrayWithUnion2.get(2).toString(), "arrU3");  }
+}
\ No newline at end of file
diff --git a/gobblin-core/src/test/resources/converter/jsonToAvroRecord.json b/gobblin-core/src/test/resources/converter/jsonToAvroRecord.json
index bbdd232..5af9fd6 100644
--- a/gobblin-core/src/test/resources/converter/jsonToAvroRecord.json
+++ b/gobblin-core/src/test/resources/converter/jsonToAvroRecord.json
@@ -1,4 +1,8 @@
-{"nullableField": null,
+{
+  "arrayFieldWithUnion2": ["arrU1", null, "arrU3"],
+  "arrayFieldWithUnion": ["arrU1", "arrU2", "arrU3", 123],
+  "nullArrayFieldWithUnion": [null],
+  "nullableField": null,
   "longField": 1234,
   "arrayField": ["arr1", "arr2", "arr3"],
   "mapField": {
@@ -22,4 +26,4 @@
     }
   ],
   "enumField": "ENUM2"
-}
+}
\ No newline at end of file
diff --git a/gobblin-core/src/test/resources/converter/jsonToAvroSchema.avsc b/gobblin-core/src/test/resources/converter/jsonToAvroSchema.avsc
index 2502f33..dfde0a8 100644
--- a/gobblin-core/src/test/resources/converter/jsonToAvroSchema.avsc
+++ b/gobblin-core/src/test/resources/converter/jsonToAvroSchema.avsc
@@ -23,6 +23,27 @@
       }
     },
     {
+      "name": "arrayFieldWithUnion",
+      "type": {
+        "type": "array",
+        "items": ["null","string","long"]
+      }
+    },
+    {
+      "name": "arrayFieldWithUnion2",
+      "type": {
+        "type": "array",
+        "items": ["string","null"]
+      }
+    },
+    {
+      "name": "nullArrayFieldWithUnion",
+      "type": {
+        "type": "array",
+        "items": ["null","string"]
+      }
+    },
+    {
       "name": "mapField",
       "type": {
         "type": "map",
@@ -114,4 +135,4 @@
       ]
     }
   ]
-}
+}
\ No newline at end of file