You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/01/23 18:17:17 UTC

incubator-gobblin git commit: [GOBBLIN-361] Support Nested nullable Record type for JDBCWriter

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master ec8529885 -> c6b3824aa


[GOBBLIN-361] Support Nested nullable Record type for JDBCWriter

Closes #2233 from jinhyukchang/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/c6b3824a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/c6b3824a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/c6b3824a

Branch: refs/heads/master
Commit: c6b3824aac01a61cb492a5cd5b672fc2fea1b09f
Parents: ec85298
Author: Jin Hyuk Chang <jn...@linkedin.com>
Authored: Tue Jan 23 10:17:10 2018 -0800
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Tue Jan 23 10:17:10 2018 -0800

----------------------------------------------------------------------
 .../filter/AvroFieldsPickConverter.java         |  55 +++++-
 .../filter/AvroFieldsPickConverterTest.java     |  37 +++-
 .../converted_pickfields_nested_with_union.avro | Bin 0 -> 678 bytes
 .../converted_pickfields_nested_with_union.avsc |  47 +++++
 .../converter/pickfields_nested_with_union.avro | Bin 0 -> 1264 bytes
 .../converter/pickfields_nested_with_union.avsc |  33 ++++
 .../jdbc/AvroToJdbcEntryConverter.java          | 150 ++++++++------
 .../jdbc/AvroToJdbcEntryConverterTest.java      | 121 ++++++++++++
 .../converter/pickfields_nested_with_union.avro | Bin 0 -> 1264 bytes
 .../converter/pickfields_nested_with_union.avsc |  33 ++++
 .../converter/pickfields_nested_with_union.json | 194 +++++++++++++++++++
 11 files changed, 608 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverter.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverter.java
index c7e2db5..74ed3f3 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverter.java
@@ -21,8 +21,10 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,6 +33,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.converter.AvroToAvroConverterBase;
@@ -133,29 +136,71 @@ public class AvroFieldsPickConverter extends AvroToAvroConverterBase {
     return createSchemaHelper(schema, root);
   }
 
-  private static Schema createSchemaHelper(Schema inputSchema, TrieNode node) {
-    Schema newRecord = Schema.createRecord(inputSchema.getName(), inputSchema.getDoc(), inputSchema.getNamespace(),
-        inputSchema.isError());
+  private static Schema createSchemaHelper(final Schema inputSchema, TrieNode node) {
     List<Field> newFields = Lists.newArrayList();
     for (TrieNode child : node.children.values()) {
-      Field innerSrcField = inputSchema.getField(child.val);
-      Preconditions.checkNotNull(innerSrcField, child.val + " does not exist under " + inputSchema);
+      Schema recordSchema = getActualRecord(inputSchema);
+      Field innerSrcField = recordSchema.getField(child.val);
+      Preconditions.checkNotNull(innerSrcField, child.val + " does not exist under " + recordSchema);
 
       if (child.children.isEmpty()) { //Leaf
         newFields.add(
             new Field(innerSrcField.name(), innerSrcField.schema(), innerSrcField.doc(), innerSrcField.defaultValue()));
       } else {
         Schema innerSrcSchema = innerSrcField.schema();
+
         Schema innerDestSchema = createSchemaHelper(innerSrcSchema, child); //Recurse of schema
         Field innerDestField =
             new Field(innerSrcField.name(), innerDestSchema, innerSrcField.doc(), innerSrcField.defaultValue());
         newFields.add(innerDestField);
       }
     }
+
+    if (Type.UNION.equals(inputSchema.getType())) {
+      Preconditions.checkArgument(inputSchema.getTypes().size() <= 2,
+          "For union type in nested record, it should only have NULL and Record type");
+
+      Schema recordSchema = getActualRecord(inputSchema);
+      Schema newRecord = Schema.createRecord(recordSchema.getName(), recordSchema.getDoc(), recordSchema.getNamespace(),
+          recordSchema.isError());
+      newRecord.setFields(newFields);
+      if (inputSchema.getTypes().size() == 1) {
+        return Schema.createUnion(newRecord);
+      }
+      return Schema.createUnion(Lists.newArrayList(Schema.create(Type.NULL), newRecord));
+    }
+
+    Schema newRecord = Schema.createRecord(inputSchema.getName(), inputSchema.getDoc(), inputSchema.getNamespace(),
+        inputSchema.isError());
     newRecord.setFields(newFields);
     return newRecord;
   }
 
+  /**
+   * For the schema that is a UNION type with NULL and Record type, it provides Records type.
+   * @param inputSchema
+   * @return
+   */
+  private static Schema getActualRecord(Schema inputSchema) {
+    if (Type.RECORD.equals(inputSchema.getType())) {
+      return inputSchema;
+    }
+
+    Preconditions.checkArgument(Type.UNION.equals(inputSchema.getType()), "Nested schema is only support with either record or union type of null with record");
+    Preconditions.checkArgument(inputSchema.getTypes().size() <= 2,
+        "For union type in nested record, it should only have NULL and Record type");
+
+    for (Schema inner : inputSchema.getTypes()) {
+      if (Type.NULL.equals(inner.getType())) {
+        continue;
+      }
+      Preconditions.checkArgument(Type.RECORD.equals(inner.getType()), "For union type in nested record, it should only have NULL and Record type");
+      return inner;
+
+    }
+    throw new IllegalArgumentException(inputSchema + " is not supported.");
+  }
+
   private static TrieNode buildTrie(List<String> fqns) {
     TrieNode root = new TrieNode(null);
     for (String fqn : fqns) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-core/src/test/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverterTest.java
index 009bcc7..a2f71f5 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverterTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverterTest.java
@@ -17,12 +17,18 @@
 
 package org.apache.gobblin.converter.filter;
 
+import java.io.File;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.converter.SchemaConversionException;
-
 import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.io.FileUtils;
 import org.skyscreamer.jsonassert.JSONAssert;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 @Test(groups = { "gobblin.converter.filter" })
@@ -59,4 +65,33 @@ public class AvroFieldsPickConverterTest {
       JSONAssert.assertEquals(expected.toString(), converted.toString(), false);
     }
   }
+
+  @Test
+  public void testFieldsPickWithNestedRecord() throws Exception {
+    Schema inputSchema = new Schema.Parser().parse(getClass().getResourceAsStream("/converter/pickfields_nested_with_union.avsc"));
+
+    WorkUnitState workUnitState = new WorkUnitState();
+    workUnitState.setProp(ConfigurationKeys.CONVERTER_AVRO_FIELD_PICK_FIELDS, "name,favorite_number,nested1.nested1_string,nested1.nested2_union.nested2_string");
+
+    try (AvroFieldsPickConverter converter = new AvroFieldsPickConverter()) {
+      Schema convertedSchema = converter.convertSchema(inputSchema, workUnitState);
+      Schema expectedSchema = new Schema.Parser().parse(getClass().getResourceAsStream("/converter/converted_pickfields_nested_with_union.avsc"));
+      JSONAssert.assertEquals(expectedSchema.toString(), convertedSchema.toString(), false);
+
+      try (DataFileReader<GenericRecord> srcDataFileReader = new DataFileReader<GenericRecord>(
+              new File(getClass().getResource("/converter/pickfields_nested_with_union.avro").toURI()),
+                  new GenericDatumReader<GenericRecord>(inputSchema));
+          DataFileReader<GenericRecord> expectedDataFileReader = new DataFileReader<GenericRecord>(
+              new File(getClass().getResource("/converter/converted_pickfields_nested_with_union.avro").toURI()),
+                  new GenericDatumReader<GenericRecord>(expectedSchema));) {
+
+        while (expectedDataFileReader.hasNext()) {
+          GenericRecord expected = expectedDataFileReader.next();
+          GenericRecord actual = converter.convertRecord(convertedSchema, srcDataFileReader.next(), workUnitState).iterator().next();
+          Assert.assertEquals(actual, expected);
+        }
+        Assert.assertTrue(!srcDataFileReader.hasNext());
+      }
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-core/src/test/resources/converter/converted_pickfields_nested_with_union.avro
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/converter/converted_pickfields_nested_with_union.avro b/gobblin-core/src/test/resources/converter/converted_pickfields_nested_with_union.avro
new file mode 100644
index 0000000..5d63a1a
Binary files /dev/null and b/gobblin-core/src/test/resources/converter/converted_pickfields_nested_with_union.avro differ

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-core/src/test/resources/converter/converted_pickfields_nested_with_union.avsc
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/converter/converted_pickfields_nested_with_union.avsc b/gobblin-core/src/test/resources/converter/converted_pickfields_nested_with_union.avsc
new file mode 100644
index 0000000..cdfc283
--- /dev/null
+++ b/gobblin-core/src/test/resources/converter/converted_pickfields_nested_with_union.avsc
@@ -0,0 +1,47 @@
+{
+  "type": "record",
+  "name": "User",
+  "namespace": "example.avro",
+  "fields": [
+    {
+      "name": "name",
+      "type": "string"
+    },
+    {
+      "name": "favorite_number",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "nested1",
+      "type": {
+        "type": "record",
+        "name": "dummy_nested1",
+        "fields": [
+          {
+            "name": "nested1_string",
+            "type": "string"
+          },
+          {
+            "name": "nested2_union",
+            "type": [
+              "null",
+              {
+                "type": "record",
+                "name": "dummy_nested2",
+                "fields": [
+                  {
+                    "name": "nested2_string",
+                    "type": "string"
+                  }
+                ]
+              }
+            ]
+          }
+        ]
+      }
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-core/src/test/resources/converter/pickfields_nested_with_union.avro
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/converter/pickfields_nested_with_union.avro b/gobblin-core/src/test/resources/converter/pickfields_nested_with_union.avro
new file mode 100644
index 0000000..b6e8d63
Binary files /dev/null and b/gobblin-core/src/test/resources/converter/pickfields_nested_with_union.avro differ

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-core/src/test/resources/converter/pickfields_nested_with_union.avsc
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/converter/pickfields_nested_with_union.avsc b/gobblin-core/src/test/resources/converter/pickfields_nested_with_union.avsc
new file mode 100644
index 0000000..bbe402d
--- /dev/null
+++ b/gobblin-core/src/test/resources/converter/pickfields_nested_with_union.avsc
@@ -0,0 +1,33 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favorite_number",  "type": ["int", "null"]},
+     {"name": "favorite_color", "type": ["string", "null"]},
+     {"name": "date_of_birth", "type": "long"},
+     {"name": "last_modified", "type": "long"},
+     {"name": "created", "type": "long"},
+     {"name": "nested1",
+      "type" : {
+                   "type": "record",
+                   "name": "dummy_nested1",
+                   "fields": [
+                       {"name": "nested1_string", "type": "string"},
+                       {"name": "nested1_int",  "type": ["int", "null"]},
+                       {"name": "nested2_union", "type": ["null", {
+                                                               "type" : "record",
+                                                               "name" : "dummy_nested2",
+                                                               "fields": [
+                                                                   {"name": "nested2_string", "type": "string"},
+                                                                   {"name": "nested2_int",  "type": ["int", "null"]}
+                                                               ]
+                                                           }
+                                                       ]
+                       }
+                   ]
+               }
+     }
+ ]
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/converter/jdbc/AvroToJdbcEntryConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/converter/jdbc/AvroToJdbcEntryConverter.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/converter/jdbc/AvroToJdbcEntryConverter.java
index b787de6..5b7e89d 100644
--- a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/converter/jdbc/AvroToJdbcEntryConverter.java
+++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/converter/jdbc/AvroToJdbcEntryConverter.java
@@ -20,9 +20,12 @@ package org.apache.gobblin.converter.jdbc;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.avro.Schema;
@@ -36,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
@@ -68,6 +72,11 @@ import org.apache.gobblin.writer.commands.JdbcWriterCommandsFactory;
 public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema, GenericRecord, JdbcEntryData> {
 
   public static final String CONVERTER_AVRO_JDBC_DATE_FIELDS = "converter.avro.jdbc.date_fields";
+  private static final String AVRO_NESTED_COLUMN_DELIMITER = ".";
+  private static final String JDBC_FLATTENED_COLUMN_DELIMITER = "_";
+  private static final String AVRO_NESTED_COLUMN_DELIMITER_REGEX_COMPATIBLE = "\\.";
+  private static final Splitter AVRO_RECORD_LEVEL_SPLITTER = Splitter.on(AVRO_NESTED_COLUMN_DELIMITER).omitEmptyStrings();
+
 
   private static final Logger LOG = LoggerFactory.getLogger(AvroToJdbcEntryConverter.class);
   private static final Map<Type, JdbcType> AVRO_TYPE_JDBC_TYPE_MAPPING =
@@ -83,6 +92,7 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema,
       ImmutableSet.<Type> builder()
         .addAll(AVRO_TYPE_JDBC_TYPE_MAPPING.keySet())
         .add(Type.UNION)
+        .add(Type.RECORD)
         .build();
   private static final Set<JdbcType> JDBC_SUPPORTED_TYPES =
       ImmutableSet.<JdbcType> builder()
@@ -93,7 +103,7 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema,
         .build();
 
   private Optional<Map<String, String>> avroToJdbcColPairs = Optional.absent();
-  private Optional<Map<String, String>> jdbcToAvroColPairs = Optional.absent();
+  private Map<String, String> jdbcToAvroColPairs = new HashMap<>();
 
   public AvroToJdbcEntryConverter() {
     super();
@@ -128,7 +138,6 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema,
           jdbcToAvroBuilder.put(entry.getValue().getAsString(), entry.getKey());
         }
         this.avroToJdbcColPairs = Optional.of((Map<String, String>) avroToJdbcBuilder.build());
-        this.jdbcToAvroColPairs = Optional.of((Map<String, String>) jdbcToAvroBuilder.build());
       }
     }
     return this;
@@ -139,7 +148,7 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema,
    *
    * Few precondition to the Avro schema
    * 1. Avro schema should have one entry type record at first depth.
-   * 2. Avro schema can recurse by having record inside record. As RDBMS structure is not recursive, this is not allowed.
+   * 2. Avro schema can recurse by having record inside record.
    * 3. Supported Avro primitive types and conversion
    *  boolean --> java.lang.Boolean
    *  int --> java.lang.Integer
@@ -150,9 +159,9 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema,
    *  string --> java.lang.String
    *  null: only allowed if it's within union (see complex types for more details)
    * 4. Supported Avro complex types
-   *  Records: Only first level depth can have Records type. Basically converter will peel out Records type and start with 2nd level.
+   *  Records: Supports nested record type as well.
    *  Enum --> java.lang.String
-   *  Unions --> Only allowed if it have one primitive type in it or null type with one primitive type where null will be ignored.
+   *  Unions --> Only allowed if it have one primitive type in it, along with Record type, or null type with one primitive type where null will be ignored.
    *  Once Union is narrowed down to one primitive type, it will follow conversion of primitive type above.
    * {@inheritDoc}
    *
@@ -167,6 +176,10 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema,
   @Override
   public JdbcEntrySchema convertSchema(Schema inputSchema, WorkUnitState workUnit) throws SchemaConversionException {
     LOG.info("Converting schema " + inputSchema);
+    Preconditions.checkArgument(Type.RECORD.equals(inputSchema.getType()),
+        "%s is expected for the first level element in Avro schema %s",
+        Type.RECORD, inputSchema);
+
     Map<String, Type> avroColumnType = flatten(inputSchema);
     String jsonStr = Preconditions.checkNotNull(workUnit.getProp(CONVERTER_AVRO_JDBC_DATE_FIELDS));
     java.lang.reflect.Type typeOfMap = new TypeToken<Map<String, JdbcType>>() {}.getType();
@@ -175,7 +188,8 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema,
 
     List<JdbcEntryMetaDatum> jdbcEntryMetaData = Lists.newArrayList();
     for (Map.Entry<String, Type> avroEntry : avroColumnType.entrySet()) {
-      String colName = tryConvertColumn(avroEntry.getKey(), this.avroToJdbcColPairs);
+      String colName = tryConvertAvroColNameToJdbcColName(avroEntry.getKey());
+
       JdbcType JdbcType = dateColumnMapping.get(colName);
       if (JdbcType == null) {
         JdbcType = AVRO_TYPE_JDBC_TYPE_MAPPING.get(avroEntry.getValue());
@@ -190,15 +204,36 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema,
     return converted;
   }
 
-  private static String tryConvertColumn(String key, Optional<Map<String, String>> mapping) {
-    if (!mapping.isPresent()) {
-      return key;
+  /**
+   * Convert Avro column name to JDBC column name. If name mapping is defined, follow it. Otherwise, just return avro column name,
+   * while replacing nested column delimiter, dot, to underscore.
+   * This method also updates, mapping from JDBC column name to Avro column name for reverse look up.
+   * @param avroColName
+   * @return
+   */
+  private String tryConvertAvroColNameToJdbcColName(String avroColName) {
+    if (!avroToJdbcColPairs.isPresent()) {
+      String converted = avroColName.replaceAll(AVRO_NESTED_COLUMN_DELIMITER_REGEX_COMPATIBLE, JDBC_FLATTENED_COLUMN_DELIMITER);
+      jdbcToAvroColPairs.put(converted, avroColName);
+      return converted;
     }
 
-    String converted = mapping.get().get(key);
-    return converted != null ? converted : key;
+    String converted = avroToJdbcColPairs.get().get(avroColName);
+    converted = converted != null ? converted : avroColName;
+    jdbcToAvroColPairs.put(converted, avroColName);
+    return converted;
+  }
+
+  /**
+   * Provides JDBC column name based on Avro column name. It's a one liner method but contains knowledge on where the mapping is.
+   * @param colName
+   * @return
+   */
+  private String convertJdbcColNameToAvroColName(String colName) {
+    return Preconditions.checkNotNull(jdbcToAvroColPairs.get(colName));
   }
 
+
   /**
    * Flattens Avro's (possibly recursive) structure and provides field name and type.
    * It assumes that the leaf level field name has unique name.
@@ -208,41 +243,44 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema,
    */
   private static Map<String, Type> flatten(Schema schema) throws SchemaConversionException {
     Map<String, Type> flattened = new LinkedHashMap<>();
-    if (!Type.RECORD.equals(schema.getType())) {
-      throw new SchemaConversionException(
-          Type.RECORD + " is expected for the first level element in Avro schema " + schema);
-    }
+    Schema recordSchema = determineType(schema);
 
-    for (Field f : schema.getFields()) {
-      produceFlattenedHelper(f.schema(), f, flattened);
+    Preconditions.checkArgument(Type.RECORD.equals(recordSchema.getType()), "%s is expected. Schema: %s",
+        Type.RECORD, recordSchema);
+
+    for (Field f : recordSchema.getFields()) {
+      produceFlattenedHelper(f, flattened);
     }
     return flattened;
   }
 
-  private static void produceFlattenedHelper(Schema schema, Field field, Map<String, Type> flattened)
+  private static void produceFlattenedHelper(Field field, Map<String, Type> flattened)
       throws SchemaConversionException {
-    if (Type.RECORD.equals(schema.getType())) {
-      throw new SchemaConversionException(Type.RECORD + " is only allowed for first level.");
+    Schema actualSchema = determineType(field.schema());
+    if (Type.RECORD.equals(actualSchema.getType())) {
+      Map<String, Type> map = flatten(actualSchema);
+      for (Entry<String, Type> entry : map.entrySet()) {
+        String key = String.format("%s" + AVRO_NESTED_COLUMN_DELIMITER + "%s", field.name(), entry.getKey());
+        Type existing = flattened.put(key, entry.getValue());
+        Preconditions.checkArgument(existing == null, "Duplicate name detected in Avro schema. Field: " + key);
+      }
+      return;
     }
 
-    Type t = determineType(schema);
-    if (field == null) {
-      throw new IllegalArgumentException("Invalid Avro schema, no name has been assigned to " + schema);
-    }
-    Type existing = flattened.put(field.name(), t);
+    Type existing = flattened.put(field.name(), actualSchema.getType());
     if (existing != null) {
       //No duplicate name allowed when flattening (not considering name space we don't have any assumption between namespace and actual database field name)
       throw new SchemaConversionException("Duplicate name detected in Avro schema. " + field.name());
     }
   }
 
-  private static Type determineType(Schema schema) throws SchemaConversionException {
+  private static Schema determineType(Schema schema) throws SchemaConversionException {
     if (!AVRO_SUPPORTED_TYPES.contains(schema.getType())) {
       throw new SchemaConversionException(schema.getType() + " is not supported");
     }
 
     if (!Type.UNION.equals(schema.getType())) {
-      return schema.getType();
+      return schema;
     }
 
     //For UNION, only supported avro type with NULL is allowed.
@@ -251,20 +289,13 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema,
       throw new SchemaConversionException("More than two types are not supported " + schemas);
     }
 
-    Type t = null;
     for (Schema s : schemas) {
       if (Type.NULL.equals(s.getType())) {
         continue;
       }
-      if (t == null) {
-        t = s.getType();
-      } else {
-        throw new SchemaConversionException("Union type of " + schemas + " is not supported.");
-      }
-    }
-    if (t != null) {
-      return t;
+      return s;
     }
+
     throw new SchemaConversionException("Cannot determine type of " + schema);
   }
 
@@ -276,12 +307,14 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema,
     }
     List<JdbcEntryDatum> jdbcEntryData = Lists.newArrayList();
     for (JdbcEntryMetaDatum entry : outputSchema) {
-      final String colName = entry.getColumnName();
+      final String jdbcColName = entry.getColumnName();
       final JdbcType jdbcType = entry.getJdbcType();
-      final Object val = record.get(tryConvertColumn(colName, this.jdbcToAvroColPairs));
+
+      String avroColName = convertJdbcColNameToAvroColName(jdbcColName);
+      final Object val = avroRecordValueGet(record, AVRO_RECORD_LEVEL_SPLITTER.split(avroColName).iterator());
 
       if (val == null) {
-        jdbcEntryData.add(new JdbcEntryDatum(colName, null));
+        jdbcEntryData.add(new JdbcEntryDatum(jdbcColName, null));
         continue;
       }
 
@@ -291,35 +324,23 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema,
 
       switch (jdbcType) {
         case VARCHAR:
-          jdbcEntryData.add(new JdbcEntryDatum(colName, val.toString()));
+          jdbcEntryData.add(new JdbcEntryDatum(jdbcColName, val.toString()));
           continue;
         case INTEGER:
         case BOOLEAN:
         case BIGINT:
         case FLOAT:
         case DOUBLE:
-          jdbcEntryData.add(new JdbcEntryDatum(colName, val));
+          jdbcEntryData.add(new JdbcEntryDatum(jdbcColName, val));
           continue;
-        //        case BOOLEAN:
-        //          jdbcEntryData.add(new JdbcEntryDatum(colName, Boolean.valueOf((boolean) val)));
-        //          continue;
-        //        case BIGINT:
-        //          jdbcEntryData.add(new JdbcEntryDatum(colName, Long.valueOf((long) val)));
-        //          continue;
-        //        case FLOAT:
-        //          jdbcEntryData.add(new JdbcEntryDatum(colName, Float.valueOf((float) val)));
-        //          continue;
-        //        case DOUBLE:
-        //          jdbcEntryData.add(new JdbcEntryDatum(colName, Double.valueOf((double) val)));
-        //          continue;
         case DATE:
-          jdbcEntryData.add(new JdbcEntryDatum(colName, new Date((long) val)));
+          jdbcEntryData.add(new JdbcEntryDatum(jdbcColName, new Date((long) val)));
           continue;
         case TIME:
-          jdbcEntryData.add(new JdbcEntryDatum(colName, new Time((long) val)));
+          jdbcEntryData.add(new JdbcEntryDatum(jdbcColName, new Time((long) val)));
           continue;
         case TIMESTAMP:
-          jdbcEntryData.add(new JdbcEntryDatum(colName, new Timestamp((long) val)));
+          jdbcEntryData.add(new JdbcEntryDatum(jdbcColName, new Timestamp((long) val)));
           continue;
         default:
           throw new DataConversionException(jdbcType + " is not supported");
@@ -332,6 +353,23 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema,
     return new SingleRecordIterable<>(converted);
   }
 
+  private Object avroRecordValueGet(GenericRecord record, Iterator<String> recordNameIterator) {
+    String name = recordNameIterator.next();
+    Object val = record.get(name);
+    if (val == null) {
+      //Either leaf value is null or nested Record (represented as UNION) is null
+      return null;
+    }
+    if (!recordNameIterator.hasNext()) {
+      //Leaf
+      return val;
+    }
+
+    //Recurse
+    return avroRecordValueGet((GenericRecord) val, recordNameIterator);
+  }
+
+  @Override
   public ConverterInitializer getInitializer(State state, WorkUnitStream workUnits, int branches, int branchId) {
     JdbcWriterCommandsFactory factory = new JdbcWriterCommandsFactory();
     if (workUnits.isSafeToMaterialize()) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/converter/jdbc/AvroToJdbcEntryConverterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/converter/jdbc/AvroToJdbcEntryConverterTest.java b/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/converter/jdbc/AvroToJdbcEntryConverterTest.java
index de1f0a3..a835d89 100644
--- a/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/converter/jdbc/AvroToJdbcEntryConverterTest.java
+++ b/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/converter/jdbc/AvroToJdbcEntryConverterTest.java
@@ -18,29 +18,50 @@
 package org.apache.gobblin.converter.jdbc;
 
 import static org.mockito.Mockito.*;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.DataConversionException;
 import org.apache.gobblin.converter.SchemaConversionException;
 import org.apache.gobblin.publisher.JdbcPublisher;
 import org.apache.gobblin.writer.Destination.DestinationType;
 import org.apache.gobblin.writer.commands.JdbcWriterCommands;
 import org.apache.gobblin.writer.commands.JdbcWriterCommandsFactory;
 
+import java.io.File;
 import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.reflect.Type;
+import java.net.URISyntaxException;
 import java.sql.Connection;
+import java.sql.Date;
 import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import com.google.common.collect.Maps;
 import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
 
 @Test(groups = {"gobblin.converter"})
 public class AvroToJdbcEntryConverterTest {
@@ -130,4 +151,104 @@ public class AvroToJdbcEntryConverterTest {
 
     Assert.assertEquals(expected, actual);
   }
+
+  @Test
+  public void testFlattening() throws IOException, SchemaConversionException, SQLException, URISyntaxException, DataConversionException {
+    final String db = "db";
+    final String table = "users";
+    Map<String, JdbcType> dateColums = new HashMap<>();
+    dateColums.put("date_of_birth", JdbcType.DATE);
+    dateColums.put("last_modified", JdbcType.TIME);
+    dateColums.put("created", JdbcType.TIMESTAMP);
+
+    JdbcWriterCommands mockWriterCommands = mock(JdbcWriterCommands.class);
+    when(mockWriterCommands.retrieveDateColumns(db, table)).thenReturn(dateColums);
+
+    JdbcWriterCommandsFactory factory = mock(JdbcWriterCommandsFactory.class);
+    when(factory.newInstance(any(State.class), any(Connection.class))).thenReturn(mockWriterCommands);
+
+    List<JdbcEntryMetaDatum> jdbcEntryMetaData = new ArrayList<>();
+    jdbcEntryMetaData.add(new JdbcEntryMetaDatum("name", JdbcType.VARCHAR));
+    jdbcEntryMetaData.add(new JdbcEntryMetaDatum("favorite_number", JdbcType.VARCHAR));
+    jdbcEntryMetaData.add(new JdbcEntryMetaDatum("favorite_color", JdbcType.VARCHAR));
+    jdbcEntryMetaData.add(new JdbcEntryMetaDatum("date_of_birth", JdbcType.DATE));
+    jdbcEntryMetaData.add(new JdbcEntryMetaDatum("last_modified", JdbcType.TIME));
+    jdbcEntryMetaData.add(new JdbcEntryMetaDatum("created", JdbcType.TIMESTAMP));
+    jdbcEntryMetaData.add(new JdbcEntryMetaDatum("nested1_nested1_string", JdbcType.VARCHAR));
+    jdbcEntryMetaData.add(new JdbcEntryMetaDatum("nested1_nested1_int", JdbcType.INTEGER));
+    jdbcEntryMetaData.add(new JdbcEntryMetaDatum("nested1_nested2_union_nested2_string", JdbcType.VARCHAR));
+    jdbcEntryMetaData.add(new JdbcEntryMetaDatum("nested1_nested2_union_nested2_int", JdbcType.INTEGER));
+    JdbcEntrySchema expected = new JdbcEntrySchema(jdbcEntryMetaData);
+
+    Schema inputSchema = new Schema.Parser().parse(getClass().getResourceAsStream("/converter/pickfields_nested_with_union.avsc"));
+    WorkUnitState workUnitState = new WorkUnitState();
+    workUnitState.appendToListProp(JdbcPublisher.JDBC_PUBLISHER_FINAL_TABLE_NAME, table);
+    AvroToJdbcEntryConverter converter = new AvroToJdbcEntryConverter(workUnitState);
+
+    Map<String, JdbcType> dateColumnMapping = Maps.newHashMap();
+    dateColumnMapping.put("date_of_birth", JdbcType.DATE);
+    dateColumnMapping.put("last_modified", JdbcType.TIME);
+    dateColumnMapping.put("created", JdbcType.TIMESTAMP);
+    workUnitState.appendToListProp(AvroToJdbcEntryConverter.CONVERTER_AVRO_JDBC_DATE_FIELDS,
+                                   new Gson().toJson(dateColumnMapping));
+
+    JdbcEntrySchema actualSchema = converter.convertSchema(inputSchema, workUnitState);
+    Assert.assertEquals(expected, actualSchema);
+
+    try (
+        DataFileReader<GenericRecord> srcDataFileReader =
+            new DataFileReader<GenericRecord>(new File(getClass().getResource(
+                "/converter/pickfields_nested_with_union.avro").toURI()), new GenericDatumReader<GenericRecord>(
+                inputSchema))) {
+
+      List<JdbcEntryData> entries = new ArrayList<>();
+      while (srcDataFileReader.hasNext()) {
+        JdbcEntryData actualData = converter.convertRecord(actualSchema, srcDataFileReader.next(), workUnitState).iterator().next();
+        entries.add(actualData);
+      }
+
+      final JsonSerializer<JdbcEntryDatum> datumSer = new JsonSerializer<JdbcEntryDatum>() {
+        @Override
+        public JsonElement serialize(JdbcEntryDatum datum, Type typeOfSrc, JsonSerializationContext context) {
+          JsonObject jso = new JsonObject();
+          if (datum.getVal() == null) {
+            jso.add(datum.getColumnName(), null);
+            return jso;
+          }
+
+          if (datum.getVal() instanceof Date) {
+            jso.addProperty(datum.getColumnName(), ((Date) datum.getVal()).getTime());
+          } else if (datum.getVal() instanceof Timestamp) {
+            jso.addProperty(datum.getColumnName(), ((Timestamp) datum.getVal()).getTime());
+          } else if (datum.getVal() instanceof Time) {
+            jso.addProperty(datum.getColumnName(), ((Time) datum.getVal()).getTime());
+          } else {
+            jso.addProperty(datum.getColumnName(), datum.getVal().toString());
+          }
+          return jso;
+        }
+      };
+
+      JsonSerializer<JdbcEntryData> serializer = new JsonSerializer<JdbcEntryData>() {
+        @Override
+        public JsonElement serialize(JdbcEntryData src, Type typeOfSrc, JsonSerializationContext context) {
+          JsonArray arr = new JsonArray();
+          for (JdbcEntryDatum datum : src) {
+            arr.add(datumSer.serialize(datum, datum.getClass(), context));
+          }
+          return arr;
+        }
+      };
+
+      Gson gson = new GsonBuilder().registerTypeAdapter(JdbcEntryData.class, serializer).serializeNulls().create();
+
+      JsonElement actualSerialized = gson.toJsonTree(entries);
+      JsonElement expectedSerialized =
+          new JsonParser().parse(new InputStreamReader(getClass().getResourceAsStream("/converter/pickfields_nested_with_union.json")));
+
+      Assert.assertEquals(actualSerialized, expectedSerialized);
+    }
+
+    converter.close();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.avro
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.avro b/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.avro
new file mode 100644
index 0000000..b6e8d63
Binary files /dev/null and b/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.avro differ

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.avsc
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.avsc b/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.avsc
new file mode 100644
index 0000000..bbe402d
--- /dev/null
+++ b/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.avsc
@@ -0,0 +1,33 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favorite_number",  "type": ["int", "null"]},
+     {"name": "favorite_color", "type": ["string", "null"]},
+     {"name": "date_of_birth", "type": "long"},
+     {"name": "last_modified", "type": "long"},
+     {"name": "created", "type": "long"},
+     {"name": "nested1",
+      "type" : {
+                   "type": "record",
+                   "name": "dummy_nested1",
+                   "fields": [
+                       {"name": "nested1_string", "type": "string"},
+                       {"name": "nested1_int",  "type": ["int", "null"]},
+                       {"name": "nested2_union", "type": ["null", {
+                                                               "type" : "record",
+                                                               "name" : "dummy_nested2",
+                                                               "fields": [
+                                                                   {"name": "nested2_string", "type": "string"},
+                                                                   {"name": "nested2_int",  "type": ["int", "null"]}
+                                                               ]
+                                                           }
+                                                       ]
+                       }
+                   ]
+               }
+     }
+ ]
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.json
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.json b/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.json
new file mode 100644
index 0000000..42db8d6
--- /dev/null
+++ b/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.json
@@ -0,0 +1,194 @@
+[
+   [
+      {
+         "created":-8205952289873597770
+      },
+      {
+         "date_of_birth":4924540963523509391
+      },
+      {
+         "favorite_color":"�hxqpwvxrf"
+      },
+      {
+         "favorite_number":"826032918"
+      },
+      {
+         "last_modified":5532738755424028468
+      },
+      {
+         "name":"btnzlrfptk"
+      },
+      {
+         "nested1_nested1_int":"-36788251"
+      },
+      {
+         "nested1_nested1_string":"gji\u001a\u0000sujkj"
+      },
+      {
+         "nested1_nested2_union_nested2_int":"1026040670"
+      },
+      {
+         "nested1_nested2_union_nested2_string":"yobzdadkgk"
+      }
+   ],
+   [
+      {
+         "created":-1393624224378683129
+      },
+      {
+         "date_of_birth":-1175814878817216371
+      },
+      {
+         "favorite_color":"x\n\tjhrpgyd"
+      },
+      {
+         "favorite_number":"1816171539"
+      },
+      {
+         "last_modified":8404219756951923781
+      },
+      {
+         "name":"\u000f\rpkgrlgio"
+      },
+      {
+         "nested1_nested1_int":"-50507635"
+      },
+      {
+         "nested1_nested1_string":"q\u000f?uspbscf"
+      },
+      {
+         "nested1_nested2_union_nested2_int":null
+      },
+      {
+         "nested1_nested2_union_nested2_string":null
+      }
+   ],
+   [
+      {
+         "created":-7739579554682470032
+      },
+      {
+         "date_of_birth":-607816151590576707
+      },
+      {
+         "favorite_color":"lyuuuympyg"
+      },
+      {
+         "favorite_number":"1866476467"
+      },
+      {
+         "last_modified":-941580389512399179
+      },
+      {
+         "name":"g?pbkpjrxh"
+      },
+      {
+         "nested1_nested1_int":"1327904823"
+      },
+      {
+         "nested1_nested1_string":"kxqmrenntu"
+      },
+      {
+         "nested1_nested2_union_nested2_int":null
+      },
+      {
+         "nested1_nested2_union_nested2_string":null
+      }
+   ],
+   [
+      {
+         "created":-6056615843637407776
+      },
+      {
+         "date_of_birth":-1429852167829293715
+      },
+      {
+         "favorite_color":"�dgasutgtx"
+      },
+      {
+         "favorite_number":"-1553608691"
+      },
+      {
+         "last_modified":450932180461066816
+      },
+      {
+         "name":"gqmcmimbhp"
+      },
+      {
+         "nested1_nested1_int":"-351781782"
+      },
+      {
+         "nested1_nested1_string":"o\u001ac\u0000bmefwh"
+      },
+      {
+         "nested1_nested2_union_nested2_int":"-1596923241"
+      },
+      {
+         "nested1_nested2_union_nested2_string":"dbcczapozw"
+      }
+   ],
+   [
+      {
+         "created":-4666560421015124885
+      },
+      {
+         "date_of_birth":-8070729844977385755
+      },
+      {
+         "favorite_color":"pfzharskmy"
+      },
+      {
+         "favorite_number":"-170051651"
+      },
+      {
+         "last_modified":-7703151747036814734
+      },
+      {
+         "name":"f\u000fwszbxhzm"
+      },
+      {
+         "nested1_nested1_int":"-1126087353"
+      },
+      {
+         "nested1_nested1_string":"mjwmnevxer"
+      },
+      {
+         "nested1_nested2_union_nested2_int":"-1722304492"
+      },
+      {
+         "nested1_nested2_union_nested2_string":"h\rwdawizsu"
+      }
+   ],
+   [
+      {
+         "created":6548727010966246856
+      },
+      {
+         "date_of_birth":8554093846897734514
+      },
+      {
+         "favorite_color":"\u000fcgsqjdabu"
+      },
+      {
+         "favorite_number":"-2132346518"
+      },
+      {
+         "last_modified":3298280474340398482
+      },
+      {
+         "name":"k\n\tngmvhpe"
+      },
+      {
+         "nested1_nested1_int":"-1330607161"
+      },
+      {
+         "nested1_nested1_string":"ubbhpssdeh"
+      },
+      {
+         "nested1_nested2_union_nested2_int":"992907867"
+      },
+      {
+         "nested1_nested2_union_nested2_string":"jk?jknvxkw"
+      }
+   ]
+]
\ No newline at end of file