You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2017/10/18 05:34:06 UTC

[3/3] incubator-gobblin git commit: [GOBBLIN-226] Nested schema support in JsonStringToJsonIntermediateConverter and JsonIntermediateToAvroConverter

[GOBBLIN-226] Nested schema support in JsonStringToJsonIntermediateConverter and JsonIntermediateToAvroConverter

Closes #2080 from tilakpatidar/nested_schema


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/6dd36a50
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/6dd36a50
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/6dd36a50

Branch: refs/heads/master
Commit: 6dd36a506d574a261bf678aaf071d89e597044e8
Parents: f058211
Author: tilakpatidar <ti...@gmail.com>
Authored: Wed Oct 18 11:03:52 2017 +0530
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Wed Oct 18 11:03:52 2017 +0530

----------------------------------------------------------------------
 .../avro/JsonElementConversionFactory.java      | 414 +++++++--
 ...nElementConversionWithAvroSchemaFactory.java |  38 +-
 .../avro/JsonIntermediateToAvroConverter.java   |  98 +-
 .../JsonRecordAvroSchemaToAvroConverter.java    |   3 +
 .../gobblin/converter/json/JsonSchema.java      | 297 ++++++
 .../JsonStringToJsonIntermediateConverter.java  | 211 ++++-
 .../avro/JsonElementConversionFactoryTest.java  | 397 ++++++++
 .../JsonIntermediateToAvroConverterTest.java    |  73 +-
 ...onStringToJsonIntermediateConverterTest.java | 118 +++
 .../JsonElementConversionFactoryTest.json       | 856 +++++++++++++++++
 .../JsonStringToJsonIntermediateConverter.json  | 919 +++++++++++++++++++
 .../src/test/resources/converter/complex1.json  | 527 +++++++++++
 .../src/test/resources/converter/complex2.json  | 186 ++++
 .../src/test/resources/converter/complex3.json  | 548 +++++++++++
 .../src/test/resources/converter/record.json    |  23 -
 .../src/test/resources/converter/record3.json   |  27 +
 .../src/test/resources/converter/schema.json    | 724 ++++++++-------
 .../Configuration-Properties-Glossary.md        |  33 +-
 18 files changed, 4880 insertions(+), 612 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6dd36a50/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionFactory.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionFactory.java
index aa015a1..07e1fc5 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionFactory.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionFactory.java
@@ -28,19 +28,29 @@ import java.util.TimeZone;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.util.Utf8;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.EmptyIterable;
+import org.apache.gobblin.converter.json.JsonSchema;
+import org.codehaus.jackson.node.JsonNodeFactory;
 import org.joda.time.DateTimeZone;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
-
-import sun.util.calendar.ZoneInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.WorkUnitState;
+import lombok.extern.java.Log;
+import sun.util.calendar.ZoneInfo;
+
+import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type.*;
+import static org.apache.gobblin.converter.json.JsonSchema.*;
 
 
 /**
@@ -67,81 +77,121 @@ public class JsonElementConversionFactory {
     BOOLEAN,
     ARRAY,
     MAP,
-    ENUM
+    ENUM,
+    RECORD,
+    NULL,
+    UNION;
+
+    private static List<Type> primitiveTypes =
+        Arrays.asList(NULL, BOOLEAN, INT, LONG, FLOAT, DOUBLE, BYTES, STRING, ENUM, FIXED);
+
+    public static boolean isPrimitive(Type type) {
+      return primitiveTypes.contains(type);
+    }
   }
 
   /**
    * Use to create a converter for a single field from a schema.
-   *
-   * @param fieldName
-   * @param fieldType
-   * @param nullable
    * @param schemaNode
+   * @param namespace
    * @param state
-   * @return
+   * @return {@link JsonElementConverter}
    * @throws UnsupportedDateTypeException
    */
-  public static JsonElementConverter getConvertor(String fieldName, String fieldType, JsonObject schemaNode,
-      WorkUnitState state, boolean nullable) throws UnsupportedDateTypeException {
+  public static JsonElementConverter getConvertor(JsonSchema schemaNode, String namespace, WorkUnitState state)
+      throws UnsupportedDateTypeException {
 
-    Type type;
-    try {
-      type = Type.valueOf(fieldType.toUpperCase());
-    } catch (IllegalArgumentException e) {
-      throw new UnsupportedDateTypeException(fieldType + " is unsupported");
-    }
+    Type type = schemaNode.getType();
 
     DateTimeZone timeZone = getTimeZone(state.getProp(ConfigurationKeys.CONVERTER_AVRO_DATE_TIMEZONE, "UTC"));
     switch (type) {
       case DATE:
-        return new DateConverter(fieldName, nullable, type.toString(),
+        return new DateConverter(schemaNode,
             state.getProp(ConfigurationKeys.CONVERTER_AVRO_DATE_FORMAT, "yyyy-MM-dd HH:mm:ss"), timeZone, state);
 
       case TIMESTAMP:
-        return new DateConverter(fieldName, nullable, type.toString(),
+        return new DateConverter(schemaNode,
             state.getProp(ConfigurationKeys.CONVERTER_AVRO_TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss"), timeZone, state);
 
       case TIME:
-        return new DateConverter(fieldName, nullable, type.toString(),
-            state.getProp(ConfigurationKeys.CONVERTER_AVRO_TIME_FORMAT, "HH:mm:ss"), timeZone, state);
+        return new DateConverter(schemaNode, state.getProp(ConfigurationKeys.CONVERTER_AVRO_TIME_FORMAT, "HH:mm:ss"),
+            timeZone, state);
 
       case FIXED:
-        throw new UnsupportedDateTypeException(fieldType + " is unsupported");
+        throw new UnsupportedDateTypeException(type.toString() + " is unsupported");
 
       case STRING:
-        return new StringConverter(fieldName, nullable, type.toString());
+        return new StringConverter(schemaNode);
 
       case BYTES:
-        return new BinaryConverter(fieldName, nullable, type.toString(),
-            state.getProp(ConfigurationKeys.CONVERTER_AVRO_BINARY_CHARSET, "UTF8"));
+        return new BinaryConverter(schemaNode, state.getProp(ConfigurationKeys.CONVERTER_AVRO_BINARY_CHARSET, "UTF8"));
 
       case INT:
-        return new IntConverter(fieldName, nullable, type.toString());
+        return new IntConverter(schemaNode);
 
       case LONG:
-        return new LongConverter(fieldName, nullable, type.toString());
+        return new LongConverter(schemaNode);
 
       case FLOAT:
-        return new FloatConverter(fieldName, nullable, type.toString());
+        return new FloatConverter(schemaNode);
 
       case DOUBLE:
-        return new DoubleConverter(fieldName, nullable, type.toString());
+        return new DoubleConverter(schemaNode);
 
       case BOOLEAN:
-        return new BooleanConverter(fieldName, nullable, type.toString());
+        return new BooleanConverter(schemaNode);
 
       case ARRAY:
-        return new ArrayConverter(fieldName, nullable, type.toString(), schemaNode, state);
+        return new ArrayConverter(schemaNode, state);
 
       case MAP:
-        return new MapConverter(fieldName, nullable, type.toString(), schemaNode, state);
+        return new MapConverter(schemaNode, state);
 
       case ENUM:
-        return new EnumConverter(fieldName, nullable, type.toString(), schemaNode);
+        return new EnumConverter(schemaNode, namespace);
+
+      case RECORD:
+        return new RecordConverter(schemaNode, state, namespace);
+
+      case NULL:
+        return new NullConverter(schemaNode);
+
+      case UNION:
+        return new UnionConverter(schemaNode, state);
 
       default:
-        throw new UnsupportedDateTypeException(fieldType + " is unsupported");
+        throw new UnsupportedDateTypeException(type.toString() + " is unsupported");
+    }
+  }
+
+  /**
+   * Backward Compatible form of {@link JsonElementConverter#getConvertor(JsonSchema, String, WorkUnitState)}
+   * @param fieldName
+   * @param fieldType
+   * @param schemaNode
+   * @param state
+   * @param nullable
+   * @return
+   * @throws UnsupportedDateTypeException
+   */
+  public static JsonElementConverter getConvertor(String fieldName, String fieldType, JsonObject schemaNode,
+      WorkUnitState state, boolean nullable)
+      throws UnsupportedDateTypeException {
+    if (!schemaNode.has(COLUMN_NAME_KEY)) {
+      schemaNode.addProperty(COLUMN_NAME_KEY, fieldName);
+    }
+    if (!schemaNode.has(DATA_TYPE_KEY)) {
+      schemaNode.add(DATA_TYPE_KEY, new JsonObject());
+    }
+    JsonObject dataType = schemaNode.get(DATA_TYPE_KEY).getAsJsonObject();
+    if (!dataType.has(TYPE_KEY)) {
+      dataType.addProperty(TYPE_KEY, fieldType);
+    }
+    if (!schemaNode.has(IS_NULLABLE_KEY)) {
+      schemaNode.addProperty(IS_NULLABLE_KEY, nullable);
     }
+    JsonSchema schema = new JsonSchema(schemaNode);
+    return getConvertor(schema, null, state);
   }
 
   private static DateTimeZone getTimeZone(String id) {
@@ -166,19 +216,17 @@ public class JsonElementConversionFactory {
    *
    */
   public static abstract class JsonElementConverter {
-    private String name;
-    private boolean nullable;
-    private String sourceType;
+    private final JsonSchema jsonSchema;
+
+    public JsonElementConverter(JsonSchema jsonSchema) {
+      this.jsonSchema = jsonSchema;
+    }
 
-    /**
-     *
-     * @param fieldName
-     * @param nullable
-     */
     public JsonElementConverter(String fieldName, boolean nullable, String sourceType) {
-      this.name = fieldName;
-      this.nullable = nullable;
-      this.sourceType = sourceType;
+      JsonSchema jsonSchema = buildBaseSchema(Type.valueOf(sourceType.toUpperCase()));
+      jsonSchema.setColumnName(fieldName);
+      jsonSchema.setNullable(nullable);
+      this.jsonSchema = jsonSchema;
     }
 
     /**
@@ -186,7 +234,7 @@ public class JsonElementConversionFactory {
      * @return
      */
     public String getName() {
-      return this.name;
+      return this.jsonSchema.getColumnName();
     }
 
     /**
@@ -194,7 +242,7 @@ public class JsonElementConversionFactory {
      * @return
      */
     public boolean isNullable() {
-      return this.nullable;
+      return this.jsonSchema.isNullable();
     }
 
     /**
@@ -202,7 +250,7 @@ public class JsonElementConversionFactory {
      * @return
      */
     public Schema getSchema() {
-      if (this.nullable) {
+      if (isNullable()) {
         List<Schema> list = new ArrayList<>();
         list.add(Schema.create(Schema.Type.NULL));
         list.add(schema());
@@ -213,8 +261,8 @@ public class JsonElementConversionFactory {
 
     protected Schema schema() {
       Schema schema = Schema.create(getTargetType());
-      schema.addProp("source.type", this.sourceType.toLowerCase());
-      return schema;
+      schema.addProp("source.type", this.jsonSchema.getType().toString().toLowerCase());
+      return buildUnionIfNullable(schema);
     }
 
     /**
@@ -224,7 +272,7 @@ public class JsonElementConversionFactory {
      */
     public Object convert(JsonElement value) {
       if (value.isJsonNull()) {
-        if (this.nullable) {
+        if (isNullable()) {
           return null;
         }
         throw new RuntimeException("Field: " + getName() + " is not nullable and contains a null value");
@@ -244,12 +292,29 @@ public class JsonElementConversionFactory {
      * @return
      */
     public abstract Schema.Type getTargetType();
+
+    protected static String buildNamespace(String namespace, String name) {
+      if (namespace == null || namespace.isEmpty()) {
+        return null;
+      }
+      if (name == null || name.isEmpty()) {
+        return null;
+      }
+      return namespace.trim() + "." + name.trim();
+    }
+
+    protected Schema buildUnionIfNullable(Schema schema) {
+      if (this.isNullable()) {
+        return Schema.createUnion(Schema.create(Schema.Type.NULL), schema);
+      }
+      return schema;
+    }
   }
 
   public static class StringConverter extends JsonElementConverter {
 
-    public StringConverter(String fieldName, boolean nullable, String sourceType) {
-      super(fieldName, nullable, sourceType);
+    public StringConverter(JsonSchema schema) {
+      super(schema);
     }
 
     @Override
@@ -265,8 +330,8 @@ public class JsonElementConversionFactory {
 
   public static class IntConverter extends JsonElementConverter {
 
-    public IntConverter(String fieldName, boolean nullable, String sourceType) {
-      super(fieldName, nullable, sourceType);
+    public IntConverter(JsonSchema schema) {
+      super(schema);
     }
 
     @Override
@@ -283,8 +348,8 @@ public class JsonElementConversionFactory {
 
   public static class LongConverter extends JsonElementConverter {
 
-    public LongConverter(String fieldName, boolean nullable, String sourceType) {
-      super(fieldName, nullable, sourceType);
+    public LongConverter(JsonSchema schema) {
+      super(schema);
     }
 
     @Override
@@ -301,8 +366,8 @@ public class JsonElementConversionFactory {
 
   public static class DoubleConverter extends JsonElementConverter {
 
-    public DoubleConverter(String fieldName, boolean nullable, String sourceType) {
-      super(fieldName, nullable, sourceType);
+    public DoubleConverter(JsonSchema schema) {
+      super(schema);
     }
 
     @Override
@@ -318,8 +383,8 @@ public class JsonElementConversionFactory {
 
   public static class FloatConverter extends JsonElementConverter {
 
-    public FloatConverter(String fieldName, boolean nullable, String sourceType) {
-      super(fieldName, nullable, sourceType);
+    public FloatConverter(JsonSchema schema) {
+      super(schema);
     }
 
     @Override
@@ -335,8 +400,8 @@ public class JsonElementConversionFactory {
 
   public static class BooleanConverter extends JsonElementConverter {
 
-    public BooleanConverter(String fieldName, boolean nullable, String sourceType) {
-      super(fieldName, nullable, sourceType);
+    public BooleanConverter(JsonSchema schema) {
+      super(schema);
     }
 
     @Override
@@ -356,9 +421,8 @@ public class JsonElementConversionFactory {
     private DateTimeZone timeZone;
     private WorkUnitState state;
 
-    public DateConverter(String fieldName, boolean nullable, String sourceType, String pattern, DateTimeZone zone,
-        WorkUnitState state) {
-      super(fieldName, nullable, sourceType);
+    public DateConverter(JsonSchema schema, String pattern, DateTimeZone zone, WorkUnitState state) {
+      super(schema);
       this.inputPatterns = pattern;
       this.timeZone = zone;
       this.state = state;
@@ -398,8 +462,8 @@ public class JsonElementConversionFactory {
   public static class BinaryConverter extends JsonElementConverter {
     private String charSet;
 
-    public BinaryConverter(String fieldName, boolean nullable, String sourceType, String charSet) {
-      super(fieldName, nullable, sourceType);
+    public BinaryConverter(JsonSchema schema, String charSet) {
+      super(schema);
       this.charSet = charSet;
     }
 
@@ -421,6 +485,10 @@ public class JsonElementConversionFactory {
   public static abstract class ComplexConverter extends JsonElementConverter {
     private JsonElementConverter elementConverter;
 
+    public ComplexConverter(JsonSchema schema) {
+      super(schema);
+    }
+
     public ComplexConverter(String fieldName, boolean nullable, String sourceType) {
       super(fieldName, nullable, sourceType);
     }
@@ -432,27 +500,46 @@ public class JsonElementConversionFactory {
     public JsonElementConverter getElementConverter() {
       return this.elementConverter;
     }
+
+    protected void processNestedItems(JsonSchema schema, WorkUnitState state)
+        throws UnsupportedDateTypeException {
+      JsonSchema nestedItem = null;
+      if (schema.isType(ARRAY)) {
+        nestedItem = schema.getItemsWithinDataType();
+      }
+      if (schema.isType(MAP)) {
+        nestedItem = schema.getValuesWithinDataType();
+      }
+      this.setElementConverter(getConvertor(nestedItem, null, state));
+    }
   }
 
   public static class ArrayConverter extends ComplexConverter {
 
-    public ArrayConverter(String fieldName, boolean nullable, String sourceType, JsonObject schemaNode,
-        WorkUnitState state) throws UnsupportedDateTypeException {
-      super(fieldName, nullable, sourceType);
-      super.setElementConverter(
-          getConvertor(fieldName, schemaNode.get("dataType").getAsJsonObject().get("items").getAsString(),
-              schemaNode.get("dataType").getAsJsonObject(), state, isNullable()));
+    public ArrayConverter(JsonSchema schema, WorkUnitState state)
+        throws UnsupportedDateTypeException {
+      super(schema);
+      processNestedItems(schema, state);
     }
 
     @Override
     Object convertField(JsonElement value) {
+      if (this.isNullable() && value.isJsonNull()) {
+        return null;
+      }
       List<Object> list = new ArrayList<>();
 
       for (JsonElement elem : (JsonArray) value) {
         list.add(getElementConverter().convertField(elem));
       }
 
-      return new GenericData.Array<>(schema(), list);
+      return new GenericData.Array<>(arraySchema(), list);
+    }
+
+    private Schema arraySchema() {
+      Schema schema = Schema.createArray(getElementConverter().schema());
+      schema.addProp(SOURCE_TYPE, ARRAY.toString().toLowerCase());
+      return schema;
     }
 
     @Override
@@ -462,20 +549,16 @@ public class JsonElementConversionFactory {
 
     @Override
     public Schema schema() {
-      Schema schema = Schema.createArray(getElementConverter().schema());
-      schema.addProp("source.type", "array");
-      return schema;
+      return buildUnionIfNullable(arraySchema());
     }
   }
 
   public static class MapConverter extends ComplexConverter {
 
-    public MapConverter(String fieldName, boolean nullable, String sourceType, JsonObject schemaNode,
-        WorkUnitState state) throws UnsupportedDateTypeException {
-      super(fieldName, nullable, sourceType);
-      super.setElementConverter(
-          getConvertor(fieldName, schemaNode.get("dataType").getAsJsonObject().get("values").getAsString(),
-              schemaNode.get("dataType").getAsJsonObject(), state, isNullable()));
+    public MapConverter(JsonSchema schema, WorkUnitState state)
+        throws UnsupportedDateTypeException {
+      super(schema);
+      processNestedItems(schema, state);
     }
 
     @Override
@@ -497,23 +580,106 @@ public class JsonElementConversionFactory {
     @Override
     public Schema schema() {
       Schema schema = Schema.createMap(getElementConverter().schema());
-      schema.addProp("source.type", "map");
-      return schema;
+      schema.addProp(SOURCE_TYPE, MAP.toString().toLowerCase());
+      return buildUnionIfNullable(schema);
+    }
+  }
+
+  @Log
+  public static class RecordConverter extends ComplexConverter {
+    private static final Logger LOG = LoggerFactory.getLogger(RecordConverter.class);
+    private HashMap<String, JsonElementConverter> converters = new HashMap<>();
+    private Schema _schema;
+    private long numFailedConversion = 0;
+    private State workUnit;
+
+    public RecordConverter(JsonSchema schema, WorkUnitState state, String namespace)
+        throws UnsupportedDateTypeException {
+      super(schema);
+      workUnit = state;
+      String name = schema.isRoot() ? schema.getColumnName() : schema.getName();
+      _schema = buildRecordSchema(schema.getValuesWithinDataType(), state, name, namespace);
+    }
+
+    private Schema buildRecordSchema(JsonSchema schema, WorkUnitState workUnit, String name, String namespace) {
+      List<Schema.Field> fields = new ArrayList<>();
+      for (int i = 0; i < schema.fieldsCount(); i++) {
+        JsonSchema map = schema.getFieldSchemaAt(i);
+        String childNamespace = buildNamespace(namespace, name);
+        JsonElementConverter converter;
+        String sourceType;
+        Schema fldSchema;
+        try {
+          sourceType = map.isType(UNION) ? UNION.toString().toLowerCase() : map.getType().toString().toLowerCase();
+          converter = getConvertor(map, childNamespace, workUnit);
+          this.converters.put(map.getColumnName(), converter);
+          fldSchema = converter.schema();
+        } catch (UnsupportedDateTypeException e) {
+          throw new UnsupportedOperationException(e);
+        }
+
+        Schema.Field fld = new Schema.Field(map.getColumnName(), fldSchema, map.getComment(),
+            map.isNullable() ? JsonNodeFactory.instance.nullNode() : null);
+        fld.addProp(SOURCE_TYPE, sourceType);
+        fields.add(fld);
+      }
+      Schema avroSchema = Schema.createRecord(name.isEmpty() ? null : name, "", namespace, false);
+      avroSchema.setFields(fields);
+
+      return avroSchema;
+    }
+
+    @Override
+    Object convertField(JsonElement value) {
+      GenericRecord avroRecord = new GenericData.Record(_schema);
+      long maxFailedConversions = this.workUnit.getPropAsLong(ConfigurationKeys.CONVERTER_AVRO_MAX_CONVERSION_FAILURES,
+          ConfigurationKeys.DEFAULT_CONVERTER_AVRO_MAX_CONVERSION_FAILURES);
+      for (Map.Entry<String, JsonElement> entry : ((JsonObject) value).entrySet()) {
+        try {
+          avroRecord.put(entry.getKey(), this.converters.get(entry.getKey()).convert(entry.getValue()));
+        } catch (Exception e) {
+          this.numFailedConversion++;
+          if (this.numFailedConversion < maxFailedConversions) {
+            LOG.error("Dropping record " + value + " because it cannot be converted to Avro", e);
+            return new EmptyIterable<>();
+          }
+          throw new RuntimeException(
+              "Unable to convert field:" + entry.getKey() + " for value:" + entry.getValue() + " for record: " + value,
+              e);
+        }
+      }
+      return avroRecord;
+    }
+
+    @Override
+    public org.apache.avro.Schema.Type getTargetType() {
+      return Schema.Type.RECORD;
+    }
+
+    @Override
+    public Schema schema() {
+      Schema schema = _schema;
+      schema.addProp(SOURCE_TYPE, RECORD.toString().toLowerCase());
+      return buildUnionIfNullable(schema);
     }
   }
 
   public static class EnumConverter extends JsonElementConverter {
     String enumName;
+    String namespace;
     List<String> enumSet = new ArrayList<>();
     Schema schema;
 
-    public EnumConverter(String fieldName, boolean nullable, String sourceType, JsonObject schemaNode) {
-      super(fieldName, nullable, sourceType);
+    public EnumConverter(JsonSchema schema, String namespace) {
+      super(schema);
 
-      for (JsonElement elem : schemaNode.get("dataType").getAsJsonObject().get("symbols").getAsJsonArray()) {
+      JsonObject dataType = schema.getDataType();
+      for (JsonElement elem : dataType.get(ENUM_SYMBOLS_KEY).getAsJsonArray()) {
         this.enumSet.add(elem.getAsString());
       }
-      this.enumName = schemaNode.get("dataType").getAsJsonObject().get("name").getAsString();
+      String enumName = schema.getName();
+      this.enumName = enumName.isEmpty() ? null : enumName;
+      this.namespace = namespace;
     }
 
     @Override
@@ -528,9 +694,69 @@ public class JsonElementConversionFactory {
 
     @Override
     public Schema schema() {
-      this.schema = Schema.createEnum(this.enumName, "", "", this.enumSet);
-      this.schema.addProp("source.type", "enum");
-      return this.schema;
+      this.schema = Schema.createEnum(this.enumName, "", namespace, this.enumSet);
+      this.schema.addProp(SOURCE_TYPE, ENUM.toString().toLowerCase());
+      return buildUnionIfNullable(this.schema);
+    }
+  }
+
+  public static class NullConverter extends JsonElementConverter {
+
+    public NullConverter(JsonSchema schema) {
+      super(schema);
+    }
+
+    @Override
+    Object convertField(JsonElement value) {
+      return value.getAsJsonNull();
+    }
+
+    @Override
+    public org.apache.avro.Schema.Type getTargetType() {
+      return Schema.Type.NULL;
+    }
+  }
+
+  public static class UnionConverter extends JsonElementConverter {
+    private final Schema firstSchema;
+    private final Schema secondSchema;
+    private final JsonElementConverter firstConverter;
+    private final JsonElementConverter secondConverter;
+
+    public UnionConverter(JsonSchema schemaNode, WorkUnitState state) {
+      super(schemaNode);
+      List<JsonSchema> types = schemaNode.getDataTypes();
+      firstConverter = getConverter(types.get(0), state);
+      secondConverter = getConverter(types.get(1), state);
+      firstSchema = firstConverter.schema();
+      secondSchema = secondConverter.schema();
+    }
+
+    private JsonElementConverter getConverter(JsonSchema schemaElement, WorkUnitState state) {
+      try {
+        return JsonElementConversionFactory.getConvertor(schemaElement, null, state);
+      } catch (UnsupportedDateTypeException e) {
+        throw new UnsupportedOperationException(e);
+      }
+    }
+
+    @Override
+    Object convertField(JsonElement value) {
+      try {
+        return firstConverter.convert(value);
+      } catch (Exception e) {
+        return secondConverter.convert(value);
+      }
+    }
+
+    @Override
+    public Schema.Type getTargetType() {
+      return Schema.Type.UNION;
+    }
+
+    @Override
+    protected Schema schema() {
+      return Schema.createUnion(firstSchema, secondSchema);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6dd36a50/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java
index ca5d88d..1da8d31 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java
@@ -17,17 +17,19 @@
 
 package org.apache.gobblin.converter.avro;
 
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.gobblin.configuration.WorkUnitState;
 
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
 
 /**
  * Creates a converter for Json types to Avro types. Overrides {@link ArrayConverter}, {@link MapConverter},
@@ -41,7 +43,8 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve
    */
 
   public static JsonElementConverter getConvertor(String fieldName, String fieldType, Schema schemaNode,
-      WorkUnitState state, boolean nullable) throws UnsupportedDateTypeException {
+      WorkUnitState state, boolean nullable)
+      throws UnsupportedDateTypeException {
 
     Type type;
     try {
@@ -52,27 +55,30 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve
 
     switch (type) {
       case ARRAY:
-        return new JsonElementConversionWithAvroSchemaFactory.ArrayConverter(fieldName, nullable, type.toString(), schemaNode, state);
+        return new JsonElementConversionWithAvroSchemaFactory.ArrayConverter(fieldName, nullable, type.toString(),
+            schemaNode, state);
 
       case MAP:
-        return new JsonElementConversionWithAvroSchemaFactory.MapConverter(fieldName, nullable, type.toString(), schemaNode, state);
+        return new JsonElementConversionWithAvroSchemaFactory.MapConverter(fieldName, nullable, type.toString(),
+            schemaNode, state);
 
       case ENUM:
-        return new JsonElementConversionWithAvroSchemaFactory.EnumConverter(fieldName, nullable, type.toString(), schemaNode);
+        return new JsonElementConversionWithAvroSchemaFactory.EnumConverter(fieldName, nullable, type.toString(),
+            schemaNode);
 
       default:
-        return JsonElementConversionFactory.getConvertor(fieldName, fieldType, null, state, nullable);
+        return JsonElementConversionFactory.getConvertor(fieldName, fieldType, new JsonObject(), state, nullable);
     }
   }
 
   public static class ArrayConverter extends ComplexConverter {
 
-    public ArrayConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode,
-        WorkUnitState state) throws UnsupportedDateTypeException {
+    public ArrayConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode, WorkUnitState state)
+        throws UnsupportedDateTypeException {
       super(fieldName, nullable, sourceType);
       super.setElementConverter(
-          getConvertor(fieldName, schemaNode.getElementType().getType().getName(),
-              schemaNode.getElementType(), state, isNullable()));
+          getConvertor(fieldName, schemaNode.getElementType().getType().getName(), schemaNode.getElementType(), state,
+              isNullable()));
     }
 
     @Override
@@ -101,12 +107,12 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve
 
   public static class MapConverter extends ComplexConverter {
 
-    public MapConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode,
-        WorkUnitState state) throws UnsupportedDateTypeException {
+    public MapConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode, WorkUnitState state)
+        throws UnsupportedDateTypeException {
       super(fieldName, nullable, sourceType);
       super.setElementConverter(
-          getConvertor(fieldName, schemaNode.getValueType().getType().getName(),
-              schemaNode.getValueType(), state, isNullable()));
+          getConvertor(fieldName, schemaNode.getValueType().getType().getName(), schemaNode.getValueType(), state,
+              isNullable()));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6dd36a50/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonIntermediateToAvroConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonIntermediateToAvroConverter.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonIntermediateToAvroConverter.java
index 0735c03..5b1810b 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonIntermediateToAvroConverter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonIntermediateToAvroConverter.java
@@ -18,34 +18,26 @@
 package org.apache.gobblin.converter.avro;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 
 import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.codehaus.jackson.node.JsonNodeFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.converter.DataConversionException;
-import org.apache.gobblin.converter.EmptyIterable;
 import org.apache.gobblin.converter.SchemaConversionException;
 import org.apache.gobblin.converter.SingleRecordIterable;
 import org.apache.gobblin.converter.ToAvroConverterBase;
+import org.apache.gobblin.converter.avro.JsonElementConversionFactory.RecordConverter;
+import org.apache.gobblin.converter.json.JsonSchema;
 import org.apache.gobblin.util.AvroUtils;
 import org.apache.gobblin.util.WriterUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
 
 
 /**
@@ -55,76 +47,37 @@ import org.apache.gobblin.util.WriterUtils;
  *
  */
 public class JsonIntermediateToAvroConverter extends ToAvroConverterBase<JsonArray, JsonObject> {
-  private Map<String, JsonElementConversionFactory.JsonElementConverter> converters = new HashMap<>();
   private static final Logger LOG = LoggerFactory.getLogger(JsonIntermediateToAvroConverter.class);
   private static final String CONVERTER_AVRO_NULLIFY_FIELDS_ENABLED = "converter.avro.nullify.fields.enabled";
   private static final boolean DEFAULT_CONVERTER_AVRO_NULLIFY_FIELDS_ENABLED = Boolean.FALSE;
   private static final String CONVERTER_AVRO_NULLIFY_FIELDS_ORIGINAL_SCHEMA_PATH =
       "converter.avro.nullify.fields.original.schema.path";
 
-  private long numFailedConversion = 0;
+  private RecordConverter recordConverter;
 
   @Override
-  public Schema convertSchema(JsonArray schema, WorkUnitState workUnit) throws SchemaConversionException {
-    List<Schema.Field> fields = new ArrayList<>();
-
-    for (JsonElement elem : schema) {
-      JsonObject map = (JsonObject) elem;
-
-      String columnName = map.get("columnName").getAsString();
-      String comment = map.has("comment") ? map.get("comment").getAsString() : "";
-      boolean nullable = map.has("isNullable") ? map.get("isNullable").getAsBoolean() : false;
-      Schema fldSchema;
-
-      try {
-        JsonElementConversionFactory.JsonElementConverter converter = JsonElementConversionFactory.getConvertor(
-            columnName, map.get("dataType").getAsJsonObject().get("type").getAsString(), map, workUnit, nullable);
-        this.converters.put(columnName, converter);
-        fldSchema = converter.getSchema();
-      } catch (UnsupportedDateTypeException e) {
-        throw new SchemaConversionException(e);
-      }
-
-      Field fld = new Field(columnName, fldSchema, comment, nullable ? JsonNodeFactory.instance.nullNode() : null);
-      fld.addProp("source.type", map.get("dataType").getAsJsonObject().get("type").getAsString());
-      fields.add(fld);
+  public Schema convertSchema(JsonArray schema, WorkUnitState workUnit)
+      throws SchemaConversionException {
+    try {
+      JsonSchema jsonSchema = new JsonSchema(schema);
+      jsonSchema.setColumnName(workUnit.getExtract().getTable());
+      recordConverter = new RecordConverter(jsonSchema, workUnit, workUnit.getExtract().getNamespace());
+    } catch (UnsupportedDateTypeException e) {
+      throw new SchemaConversionException(e);
     }
-
-    Schema avroSchema =
-        Schema.createRecord(workUnit.getExtract().getTable(), "", workUnit.getExtract().getNamespace(), false);
-    avroSchema.setFields(fields);
-
-    if (workUnit.getPropAsBoolean(CONVERTER_AVRO_NULLIFY_FIELDS_ENABLED,
-        DEFAULT_CONVERTER_AVRO_NULLIFY_FIELDS_ENABLED)) {
-      return this.generateSchemaWithNullifiedField(workUnit, avroSchema);
+    Schema recordSchema = recordConverter.schema();
+    if (workUnit
+        .getPropAsBoolean(CONVERTER_AVRO_NULLIFY_FIELDS_ENABLED, DEFAULT_CONVERTER_AVRO_NULLIFY_FIELDS_ENABLED)) {
+      return this.generateSchemaWithNullifiedField(workUnit, recordSchema);
     }
-
-    return avroSchema;
+    return recordSchema;
   }
 
   @Override
   public Iterable<GenericRecord> convertRecord(Schema outputSchema, JsonObject inputRecord, WorkUnitState workUnit)
       throws DataConversionException {
 
-    GenericRecord avroRecord = new GenericData.Record(outputSchema);
-    long maxFailedConversions = workUnit.getPropAsLong(ConfigurationKeys.CONVERTER_AVRO_MAX_CONVERSION_FAILURES,
-        ConfigurationKeys.DEFAULT_CONVERTER_AVRO_MAX_CONVERSION_FAILURES);
-
-    for (Map.Entry<String, JsonElement> entry : inputRecord.entrySet()) {
-      try {
-        avroRecord.put(entry.getKey(), this.converters.get(entry.getKey()).convert(entry.getValue()));
-      } catch (Exception e) {
-        this.numFailedConversion++;
-        if (this.numFailedConversion < maxFailedConversions) {
-          LOG.error("Dropping record " + inputRecord + " because it cannot be converted to Avro", e);
-          return new EmptyIterable<>();
-        }
-        throw new DataConversionException("Unable to convert field:" + entry.getKey() + " for value:" + entry.getValue()
-            + " for record: " + inputRecord, e);
-      }
-    }
-
-    return new SingleRecordIterable<>(avroRecord);
+    return new SingleRecordIterable<>((GenericRecord) recordConverter.convert(inputRecord));
   }
 
   /**
@@ -151,8 +104,7 @@ public class JsonIntermediateToAvroConverter extends ToAvroConverterBase<JsonArr
           + "is not specified. Trying to get the orignal schema from previous avro files.");
       originalSchemaPath = WriterUtils
           .getDataPublisherFinalDir(workUnitState, workUnitState.getPropAsInt(ConfigurationKeys.FORK_BRANCHES_KEY, 1),
-              workUnitState.getPropAsInt(ConfigurationKeys.FORK_BRANCH_ID_KEY, 0))
-          .getParent();
+              workUnitState.getPropAsInt(ConfigurationKeys.FORK_BRANCH_ID_KEY, 0)).getParent();
     }
     try {
       Schema prevSchema = AvroUtils.getDirectorySchema(originalSchemaPath, conf, false);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6dd36a50/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
index c495bae..97f0121 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.converter.avro;
 
 import java.util.List;
+
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
@@ -27,6 +28,8 @@ import org.apache.gobblin.converter.DataConversionException;
 import org.apache.gobblin.converter.SchemaConversionException;
 import org.apache.gobblin.converter.SingleRecordIterable;
 import org.apache.gobblin.converter.ToAvroConverterBase;
+
+import com.google.common.base.Preconditions;
 import com.google.gson.JsonObject;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6dd36a50/gobblin-core/src/main/java/org/apache/gobblin/converter/json/JsonSchema.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/json/JsonSchema.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/json/JsonSchema.java
new file mode 100644
index 0000000..21bfeaf
--- /dev/null
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/json/JsonSchema.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.converter.json;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.converter.DataConversionException;
+import org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type;
+import org.apache.gobblin.source.extractor.schema.Schema;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type.ENUM;
+import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type.FIXED;
+import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type.RECORD;
+import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type.UNION;
+import static org.apache.gobblin.converter.json.JsonSchema.SchemaType.CHILD;
+import static org.apache.gobblin.converter.json.JsonSchema.SchemaType.ROOT;
+
+
+/**
+ * Represents a source schema declared in the configuration with {@link ConfigurationKeys#SOURCE_SCHEMA}.
+ * The source schema is represented by a {@link JsonArray}.
+ * @author tilakpatidar
+ */
+public class JsonSchema extends Schema {
+  public static final String RECORD_FIELDS_KEY = "values";
+  public static final String TYPE_KEY = "type";
+  public static final String NAME_KEY = "name";
+  public static final String SIZE_KEY = "size";
+  public static final String ENUM_SYMBOLS_KEY = "symbols";
+  public static final String COLUMN_NAME_KEY = "columnName";
+  public static final String DATA_TYPE_KEY = "dataType";
+  public static final String COMMENT_KEY = "comment";
+  public static final String DEFAULT_VALUE_KEY = "defaultValue";
+  public static final String IS_NULLABLE_KEY = "isNullable";
+  public static final String DEFAULT_RECORD_COLUMN_NAME = "root";
+  public static final String DEFAULT_VALUE_FOR_OPTIONAL_PROPERTY = "";
+  public static final String ARRAY_ITEMS_KEY = "items";
+  public static final String MAP_ITEMS_KEY = "values";
+  public static final String SOURCE_TYPE = "source.type";
+  private final Type type;
+  private final JsonObject json;
+  private final SchemaType schemaNestedLevel;
+  private JsonSchema secondType;
+  private JsonSchema firstType;
+  private JsonArray jsonArray;
+
+  public enum SchemaType {
+    ROOT, CHILD
+  }
+
+  /**
+   * Build a {@link JsonSchema} using {@link JsonArray}
+   * This will create a {@link SchemaType} of {@link SchemaType#ROOT}
+   * @param jsonArray
+   */
+  public JsonSchema(JsonArray jsonArray) {
+    JsonObject jsonObject = new JsonObject();
+    JsonObject dataType = new JsonObject();
+    jsonObject.addProperty(COLUMN_NAME_KEY, DEFAULT_RECORD_COLUMN_NAME);
+    dataType.addProperty(TYPE_KEY, RECORD.toString());
+    dataType.add(RECORD_FIELDS_KEY, jsonArray);
+    jsonObject.add(DATA_TYPE_KEY, dataType);
+    setJsonSchemaProperties(jsonObject);
+    this.type = RECORD;
+    this.json = jsonObject;
+    this.jsonArray = jsonArray;
+    this.schemaNestedLevel = ROOT;
+  }
+
+  /**
+   * Build a {@link JsonSchema} using {@link JsonArray}
+   * This will create a {@link SchemaType} of {@link SchemaType#CHILD}
+   * @param jsonObject
+   */
+  public JsonSchema(JsonObject jsonObject) {
+    JsonObject root = new JsonObject();
+    if (!jsonObject.has(COLUMN_NAME_KEY) && !jsonObject.has(DATA_TYPE_KEY)) {
+      root.addProperty(COLUMN_NAME_KEY, DEFAULT_RECORD_COLUMN_NAME);
+      root.add(DATA_TYPE_KEY, jsonObject);
+      jsonObject = root;
+    }
+    if (!jsonObject.has(COLUMN_NAME_KEY) && jsonObject.has(DATA_TYPE_KEY)) {
+      jsonObject.addProperty(COLUMN_NAME_KEY, DEFAULT_RECORD_COLUMN_NAME);
+    }
+    setJsonSchemaProperties(jsonObject);
+    JsonElement typeElement = getDataType().get(TYPE_KEY);
+    if (typeElement.isJsonPrimitive()) {
+      this.type = Type.valueOf(typeElement.getAsString().toUpperCase());
+    } else if (typeElement.isJsonArray()) {
+      JsonArray jsonArray = typeElement.getAsJsonArray();
+      if (jsonArray.size() != 2) {
+        throw new RuntimeException("Invalid " + TYPE_KEY + "property in schema for union types");
+      }
+      this.type = UNION;
+      JsonElement type1 = jsonArray.get(0);
+      JsonElement type2 = jsonArray.get(1);
+      if (type1.isJsonPrimitive()) {
+        this.firstType = buildBaseSchema(Type.valueOf(type1.getAsString().toUpperCase()));
+      }
+      if (type2.isJsonPrimitive()) {
+        this.secondType = buildBaseSchema(Type.valueOf(type2.getAsString().toUpperCase()));
+      }
+      if (type1.isJsonObject()) {
+        this.firstType = buildBaseSchema(type1.getAsJsonObject());
+      }
+      if (type2.isJsonObject()) {
+        this.secondType = buildBaseSchema(type2.getAsJsonObject());
+      }
+    } else {
+      throw new RuntimeException("Invalid " + TYPE_KEY + "property in schema");
+    }
+    this.json = jsonObject;
+    JsonArray jsonArray = new JsonArray();
+    jsonArray.add(jsonObject);
+    this.jsonArray = jsonArray;
+    this.schemaNestedLevel = CHILD;
+  }
+
+  /**
+   * Get symbols for a {@link Type#ENUM} type.
+   * @return
+   */
+  public JsonArray getSymbols() {
+    if (this.type.equals(ENUM)) {
+      return getDataType().get(ENUM_SYMBOLS_KEY).getAsJsonArray();
+    }
+    return new JsonArray();
+  }
+
+  /**
+   * Get {@link Type} for this {@link JsonSchema}.
+   * @return
+   */
+  public Type getType() {
+    return type;
+  }
+
+  /**
+   * Builds a {@link JsonSchema} object for a given {@link Type} object.
+   * @param type
+   * @return
+   */
+  public static JsonSchema buildBaseSchema(Type type) {
+    JsonObject jsonObject = new JsonObject();
+    JsonObject dataType = new JsonObject();
+    jsonObject.addProperty(COLUMN_NAME_KEY, DEFAULT_RECORD_COLUMN_NAME);
+    dataType.addProperty(TYPE_KEY, type.toString());
+    jsonObject.add(DATA_TYPE_KEY, dataType);
+    return new JsonSchema(jsonObject);
+  }
+
+  /**
+   * Builds a {@link JsonSchema} object for a given {@link Type} object.
+   * @return
+   */
+  public static JsonSchema buildBaseSchema(JsonObject root) {
+    root.addProperty(COLUMN_NAME_KEY, DEFAULT_RECORD_COLUMN_NAME);
+    return new JsonSchema(root);
+  }
+
+  /**
+   * Get optional property from a {@link JsonObject} for a {@link String} key.
+   * If key does'nt exists returns {@link #DEFAULT_VALUE_FOR_OPTIONAL_PROPERTY}.
+   * @param jsonObject
+   * @param key
+   * @return
+   */
+  public static String getOptionalProperty(JsonObject jsonObject, String key) {
+    return jsonObject.has(key) ? jsonObject.get(key).getAsString() : DEFAULT_VALUE_FOR_OPTIONAL_PROPERTY;
+  }
+
+  /**
+   * Fetches dataType.values from the JsonObject
+   * @return
+   */
+  public JsonSchema getValuesWithinDataType() {
+    JsonElement element = this.getDataType().get(MAP_ITEMS_KEY);
+    if (element.isJsonObject()) {
+      return new JsonSchema(element.getAsJsonObject());
+    }
+    if (element.isJsonArray()) {
+      return new JsonSchema(element.getAsJsonArray());
+    }
+    if (element.isJsonPrimitive()) {
+      return buildBaseSchema(Type.valueOf(element.getAsString().toUpperCase()));
+    }
+    throw new UnsupportedOperationException(
+        "Map values can only be defined using JsonObject, JsonArray or JsonPrimitive.");
+  }
+
+  /**
+   * Gets size for fixed type viz dataType.size from the JsonObject
+   * @return
+   */
+  public int getSizeOfFixedData() {
+    if (this.type.equals(FIXED)) {
+      return this.getDataType().get(SIZE_KEY).getAsInt();
+    }
+    return 0;
+  }
+
+  public boolean isType(Type type) {
+    return this.type.equals(type);
+  }
+
+  /**
+   * Fetches the nested or primitive array items type from schema.
+   * @return
+   * @throws DataConversionException
+   */
+  public Type getTypeOfArrayItems()
+      throws DataConversionException {
+    JsonSchema arrayValues = getItemsWithinDataType();
+    if (arrayValues == null) {
+      throw new DataConversionException("Array types only allow values as primitive, null or JsonObject");
+    }
+    return arrayValues.getType();
+  }
+
+  public JsonSchema getItemsWithinDataType() {
+    JsonElement element = this.getDataType().get(ARRAY_ITEMS_KEY);
+    if (element.isJsonObject()) {
+      return new JsonSchema(element.getAsJsonObject());
+    }
+    if (element.isJsonPrimitive()) {
+      return buildBaseSchema(Type.valueOf(element.getAsString().toUpperCase()));
+    }
+    throw new UnsupportedOperationException("Array items can only be defined using JsonObject or JsonPrimitive.");
+  }
+
+  public JsonSchema getFirstTypeSchema() {
+    return this.firstType;
+  }
+
+  public JsonSchema getSecondTypeSchema() {
+    return this.secondType;
+  }
+
+  public int fieldsCount() {
+    return this.jsonArray.size();
+  }
+
+  public JsonSchema getFieldSchemaAt(int i) {
+    if (i >= this.jsonArray.size()) {
+      return new JsonSchema(this.json);
+    }
+    return new JsonSchema(this.jsonArray.get(i).getAsJsonObject());
+  }
+
+  public List<JsonSchema> getDataTypes() {
+    if (firstType != null && secondType != null) {
+      return Arrays.asList(firstType, secondType);
+    }
+    return Collections.singletonList(this);
+  }
+
+  public boolean isRoot() {
+    return this.schemaNestedLevel.equals(ROOT);
+  }
+
+  public String getName() {
+    return getOptionalProperty(this.getDataType(), NAME_KEY);
+  }
+
+  /**
+   * Set properties for {@link JsonSchema} from a {@link JsonObject}.
+   * @param jsonObject
+   */
+  private void setJsonSchemaProperties(JsonObject jsonObject) {
+    setColumnName(jsonObject.get(COLUMN_NAME_KEY).getAsString());
+    setDataType(jsonObject.get(DATA_TYPE_KEY).getAsJsonObject());
+    setNullable(jsonObject.has(IS_NULLABLE_KEY) && jsonObject.get(IS_NULLABLE_KEY).getAsBoolean());
+    setComment(getOptionalProperty(jsonObject, COMMENT_KEY));
+    setDefaultValue(getOptionalProperty(jsonObject, DEFAULT_VALUE_KEY));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6dd36a50/gobblin-core/src/main/java/org/apache/gobblin/converter/json/JsonStringToJsonIntermediateConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/json/JsonStringToJsonIntermediateConverter.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/json/JsonStringToJsonIntermediateConverter.java
index b1cd467..4cb8d84 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/converter/json/JsonStringToJsonIntermediateConverter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/json/JsonStringToJsonIntermediateConverter.java
@@ -17,25 +17,30 @@
 
 package org.apache.gobblin.converter.json;
 
+import java.io.IOException;
+import java.util.Map.Entry;
+
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.converter.Converter;
 import org.apache.gobblin.converter.DataConversionException;
 import org.apache.gobblin.converter.SchemaConversionException;
 import org.apache.gobblin.converter.SingleRecordIterable;
-
-import java.io.IOException;
-import java.util.Map;
-
+import org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonNull;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 
+import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type.FIXED;
+import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type.MAP;
+import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type.NULL;
+import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type.RECORD;
+import static org.apache.gobblin.converter.json.JsonSchema.DEFAULT_RECORD_COLUMN_NAME;
+
 
 /**
  * Converts a json string to a {@link JsonObject}.
@@ -44,7 +49,9 @@ public class JsonStringToJsonIntermediateConverter extends Converter<String, Jso
 
   private final static Logger log = LoggerFactory.getLogger(JsonStringToJsonIntermediateConverter.class);
 
-  private static final String UNPACK_COMPLEX_SCHEMAS_KEY = "gobblin.converter.jsonStringToJsonIntermediate.unpackComplexSchemas";
+  private static final String UNPACK_COMPLEX_SCHEMAS_KEY =
+      "gobblin.converter.jsonStringToJsonIntermediate.unpackComplexSchemas";
+  public static final boolean DEFAULT_UNPACK_COMPLEX_SCHEMAS_KEY = Boolean.TRUE;
 
   private boolean unpackComplexSchemas;
 
@@ -53,8 +60,10 @@ public class JsonStringToJsonIntermediateConverter extends Converter<String, Jso
    * @return a JsonArray representation of the schema
    */
   @Override
-  public JsonArray convertSchema(String inputSchema, WorkUnitState workUnit) throws SchemaConversionException {
-    this.unpackComplexSchemas = workUnit.getPropAsBoolean(UNPACK_COMPLEX_SCHEMAS_KEY, true);
+  public JsonArray convertSchema(String inputSchema, WorkUnitState workUnit)
+      throws SchemaConversionException {
+    this.unpackComplexSchemas =
+        workUnit.getPropAsBoolean(UNPACK_COMPLEX_SCHEMAS_KEY, DEFAULT_UNPACK_COMPLEX_SCHEMAS_KEY);
 
     JsonParser jsonParser = new JsonParser();
     log.info("Schema: " + inputSchema);
@@ -76,40 +85,172 @@ public class JsonStringToJsonIntermediateConverter extends Converter<String, Jso
     if (!this.unpackComplexSchemas) {
       return new SingleRecordIterable<>(inputRecord);
     }
+    JsonSchema schema = new JsonSchema(outputSchema);
+    JsonObject rec = parse(inputRecord, schema);
+    return new SingleRecordIterable(rec);
+  }
 
-    JsonObject outputRecord = new JsonObject();
-
-    for (int i = 0; i < outputSchema.size(); i++) {
-      String expectedColumnName = outputSchema.get(i).getAsJsonObject().get("columnName").getAsString();
+  /**
+   * Parses a provided JsonObject input using the provided JsonArray schema into
+   * a JsonObject.
+   * @param element
+   * @param schema
+   * @return
+   * @throws DataConversionException
+   */
+  private JsonElement parse(JsonElement element, JsonSchema schema)
+      throws DataConversionException {
+    JsonObject root = new JsonObject();
+    root.add(DEFAULT_RECORD_COLUMN_NAME, element);
+    JsonObject jsonObject = parse(root, schema);
+    return jsonObject.get(DEFAULT_RECORD_COLUMN_NAME);
+  }
 
-      if (inputRecord.has(expectedColumnName)) {
-        //As currently org.apache.gobblin.converter.avro.JsonIntermediateToAvroConverter is not able to handle complex schema's so storing it as string
+  /**
+   * Parses a provided JsonObject input using the provided JsonArray schema into
+   * a JsonObject.
+   * @param record
+   * @param schema
+   * @return
+   * @throws DataConversionException
+   */
+  private JsonObject parse(JsonObject record, JsonSchema schema)
+      throws DataConversionException {
+      JsonObject output = new JsonObject();
+      for (int i = 0; i < schema.fieldsCount(); i++) {
+        JsonSchema schemaElement = schema.getFieldSchemaAt(i);
+        String columnKey = schemaElement.getColumnName();
+        JsonElement parsed;
+        if (!record.has(columnKey)) {
+          output.add(columnKey, JsonNull.INSTANCE);
+          continue;
+        }
 
-        if (inputRecord.get(expectedColumnName).isJsonArray()) {
-          outputRecord.addProperty(expectedColumnName, inputRecord.get(expectedColumnName).toString());
-        } else if (inputRecord.get(expectedColumnName).isJsonObject()) {
-          //To check if internally in an JsonObject there is multiple hierarchy
-          boolean isMultiHierarchyInsideJsonObject = false;
-          for (Map.Entry<String, JsonElement> entry : ((JsonObject) inputRecord.get(expectedColumnName)).entrySet()) {
-            if (entry.getValue().isJsonArray() || entry.getValue().isJsonObject()) {
-              isMultiHierarchyInsideJsonObject = true;
-              break;
+        JsonElement columnValue = record.get(columnKey);
+        switch (schemaElement.getType()) {
+          case UNION:
+            parsed = parseUnionType(schemaElement, columnValue);
+            break;
+          case ENUM:
+            parsed = parseEnumType(schemaElement, columnValue);
+            break;
+          default:
+            if (columnValue.isJsonArray()) {
+              parsed = parseJsonArrayType(schemaElement, columnValue);
+            } else if (columnValue.isJsonObject()) {
+              parsed = parseJsonObjectType(schemaElement, columnValue);
+            } else {
+              parsed = parsePrimitiveType(schemaElement, columnValue);
             }
-          }
-          if (isMultiHierarchyInsideJsonObject) {
-            outputRecord.addProperty(expectedColumnName, inputRecord.get(expectedColumnName).toString());
-          } else {
-            outputRecord.add(expectedColumnName, inputRecord.get(expectedColumnName));
-          }
-
-        } else {
-          outputRecord.add(expectedColumnName, inputRecord.get(expectedColumnName));
         }
-      } else {
-        outputRecord.add(expectedColumnName, JsonNull.INSTANCE);
+        output.add(columnKey, parsed);
+      }
+      return output;
+  }
+
+  private JsonElement parseUnionType(JsonSchema schemaElement, JsonElement columnValue)
+      throws DataConversionException {
+    try {
+      return parse(columnValue, schemaElement.getFirstTypeSchema());
+    } catch (DataConversionException e) {
+      return parse(columnValue, schemaElement.getSecondTypeSchema());
+    }
+  }
+
+  /**
+   * Parses Enum type values
+   * @param schema
+   * @param value
+   * @return
+   * @throws DataConversionException
+   */
+  private JsonElement parseEnumType(JsonSchema schema, JsonElement value)
+      throws DataConversionException {
+    if (schema.getSymbols().contains(value)) {
+      return value;
+    }
+    throw new DataConversionException(
+        "Invalid symbol: " + value.getAsString() + " allowed values: " + schema.getSymbols().toString());
+  }
+
+  /**
+   * Parses JsonArray type values
+   * @param schema
+   * @param value
+   * @return
+   * @throws DataConversionException
+   */
+  private JsonElement parseJsonArrayType(JsonSchema schema, JsonElement value)
+      throws DataConversionException {
+    Type arrayType = schema.getTypeOfArrayItems();
+    JsonArray tempArray = new JsonArray();
+    if (Type.isPrimitive(arrayType)) {
+      return value;
+    }
+    JsonSchema nestedSchema = schema.getItemsWithinDataType();
+    for (JsonElement v : value.getAsJsonArray()) {
+      tempArray.add(parse(v, nestedSchema));
+    }
+    return tempArray;
+  }
+
+  /**
+   * Parses JsonObject type values
+   * @param value
+   * @return
+   * @throws DataConversionException
+   */
+  private JsonElement parseJsonObjectType(JsonSchema schema, JsonElement value)
+      throws DataConversionException {
+    JsonSchema valuesWithinDataType = schema.getValuesWithinDataType();
+    if (schema.isType(MAP)) {
+      if (Type.isPrimitive(valuesWithinDataType.getType())) {
+        return value;
       }
 
+      JsonObject map = new JsonObject();
+      for (Entry<String, JsonElement> mapEntry : value.getAsJsonObject().entrySet()) {
+        JsonElement mapValue = mapEntry.getValue();
+        map.add(mapEntry.getKey(), parse(mapValue, valuesWithinDataType));
+      }
+      return map;
+    } else if (schema.isType(RECORD)) {
+      JsonSchema schemaArray = valuesWithinDataType.getValuesWithinDataType();
+      return parse((JsonObject) value, schemaArray);
+    } else {
+      return JsonNull.INSTANCE;
+    }
+  }
+
+  /**
+   * Parses primitive types
+   * @param schema
+   * @param value
+   * @return
+   * @throws DataConversionException
+   */
+  private JsonElement parsePrimitiveType(JsonSchema schema, JsonElement value)
+      throws DataConversionException {
+
+    if ((schema.isType(NULL) || schema.isNullable()) && value.isJsonNull()) {
+      return JsonNull.INSTANCE;
+    }
+
+    if ((schema.isType(NULL) && !value.isJsonNull()) || (!schema.isType(NULL) && value.isJsonNull())) {
+      throw new DataConversionException(
+          "Type mismatch for " + value.toString() + " of type " + schema.getDataTypes().toString());
+    }
+
+    if (schema.isType(FIXED)) {
+      int expectedSize = schema.getSizeOfFixedData();
+      if (value.getAsString().length() == expectedSize) {
+        return value;
+      } else {
+        throw new DataConversionException(
+            "Fixed type value is not same as defined value expected fieldsCount: " + expectedSize);
+      }
+    } else {
+      return value;
     }
-    return new SingleRecordIterable<>(outputRecord);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6dd36a50/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonElementConversionFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonElementConversionFactoryTest.java b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonElementConversionFactoryTest.java
new file mode 100644
index 0000000..7f39d30
--- /dev/null
+++ b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonElementConversionFactoryTest.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.converter.avro;
+
+import java.io.InputStreamReader;
+import java.lang.reflect.Type;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.DataConversionException;
+import org.apache.gobblin.converter.SchemaConversionException;
+import org.apache.gobblin.converter.avro.JsonElementConversionFactory.EnumConverter;
+import org.apache.gobblin.converter.avro.JsonElementConversionFactory.MapConverter;
+import org.apache.gobblin.converter.avro.JsonElementConversionFactory.NullConverter;
+import org.apache.gobblin.converter.avro.JsonElementConversionFactory.RecordConverter;
+import org.apache.gobblin.converter.avro.JsonElementConversionFactory.StringConverter;
+import org.apache.gobblin.converter.json.JsonSchema;
+import org.apache.gobblin.source.workunit.Extract;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.reflect.TypeToken;
+
+import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.ArrayConverter;
+import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.JsonElementConverter;
+import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.JsonElementConverter.buildNamespace;
+import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type.NULL;
+import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.UnionConverter;
+
+
+/**
+ * Unit test for {@link JsonElementConversionFactory}
+ *
+ * @author Tilak Patidar
+ */
+@Test(groups = {"gobblin.converter"})
+public class JsonElementConversionFactoryTest {
+
+  private static WorkUnitState state;
+  private static JsonObject testData;
+  private static JsonParser jsonParser = new JsonParser();
+
+  @BeforeClass
+  public static void setUp() {
+    WorkUnit workUnit = new WorkUnit(new SourceState(),
+        new Extract(new SourceState(), Extract.TableType.SNAPSHOT_ONLY, "namespace", "dummy_table"));
+    state = new WorkUnitState(workUnit);
+    Type listType = new TypeToken<JsonObject>() {
+    }.getType();
+    Gson gson = new Gson();
+    testData = gson.fromJson(new InputStreamReader(
+            JsonElementConversionFactoryTest.class.getResourceAsStream("/converter/JsonElementConversionFactoryTest.json")),
+        listType);
+  }
+
+  @Test
+  public void schemaWithArrayOfMaps()
+      throws Exception {
+    String testName = "schemaWithArrayOfMaps";
+    JsonObject schema = getSchemaData(testName).getAsJsonObject();
+    JsonObject expected = getExpectedSchema(testName).getAsJsonObject();
+    JsonSchema jsonSchema = new JsonSchema(schema);
+    jsonSchema.setColumnName("dummy");
+
+    ArrayConverter converter = new ArrayConverter(jsonSchema, state);
+    Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+  }
+
+  @Test
+  public void schemaWithArrayOfRecords()
+      throws Exception {
+    String testName = "schemaWithArrayOfRecords";
+    JsonObject schema = getSchemaData(testName).getAsJsonObject();
+    JsonObject expected = getExpectedSchema(testName).getAsJsonObject();
+    JsonSchema jsonSchema = new JsonSchema(schema);
+    jsonSchema.setColumnName("dummy1");
+
+    ArrayConverter converter = new ArrayConverter(jsonSchema, state);
+
+    Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+  }
+
+  @Test
+  public void schemaWithRecord()
+      throws DataConversionException, SchemaConversionException, UnsupportedDateTypeException {
+    String testName = "schemaWithRecord";
+    JsonObject schema = getSchemaData(testName).getAsJsonObject();
+    JsonObject expected = getExpectedSchema(testName).getAsJsonObject();
+    JsonSchema jsonSchema = new JsonSchema(schema);
+    jsonSchema.setColumnName("dummy1");
+
+    RecordConverter converter =
+        new RecordConverter(jsonSchema, state, buildNamespace(state.getExtract().getNamespace(), "something"));
+
+    Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+  }
+
+  @Test
+  public void schemaWithArrayOfInts()
+      throws Exception {
+    String testName = "schemaWithArrayOfInts";
+    JsonObject schema = getSchemaData(testName).getAsJsonObject();
+    JsonObject expected = getExpectedSchema(testName).getAsJsonObject();
+
+    ArrayConverter converter = new ArrayConverter(new JsonSchema(schema), state);
+
+    Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+  }
+
+  @Test
+  public void schemaWithNullType() {
+    NullConverter nullConverter = new NullConverter(JsonSchema.buildBaseSchema(NULL));
+    JsonObject expected = new JsonObject();
+    expected.addProperty("type", "null");
+    expected.addProperty("source.type", "null");
+
+    Assert.assertEquals(avroSchemaToJsonElement(nullConverter), expected);
+  }
+
+  @Test
+  public void schemaWithArrayOfEnums()
+      throws Exception {
+    String testName = "schemaWithArrayOfEnums";
+    JsonObject schema = getSchemaData(testName).getAsJsonObject();
+    JsonObject expected = getExpectedSchema(testName).getAsJsonObject();
+
+    ArrayConverter converter = new ArrayConverter(new JsonSchema(schema), state);
+
+    Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+  }
+
+  @Test
+  public void schemaWithMap()
+      throws Exception {
+    String testName = "schemaWithMap";
+    JsonObject schema = getSchemaData(testName).getAsJsonObject();
+    JsonObject expected = getExpectedSchema(testName).getAsJsonObject();
+
+    MapConverter converter = new MapConverter(new JsonSchema(schema), state);
+
+    Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+  }
+
+  @Test
+  public void schemaWithMapOfRecords()
+      throws Exception {
+    String testName = "schemaWithMapOfRecords";
+    JsonObject schema = getSchemaData(testName).getAsJsonObject();
+    JsonObject expected = getExpectedSchema(testName).getAsJsonObject();
+
+    MapConverter converter = new MapConverter(new JsonSchema(schema), state);
+
+    Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+  }
+
+  @Test
+  public void schemaWithMapOfArrays()
+      throws Exception {
+    String testName = "schemaWithMapOfArrays";
+    JsonObject schema = getSchemaData(testName).getAsJsonObject();
+    JsonObject expected = getExpectedSchema(testName).getAsJsonObject();
+
+    MapConverter converter = new MapConverter(new JsonSchema(schema), state);
+
+    Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+  }
+
+  @Test
+  public void schemaWithMapOfEnum()
+      throws Exception {
+    String testName = "schemaWithMapOfEnum";
+    JsonObject schema = getSchemaData(testName).getAsJsonObject();
+    JsonObject expected = getExpectedSchema(testName).getAsJsonObject();
+
+    MapConverter converter = new MapConverter(new JsonSchema(schema), state);
+
+    Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+  }
+
+  @Test
+  public void schemaWithRecordOfMap()
+      throws Exception {
+    String testName = "schemaWithRecordOfMap";
+    JsonObject schema = getSchemaData(testName).getAsJsonObject();
+    JsonObject expected = getExpectedSchema(testName).getAsJsonObject();
+
+    RecordConverter converter = new RecordConverter(new JsonSchema(schema), state,
+        buildNamespace(state.getExtract().getNamespace(), "something"));
+
+    Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+  }
+
+  @Test
+  public void schemaWithRecordOfArray()
+      throws Exception {
+    String testName = "schemaWithRecordOfArray";
+    JsonObject schema = getSchemaData(testName).getAsJsonObject();
+    JsonObject expected = getExpectedSchema(testName).getAsJsonObject();
+
+    RecordConverter converter = new RecordConverter(new JsonSchema(schema), state,
+        buildNamespace(state.getExtract().getNamespace(), "something"));
+
+    Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+  }
+
+  @Test
+  public void schemaWithRecordOfEnum()
+      throws Exception {
+    String testName = "schemaWithRecordOfEnum";
+    JsonObject schema = getSchemaData(testName).getAsJsonObject();
+    JsonObject expected = getExpectedSchema(testName).getAsJsonObject();
+
+    RecordConverter converter = new RecordConverter(new JsonSchema(schema), state,
+        buildNamespace(state.getExtract().getNamespace(), "something"));
+
+    Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class)
+  public void schemaWithMapValuesAsJsonArray()
+      throws Exception {
+    String testName = "schemaWithMapValuesAsJsonArray";
+    JsonObject schema = getSchemaData(testName).getAsJsonObject();
+
+    new RecordConverter(new JsonSchema(schema), state, buildNamespace(state.getExtract().getNamespace(), "something"));
+  }
+
+  @Test(expectedExceptions = UnsupportedOperationException.class)
+  public void schemaWithMapValuesAsJsonNull()
+      throws Exception {
+    String testName = "schemaWithMapValuesAsJsonNull";
+    JsonObject schema = getSchemaData(testName).getAsJsonObject();
+
+    new RecordConverter(new JsonSchema(schema), state, buildNamespace(state.getExtract().getNamespace(), "something"));
+  }
+
+  @Test
+  public void schemaWithRecordOfRecord()
+      throws Exception {
+    String testName = "schemaWithRecordOfRecord";
+    JsonObject schema = getSchemaData(testName).getAsJsonObject();
+    JsonObject expected = getExpectedSchema(testName).getAsJsonObject();
+
+    RecordConverter converter = new RecordConverter(new JsonSchema(schema), state,
+        buildNamespace(state.getExtract().getNamespace(), "something"));
+
+    Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+  }
+
+  @Test
+  public void schemaWithRecordOfRecordCheckNamespace()
+      throws Exception {
+    String testName = "schemaWithRecordOfRecordCheckNamespace";
+    JsonObject schema = getSchemaData(testName).getAsJsonObject();
+    JsonObject expected = getExpectedSchema(testName).getAsJsonObject();
+
+    RecordConverter converter =
+        new RecordConverter(new JsonSchema(schema), state, buildNamespace(state.getExtract().getNamespace(), "person"));
+    Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+    Assert.assertEquals(converter.schema().getField("someperson").schema().getNamespace(), "namespace.person.myrecord");
+    Assert.assertEquals(converter.schema().getNamespace(), "namespace.person");
+  }
+
+  @Test
+  public void schemaWithRecordOfEnumCheckNamespace()
+      throws Exception {
+    String testName = "schemaWithRecordOfEnumCheckNamespace";
+    JsonObject schema = getSchemaData(testName).getAsJsonObject();
+    JsonObject expected = getExpectedSchema(testName).getAsJsonObject();
+
+    RecordConverter converter = new RecordConverter(new JsonSchema(schema), state,
+        buildNamespace(state.getExtract().getNamespace(), "something"));
+
+    Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+    Assert.assertEquals(converter.schema().getField("someperson").schema().getNamespace(),
+        "namespace.something.myrecord");
+    Assert.assertEquals(converter.schema().getNamespace(), "namespace.something");
+  }
+
+  @Test
+  public void schemaWithUnion()
+      throws Exception {
+    String testName = "schemaWithUnion";
+    JsonObject schema = getSchemaData(testName).getAsJsonObject();
+    JsonArray expected = getExpectedSchema(testName).getAsJsonArray();
+
+    UnionConverter converter = new UnionConverter(new JsonSchema(schema), state);
+
+    Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+  }
+
+  @Test
+  public void schemaWithComplexUnion()
+      throws Exception {
+    String testName = "schemaWithComplexUnion";
+    JsonObject schema = getSchemaData(testName).getAsJsonObject();
+    JsonArray expected = getExpectedSchema(testName).getAsJsonArray();
+
+    UnionConverter converter = new UnionConverter(new JsonSchema(schema), state);
+
+    Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+  }
+
+  @Test
+  public void schemaWithIsNullable()
+      throws Exception {
+    String testName = "schemaWithIsNullable";
+    JsonObject schema = getSchemaData(testName).getAsJsonObject();
+    JsonArray expected = getExpectedSchema(testName).getAsJsonArray();
+
+    StringConverter converter = new StringConverter(new JsonSchema(schema));
+
+    Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+  }
+
+  @Test
+  public void schemaWithRecordIsNullable()
+      throws Exception {
+    String testName = "schemaWithRecordIsNullable";
+    JsonObject schema = getSchemaData(testName).getAsJsonObject();
+    JsonArray expected = getExpectedSchema(testName).getAsJsonArray();
+
+    RecordConverter converter = new RecordConverter(new JsonSchema(schema), state,
+        buildNamespace(state.getExtract().getNamespace(), "something"));
+
+    Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+  }
+
+  @Test
+  public void schemaWithMapIsNullable()
+      throws Exception {
+    String testName = "schemaWithMapIsNullable";
+    JsonObject schema = getSchemaData(testName).getAsJsonObject();
+    JsonArray expected = getExpectedSchema(testName).getAsJsonArray();
+
+    MapConverter converter = new MapConverter(new JsonSchema(schema), state);
+
+    Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+  }
+
+  @Test
+  public void schemaWithEnumIsNullable()
+      throws Exception {
+    String testName = "schemaWithEnumIsNullable";
+    JsonObject schema = getSchemaData(testName).getAsJsonObject();
+    JsonArray expected = getExpectedSchema(testName).getAsJsonArray();
+
+    EnumConverter converter = new EnumConverter(new JsonSchema(schema), "something");
+
+    Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+  }
+
+  @Test
+  public void schemaWithArrayIsNullable()
+      throws Exception {
+    String testName = "schemaWithArrayIsNullable";
+    JsonObject schema = getSchemaData(testName).getAsJsonObject();
+    JsonArray expected = getExpectedSchema(testName).getAsJsonArray();
+
+    ArrayConverter converter = new ArrayConverter(new JsonSchema(schema), state);
+
+    Assert.assertEquals(avroSchemaToJsonElement(converter), expected);
+  }
+
+  private JsonElement avroSchemaToJsonElement(JsonElementConverter converter) {
+    return jsonParser.parse(converter.schema().toString());
+  }
+
+  private JsonElement getExpectedSchema(String methodName) {
+    return testData.get(methodName).getAsJsonArray().get(1);
+  }
+
+  private JsonElement getSchemaData(String methodName) {
+    return testData.get(methodName).getAsJsonArray().get(0);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6dd36a50/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonIntermediateToAvroConverterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonIntermediateToAvroConverterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonIntermediateToAvroConverterTest.java
index 558c1f6..be2f417 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonIntermediateToAvroConverterTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonIntermediateToAvroConverterTest.java
@@ -26,22 +26,25 @@ import java.util.TimeZone;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericArray;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.DataConversionException;
+import org.apache.gobblin.converter.SchemaConversionException;
+import org.apache.gobblin.source.workunit.Extract;
+import org.apache.gobblin.source.workunit.WorkUnit;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import com.google.gson.Gson;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
 import com.google.gson.reflect.TypeToken;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.SourceState;
-import org.apache.gobblin.configuration.WorkUnitState;
-import org.apache.gobblin.source.workunit.Extract.TableType;
 
 
 /**
@@ -55,28 +58,48 @@ public class JsonIntermediateToAvroConverterTest {
   private JsonObject jsonRecord;
   private WorkUnitState state;
 
-  @BeforeClass
-  public void setUp()
-      throws Exception {
-    Type listType = new TypeToken<JsonArray>() {
+  /**
+   * To test schema and record using the path to their resource file.
+   * @param resourceFilePath
+   * @throws SchemaConversionException
+   * @throws DataConversionException
+   */
+  private void complexSchemaTest(String resourceFilePath)
+      throws SchemaConversionException, DataConversionException {
+    JsonObject testData = initResources(resourceFilePath);
+
+    JsonIntermediateToAvroConverter converter = new JsonIntermediateToAvroConverter();
+
+    Schema avroSchema = converter.convertSchema(jsonSchema, state);
+    GenericRecord genericRecord = converter.convertRecord(avroSchema, jsonRecord, state).iterator().next();
+    JsonParser parser = new JsonParser();
+    Assert.assertEquals(parser.parse(avroSchema.toString()).getAsJsonObject(),
+        testData.get("expectedSchema").getAsJsonObject());
+    Assert.assertEquals(parser.parse(genericRecord.toString()), testData.get("expectedRecord").getAsJsonObject());
+  }
+
+  private JsonObject initResources(String resourceFilePath) {
+    Type listType = new TypeToken<JsonObject>() {
     }.getType();
     Gson gson = new Gson();
-    jsonSchema = gson.fromJson(new InputStreamReader(this.getClass().getResourceAsStream("/converter/schema.json")), listType);
+    JsonObject testData =
+        gson.fromJson(new InputStreamReader(this.getClass().getResourceAsStream(resourceFilePath)), listType);
 
-    listType = new TypeToken<JsonObject>() {
-    }.getType();
-    jsonRecord = gson.fromJson(new InputStreamReader(this.getClass().getResourceAsStream("/converter/record.json")), listType);
+    jsonRecord = testData.get("record").getAsJsonObject();
+    jsonSchema = testData.get("schema").getAsJsonArray();
 
-    SourceState source = new SourceState();
-    state = new WorkUnitState(
-        source.createWorkUnit(source.createExtract(TableType.SNAPSHOT_ONLY, "test_table", "test_namespace")));
+    WorkUnit workUnit = new WorkUnit(new SourceState(),
+        new Extract(new SourceState(), Extract.TableType.SNAPSHOT_ONLY, "namespace", "dummy_table"));
+    state = new WorkUnitState(workUnit);
     state.setProp(ConfigurationKeys.CONVERTER_AVRO_TIME_FORMAT, "HH:mm:ss");
     state.setProp(ConfigurationKeys.CONVERTER_AVRO_DATE_TIMEZONE, "PST");
+    return testData;
   }
 
   @Test
   public void testConverter()
       throws Exception {
+    initResources("/converter/schema.json");
     JsonIntermediateToAvroConverter converter = new JsonIntermediateToAvroConverter();
 
     Schema avroSchema = converter.convertSchema(jsonSchema, state);
@@ -121,4 +144,22 @@ public class JsonIntermediateToAvroConverterTest {
 
     Assert.assertNotEquals(record.get("LastModifiedDate"), record2.get("LastModifiedDate"));
   }
+
+  @Test
+  public void testComplexSchema1()
+      throws Exception {
+    complexSchemaTest("/converter/complex1.json");
+  }
+
+  @Test
+  public void testComplexSchema2()
+      throws Exception {
+    complexSchemaTest("/converter/complex2.json");
+  }
+
+  @Test
+  public void testComplexSchema3()
+      throws Exception {
+    complexSchemaTest("/converter/complex3.json");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6dd36a50/gobblin-core/src/test/java/org/apache/gobblin/converter/json/JsonStringToJsonIntermediateConverterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/converter/json/JsonStringToJsonIntermediateConverterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/converter/json/JsonStringToJsonIntermediateConverterTest.java
new file mode 100644
index 0000000..305113b
--- /dev/null
+++ b/gobblin-core/src/test/java/org/apache/gobblin/converter/json/JsonStringToJsonIntermediateConverterTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.converter.json;
+
+import java.io.InputStreamReader;
+import java.lang.reflect.Type;
+import java.util.Map;
+
+import org.apache.gobblin.converter.DataConversionException;
+import org.apache.gobblin.converter.SchemaConversionException;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.reflect.TypeToken;
+
+import gobblin.configuration.WorkUnitState;
+
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Unit test for {@link JsonStringToJsonIntermediateConverter}
+ *
+ * @author Tilak Patidar
+ */
+@Test(groups = {"gobblin.converter"})
+public class JsonStringToJsonIntermediateConverterTest {
+
+  private static JsonStringToJsonIntermediateConverter converter;
+  private static JsonObject testJsonData;
+
+  @BeforeClass
+  public static void setUp()
+      throws SchemaConversionException {
+    converter = new JsonStringToJsonIntermediateConverter();
+    WorkUnitState workUnit = new WorkUnitState();
+    workUnit.getPropAsBoolean("gobblin.converter.jsonStringToJsonIntermediate.unpackComplexSchemas", true);
+    converter.convertSchema("[]", workUnit);
+    Type jsonType = new TypeToken<JsonObject>() {
+    }.getType();
+    Gson gson = new Gson();
+    testJsonData = gson.fromJson(new InputStreamReader(JsonStringToJsonIntermediateConverterTest.class
+        .getResourceAsStream("/converter/JsonStringToJsonIntermediateConverter.json")), jsonType);
+  }
+
+  private JsonObject parseJsonObject(JsonObject json, JsonArray record)
+      throws DataConversionException {
+    return converter.convertRecord(record, json.toString(), new WorkUnitState()).iterator().next();
+  }
+
+  @Test
+  public void testAllCases()
+      throws DataConversionException {
+    for (Map.Entry<String, JsonElement> keyset : testJsonData.entrySet()) {
+      JsonArray testData = keyset.getValue().getAsJsonArray();
+      JsonObject json = testData.get(0).getAsJsonObject();
+      JsonArray schema = testData.get(1).getAsJsonArray();
+      JsonObject expected = testData.get(2).getAsJsonObject();
+      JsonObject result = null;
+      try {
+        result = parseJsonObject(json, schema);
+      } catch (Exception e) {
+        e.printStackTrace();
+        assertEquals("Test case failed : " + keyset.getKey(), "No exception", e.getMessage());
+      }
+      assertEquals("Test case failed : " + keyset.getKey(), expected, result);
+    }
+  }
+
+  @Test(expectedExceptions = DataConversionException.class, expectedExceptionsMessageRegExp = "Invalid symbol.*")
+  public void jsonWithInvalidEnumEntry()
+      throws DataConversionException {
+    String jsonStr = "{\"a\":\"somename\", \"b\":\"TROLL\"}";
+    String schemaStr =
+        "    [{\"columnName\":\"a\", \"dataType\":{\"type\":\"string\"}},{\"columnName\":\"b\", \"dataType\":{\"type\":\"enum\", \"symbols\":[\"HELL\",\"BELLS\"]}}]";
+
+    parseJsonObject(buildJsonObject(jsonStr), buildJsonArray(schemaStr));
+  }
+
+  @Test(expectedExceptions = UnsupportedOperationException.class, expectedExceptionsMessageRegExp = "Array items can only be defined using JsonObject or JsonPrimitive.")
+  public void jsonWithArrayOfMapContainingRecordWithWrongSchema()
+      throws DataConversionException {
+    String jsonStr = "{\"a\":\"somename\", \"b\":[{\"d\":{\"age\":\"10\"}},{\"d\":{\"age\":\"1\"}}]}";
+    String schemaStr =
+        "[{\"columnName\":\"a\", \"dataType\":{\"type\":\"string\"}},{\"columnName\":\"b\", \"dataType\":{\"type\":\"array\", \"items\":[{\"dataType\":{\"type\":\"map\", \"values\":{\"dataType\":{\"type\":\"record\",\"values\":[{\"columnName\":\"age\", \"dataType\":{\"type\":\"int\"}}]}}}}]}}]";
+
+    parseJsonObject(buildJsonObject(jsonStr), buildJsonArray(schemaStr));
+  }
+
+  private JsonObject buildJsonObject(String s) {
+    JsonParser parser = new JsonParser();
+    return (JsonObject) parser.parse(s);
+  }
+
+  private JsonArray buildJsonArray(String schemaStr) {
+    JsonParser parser = new JsonParser();
+    return parser.parse(schemaStr).getAsJsonArray();
+  }
+}
\ No newline at end of file