You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/04/17 18:10:02 UTC

incubator-gobblin git commit: [GOBBLIN-467] Fix json to avro conversion for records within arrays

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 1c03ea22f -> e386cf67e


[GOBBLIN-467] Fix json to avro conversion for records within arrays

Closes #2339 from jack-moseley/jsontoavro-fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/e386cf67
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/e386cf67
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/e386cf67

Branch: refs/heads/master
Commit: e386cf67e337931947152fbac526c6729ffc7d0f
Parents: 1c03ea2
Author: Jack Moseley <jm...@linkedin.com>
Authored: Tue Apr 17 11:09:57 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Apr 17 11:09:57 2018 -0700

----------------------------------------------------------------------
 ...nElementConversionWithAvroSchemaFactory.java | 59 ++++++++++++++++----
 .../JsonRecordAvroSchemaToAvroConverter.java    | 13 +++--
 ...JsonRecordAvroSchemaToAvroConverterTest.java |  9 ++-
 .../resources/converter/jsonToAvroRecord.json   | 13 ++++-
 .../resources/converter/jsonToAvroSchema.avsc   | 52 +++++++++++++++++
 5 files changed, 127 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e386cf67/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java
index 1da8d31..cc51874 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.DataConversionException;
 
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
@@ -43,8 +44,7 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve
    */
 
   public static JsonElementConverter getConvertor(String fieldName, String fieldType, Schema schemaNode,
-      WorkUnitState state, boolean nullable)
-      throws UnsupportedDateTypeException {
+      WorkUnitState state, boolean nullable, List<String> ignoreFields) throws UnsupportedDateTypeException {
 
     Type type;
     try {
@@ -56,16 +56,20 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve
     switch (type) {
       case ARRAY:
         return new JsonElementConversionWithAvroSchemaFactory.ArrayConverter(fieldName, nullable, type.toString(),
-            schemaNode, state);
+            schemaNode, state, ignoreFields);
 
       case MAP:
         return new JsonElementConversionWithAvroSchemaFactory.MapConverter(fieldName, nullable, type.toString(),
-            schemaNode, state);
+            schemaNode, state, ignoreFields);
 
       case ENUM:
         return new JsonElementConversionWithAvroSchemaFactory.EnumConverter(fieldName, nullable, type.toString(),
             schemaNode);
 
+      case RECORD:
+        return new JsonElementConversionWithAvroSchemaFactory.RecordConverter(fieldName, nullable, type.toString(),
+            schemaNode, state, ignoreFields);
+
       default:
         return JsonElementConversionFactory.getConvertor(fieldName, fieldType, new JsonObject(), state, nullable);
     }
@@ -73,12 +77,12 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve
 
   public static class ArrayConverter extends ComplexConverter {
 
-    public ArrayConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode, WorkUnitState state)
-        throws UnsupportedDateTypeException {
+    public ArrayConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode, WorkUnitState state,
+        List<String> ignoreFields) throws UnsupportedDateTypeException {
       super(fieldName, nullable, sourceType);
       super.setElementConverter(
           getConvertor(fieldName, schemaNode.getElementType().getType().getName(), schemaNode.getElementType(), state,
-              isNullable()));
+              isNullable(), ignoreFields));
     }
 
     @Override
@@ -107,12 +111,12 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve
 
   public static class MapConverter extends ComplexConverter {
 
-    public MapConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode, WorkUnitState state)
-        throws UnsupportedDateTypeException {
+    public MapConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode, WorkUnitState state,
+        List<String> ignoreFields) throws UnsupportedDateTypeException {
       super(fieldName, nullable, sourceType);
       super.setElementConverter(
           getConvertor(fieldName, schemaNode.getValueType().getType().getName(), schemaNode.getValueType(), state,
-              isNullable()));
+              isNullable(), ignoreFields));
     }
 
     @Override
@@ -169,4 +173,39 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve
       return this.schema;
     }
   }
+
+  public static class RecordConverter extends ComplexConverter {
+
+    List<String> ignoreFields;
+    Schema schema;
+    WorkUnitState state;
+
+    public RecordConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode,
+        WorkUnitState state, List<String> ignoreFields) throws UnsupportedDateTypeException {
+      super(fieldName, nullable, sourceType);
+      this.schema = schemaNode;
+      this.state = state;
+      this.ignoreFields = ignoreFields;
+    }
+
+    @Override
+    Object convertField(JsonElement value) {
+      try {
+        return JsonRecordAvroSchemaToAvroConverter.convertNestedRecord(this.schema, value.getAsJsonObject(), this.state,
+            this.ignoreFields);
+      } catch (DataConversionException e) {
+        throw new RuntimeException("Failed to convert nested record", e);
+      }
+    }
+
+    @Override
+    public Schema.Type getTargetType() {
+      return Schema.Type.RECORD;
+    }
+
+    @Override
+    public Schema schema() {
+      return this.schema;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e386cf67/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
index 8e25975..e3e2a0d 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
@@ -68,16 +68,17 @@ public class JsonRecordAvroSchemaToAvroConverter<SI> extends ToAvroConverterBase
   @Override
   public Iterable<GenericRecord> convertRecord(Schema outputSchema, JsonObject inputRecord, WorkUnitState workUnit)
       throws DataConversionException {
-    GenericRecord avroRecord = convertNestedRecord(outputSchema, inputRecord, workUnit);
+    GenericRecord avroRecord = convertNestedRecord(outputSchema, inputRecord, workUnit, this.ignoreFields);
     return new SingleRecordIterable<>(avroRecord);
   }
 
-  private GenericRecord convertNestedRecord(Schema outputSchema, JsonObject inputRecord, WorkUnitState workUnit) throws DataConversionException {
+  public static GenericRecord convertNestedRecord(Schema outputSchema, JsonObject inputRecord, WorkUnitState workUnit,
+      List<String> ignoreFields) throws DataConversionException {
     GenericRecord avroRecord = new GenericData.Record(outputSchema);
     JsonElementConversionWithAvroSchemaFactory.JsonElementConverter converter;
     for (Schema.Field field : outputSchema.getFields()) {
 
-      if (this.ignoreFields.contains(field.name())) {
+      if (ignoreFields.contains(field.name())) {
         continue;
       }
 
@@ -115,15 +116,15 @@ public class JsonRecordAvroSchemaToAvroConverter<SI> extends ToAvroConverterBase
           avroRecord.put(field.name(), null);
         } else {
           avroRecord.put(field.name(),
-              convertNestedRecord(schema, inputRecord.get(field.name()).getAsJsonObject(), workUnit));
+              convertNestedRecord(schema, inputRecord.get(field.name()).getAsJsonObject(), workUnit, ignoreFields));
         }
       } else {
         try {
           converter = JsonElementConversionWithAvroSchemaFactory.getConvertor(field.name(), type.getName(), schema,
-              workUnit, nullable);
+              workUnit, nullable, ignoreFields);
           avroRecord.put(field.name(), converter.convert(inputRecord.get(field.name())));
         } catch (Exception e) {
-          throw new DataConversionException("Could not convert field " + field.name());
+          throw new DataConversionException("Could not convert field " + field.name(), e);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e386cf67/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java
index 4cf6898..ed2dbb6 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java
@@ -76,7 +76,12 @@ public class JsonRecordAvroSchemaToAvroConverterTest {
 
     Assert.assertTrue(record.get("mapField") instanceof Map);
 
-    Assert.assertEquals(((GenericRecord)record.get("nestedRecords")).get("nestedField").toString(), "test");
-    Assert.assertEquals(((GenericRecord)record.get("nestedRecords")).get("nestedField2").toString(), "test2");
+    Assert.assertEquals(((GenericRecord) record.get("nestedRecords")).get("nestedField").toString(), "test");
+    Assert.assertEquals(((GenericRecord) record.get("nestedRecords")).get("nestedField2").toString(), "test2");
+
+    Assert.assertTrue(((GenericArray) record.get("emptyArrayOfRecords")).isEmpty());
+
+    GenericRecord recordInArray = (GenericRecord) (((GenericArray) record.get("arrayOfRecords")).get(0));
+    Assert.assertEquals(recordInArray.get("field1").toString(), "test1");
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e386cf67/gobblin-core/src/test/resources/converter/jsonToAvroRecord.json
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/converter/jsonToAvroRecord.json b/gobblin-core/src/test/resources/converter/jsonToAvroRecord.json
index 65bfce6..7ab2c50 100644
--- a/gobblin-core/src/test/resources/converter/jsonToAvroRecord.json
+++ b/gobblin-core/src/test/resources/converter/jsonToAvroRecord.json
@@ -9,5 +9,16 @@
   "nestedRecords": {
     "nestedField": "test",
     "nestedField2": "test2"
-  }
+  },
+  "emptyArrayOfRecords": [],
+  "arrayOfRecords": [
+    {
+      "field1": "test1",
+      "field2": "test2"
+    },
+    {
+      "field1": "test3",
+      "field2": "test4"
+    }
+  ]
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e386cf67/gobblin-core/src/test/resources/converter/jsonToAvroSchema.avsc
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/converter/jsonToAvroSchema.avsc b/gobblin-core/src/test/resources/converter/jsonToAvroSchema.avsc
index effc91c..0198eb8 100644
--- a/gobblin-core/src/test/resources/converter/jsonToAvroSchema.avsc
+++ b/gobblin-core/src/test/resources/converter/jsonToAvroSchema.avsc
@@ -45,6 +45,58 @@
           }
         ]
       }
+    },
+    {
+      "name": "emptyArrayOfRecords",
+      "type": {
+        "type": "array",
+        "items": {
+          "type": "record",
+          "name": "recordInEmptyArray",
+          "fields": [
+            {
+              "name": "field1",
+              "type": [
+                "null",
+                "string"
+              ]
+            },
+            {
+              "name": "field2",
+              "type": [
+                "null",
+                "string"
+              ]
+            }
+          ]
+        }
+      }
+    },
+    {
+      "name": "arrayOfRecords",
+      "type": {
+        "type": "array",
+        "items": {
+          "type": "record",
+          "name": "recordInArray",
+          "fields": [
+            {
+              "name": "field1",
+              "type": [
+                "null",
+                "string"
+              ]
+            },
+            {
+              "name": "field2",
+              "type": [
+                "null",
+                "string"
+              ]
+            }
+          ]
+        }
+      }
     }
   ]
 }