You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/04/17 18:10:02 UTC
incubator-gobblin git commit: [GOBBLIN-467] Fix json to avro
conversion for records within arrays
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 1c03ea22f -> e386cf67e
[GOBBLIN-467] Fix json to avro conversion for records within arrays
Closes #2339 from jack-moseley/jsontoavro-fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/e386cf67
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/e386cf67
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/e386cf67
Branch: refs/heads/master
Commit: e386cf67e337931947152fbac526c6729ffc7d0f
Parents: 1c03ea2
Author: Jack Moseley <jm...@linkedin.com>
Authored: Tue Apr 17 11:09:57 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Apr 17 11:09:57 2018 -0700
----------------------------------------------------------------------
...nElementConversionWithAvroSchemaFactory.java | 59 ++++++++++++++++----
.../JsonRecordAvroSchemaToAvroConverter.java | 13 +++--
...JsonRecordAvroSchemaToAvroConverterTest.java | 9 ++-
.../resources/converter/jsonToAvroRecord.json | 13 ++++-
.../resources/converter/jsonToAvroSchema.avsc | 52 +++++++++++++++++
5 files changed, 127 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e386cf67/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java
index 1da8d31..cc51874 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java
@@ -25,6 +25,7 @@ import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.DataConversionException;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
@@ -43,8 +44,7 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve
*/
public static JsonElementConverter getConvertor(String fieldName, String fieldType, Schema schemaNode,
- WorkUnitState state, boolean nullable)
- throws UnsupportedDateTypeException {
+ WorkUnitState state, boolean nullable, List<String> ignoreFields) throws UnsupportedDateTypeException {
Type type;
try {
@@ -56,16 +56,20 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve
switch (type) {
case ARRAY:
return new JsonElementConversionWithAvroSchemaFactory.ArrayConverter(fieldName, nullable, type.toString(),
- schemaNode, state);
+ schemaNode, state, ignoreFields);
case MAP:
return new JsonElementConversionWithAvroSchemaFactory.MapConverter(fieldName, nullable, type.toString(),
- schemaNode, state);
+ schemaNode, state, ignoreFields);
case ENUM:
return new JsonElementConversionWithAvroSchemaFactory.EnumConverter(fieldName, nullable, type.toString(),
schemaNode);
+ case RECORD:
+ return new JsonElementConversionWithAvroSchemaFactory.RecordConverter(fieldName, nullable, type.toString(),
+ schemaNode, state, ignoreFields);
+
default:
return JsonElementConversionFactory.getConvertor(fieldName, fieldType, new JsonObject(), state, nullable);
}
@@ -73,12 +77,12 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve
public static class ArrayConverter extends ComplexConverter {
- public ArrayConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode, WorkUnitState state)
- throws UnsupportedDateTypeException {
+ public ArrayConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode, WorkUnitState state,
+ List<String> ignoreFields) throws UnsupportedDateTypeException {
super(fieldName, nullable, sourceType);
super.setElementConverter(
getConvertor(fieldName, schemaNode.getElementType().getType().getName(), schemaNode.getElementType(), state,
- isNullable()));
+ isNullable(), ignoreFields));
}
@Override
@@ -107,12 +111,12 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve
public static class MapConverter extends ComplexConverter {
- public MapConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode, WorkUnitState state)
- throws UnsupportedDateTypeException {
+ public MapConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode, WorkUnitState state,
+ List<String> ignoreFields) throws UnsupportedDateTypeException {
super(fieldName, nullable, sourceType);
super.setElementConverter(
getConvertor(fieldName, schemaNode.getValueType().getType().getName(), schemaNode.getValueType(), state,
- isNullable()));
+ isNullable(), ignoreFields));
}
@Override
@@ -169,4 +173,39 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve
return this.schema;
}
}
+
+ public static class RecordConverter extends ComplexConverter {
+
+ List<String> ignoreFields;
+ Schema schema;
+ WorkUnitState state;
+
+ public RecordConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode,
+ WorkUnitState state, List<String> ignoreFields) throws UnsupportedDateTypeException {
+ super(fieldName, nullable, sourceType);
+ this.schema = schemaNode;
+ this.state = state;
+ this.ignoreFields = ignoreFields;
+ }
+
+ @Override
+ Object convertField(JsonElement value) {
+ try {
+ return JsonRecordAvroSchemaToAvroConverter.convertNestedRecord(this.schema, value.getAsJsonObject(), this.state,
+ this.ignoreFields);
+ } catch (DataConversionException e) {
+ throw new RuntimeException("Failed to convert nested record", e);
+ }
+ }
+
+ @Override
+ public Schema.Type getTargetType() {
+ return Schema.Type.RECORD;
+ }
+
+ @Override
+ public Schema schema() {
+ return this.schema;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e386cf67/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
index 8e25975..e3e2a0d 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
@@ -68,16 +68,17 @@ public class JsonRecordAvroSchemaToAvroConverter<SI> extends ToAvroConverterBase
@Override
public Iterable<GenericRecord> convertRecord(Schema outputSchema, JsonObject inputRecord, WorkUnitState workUnit)
throws DataConversionException {
- GenericRecord avroRecord = convertNestedRecord(outputSchema, inputRecord, workUnit);
+ GenericRecord avroRecord = convertNestedRecord(outputSchema, inputRecord, workUnit, this.ignoreFields);
return new SingleRecordIterable<>(avroRecord);
}
- private GenericRecord convertNestedRecord(Schema outputSchema, JsonObject inputRecord, WorkUnitState workUnit) throws DataConversionException {
+ public static GenericRecord convertNestedRecord(Schema outputSchema, JsonObject inputRecord, WorkUnitState workUnit,
+ List<String> ignoreFields) throws DataConversionException {
GenericRecord avroRecord = new GenericData.Record(outputSchema);
JsonElementConversionWithAvroSchemaFactory.JsonElementConverter converter;
for (Schema.Field field : outputSchema.getFields()) {
- if (this.ignoreFields.contains(field.name())) {
+ if (ignoreFields.contains(field.name())) {
continue;
}
@@ -115,15 +116,15 @@ public class JsonRecordAvroSchemaToAvroConverter<SI> extends ToAvroConverterBase
avroRecord.put(field.name(), null);
} else {
avroRecord.put(field.name(),
- convertNestedRecord(schema, inputRecord.get(field.name()).getAsJsonObject(), workUnit));
+ convertNestedRecord(schema, inputRecord.get(field.name()).getAsJsonObject(), workUnit, ignoreFields));
}
} else {
try {
converter = JsonElementConversionWithAvroSchemaFactory.getConvertor(field.name(), type.getName(), schema,
- workUnit, nullable);
+ workUnit, nullable, ignoreFields);
avroRecord.put(field.name(), converter.convert(inputRecord.get(field.name())));
} catch (Exception e) {
- throw new DataConversionException("Could not convert field " + field.name());
+ throw new DataConversionException("Could not convert field " + field.name(), e);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e386cf67/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java
index 4cf6898..ed2dbb6 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java
@@ -76,7 +76,12 @@ public class JsonRecordAvroSchemaToAvroConverterTest {
Assert.assertTrue(record.get("mapField") instanceof Map);
- Assert.assertEquals(((GenericRecord)record.get("nestedRecords")).get("nestedField").toString(), "test");
- Assert.assertEquals(((GenericRecord)record.get("nestedRecords")).get("nestedField2").toString(), "test2");
+ Assert.assertEquals(((GenericRecord) record.get("nestedRecords")).get("nestedField").toString(), "test");
+ Assert.assertEquals(((GenericRecord) record.get("nestedRecords")).get("nestedField2").toString(), "test2");
+
+ Assert.assertTrue(((GenericArray) record.get("emptyArrayOfRecords")).isEmpty());
+
+ GenericRecord recordInArray = (GenericRecord) (((GenericArray) record.get("arrayOfRecords")).get(0));
+ Assert.assertEquals(recordInArray.get("field1").toString(), "test1");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e386cf67/gobblin-core/src/test/resources/converter/jsonToAvroRecord.json
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/converter/jsonToAvroRecord.json b/gobblin-core/src/test/resources/converter/jsonToAvroRecord.json
index 65bfce6..7ab2c50 100644
--- a/gobblin-core/src/test/resources/converter/jsonToAvroRecord.json
+++ b/gobblin-core/src/test/resources/converter/jsonToAvroRecord.json
@@ -9,5 +9,16 @@
"nestedRecords": {
"nestedField": "test",
"nestedField2": "test2"
- }
+ },
+ "emptyArrayOfRecords": [],
+ "arrayOfRecords": [
+ {
+ "field1": "test1",
+ "field2": "test2"
+ },
+ {
+ "field1": "test3",
+ "field2": "test4"
+ }
+ ]
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e386cf67/gobblin-core/src/test/resources/converter/jsonToAvroSchema.avsc
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/converter/jsonToAvroSchema.avsc b/gobblin-core/src/test/resources/converter/jsonToAvroSchema.avsc
index effc91c..0198eb8 100644
--- a/gobblin-core/src/test/resources/converter/jsonToAvroSchema.avsc
+++ b/gobblin-core/src/test/resources/converter/jsonToAvroSchema.avsc
@@ -45,6 +45,58 @@
}
]
}
+ },
+ {
+ "name": "emptyArrayOfRecords",
+ "type": {
+ "type": "array",
+ "items": {
+ "type": "record",
+ "name": "recordInEmptyArray",
+ "fields": [
+ {
+ "name": "field1",
+ "type": [
+ "null",
+ "string"
+ ]
+ },
+ {
+ "name": "field2",
+ "type": [
+ "null",
+ "string"
+ ]
+ }
+ ]
+ }
+ }
+ },
+ {
+ "name": "arrayOfRecords",
+ "type": {
+ "type": "array",
+ "items": {
+ "type": "record",
+ "name": "recordInArray",
+ "fields": [
+ {
+ "name": "field1",
+ "type": [
+ "null",
+ "string"
+ ]
+ },
+ {
+ "name": "field2",
+ "type": [
+ "null",
+ "string"
+ ]
+ }
+ ]
+ }
+ }
}
]
}