You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2017/10/18 05:34:06 UTC
[3/3] incubator-gobblin git commit: [GOBBLIN-226] Nested schema
support in JsonStringToJsonIntermediateConverter and
JsonIntermediateToAvroConverter
[GOBBLIN-226] Nested schema support in JsonStringToJsonIntermediateConverter and JsonIntermediateToAvroConverter
Closes #2080 from tilakpatidar/nested_schema
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/6dd36a50
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/6dd36a50
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/6dd36a50
Branch: refs/heads/master
Commit: 6dd36a506d574a261bf678aaf071d89e597044e8
Parents: f058211
Author: tilakpatidar <ti...@gmail.com>
Authored: Wed Oct 18 11:03:52 2017 +0530
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Wed Oct 18 11:03:52 2017 +0530
----------------------------------------------------------------------
.../avro/JsonElementConversionFactory.java | 414 +++++++--
...nElementConversionWithAvroSchemaFactory.java | 38 +-
.../avro/JsonIntermediateToAvroConverter.java | 98 +-
.../JsonRecordAvroSchemaToAvroConverter.java | 3 +
.../gobblin/converter/json/JsonSchema.java | 297 ++++++
.../JsonStringToJsonIntermediateConverter.java | 211 ++++-
.../avro/JsonElementConversionFactoryTest.java | 397 ++++++++
.../JsonIntermediateToAvroConverterTest.java | 73 +-
...onStringToJsonIntermediateConverterTest.java | 118 +++
.../JsonElementConversionFactoryTest.json | 856 +++++++++++++++++
.../JsonStringToJsonIntermediateConverter.json | 919 +++++++++++++++++++
.../src/test/resources/converter/complex1.json | 527 +++++++++++
.../src/test/resources/converter/complex2.json | 186 ++++
.../src/test/resources/converter/complex3.json | 548 +++++++++++
.../src/test/resources/converter/record.json | 23 -
.../src/test/resources/converter/record3.json | 27 +
.../src/test/resources/converter/schema.json | 724 ++++++++-------
.../Configuration-Properties-Glossary.md | 33 +-
18 files changed, 4880 insertions(+), 612 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6dd36a50/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionFactory.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionFactory.java
index aa015a1..07e1fc5 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionFactory.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionFactory.java
@@ -28,19 +28,29 @@ import java.util.TimeZone;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.EmptyIterable;
+import org.apache.gobblin.converter.json.JsonSchema;
+import org.codehaus.jackson.node.JsonNodeFactory;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
-
-import sun.util.calendar.ZoneInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.WorkUnitState;
+import lombok.extern.java.Log;
+import sun.util.calendar.ZoneInfo;
+
+import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type.*;
+import static org.apache.gobblin.converter.json.JsonSchema.*;
/**
@@ -67,81 +77,121 @@ public class JsonElementConversionFactory {
BOOLEAN,
ARRAY,
MAP,
- ENUM
+ ENUM,
+ RECORD,
+ NULL,
+ UNION;
+
+ private static List<Type> primitiveTypes =
+ Arrays.asList(NULL, BOOLEAN, INT, LONG, FLOAT, DOUBLE, BYTES, STRING, ENUM, FIXED);
+
+ public static boolean isPrimitive(Type type) {
+ return primitiveTypes.contains(type);
+ }
}
/**
* Use to create a converter for a single field from a schema.
- *
- * @param fieldName
- * @param fieldType
- * @param nullable
* @param schemaNode
+ * @param namespace
* @param state
- * @return
+ * @return {@link JsonElementConverter}
* @throws UnsupportedDateTypeException
*/
- public static JsonElementConverter getConvertor(String fieldName, String fieldType, JsonObject schemaNode,
- WorkUnitState state, boolean nullable) throws UnsupportedDateTypeException {
+ public static JsonElementConverter getConvertor(JsonSchema schemaNode, String namespace, WorkUnitState state)
+ throws UnsupportedDateTypeException {
- Type type;
- try {
- type = Type.valueOf(fieldType.toUpperCase());
- } catch (IllegalArgumentException e) {
- throw new UnsupportedDateTypeException(fieldType + " is unsupported");
- }
+ Type type = schemaNode.getType();
DateTimeZone timeZone = getTimeZone(state.getProp(ConfigurationKeys.CONVERTER_AVRO_DATE_TIMEZONE, "UTC"));
switch (type) {
case DATE:
- return new DateConverter(fieldName, nullable, type.toString(),
+ return new DateConverter(schemaNode,
state.getProp(ConfigurationKeys.CONVERTER_AVRO_DATE_FORMAT, "yyyy-MM-dd HH:mm:ss"), timeZone, state);
case TIMESTAMP:
- return new DateConverter(fieldName, nullable, type.toString(),
+ return new DateConverter(schemaNode,
state.getProp(ConfigurationKeys.CONVERTER_AVRO_TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss"), timeZone, state);
case TIME:
- return new DateConverter(fieldName, nullable, type.toString(),
- state.getProp(ConfigurationKeys.CONVERTER_AVRO_TIME_FORMAT, "HH:mm:ss"), timeZone, state);
+ return new DateConverter(schemaNode, state.getProp(ConfigurationKeys.CONVERTER_AVRO_TIME_FORMAT, "HH:mm:ss"),
+ timeZone, state);
case FIXED:
- throw new UnsupportedDateTypeException(fieldType + " is unsupported");
+ throw new UnsupportedDateTypeException(type.toString() + " is unsupported");
case STRING:
- return new StringConverter(fieldName, nullable, type.toString());
+ return new StringConverter(schemaNode);
case BYTES:
- return new BinaryConverter(fieldName, nullable, type.toString(),
- state.getProp(ConfigurationKeys.CONVERTER_AVRO_BINARY_CHARSET, "UTF8"));
+ return new BinaryConverter(schemaNode, state.getProp(ConfigurationKeys.CONVERTER_AVRO_BINARY_CHARSET, "UTF8"));
case INT:
- return new IntConverter(fieldName, nullable, type.toString());
+ return new IntConverter(schemaNode);
case LONG:
- return new LongConverter(fieldName, nullable, type.toString());
+ return new LongConverter(schemaNode);
case FLOAT:
- return new FloatConverter(fieldName, nullable, type.toString());
+ return new FloatConverter(schemaNode);
case DOUBLE:
- return new DoubleConverter(fieldName, nullable, type.toString());
+ return new DoubleConverter(schemaNode);
case BOOLEAN:
- return new BooleanConverter(fieldName, nullable, type.toString());
+ return new BooleanConverter(schemaNode);
case ARRAY:
- return new ArrayConverter(fieldName, nullable, type.toString(), schemaNode, state);
+ return new ArrayConverter(schemaNode, state);
case MAP:
- return new MapConverter(fieldName, nullable, type.toString(), schemaNode, state);
+ return new MapConverter(schemaNode, state);
case ENUM:
- return new EnumConverter(fieldName, nullable, type.toString(), schemaNode);
+ return new EnumConverter(schemaNode, namespace);
+
+ case RECORD:
+ return new RecordConverter(schemaNode, state, namespace);
+
+ case NULL:
+ return new NullConverter(schemaNode);
+
+ case UNION:
+ return new UnionConverter(schemaNode, state);
default:
- throw new UnsupportedDateTypeException(fieldType + " is unsupported");
+ throw new UnsupportedDateTypeException(type.toString() + " is unsupported");
+ }
+ }
+
+ /**
+ * Backward Compatible form of {@link JsonElementConverter#getConvertor(JsonSchema, String, WorkUnitState)}
+ * @param fieldName
+ * @param fieldType
+ * @param schemaNode
+ * @param state
+ * @param nullable
+ * @return
+ * @throws UnsupportedDateTypeException
+ */
+ public static JsonElementConverter getConvertor(String fieldName, String fieldType, JsonObject schemaNode,
+ WorkUnitState state, boolean nullable)
+ throws UnsupportedDateTypeException {
+ if (!schemaNode.has(COLUMN_NAME_KEY)) {
+ schemaNode.addProperty(COLUMN_NAME_KEY, fieldName);
+ }
+ if (!schemaNode.has(DATA_TYPE_KEY)) {
+ schemaNode.add(DATA_TYPE_KEY, new JsonObject());
+ }
+ JsonObject dataType = schemaNode.get(DATA_TYPE_KEY).getAsJsonObject();
+ if (!dataType.has(TYPE_KEY)) {
+ dataType.addProperty(TYPE_KEY, fieldType);
+ }
+ if (!schemaNode.has(IS_NULLABLE_KEY)) {
+ schemaNode.addProperty(IS_NULLABLE_KEY, nullable);
}
+ JsonSchema schema = new JsonSchema(schemaNode);
+ return getConvertor(schema, null, state);
}
private static DateTimeZone getTimeZone(String id) {
@@ -166,19 +216,17 @@ public class JsonElementConversionFactory {
*
*/
public static abstract class JsonElementConverter {
- private String name;
- private boolean nullable;
- private String sourceType;
+ private final JsonSchema jsonSchema;
+
+ public JsonElementConverter(JsonSchema jsonSchema) {
+ this.jsonSchema = jsonSchema;
+ }
- /**
- *
- * @param fieldName
- * @param nullable
- */
public JsonElementConverter(String fieldName, boolean nullable, String sourceType) {
- this.name = fieldName;
- this.nullable = nullable;
- this.sourceType = sourceType;
+ JsonSchema jsonSchema = buildBaseSchema(Type.valueOf(sourceType.toUpperCase()));
+ jsonSchema.setColumnName(fieldName);
+ jsonSchema.setNullable(nullable);
+ this.jsonSchema = jsonSchema;
}
/**
@@ -186,7 +234,7 @@ public class JsonElementConversionFactory {
* @return
*/
public String getName() {
- return this.name;
+ return this.jsonSchema.getColumnName();
}
/**
@@ -194,7 +242,7 @@ public class JsonElementConversionFactory {
* @return
*/
public boolean isNullable() {
- return this.nullable;
+ return this.jsonSchema.isNullable();
}
/**
@@ -202,7 +250,7 @@ public class JsonElementConversionFactory {
* @return
*/
public Schema getSchema() {
- if (this.nullable) {
+ if (isNullable()) {
List<Schema> list = new ArrayList<>();
list.add(Schema.create(Schema.Type.NULL));
list.add(schema());
@@ -213,8 +261,8 @@ public class JsonElementConversionFactory {
protected Schema schema() {
Schema schema = Schema.create(getTargetType());
- schema.addProp("source.type", this.sourceType.toLowerCase());
- return schema;
+ schema.addProp("source.type", this.jsonSchema.getType().toString().toLowerCase());
+ return buildUnionIfNullable(schema);
}
/**
@@ -224,7 +272,7 @@ public class JsonElementConversionFactory {
*/
public Object convert(JsonElement value) {
if (value.isJsonNull()) {
- if (this.nullable) {
+ if (isNullable()) {
return null;
}
throw new RuntimeException("Field: " + getName() + " is not nullable and contains a null value");
@@ -244,12 +292,29 @@ public class JsonElementConversionFactory {
* @return
*/
public abstract Schema.Type getTargetType();
+
+ protected static String buildNamespace(String namespace, String name) {
+ if (namespace == null || namespace.isEmpty()) {
+ return null;
+ }
+ if (name == null || name.isEmpty()) {
+ return null;
+ }
+ return namespace.trim() + "." + name.trim();
+ }
+
+ protected Schema buildUnionIfNullable(Schema schema) {
+ if (this.isNullable()) {
+ return Schema.createUnion(Schema.create(Schema.Type.NULL), schema);
+ }
+ return schema;
+ }
}
public static class StringConverter extends JsonElementConverter {
- public StringConverter(String fieldName, boolean nullable, String sourceType) {
- super(fieldName, nullable, sourceType);
+ public StringConverter(JsonSchema schema) {
+ super(schema);
}
@Override
@@ -265,8 +330,8 @@ public class JsonElementConversionFactory {
public static class IntConverter extends JsonElementConverter {
- public IntConverter(String fieldName, boolean nullable, String sourceType) {
- super(fieldName, nullable, sourceType);
+ public IntConverter(JsonSchema schema) {
+ super(schema);
}
@Override
@@ -283,8 +348,8 @@ public class JsonElementConversionFactory {
public static class LongConverter extends JsonElementConverter {
- public LongConverter(String fieldName, boolean nullable, String sourceType) {
- super(fieldName, nullable, sourceType);
+ public LongConverter(JsonSchema schema) {
+ super(schema);
}
@Override
@@ -301,8 +366,8 @@ public class JsonElementConversionFactory {
public static class DoubleConverter extends JsonElementConverter {
- public DoubleConverter(String fieldName, boolean nullable, String sourceType) {
- super(fieldName, nullable, sourceType);
+ public DoubleConverter(JsonSchema schema) {
+ super(schema);
}
@Override
@@ -318,8 +383,8 @@ public class JsonElementConversionFactory {
public static class FloatConverter extends JsonElementConverter {
- public FloatConverter(String fieldName, boolean nullable, String sourceType) {
- super(fieldName, nullable, sourceType);
+ public FloatConverter(JsonSchema schema) {
+ super(schema);
}
@Override
@@ -335,8 +400,8 @@ public class JsonElementConversionFactory {
public static class BooleanConverter extends JsonElementConverter {
- public BooleanConverter(String fieldName, boolean nullable, String sourceType) {
- super(fieldName, nullable, sourceType);
+ public BooleanConverter(JsonSchema schema) {
+ super(schema);
}
@Override
@@ -356,9 +421,8 @@ public class JsonElementConversionFactory {
private DateTimeZone timeZone;
private WorkUnitState state;
- public DateConverter(String fieldName, boolean nullable, String sourceType, String pattern, DateTimeZone zone,
- WorkUnitState state) {
- super(fieldName, nullable, sourceType);
+ public DateConverter(JsonSchema schema, String pattern, DateTimeZone zone, WorkUnitState state) {
+ super(schema);
this.inputPatterns = pattern;
this.timeZone = zone;
this.state = state;
@@ -398,8 +462,8 @@ public class JsonElementConversionFactory {
public static class BinaryConverter extends JsonElementConverter {
private String charSet;
- public BinaryConverter(String fieldName, boolean nullable, String sourceType, String charSet) {
- super(fieldName, nullable, sourceType);
+ public BinaryConverter(JsonSchema schema, String charSet) {
+ super(schema);
this.charSet = charSet;
}
@@ -421,6 +485,10 @@ public class JsonElementConversionFactory {
public static abstract class ComplexConverter extends JsonElementConverter {
private JsonElementConverter elementConverter;
+ public ComplexConverter(JsonSchema schema) {
+ super(schema);
+ }
+
public ComplexConverter(String fieldName, boolean nullable, String sourceType) {
super(fieldName, nullable, sourceType);
}
@@ -432,27 +500,46 @@ public class JsonElementConversionFactory {
public JsonElementConverter getElementConverter() {
return this.elementConverter;
}
+
+ protected void processNestedItems(JsonSchema schema, WorkUnitState state)
+ throws UnsupportedDateTypeException {
+ JsonSchema nestedItem = null;
+ if (schema.isType(ARRAY)) {
+ nestedItem = schema.getItemsWithinDataType();
+ }
+ if (schema.isType(MAP)) {
+ nestedItem = schema.getValuesWithinDataType();
+ }
+ this.setElementConverter(getConvertor(nestedItem, null, state));
+ }
}
public static class ArrayConverter extends ComplexConverter {
- public ArrayConverter(String fieldName, boolean nullable, String sourceType, JsonObject schemaNode,
- WorkUnitState state) throws UnsupportedDateTypeException {
- super(fieldName, nullable, sourceType);
- super.setElementConverter(
- getConvertor(fieldName, schemaNode.get("dataType").getAsJsonObject().get("items").getAsString(),
- schemaNode.get("dataType").getAsJsonObject(), state, isNullable()));
+ public ArrayConverter(JsonSchema schema, WorkUnitState state)
+ throws UnsupportedDateTypeException {
+ super(schema);
+ processNestedItems(schema, state);
}
@Override
Object convertField(JsonElement value) {
+ if (this.isNullable() && value.isJsonNull()) {
+ return null;
+ }
List<Object> list = new ArrayList<>();
for (JsonElement elem : (JsonArray) value) {
list.add(getElementConverter().convertField(elem));
}
- return new GenericData.Array<>(schema(), list);
+ return new GenericData.Array<>(arraySchema(), list);
+ }
+
+ private Schema arraySchema() {
+ Schema schema = Schema.createArray(getElementConverter().schema());
+ schema.addProp(SOURCE_TYPE, ARRAY.toString().toLowerCase());
+ return schema;
}
@Override
@@ -462,20 +549,16 @@ public class JsonElementConversionFactory {
@Override
public Schema schema() {
- Schema schema = Schema.createArray(getElementConverter().schema());
- schema.addProp("source.type", "array");
- return schema;
+ return buildUnionIfNullable(arraySchema());
}
}
public static class MapConverter extends ComplexConverter {
- public MapConverter(String fieldName, boolean nullable, String sourceType, JsonObject schemaNode,
- WorkUnitState state) throws UnsupportedDateTypeException {
- super(fieldName, nullable, sourceType);
- super.setElementConverter(
- getConvertor(fieldName, schemaNode.get("dataType").getAsJsonObject().get("values").getAsString(),
- schemaNode.get("dataType").getAsJsonObject(), state, isNullable()));
+ public MapConverter(JsonSchema schema, WorkUnitState state)
+ throws UnsupportedDateTypeException {
+ super(schema);
+ processNestedItems(schema, state);
}
@Override
@@ -497,23 +580,106 @@ public class JsonElementConversionFactory {
@Override
public Schema schema() {
Schema schema = Schema.createMap(getElementConverter().schema());
- schema.addProp("source.type", "map");
- return schema;
+ schema.addProp(SOURCE_TYPE, MAP.toString().toLowerCase());
+ return buildUnionIfNullable(schema);
+ }
+ }
+
+ @Log
+ public static class RecordConverter extends ComplexConverter {
+ private static final Logger LOG = LoggerFactory.getLogger(RecordConverter.class);
+ private HashMap<String, JsonElementConverter> converters = new HashMap<>();
+ private Schema _schema;
+ private long numFailedConversion = 0;
+ private State workUnit;
+
+ public RecordConverter(JsonSchema schema, WorkUnitState state, String namespace)
+ throws UnsupportedDateTypeException {
+ super(schema);
+ workUnit = state;
+ String name = schema.isRoot() ? schema.getColumnName() : schema.getName();
+ _schema = buildRecordSchema(schema.getValuesWithinDataType(), state, name, namespace);
+ }
+
+ private Schema buildRecordSchema(JsonSchema schema, WorkUnitState workUnit, String name, String namespace) {
+ List<Schema.Field> fields = new ArrayList<>();
+ for (int i = 0; i < schema.fieldsCount(); i++) {
+ JsonSchema map = schema.getFieldSchemaAt(i);
+ String childNamespace = buildNamespace(namespace, name);
+ JsonElementConverter converter;
+ String sourceType;
+ Schema fldSchema;
+ try {
+ sourceType = map.isType(UNION) ? UNION.toString().toLowerCase() : map.getType().toString().toLowerCase();
+ converter = getConvertor(map, childNamespace, workUnit);
+ this.converters.put(map.getColumnName(), converter);
+ fldSchema = converter.schema();
+ } catch (UnsupportedDateTypeException e) {
+ throw new UnsupportedOperationException(e);
+ }
+
+ Schema.Field fld = new Schema.Field(map.getColumnName(), fldSchema, map.getComment(),
+ map.isNullable() ? JsonNodeFactory.instance.nullNode() : null);
+ fld.addProp(SOURCE_TYPE, sourceType);
+ fields.add(fld);
+ }
+ Schema avroSchema = Schema.createRecord(name.isEmpty() ? null : name, "", namespace, false);
+ avroSchema.setFields(fields);
+
+ return avroSchema;
+ }
+
+ @Override
+ Object convertField(JsonElement value) {
+ GenericRecord avroRecord = new GenericData.Record(_schema);
+ long maxFailedConversions = this.workUnit.getPropAsLong(ConfigurationKeys.CONVERTER_AVRO_MAX_CONVERSION_FAILURES,
+ ConfigurationKeys.DEFAULT_CONVERTER_AVRO_MAX_CONVERSION_FAILURES);
+ for (Map.Entry<String, JsonElement> entry : ((JsonObject) value).entrySet()) {
+ try {
+ avroRecord.put(entry.getKey(), this.converters.get(entry.getKey()).convert(entry.getValue()));
+ } catch (Exception e) {
+ this.numFailedConversion++;
+ if (this.numFailedConversion < maxFailedConversions) {
+ LOG.error("Dropping record " + value + " because it cannot be converted to Avro", e);
+ return new EmptyIterable<>();
+ }
+ throw new RuntimeException(
+ "Unable to convert field:" + entry.getKey() + " for value:" + entry.getValue() + " for record: " + value,
+ e);
+ }
+ }
+ return avroRecord;
+ }
+
+ @Override
+ public org.apache.avro.Schema.Type getTargetType() {
+ return Schema.Type.RECORD;
+ }
+
+ @Override
+ public Schema schema() {
+ Schema schema = _schema;
+ schema.addProp(SOURCE_TYPE, RECORD.toString().toLowerCase());
+ return buildUnionIfNullable(schema);
}
}
public static class EnumConverter extends JsonElementConverter {
String enumName;
+ String namespace;
List<String> enumSet = new ArrayList<>();
Schema schema;
- public EnumConverter(String fieldName, boolean nullable, String sourceType, JsonObject schemaNode) {
- super(fieldName, nullable, sourceType);
+ public EnumConverter(JsonSchema schema, String namespace) {
+ super(schema);
- for (JsonElement elem : schemaNode.get("dataType").getAsJsonObject().get("symbols").getAsJsonArray()) {
+ JsonObject dataType = schema.getDataType();
+ for (JsonElement elem : dataType.get(ENUM_SYMBOLS_KEY).getAsJsonArray()) {
this.enumSet.add(elem.getAsString());
}
- this.enumName = schemaNode.get("dataType").getAsJsonObject().get("name").getAsString();
+ String enumName = schema.getName();
+ this.enumName = enumName.isEmpty() ? null : enumName;
+ this.namespace = namespace;
}
@Override
@@ -528,9 +694,69 @@ public class JsonElementConversionFactory {
@Override
public Schema schema() {
- this.schema = Schema.createEnum(this.enumName, "", "", this.enumSet);
- this.schema.addProp("source.type", "enum");
- return this.schema;
+ this.schema = Schema.createEnum(this.enumName, "", namespace, this.enumSet);
+ this.schema.addProp(SOURCE_TYPE, ENUM.toString().toLowerCase());
+ return buildUnionIfNullable(this.schema);
+ }
+ }
+
+ public static class NullConverter extends JsonElementConverter {
+
+ public NullConverter(JsonSchema schema) {
+ super(schema);
+ }
+
+ @Override
+ Object convertField(JsonElement value) {
+ return value.getAsJsonNull();
+ }
+
+ @Override
+ public org.apache.avro.Schema.Type getTargetType() {
+ return Schema.Type.NULL;
+ }
+ }
+
+ public static class UnionConverter extends JsonElementConverter {
+ private final Schema firstSchema;
+ private final Schema secondSchema;
+ private final JsonElementConverter firstConverter;
+ private final JsonElementConverter secondConverter;
+
+ public UnionConverter(JsonSchema schemaNode, WorkUnitState state) {
+ super(schemaNode);
+ List<JsonSchema> types = schemaNode.getDataTypes();
+ firstConverter = getConverter(types.get(0), state);
+ secondConverter = getConverter(types.get(1), state);
+ firstSchema = firstConverter.schema();
+ secondSchema = secondConverter.schema();
+ }
+
+ private JsonElementConverter getConverter(JsonSchema schemaElement, WorkUnitState state) {
+ try {
+ return JsonElementConversionFactory.getConvertor(schemaElement, null, state);
+ } catch (UnsupportedDateTypeException e) {
+ throw new UnsupportedOperationException(e);
+ }
+ }
+
+ @Override
+ Object convertField(JsonElement value) {
+ try {
+ return firstConverter.convert(value);
+ } catch (Exception e) {
+ return secondConverter.convert(value);
+ }
+ }
+
+ @Override
+ public Schema.Type getTargetType() {
+ return Schema.Type.UNION;
+ }
+
+ @Override
+ protected Schema schema() {
+ return Schema.createUnion(firstSchema, secondSchema);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6dd36a50/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java
index ca5d88d..1da8d31 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java
@@ -17,17 +17,19 @@
package org.apache.gobblin.converter.avro;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.gobblin.configuration.WorkUnitState;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
/**
* Creates a converter for Json types to Avro types. Overrides {@link ArrayConverter}, {@link MapConverter},
@@ -41,7 +43,8 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve
*/
public static JsonElementConverter getConvertor(String fieldName, String fieldType, Schema schemaNode,
- WorkUnitState state, boolean nullable) throws UnsupportedDateTypeException {
+ WorkUnitState state, boolean nullable)
+ throws UnsupportedDateTypeException {
Type type;
try {
@@ -52,27 +55,30 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve
switch (type) {
case ARRAY:
- return new JsonElementConversionWithAvroSchemaFactory.ArrayConverter(fieldName, nullable, type.toString(), schemaNode, state);
+ return new JsonElementConversionWithAvroSchemaFactory.ArrayConverter(fieldName, nullable, type.toString(),
+ schemaNode, state);
case MAP:
- return new JsonElementConversionWithAvroSchemaFactory.MapConverter(fieldName, nullable, type.toString(), schemaNode, state);
+ return new JsonElementConversionWithAvroSchemaFactory.MapConverter(fieldName, nullable, type.toString(),
+ schemaNode, state);
case ENUM:
- return new JsonElementConversionWithAvroSchemaFactory.EnumConverter(fieldName, nullable, type.toString(), schemaNode);
+ return new JsonElementConversionWithAvroSchemaFactory.EnumConverter(fieldName, nullable, type.toString(),
+ schemaNode);
default:
- return JsonElementConversionFactory.getConvertor(fieldName, fieldType, null, state, nullable);
+ return JsonElementConversionFactory.getConvertor(fieldName, fieldType, new JsonObject(), state, nullable);
}
}
public static class ArrayConverter extends ComplexConverter {
- public ArrayConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode,
- WorkUnitState state) throws UnsupportedDateTypeException {
+ public ArrayConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode, WorkUnitState state)
+ throws UnsupportedDateTypeException {
super(fieldName, nullable, sourceType);
super.setElementConverter(
- getConvertor(fieldName, schemaNode.getElementType().getType().getName(),
- schemaNode.getElementType(), state, isNullable()));
+ getConvertor(fieldName, schemaNode.getElementType().getType().getName(), schemaNode.getElementType(), state,
+ isNullable()));
}
@Override
@@ -101,12 +107,12 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve
public static class MapConverter extends ComplexConverter {
- public MapConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode,
- WorkUnitState state) throws UnsupportedDateTypeException {
+ public MapConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode, WorkUnitState state)
+ throws UnsupportedDateTypeException {
super(fieldName, nullable, sourceType);
super.setElementConverter(
- getConvertor(fieldName, schemaNode.getValueType().getType().getName(),
- schemaNode.getValueType(), state, isNullable()));
+ getConvertor(fieldName, schemaNode.getValueType().getType().getName(), schemaNode.getValueType(), state,
+ isNullable()));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6dd36a50/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonIntermediateToAvroConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonIntermediateToAvroConverter.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonIntermediateToAvroConverter.java
index 0735c03..5b1810b 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonIntermediateToAvroConverter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonIntermediateToAvroConverter.java
@@ -18,34 +18,26 @@
package org.apache.gobblin.converter.avro;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.codehaus.jackson.node.JsonNodeFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.converter.DataConversionException;
-import org.apache.gobblin.converter.EmptyIterable;
import org.apache.gobblin.converter.SchemaConversionException;
import org.apache.gobblin.converter.SingleRecordIterable;
import org.apache.gobblin.converter.ToAvroConverterBase;
+import org.apache.gobblin.converter.avro.JsonElementConversionFactory.RecordConverter;
+import org.apache.gobblin.converter.json.JsonSchema;
import org.apache.gobblin.util.AvroUtils;
import org.apache.gobblin.util.WriterUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
/**
@@ -55,76 +47,37 @@ import org.apache.gobblin.util.WriterUtils;
*
*/
public class JsonIntermediateToAvroConverter extends ToAvroConverterBase<JsonArray, JsonObject> {
- private Map<String, JsonElementConversionFactory.JsonElementConverter> converters = new HashMap<>();
private static final Logger LOG = LoggerFactory.getLogger(JsonIntermediateToAvroConverter.class);
private static final String CONVERTER_AVRO_NULLIFY_FIELDS_ENABLED = "converter.avro.nullify.fields.enabled";
private static final boolean DEFAULT_CONVERTER_AVRO_NULLIFY_FIELDS_ENABLED = Boolean.FALSE;
private static final String CONVERTER_AVRO_NULLIFY_FIELDS_ORIGINAL_SCHEMA_PATH =
"converter.avro.nullify.fields.original.schema.path";
- private long numFailedConversion = 0;
+ private RecordConverter recordConverter;
@Override
- public Schema convertSchema(JsonArray schema, WorkUnitState workUnit) throws SchemaConversionException {
- List<Schema.Field> fields = new ArrayList<>();
-
- for (JsonElement elem : schema) {
- JsonObject map = (JsonObject) elem;
-
- String columnName = map.get("columnName").getAsString();
- String comment = map.has("comment") ? map.get("comment").getAsString() : "";
- boolean nullable = map.has("isNullable") ? map.get("isNullable").getAsBoolean() : false;
- Schema fldSchema;
-
- try {
- JsonElementConversionFactory.JsonElementConverter converter = JsonElementConversionFactory.getConvertor(
- columnName, map.get("dataType").getAsJsonObject().get("type").getAsString(), map, workUnit, nullable);
- this.converters.put(columnName, converter);
- fldSchema = converter.getSchema();
- } catch (UnsupportedDateTypeException e) {
- throw new SchemaConversionException(e);
- }
-
- Field fld = new Field(columnName, fldSchema, comment, nullable ? JsonNodeFactory.instance.nullNode() : null);
- fld.addProp("source.type", map.get("dataType").getAsJsonObject().get("type").getAsString());
- fields.add(fld);
+ public Schema convertSchema(JsonArray schema, WorkUnitState workUnit)
+ throws SchemaConversionException {
+ try {
+ JsonSchema jsonSchema = new JsonSchema(schema);
+ jsonSchema.setColumnName(workUnit.getExtract().getTable());
+ recordConverter = new RecordConverter(jsonSchema, workUnit, workUnit.getExtract().getNamespace());
+ } catch (UnsupportedDateTypeException e) {
+ throw new SchemaConversionException(e);
}
-
- Schema avroSchema =
- Schema.createRecord(workUnit.getExtract().getTable(), "", workUnit.getExtract().getNamespace(), false);
- avroSchema.setFields(fields);
-
- if (workUnit.getPropAsBoolean(CONVERTER_AVRO_NULLIFY_FIELDS_ENABLED,
- DEFAULT_CONVERTER_AVRO_NULLIFY_FIELDS_ENABLED)) {
- return this.generateSchemaWithNullifiedField(workUnit, avroSchema);
+ Schema recordSchema = recordConverter.schema();
+ if (workUnit
+ .getPropAsBoolean(CONVERTER_AVRO_NULLIFY_FIELDS_ENABLED, DEFAULT_CONVERTER_AVRO_NULLIFY_FIELDS_ENABLED)) {
+ return this.generateSchemaWithNullifiedField(workUnit, recordSchema);
}
-
- return avroSchema;
+ return recordSchema;
}
@Override
public Iterable<GenericRecord> convertRecord(Schema outputSchema, JsonObject inputRecord, WorkUnitState workUnit)
throws DataConversionException {
- GenericRecord avroRecord = new GenericData.Record(outputSchema);
- long maxFailedConversions = workUnit.getPropAsLong(ConfigurationKeys.CONVERTER_AVRO_MAX_CONVERSION_FAILURES,
- ConfigurationKeys.DEFAULT_CONVERTER_AVRO_MAX_CONVERSION_FAILURES);
-
- for (Map.Entry<String, JsonElement> entry : inputRecord.entrySet()) {
- try {
- avroRecord.put(entry.getKey(), this.converters.get(entry.getKey()).convert(entry.getValue()));
- } catch (Exception e) {
- this.numFailedConversion++;
- if (this.numFailedConversion < maxFailedConversions) {
- LOG.error("Dropping record " + inputRecord + " because it cannot be converted to Avro", e);
- return new EmptyIterable<>();
- }
- throw new DataConversionException("Unable to convert field:" + entry.getKey() + " for value:" + entry.getValue()
- + " for record: " + inputRecord, e);
- }
- }
-
- return new SingleRecordIterable<>(avroRecord);
+ return new SingleRecordIterable<>((GenericRecord) recordConverter.convert(inputRecord));
}
/**
@@ -151,8 +104,7 @@ public class JsonIntermediateToAvroConverter extends ToAvroConverterBase<JsonArr
+ "is not specified. Trying to get the orignal schema from previous avro files.");
originalSchemaPath = WriterUtils
.getDataPublisherFinalDir(workUnitState, workUnitState.getPropAsInt(ConfigurationKeys.FORK_BRANCHES_KEY, 1),
- workUnitState.getPropAsInt(ConfigurationKeys.FORK_BRANCH_ID_KEY, 0))
- .getParent();
+ workUnitState.getPropAsInt(ConfigurationKeys.FORK_BRANCH_ID_KEY, 0)).getParent();
}
try {
Schema prevSchema = AvroUtils.getDirectorySchema(originalSchemaPath, conf, false);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6dd36a50/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
index c495bae..97f0121 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.converter.avro;
import java.util.List;
+
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
@@ -27,6 +28,8 @@ import org.apache.gobblin.converter.DataConversionException;
import org.apache.gobblin.converter.SchemaConversionException;
import org.apache.gobblin.converter.SingleRecordIterable;
import org.apache.gobblin.converter.ToAvroConverterBase;
+
+import com.google.common.base.Preconditions;
import com.google.gson.JsonObject;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6dd36a50/gobblin-core/src/main/java/org/apache/gobblin/converter/json/JsonSchema.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/json/JsonSchema.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/json/JsonSchema.java
new file mode 100644
index 0000000..21bfeaf
--- /dev/null
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/json/JsonSchema.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.converter.json;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.converter.DataConversionException;
+import org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type;
+import org.apache.gobblin.source.extractor.schema.Schema;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type.ENUM;
+import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type.FIXED;
+import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type.RECORD;
+import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type.UNION;
+import static org.apache.gobblin.converter.json.JsonSchema.SchemaType.CHILD;
+import static org.apache.gobblin.converter.json.JsonSchema.SchemaType.ROOT;
+
+
+/**
+ * Represents a source schema declared in the configuration with {@link ConfigurationKeys#SOURCE_SCHEMA}.
+ * The source schema is represented by a {@link JsonArray}.
+ * @author tilakpatidar
+ */
+public class JsonSchema extends Schema {
+ public static final String RECORD_FIELDS_KEY = "values";
+ public static final String TYPE_KEY = "type";
+ public static final String NAME_KEY = "name";
+ public static final String SIZE_KEY = "size";
+ public static final String ENUM_SYMBOLS_KEY = "symbols";
+ public static final String COLUMN_NAME_KEY = "columnName";
+ public static final String DATA_TYPE_KEY = "dataType";
+ public static final String COMMENT_KEY = "comment";
+ public static final String DEFAULT_VALUE_KEY = "defaultValue";
+ public static final String IS_NULLABLE_KEY = "isNullable";
+ public static final String DEFAULT_RECORD_COLUMN_NAME = "root";
+ public static final String DEFAULT_VALUE_FOR_OPTIONAL_PROPERTY = "";
+ public static final String ARRAY_ITEMS_KEY = "items";
+ public static final String MAP_ITEMS_KEY = "values";
+ public static final String SOURCE_TYPE = "source.type";
+ private final Type type;
+ private final JsonObject json;
+ private final SchemaType schemaNestedLevel;
+ private JsonSchema secondType;
+ private JsonSchema firstType;
+ private JsonArray jsonArray;
+
+ public enum SchemaType {
+ ROOT, CHILD
+ }
+
+ /**
+ * Build a {@link JsonSchema} using {@link JsonArray}
+ * This will create a {@link SchemaType} of {@link SchemaType#ROOT}
+ * @param jsonArray
+ */
+ public JsonSchema(JsonArray jsonArray) {
+ JsonObject jsonObject = new JsonObject();
+ JsonObject dataType = new JsonObject();
+ jsonObject.addProperty(COLUMN_NAME_KEY, DEFAULT_RECORD_COLUMN_NAME);
+ dataType.addProperty(TYPE_KEY, RECORD.toString());
+ dataType.add(RECORD_FIELDS_KEY, jsonArray);
+ jsonObject.add(DATA_TYPE_KEY, dataType);
+ setJsonSchemaProperties(jsonObject);
+ this.type = RECORD;
+ this.json = jsonObject;
+ this.jsonArray = jsonArray;
+ this.schemaNestedLevel = ROOT;
+ }
+
+ /**
+ * Build a {@link JsonSchema} using {@link JsonArray}
+ * This will create a {@link SchemaType} of {@link SchemaType#CHILD}
+ * @param jsonObject
+ */
+ public JsonSchema(JsonObject jsonObject) {
+ JsonObject root = new JsonObject();
+ if (!jsonObject.has(COLUMN_NAME_KEY) && !jsonObject.has(DATA_TYPE_KEY)) {
+ root.addProperty(COLUMN_NAME_KEY, DEFAULT_RECORD_COLUMN_NAME);
+ root.add(DATA_TYPE_KEY, jsonObject);
+ jsonObject = root;
+ }
+ if (!jsonObject.has(COLUMN_NAME_KEY) && jsonObject.has(DATA_TYPE_KEY)) {
+ jsonObject.addProperty(COLUMN_NAME_KEY, DEFAULT_RECORD_COLUMN_NAME);
+ }
+ setJsonSchemaProperties(jsonObject);
+ JsonElement typeElement = getDataType().get(TYPE_KEY);
+ if (typeElement.isJsonPrimitive()) {
+ this.type = Type.valueOf(typeElement.getAsString().toUpperCase());
+ } else if (typeElement.isJsonArray()) {
+ JsonArray jsonArray = typeElement.getAsJsonArray();
+ if (jsonArray.size() != 2) {
+ throw new RuntimeException("Invalid " + TYPE_KEY + "property in schema for union types");
+ }
+ this.type = UNION;
+ JsonElement type1 = jsonArray.get(0);
+ JsonElement type2 = jsonArray.get(1);
+ if (type1.isJsonPrimitive()) {
+ this.firstType = buildBaseSchema(Type.valueOf(type1.getAsString().toUpperCase()));
+ }
+ if (type2.isJsonPrimitive()) {
+ this.secondType = buildBaseSchema(Type.valueOf(type2.getAsString().toUpperCase()));
+ }
+ if (type1.isJsonObject()) {
+ this.firstType = buildBaseSchema(type1.getAsJsonObject());
+ }
+ if (type2.isJsonObject()) {
+ this.secondType = buildBaseSchema(type2.getAsJsonObject());
+ }
+ } else {
+ throw new RuntimeException("Invalid " + TYPE_KEY + "property in schema");
+ }
+ this.json = jsonObject;
+ JsonArray jsonArray = new JsonArray();
+ jsonArray.add(jsonObject);
+ this.jsonArray = jsonArray;
+ this.schemaNestedLevel = CHILD;
+ }
+
+ /**
+ * Get symbols for a {@link Type#ENUM} type.
+ * @return
+ */
+ public JsonArray getSymbols() {
+ if (this.type.equals(ENUM)) {
+ return getDataType().get(ENUM_SYMBOLS_KEY).getAsJsonArray();
+ }
+ return new JsonArray();
+ }
+
+ /**
+ * Get {@link Type} for this {@link JsonSchema}.
+ * @return
+ */
+ public Type getType() {
+ return type;
+ }
+
+ /**
+ * Builds a {@link JsonSchema} object for a given {@link Type} object.
+ * @param type
+ * @return
+ */
+ public static JsonSchema buildBaseSchema(Type type) {
+ JsonObject jsonObject = new JsonObject();
+ JsonObject dataType = new JsonObject();
+ jsonObject.addProperty(COLUMN_NAME_KEY, DEFAULT_RECORD_COLUMN_NAME);
+ dataType.addProperty(TYPE_KEY, type.toString());
+ jsonObject.add(DATA_TYPE_KEY, dataType);
+ return new JsonSchema(jsonObject);
+ }
+
+ /**
+ * Builds a {@link JsonSchema} object for a given {@link Type} object.
+ * @return
+ */
+ public static JsonSchema buildBaseSchema(JsonObject root) {
+ root.addProperty(COLUMN_NAME_KEY, DEFAULT_RECORD_COLUMN_NAME);
+ return new JsonSchema(root);
+ }
+
+ /**
+ * Get optional property from a {@link JsonObject} for a {@link String} key.
+ * If key does'nt exists returns {@link #DEFAULT_VALUE_FOR_OPTIONAL_PROPERTY}.
+ * @param jsonObject
+ * @param key
+ * @return
+ */
+ public static String getOptionalProperty(JsonObject jsonObject, String key) {
+ return jsonObject.has(key) ? jsonObject.get(key).getAsString() : DEFAULT_VALUE_FOR_OPTIONAL_PROPERTY;
+ }
+
+ /**
+ * Fetches dataType.values from the JsonObject
+ * @return
+ */
+ public JsonSchema getValuesWithinDataType() {
+ JsonElement element = this.getDataType().get(MAP_ITEMS_KEY);
+ if (element.isJsonObject()) {
+ return new JsonSchema(element.getAsJsonObject());
+ }
+ if (element.isJsonArray()) {
+ return new JsonSchema(element.getAsJsonArray());
+ }
+ if (element.isJsonPrimitive()) {
+ return buildBaseSchema(Type.valueOf(element.getAsString().toUpperCase()));
+ }
+ throw new UnsupportedOperationException(
+ "Map values can only be defined using JsonObject, JsonArray or JsonPrimitive.");
+ }
+
+ /**
+ * Gets size for fixed type viz dataType.size from the JsonObject
+ * @return
+ */
+ public int getSizeOfFixedData() {
+ if (this.type.equals(FIXED)) {
+ return this.getDataType().get(SIZE_KEY).getAsInt();
+ }
+ return 0;
+ }
+
+ public boolean isType(Type type) {
+ return this.type.equals(type);
+ }
+
+ /**
+ * Fetches the nested or primitive array items type from schema.
+ * @return
+ * @throws DataConversionException
+ */
+ public Type getTypeOfArrayItems()
+ throws DataConversionException {
+ JsonSchema arrayValues = getItemsWithinDataType();
+ if (arrayValues == null) {
+ throw new DataConversionException("Array types only allow values as primitive, null or JsonObject");
+ }
+ return arrayValues.getType();
+ }
+
+ public JsonSchema getItemsWithinDataType() {
+ JsonElement element = this.getDataType().get(ARRAY_ITEMS_KEY);
+ if (element.isJsonObject()) {
+ return new JsonSchema(element.getAsJsonObject());
+ }
+ if (element.isJsonPrimitive()) {
+ return buildBaseSchema(Type.valueOf(element.getAsString().toUpperCase()));
+ }
+ throw new UnsupportedOperationException("Array items can only be defined using JsonObject or JsonPrimitive.");
+ }
+
+ public JsonSchema getFirstTypeSchema() {
+ return this.firstType;
+ }
+
+ public JsonSchema getSecondTypeSchema() {
+ return this.secondType;
+ }
+
+ public int fieldsCount() {
+ return this.jsonArray.size();
+ }
+
+ public JsonSchema getFieldSchemaAt(int i) {
+ if (i >= this.jsonArray.size()) {
+ return new JsonSchema(this.json);
+ }
+ return new JsonSchema(this.jsonArray.get(i).getAsJsonObject());
+ }
+
+ public List<JsonSchema> getDataTypes() {
+ if (firstType != null && secondType != null) {
+ return Arrays.asList(firstType, secondType);
+ }
+ return Collections.singletonList(this);
+ }
+
+ public boolean isRoot() {
+ return this.schemaNestedLevel.equals(ROOT);
+ }
+
+ public String getName() {
+ return getOptionalProperty(this.getDataType(), NAME_KEY);
+ }
+
+ /**
+ * Set properties for {@link JsonSchema} from a {@link JsonObject}.
+ * @param jsonObject
+ */
+ private void setJsonSchemaProperties(JsonObject jsonObject) {
+ setColumnName(jsonObject.get(COLUMN_NAME_KEY).getAsString());
+ setDataType(jsonObject.get(DATA_TYPE_KEY).getAsJsonObject());
+ setNullable(jsonObject.has(IS_NULLABLE_KEY) && jsonObject.get(IS_NULLABLE_KEY).getAsBoolean());
+ setComment(getOptionalProperty(jsonObject, COMMENT_KEY));
+ setDefaultValue(getOptionalProperty(jsonObject, DEFAULT_VALUE_KEY));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6dd36a50/gobblin-core/src/main/java/org/apache/gobblin/converter/json/JsonStringToJsonIntermediateConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/json/JsonStringToJsonIntermediateConverter.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/json/JsonStringToJsonIntermediateConverter.java
index b1cd467..4cb8d84 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/converter/json/JsonStringToJsonIntermediateConverter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/json/JsonStringToJsonIntermediateConverter.java
@@ -17,25 +17,30 @@
package org.apache.gobblin.converter.json;
+import java.io.IOException;
+import java.util.Map.Entry;
+
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.converter.Converter;
import org.apache.gobblin.converter.DataConversionException;
import org.apache.gobblin.converter.SchemaConversionException;
import org.apache.gobblin.converter.SingleRecordIterable;
-
-import java.io.IOException;
-import java.util.Map;
-
+import org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonNull;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
+import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type.FIXED;
+import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type.MAP;
+import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type.NULL;
+import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type.RECORD;
+import static org.apache.gobblin.converter.json.JsonSchema.DEFAULT_RECORD_COLUMN_NAME;
+
/**
* Converts a json string to a {@link JsonObject}.
@@ -44,7 +49,9 @@ public class JsonStringToJsonIntermediateConverter extends Converter<String, Jso
private final static Logger log = LoggerFactory.getLogger(JsonStringToJsonIntermediateConverter.class);
- private static final String UNPACK_COMPLEX_SCHEMAS_KEY = "gobblin.converter.jsonStringToJsonIntermediate.unpackComplexSchemas";
+ private static final String UNPACK_COMPLEX_SCHEMAS_KEY =
+ "gobblin.converter.jsonStringToJsonIntermediate.unpackComplexSchemas";
+ public static final boolean DEFAULT_UNPACK_COMPLEX_SCHEMAS_KEY = Boolean.TRUE;
private boolean unpackComplexSchemas;
@@ -53,8 +60,10 @@ public class JsonStringToJsonIntermediateConverter extends Converter<String, Jso
* @return a JsonArray representation of the schema
*/
@Override
- public JsonArray convertSchema(String inputSchema, WorkUnitState workUnit) throws SchemaConversionException {
- this.unpackComplexSchemas = workUnit.getPropAsBoolean(UNPACK_COMPLEX_SCHEMAS_KEY, true);
+ public JsonArray convertSchema(String inputSchema, WorkUnitState workUnit)
+ throws SchemaConversionException {
+ this.unpackComplexSchemas =
+ workUnit.getPropAsBoolean(UNPACK_COMPLEX_SCHEMAS_KEY, DEFAULT_UNPACK_COMPLEX_SCHEMAS_KEY);
JsonParser jsonParser = new JsonParser();
log.info("Schema: " + inputSchema);
@@ -76,40 +85,172 @@ public class JsonStringToJsonIntermediateConverter extends Converter<String, Jso
if (!this.unpackComplexSchemas) {
return new SingleRecordIterable<>(inputRecord);
}
+ JsonSchema schema = new JsonSchema(outputSchema);
+ JsonObject rec = parse(inputRecord, schema);
+ return new SingleRecordIterable(rec);
+ }
- JsonObject outputRecord = new JsonObject();
-
- for (int i = 0; i < outputSchema.size(); i++) {
- String expectedColumnName = outputSchema.get(i).getAsJsonObject().get("columnName").getAsString();
+ /**
+ * Parses a provided JsonObject input using the provided JsonArray schema into
+ * a JsonObject.
+ * @param element
+ * @param schema
+ * @return
+ * @throws DataConversionException
+ */
+ private JsonElement parse(JsonElement element, JsonSchema schema)
+ throws DataConversionException {
+ JsonObject root = new JsonObject();
+ root.add(DEFAULT_RECORD_COLUMN_NAME, element);
+ JsonObject jsonObject = parse(root, schema);
+ return jsonObject.get(DEFAULT_RECORD_COLUMN_NAME);
+ }
- if (inputRecord.has(expectedColumnName)) {
- //As currently org.apache.gobblin.converter.avro.JsonIntermediateToAvroConverter is not able to handle complex schema's so storing it as string
+ /**
+ * Parses a provided JsonObject input using the provided JsonArray schema into
+ * a JsonObject.
+ * @param record
+ * @param schema
+ * @return
+ * @throws DataConversionException
+ */
+ private JsonObject parse(JsonObject record, JsonSchema schema)
+ throws DataConversionException {
+ JsonObject output = new JsonObject();
+ for (int i = 0; i < schema.fieldsCount(); i++) {
+ JsonSchema schemaElement = schema.getFieldSchemaAt(i);
+ String columnKey = schemaElement.getColumnName();
+ JsonElement parsed;
+ if (!record.has(columnKey)) {
+ output.add(columnKey, JsonNull.INSTANCE);
+ continue;
+ }
- if (inputRecord.get(expectedColumnName).isJsonArray()) {
- outputRecord.addProperty(expectedColumnName, inputRecord.get(expectedColumnName).toString());
- } else if (inputRecord.get(expectedColumnName).isJsonObject()) {
- //To check if internally in an JsonObject there is multiple hierarchy
- boolean isMultiHierarchyInsideJsonObject = false;
- for (Map.Entry<String, JsonElement> entry : ((JsonObject) inputRecord.get(expectedColumnName)).entrySet()) {
- if (entry.getValue().isJsonArray() || entry.getValue().isJsonObject()) {
- isMultiHierarchyInsideJsonObject = true;
- break;
+ JsonElement columnValue = record.get(columnKey);
+ switch (schemaElement.getType()) {
+ case UNION:
+ parsed = parseUnionType(schemaElement, columnValue);
+ break;
+ case ENUM:
+ parsed = parseEnumType(schemaElement, columnValue);
+ break;
+ default:
+ if (columnValue.isJsonArray()) {
+ parsed = parseJsonArrayType(schemaElement, columnValue);
+ } else if (columnValue.isJsonObject()) {
+ parsed = parseJsonObjectType(schemaElement, columnValue);
+ } else {
+ parsed = parsePrimitiveType(schemaElement, columnValue);
}
- }
- if (isMultiHierarchyInsideJsonObject) {
- outputRecord.addProperty(expectedColumnName, inputRecord.get(expectedColumnName).toString());
- } else {
- outputRecord.add(expectedColumnName, inputRecord.get(expectedColumnName));
- }
-
- } else {
- outputRecord.add(expectedColumnName, inputRecord.get(expectedColumnName));
}
- } else {
- outputRecord.add(expectedColumnName, JsonNull.INSTANCE);
+ output.add(columnKey, parsed);
+ }
+ return output;
+ }
+
+ private JsonElement parseUnionType(JsonSchema schemaElement, JsonElement columnValue)
+ throws DataConversionException {
+ try {
+ return parse(columnValue, schemaElement.getFirstTypeSchema());
+ } catch (DataConversionException e) {
+ return parse(columnValue, schemaElement.getSecondTypeSchema());
+ }
+ }
+
+ /**
+ * Parses Enum type values
+ * @param schema
+ * @param value
+ * @return
+ * @throws DataConversionException
+ */
+ private JsonElement parseEnumType(JsonSchema schema, JsonElement value)
+ throws DataConversionException {
+ if (schema.getSymbols().contains(value)) {
+ return value;
+ }
+ throw new DataConversionException(
+ "Invalid symbol: " + value.getAsString() + " allowed values: " + schema.getSymbols().toString());
+ }
+
+ /**
+ * Parses JsonArray type values
+ * @param schema
+ * @param value
+ * @return
+ * @throws DataConversionException
+ */
+ private JsonElement parseJsonArrayType(JsonSchema schema, JsonElement value)
+ throws DataConversionException {
+ Type arrayType = schema.getTypeOfArrayItems();
+ JsonArray tempArray = new JsonArray();
+ if (Type.isPrimitive(arrayType)) {
+ return value;
+ }
+ JsonSchema nestedSchema = schema.getItemsWithinDataType();
+ for (JsonElement v : value.getAsJsonArray()) {
+ tempArray.add(parse(v, nestedSchema));
+ }
+ return tempArray;
+ }
+
+ /**
+ * Parses JsonObject type values
+ * @param value
+ * @return
+ * @throws DataConversionException
+ */
+ private JsonElement parseJsonObjectType(JsonSchema schema, JsonElement value)
+ throws DataConversionException {
+ JsonSchema valuesWithinDataType = schema.getValuesWithinDataType();
+ if (schema.isType(MAP)) {
+ if (Type.isPrimitive(valuesWithinDataType.getType())) {
+ return value;
}
+ JsonObject map = new JsonObject();
+ for (Entry<String, JsonElement> mapEntry : value.getAsJsonObject().entrySet()) {
+ JsonElement mapValue = mapEntry.getValue();
+ map.add(mapEntry.getKey(), parse(mapValue, valuesWithinDataType));
+ }
+ return map;
+ } else if (schema.isType(RECORD)) {
+ JsonSchema schemaArray = valuesWithinDataType.getValuesWithinDataType();
+ return parse((JsonObject) value, schemaArray);
+ } else {
+ return JsonNull.INSTANCE;
+ }
+ }
+
+ /**
+ * Parses primitive types
+ * @param schema
+ * @param value
+ * @return
+ * @throws DataConversionException
+ */
+ private JsonElement parsePrimitiveType(JsonSchema schema, JsonElement value)
+ throws DataConversionException {
+
+ if ((schema.isType(NULL) || schema.isNullable()) && value.isJsonNull()) {
+ return JsonNull.INSTANCE;
+ }
+
+ if ((schema.isType(NULL) && !value.isJsonNull()) || (!schema.isType(NULL) && value.isJsonNull())) {
+ throw new DataConversionException(
+ "Type mismatch for " + value.toString() + " of type " + schema.getDataTypes().toString());
+ }
+
+ if (schema.isType(FIXED)) {
+ int expectedSize = schema.getSizeOfFixedData();
+ if (value.getAsString().length() == expectedSize) {
+ return value;
+ } else {
+ throw new DataConversionException(
+ "Fixed type value is not same as defined value expected fieldsCount: " + expectedSize);
+ }
+ } else {
+ return value;
}
- return new SingleRecordIterable<>(outputRecord);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6dd36a50/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonElementConversionFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonElementConversionFactoryTest.java b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonElementConversionFactoryTest.java
new file mode 100644
index 0000000..7f39d30
--- /dev/null
+++ b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonElementConversionFactoryTest.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.converter.avro;
+
+import java.io.InputStreamReader;
+import java.lang.reflect.Type;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.DataConversionException;
+import org.apache.gobblin.converter.SchemaConversionException;
+import org.apache.gobblin.converter.avro.JsonElementConversionFactory.EnumConverter;
+import org.apache.gobblin.converter.avro.JsonElementConversionFactory.MapConverter;
+import org.apache.gobblin.converter.avro.JsonElementConversionFactory.NullConverter;
+import org.apache.gobblin.converter.avro.JsonElementConversionFactory.RecordConverter;
+import org.apache.gobblin.converter.avro.JsonElementConversionFactory.StringConverter;
+import org.apache.gobblin.converter.json.JsonSchema;
+import org.apache.gobblin.source.workunit.Extract;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.reflect.TypeToken;
+
+import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.ArrayConverter;
+import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.JsonElementConverter;
+import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.JsonElementConverter.buildNamespace;
+import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type.NULL;
+import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.UnionConverter;
+
+
+/**
+ * Unit test for {@link JsonElementConversionFactory}
+ *
+ * @author Tilak Patidar
+ */
+@Test(groups = {"gobblin.converter"})
+public class JsonElementConversionFactoryTest {
+
+ private static WorkUnitState state;
+ private static JsonObject testData;
+ private static JsonParser jsonParser = new JsonParser();
+
+ @BeforeClass
+ public static void setUp() {
+ WorkUnit workUnit = new WorkUnit(new SourceState(),
+ new Extract(new SourceState(), Extract.TableType.SNAPSHOT_ONLY, "namespace", "dummy_table"));
+ state = new WorkUnitState(workUnit);
+ Type listType = new TypeToken<JsonObject>() {
+ }.getType();
+ Gson gson = new Gson();
+ testData = gson.fromJson(new InputStreamReader(
+ JsonElementConversionFactoryTest.class.getResourceAsStream("/converter/JsonElementConversionFactoryTest.json")),
+ listType);
+ }
+
+ @Test
+ public void schemaWithArrayOfMaps()
+ throws Exception {
+ String testName = "schemaWithArrayOfMaps";
+ JsonObject schema = getSchemaData(testName).getAsJsonObject();
+ JsonObject expected = getExpectedSchema(testName).getAsJsonObject();
+ JsonSchema jsonSchema = new JsonSchema(schema);
+ jsonSchema.setColumnName("dummy");
+
+ ArrayConverter converter = new ArrayConverter(jsonSchema, state);
+ Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+ }
+
+ @Test
+ public void schemaWithArrayOfRecords()
+ throws Exception {
+ String testName = "schemaWithArrayOfRecords";
+ JsonObject schema = getSchemaData(testName).getAsJsonObject();
+ JsonObject expected = getExpectedSchema(testName).getAsJsonObject();
+ JsonSchema jsonSchema = new JsonSchema(schema);
+ jsonSchema.setColumnName("dummy1");
+
+ ArrayConverter converter = new ArrayConverter(jsonSchema, state);
+
+ Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+ }
+
+ @Test
+ public void schemaWithRecord()
+ throws DataConversionException, SchemaConversionException, UnsupportedDateTypeException {
+ String testName = "schemaWithRecord";
+ JsonObject schema = getSchemaData(testName).getAsJsonObject();
+ JsonObject expected = getExpectedSchema(testName).getAsJsonObject();
+ JsonSchema jsonSchema = new JsonSchema(schema);
+ jsonSchema.setColumnName("dummy1");
+
+ RecordConverter converter =
+ new RecordConverter(jsonSchema, state, buildNamespace(state.getExtract().getNamespace(), "something"));
+
+ Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+ }
+
+ @Test
+ public void schemaWithArrayOfInts()
+ throws Exception {
+ String testName = "schemaWithArrayOfInts";
+ JsonObject schema = getSchemaData(testName).getAsJsonObject();
+ JsonObject expected = getExpectedSchema(testName).getAsJsonObject();
+
+ ArrayConverter converter = new ArrayConverter(new JsonSchema(schema), state);
+
+ Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+ }
+
+ @Test
+ public void schemaWithNullType() {
+ NullConverter nullConverter = new NullConverter(JsonSchema.buildBaseSchema(NULL));
+ JsonObject expected = new JsonObject();
+ expected.addProperty("type", "null");
+ expected.addProperty("source.type", "null");
+
+ Assert.assertEquals(avroSchemaToJsonElement(nullConverter), expected);
+ }
+
+ @Test
+ public void schemaWithArrayOfEnums()
+ throws Exception {
+ String testName = "schemaWithArrayOfEnums";
+ JsonObject schema = getSchemaData(testName).getAsJsonObject();
+ JsonObject expected = getExpectedSchema(testName).getAsJsonObject();
+
+ ArrayConverter converter = new ArrayConverter(new JsonSchema(schema), state);
+
+ Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+ }
+
+ @Test
+ public void schemaWithMap()
+ throws Exception {
+ String testName = "schemaWithMap";
+ JsonObject schema = getSchemaData(testName).getAsJsonObject();
+ JsonObject expected = getExpectedSchema(testName).getAsJsonObject();
+
+ MapConverter converter = new MapConverter(new JsonSchema(schema), state);
+
+ Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+ }
+
+ @Test
+ public void schemaWithMapOfRecords()
+ throws Exception {
+ String testName = "schemaWithMapOfRecords";
+ JsonObject schema = getSchemaData(testName).getAsJsonObject();
+ JsonObject expected = getExpectedSchema(testName).getAsJsonObject();
+
+ MapConverter converter = new MapConverter(new JsonSchema(schema), state);
+
+ Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+ }
+
+ @Test
+ public void schemaWithMapOfArrays()
+ throws Exception {
+ String testName = "schemaWithMapOfArrays";
+ JsonObject schema = getSchemaData(testName).getAsJsonObject();
+ JsonObject expected = getExpectedSchema(testName).getAsJsonObject();
+
+ MapConverter converter = new MapConverter(new JsonSchema(schema), state);
+
+ Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+ }
+
+ @Test
+ public void schemaWithMapOfEnum()
+ throws Exception {
+ String testName = "schemaWithMapOfEnum";
+ JsonObject schema = getSchemaData(testName).getAsJsonObject();
+ JsonObject expected = getExpectedSchema(testName).getAsJsonObject();
+
+ MapConverter converter = new MapConverter(new JsonSchema(schema), state);
+
+ Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+ }
+
+ @Test
+ public void schemaWithRecordOfMap()
+ throws Exception {
+ String testName = "schemaWithRecordOfMap";
+ JsonObject schema = getSchemaData(testName).getAsJsonObject();
+ JsonObject expected = getExpectedSchema(testName).getAsJsonObject();
+
+ RecordConverter converter = new RecordConverter(new JsonSchema(schema), state,
+ buildNamespace(state.getExtract().getNamespace(), "something"));
+
+ Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+ }
+
+ @Test
+ public void schemaWithRecordOfArray()
+ throws Exception {
+ String testName = "schemaWithRecordOfArray";
+ JsonObject schema = getSchemaData(testName).getAsJsonObject();
+ JsonObject expected = getExpectedSchema(testName).getAsJsonObject();
+
+ RecordConverter converter = new RecordConverter(new JsonSchema(schema), state,
+ buildNamespace(state.getExtract().getNamespace(), "something"));
+
+ Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+ }
+
+ @Test
+ public void schemaWithRecordOfEnum()
+ throws Exception {
+ String testName = "schemaWithRecordOfEnum";
+ JsonObject schema = getSchemaData(testName).getAsJsonObject();
+ JsonObject expected = getExpectedSchema(testName).getAsJsonObject();
+
+ RecordConverter converter = new RecordConverter(new JsonSchema(schema), state,
+ buildNamespace(state.getExtract().getNamespace(), "something"));
+
+ Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class)
+ public void schemaWithMapValuesAsJsonArray()
+ throws Exception {
+ String testName = "schemaWithMapValuesAsJsonArray";
+ JsonObject schema = getSchemaData(testName).getAsJsonObject();
+
+ new RecordConverter(new JsonSchema(schema), state, buildNamespace(state.getExtract().getNamespace(), "something"));
+ }
+
+ @Test(expectedExceptions = UnsupportedOperationException.class)
+ public void schemaWithMapValuesAsJsonNull()
+ throws Exception {
+ String testName = "schemaWithMapValuesAsJsonNull";
+ JsonObject schema = getSchemaData(testName).getAsJsonObject();
+
+ new RecordConverter(new JsonSchema(schema), state, buildNamespace(state.getExtract().getNamespace(), "something"));
+ }
+
+ @Test
+ public void schemaWithRecordOfRecord()
+ throws Exception {
+ String testName = "schemaWithRecordOfRecord";
+ JsonObject schema = getSchemaData(testName).getAsJsonObject();
+ JsonObject expected = getExpectedSchema(testName).getAsJsonObject();
+
+ RecordConverter converter = new RecordConverter(new JsonSchema(schema), state,
+ buildNamespace(state.getExtract().getNamespace(), "something"));
+
+ Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+ }
+
+ @Test
+ public void schemaWithRecordOfRecordCheckNamespace()
+ throws Exception {
+ String testName = "schemaWithRecordOfRecordCheckNamespace";
+ JsonObject schema = getSchemaData(testName).getAsJsonObject();
+ JsonObject expected = getExpectedSchema(testName).getAsJsonObject();
+
+ RecordConverter converter =
+ new RecordConverter(new JsonSchema(schema), state, buildNamespace(state.getExtract().getNamespace(), "person"));
+ Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+ Assert.assertEquals(converter.schema().getField("someperson").schema().getNamespace(), "namespace.person.myrecord");
+ Assert.assertEquals(converter.schema().getNamespace(), "namespace.person");
+ }
+
+ @Test
+ public void schemaWithRecordOfEnumCheckNamespace()
+ throws Exception {
+ String testName = "schemaWithRecordOfEnumCheckNamespace";
+ JsonObject schema = getSchemaData(testName).getAsJsonObject();
+ JsonObject expected = getExpectedSchema(testName).getAsJsonObject();
+
+ RecordConverter converter = new RecordConverter(new JsonSchema(schema), state,
+ buildNamespace(state.getExtract().getNamespace(), "something"));
+
+ Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+ Assert.assertEquals(converter.schema().getField("someperson").schema().getNamespace(),
+ "namespace.something.myrecord");
+ Assert.assertEquals(converter.schema().getNamespace(), "namespace.something");
+ }
+
+ @Test
+ public void schemaWithUnion()
+ throws Exception {
+ String testName = "schemaWithUnion";
+ JsonObject schema = getSchemaData(testName).getAsJsonObject();
+ JsonArray expected = getExpectedSchema(testName).getAsJsonArray();
+
+ UnionConverter converter = new UnionConverter(new JsonSchema(schema), state);
+
+ Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+ }
+
+ @Test
+ public void schemaWithComplexUnion()
+ throws Exception {
+ String testName = "schemaWithComplexUnion";
+ JsonObject schema = getSchemaData(testName).getAsJsonObject();
+ JsonArray expected = getExpectedSchema(testName).getAsJsonArray();
+
+ UnionConverter converter = new UnionConverter(new JsonSchema(schema), state);
+
+ Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+ }
+
+ @Test
+ public void schemaWithIsNullable()
+ throws Exception {
+ String testName = "schemaWithIsNullable";
+ JsonObject schema = getSchemaData(testName).getAsJsonObject();
+ JsonArray expected = getExpectedSchema(testName).getAsJsonArray();
+
+ StringConverter converter = new StringConverter(new JsonSchema(schema));
+
+ Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+ }
+
+ @Test
+ public void schemaWithRecordIsNullable()
+ throws Exception {
+ String testName = "schemaWithRecordIsNullable";
+ JsonObject schema = getSchemaData(testName).getAsJsonObject();
+ JsonArray expected = getExpectedSchema(testName).getAsJsonArray();
+
+ RecordConverter converter = new RecordConverter(new JsonSchema(schema), state,
+ buildNamespace(state.getExtract().getNamespace(), "something"));
+
+ Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+ }
+
+ @Test
+ public void schemaWithMapIsNullable()
+ throws Exception {
+ String testName = "schemaWithMapIsNullable";
+ JsonObject schema = getSchemaData(testName).getAsJsonObject();
+ JsonArray expected = getExpectedSchema(testName).getAsJsonArray();
+
+ MapConverter converter = new MapConverter(new JsonSchema(schema), state);
+
+ Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+ }
+
+ @Test
+ public void schemaWithEnumIsNullable()
+ throws Exception {
+ String testName = "schemaWithEnumIsNullable";
+ JsonObject schema = getSchemaData(testName).getAsJsonObject();
+ JsonArray expected = getExpectedSchema(testName).getAsJsonArray();
+
+ EnumConverter converter = new EnumConverter(new JsonSchema(schema), "something");
+
+ Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+ }
+
+ @Test
+ public void schemaWithArrayIsNullable()
+ throws Exception {
+ String testName = "schemaWithArrayIsNullable";
+ JsonObject schema = getSchemaData(testName).getAsJsonObject();
+ JsonArray expected = getExpectedSchema(testName).getAsJsonArray();
+
+ ArrayConverter converter = new ArrayConverter(new JsonSchema(schema), state);
+
+ Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+ }
+
+ private JsonElement avroSchemaToJsonElement(JsonElementConverter converter) {
+ return jsonParser.parse(converter.schema().toString());
+ }
+
+ private JsonElement getExpectedSchema(String methodName) {
+ return testData.get(methodName).getAsJsonArray().get(1);
+ }
+
+ private JsonElement getSchemaData(String methodName) {
+ return testData.get(methodName).getAsJsonArray().get(0);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6dd36a50/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonIntermediateToAvroConverterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonIntermediateToAvroConverterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonIntermediateToAvroConverterTest.java
index 558c1f6..be2f417 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonIntermediateToAvroConverterTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonIntermediateToAvroConverterTest.java
@@ -26,22 +26,25 @@ import java.util.TimeZone;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericRecord;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.DataConversionException;
+import org.apache.gobblin.converter.SchemaConversionException;
+import org.apache.gobblin.source.workunit.Extract;
+import org.apache.gobblin.source.workunit.WorkUnit;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.SourceState;
-import org.apache.gobblin.configuration.WorkUnitState;
-import org.apache.gobblin.source.workunit.Extract.TableType;
/**
@@ -55,28 +58,48 @@ public class JsonIntermediateToAvroConverterTest {
private JsonObject jsonRecord;
private WorkUnitState state;
- @BeforeClass
- public void setUp()
- throws Exception {
- Type listType = new TypeToken<JsonArray>() {
+ /**
+ * To test schema and record using the path to their resource file.
+ * @param resourceFilePath
+ * @throws SchemaConversionException
+ * @throws DataConversionException
+ */
+ private void complexSchemaTest(String resourceFilePath)
+ throws SchemaConversionException, DataConversionException {
+ JsonObject testData = initResources(resourceFilePath);
+
+ JsonIntermediateToAvroConverter converter = new JsonIntermediateToAvroConverter();
+
+ Schema avroSchema = converter.convertSchema(jsonSchema, state);
+ GenericRecord genericRecord = converter.convertRecord(avroSchema, jsonRecord, state).iterator().next();
+ JsonParser parser = new JsonParser();
+ Assert.assertEquals(parser.parse(avroSchema.toString()).getAsJsonObject(),
+ testData.get("expectedSchema").getAsJsonObject());
+ Assert.assertEquals(parser.parse(genericRecord.toString()), testData.get("expectedRecord").getAsJsonObject());
+ }
+
+ private JsonObject initResources(String resourceFilePath) {
+ Type listType = new TypeToken<JsonObject>() {
}.getType();
Gson gson = new Gson();
- jsonSchema = gson.fromJson(new InputStreamReader(this.getClass().getResourceAsStream("/converter/schema.json")), listType);
+ JsonObject testData =
+ gson.fromJson(new InputStreamReader(this.getClass().getResourceAsStream(resourceFilePath)), listType);
- listType = new TypeToken<JsonObject>() {
- }.getType();
- jsonRecord = gson.fromJson(new InputStreamReader(this.getClass().getResourceAsStream("/converter/record.json")), listType);
+ jsonRecord = testData.get("record").getAsJsonObject();
+ jsonSchema = testData.get("schema").getAsJsonArray();
- SourceState source = new SourceState();
- state = new WorkUnitState(
- source.createWorkUnit(source.createExtract(TableType.SNAPSHOT_ONLY, "test_table", "test_namespace")));
+ WorkUnit workUnit = new WorkUnit(new SourceState(),
+ new Extract(new SourceState(), Extract.TableType.SNAPSHOT_ONLY, "namespace", "dummy_table"));
+ state = new WorkUnitState(workUnit);
state.setProp(ConfigurationKeys.CONVERTER_AVRO_TIME_FORMAT, "HH:mm:ss");
state.setProp(ConfigurationKeys.CONVERTER_AVRO_DATE_TIMEZONE, "PST");
+ return testData;
}
@Test
public void testConverter()
throws Exception {
+ initResources("/converter/schema.json");
JsonIntermediateToAvroConverter converter = new JsonIntermediateToAvroConverter();
Schema avroSchema = converter.convertSchema(jsonSchema, state);
@@ -121,4 +144,22 @@ public class JsonIntermediateToAvroConverterTest {
Assert.assertNotEquals(record.get("LastModifiedDate"), record2.get("LastModifiedDate"));
}
+
+ @Test
+ public void testComplexSchema1()
+ throws Exception {
+ complexSchemaTest("/converter/complex1.json");
+ }
+
+ @Test
+ public void testComplexSchema2()
+ throws Exception {
+ complexSchemaTest("/converter/complex2.json");
+ }
+
+ @Test
+ public void testComplexSchema3()
+ throws Exception {
+ complexSchemaTest("/converter/complex3.json");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6dd36a50/gobblin-core/src/test/java/org/apache/gobblin/converter/json/JsonStringToJsonIntermediateConverterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/converter/json/JsonStringToJsonIntermediateConverterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/converter/json/JsonStringToJsonIntermediateConverterTest.java
new file mode 100644
index 0000000..305113b
--- /dev/null
+++ b/gobblin-core/src/test/java/org/apache/gobblin/converter/json/JsonStringToJsonIntermediateConverterTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.converter.json;
+
+import java.io.InputStreamReader;
+import java.lang.reflect.Type;
+import java.util.Map;
+
+import org.apache.gobblin.converter.DataConversionException;
+import org.apache.gobblin.converter.SchemaConversionException;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.reflect.TypeToken;
+
+import gobblin.configuration.WorkUnitState;
+
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Unit test for {@link JsonStringToJsonIntermediateConverter}
+ *
+ * @author Tilak Patidar
+ */
+@Test(groups = {"gobblin.converter"})
+public class JsonStringToJsonIntermediateConverterTest {
+
+ private static JsonStringToJsonIntermediateConverter converter;
+ private static JsonObject testJsonData;
+
+ @BeforeClass
+ public static void setUp()
+ throws SchemaConversionException {
+ converter = new JsonStringToJsonIntermediateConverter();
+ WorkUnitState workUnit = new WorkUnitState();
+ workUnit.getPropAsBoolean("gobblin.converter.jsonStringToJsonIntermediate.unpackComplexSchemas", true);
+ converter.convertSchema("[]", workUnit);
+ Type jsonType = new TypeToken<JsonObject>() {
+ }.getType();
+ Gson gson = new Gson();
+ testJsonData = gson.fromJson(new InputStreamReader(JsonStringToJsonIntermediateConverterTest.class
+ .getResourceAsStream("/converter/JsonStringToJsonIntermediateConverter.json")), jsonType);
+ }
+
+ private JsonObject parseJsonObject(JsonObject json, JsonArray record)
+ throws DataConversionException {
+ return converter.convertRecord(record, json.toString(), new WorkUnitState()).iterator().next();
+ }
+
+ @Test
+ public void testAllCases()
+ throws DataConversionException {
+ for (Map.Entry<String, JsonElement> keyset : testJsonData.entrySet()) {
+ JsonArray testData = keyset.getValue().getAsJsonArray();
+ JsonObject json = testData.get(0).getAsJsonObject();
+ JsonArray schema = testData.get(1).getAsJsonArray();
+ JsonObject expected = testData.get(2).getAsJsonObject();
+ JsonObject result = null;
+ try {
+ result = parseJsonObject(json, schema);
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertEquals("Test case failed : " + keyset.getKey(), "No exception", e.getMessage());
+ }
+ assertEquals("Test case failed : " + keyset.getKey(), expected, result);
+ }
+ }
+
+ @Test(expectedExceptions = DataConversionException.class, expectedExceptionsMessageRegExp = "Invalid symbol.*")
+ public void jsonWithInvalidEnumEntry()
+ throws DataConversionException {
+ String jsonStr = "{\"a\":\"somename\", \"b\":\"TROLL\"}";
+ String schemaStr =
+ " [{\"columnName\":\"a\", \"dataType\":{\"type\":\"string\"}},{\"columnName\":\"b\", \"dataType\":{\"type\":\"enum\", \"symbols\":[\"HELL\",\"BELLS\"]}}]";
+
+ parseJsonObject(buildJsonObject(jsonStr), buildJsonArray(schemaStr));
+ }
+
+ @Test(expectedExceptions = UnsupportedOperationException.class, expectedExceptionsMessageRegExp = "Array items can only be defined using JsonObject or JsonPrimitive.")
+ public void jsonWithArrayOfMapContainingRecordWithWrongSchema()
+ throws DataConversionException {
+ String jsonStr = "{\"a\":\"somename\", \"b\":[{\"d\":{\"age\":\"10\"}},{\"d\":{\"age\":\"1\"}}]}";
+ String schemaStr =
+ "[{\"columnName\":\"a\", \"dataType\":{\"type\":\"string\"}},{\"columnName\":\"b\", \"dataType\":{\"type\":\"array\", \"items\":[{\"dataType\":{\"type\":\"map\", \"values\":{\"dataType\":{\"type\":\"record\",\"values\":[{\"columnName\":\"age\", \"dataType\":{\"type\":\"int\"}}]}}}}]}}]";
+
+ parseJsonObject(buildJsonObject(jsonStr), buildJsonArray(schemaStr));
+ }
+
+ private JsonObject buildJsonObject(String s) {
+ JsonParser parser = new JsonParser();
+ return (JsonObject) parser.parse(s);
+ }
+
+ private JsonArray buildJsonArray(String schemaStr) {
+ JsonParser parser = new JsonParser();
+ return parser.parse(schemaStr).getAsJsonArray();
+ }
+}
\ No newline at end of file