You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/01/23 18:17:17 UTC
incubator-gobblin git commit: [GOBBLIN-361] Support Nested nullable
Record type for JDBCWriter
Repository: incubator-gobblin
Updated Branches:
refs/heads/master ec8529885 -> c6b3824aa
[GOBBLIN-361] Support Nested nullable Record type for JDBCWriter
Closes #2233 from jinhyukchang/master
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/c6b3824a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/c6b3824a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/c6b3824a
Branch: refs/heads/master
Commit: c6b3824aac01a61cb492a5cd5b672fc2fea1b09f
Parents: ec85298
Author: Jin Hyuk Chang <jn...@linkedin.com>
Authored: Tue Jan 23 10:17:10 2018 -0800
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Tue Jan 23 10:17:10 2018 -0800
----------------------------------------------------------------------
.../filter/AvroFieldsPickConverter.java | 55 +++++-
.../filter/AvroFieldsPickConverterTest.java | 37 +++-
.../converted_pickfields_nested_with_union.avro | Bin 0 -> 678 bytes
.../converted_pickfields_nested_with_union.avsc | 47 +++++
.../converter/pickfields_nested_with_union.avro | Bin 0 -> 1264 bytes
.../converter/pickfields_nested_with_union.avsc | 33 ++++
.../jdbc/AvroToJdbcEntryConverter.java | 150 ++++++++------
.../jdbc/AvroToJdbcEntryConverterTest.java | 121 ++++++++++++
.../converter/pickfields_nested_with_union.avro | Bin 0 -> 1264 bytes
.../converter/pickfields_nested_with_union.avsc | 33 ++++
.../converter/pickfields_nested_with_union.json | 194 +++++++++++++++++++
11 files changed, 608 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverter.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverter.java
index c7e2db5..74ed3f3 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverter.java
@@ -21,8 +21,10 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,6 +33,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.converter.AvroToAvroConverterBase;
@@ -133,29 +136,71 @@ public class AvroFieldsPickConverter extends AvroToAvroConverterBase {
return createSchemaHelper(schema, root);
}
- private static Schema createSchemaHelper(Schema inputSchema, TrieNode node) {
- Schema newRecord = Schema.createRecord(inputSchema.getName(), inputSchema.getDoc(), inputSchema.getNamespace(),
- inputSchema.isError());
+ private static Schema createSchemaHelper(final Schema inputSchema, TrieNode node) {
List<Field> newFields = Lists.newArrayList();
for (TrieNode child : node.children.values()) {
- Field innerSrcField = inputSchema.getField(child.val);
- Preconditions.checkNotNull(innerSrcField, child.val + " does not exist under " + inputSchema);
+ Schema recordSchema = getActualRecord(inputSchema);
+ Field innerSrcField = recordSchema.getField(child.val);
+ Preconditions.checkNotNull(innerSrcField, child.val + " does not exist under " + recordSchema);
if (child.children.isEmpty()) { //Leaf
newFields.add(
new Field(innerSrcField.name(), innerSrcField.schema(), innerSrcField.doc(), innerSrcField.defaultValue()));
} else {
Schema innerSrcSchema = innerSrcField.schema();
+
Schema innerDestSchema = createSchemaHelper(innerSrcSchema, child); //Recurse of schema
Field innerDestField =
new Field(innerSrcField.name(), innerDestSchema, innerSrcField.doc(), innerSrcField.defaultValue());
newFields.add(innerDestField);
}
}
+
+ if (Type.UNION.equals(inputSchema.getType())) {
+ Preconditions.checkArgument(inputSchema.getTypes().size() <= 2,
+ "For union type in nested record, it should only have NULL and Record type");
+
+ Schema recordSchema = getActualRecord(inputSchema);
+ Schema newRecord = Schema.createRecord(recordSchema.getName(), recordSchema.getDoc(), recordSchema.getNamespace(),
+ recordSchema.isError());
+ newRecord.setFields(newFields);
+ if (inputSchema.getTypes().size() == 1) {
+ return Schema.createUnion(newRecord);
+ }
+ return Schema.createUnion(Lists.newArrayList(Schema.create(Type.NULL), newRecord));
+ }
+
+ Schema newRecord = Schema.createRecord(inputSchema.getName(), inputSchema.getDoc(), inputSchema.getNamespace(),
+ inputSchema.isError());
newRecord.setFields(newFields);
return newRecord;
}
+ /**
+ * For the schema that is a UNION type with NULL and Record type, it provides Records type.
+ * @param inputSchema
+ * @return
+ */
+ private static Schema getActualRecord(Schema inputSchema) {
+ if (Type.RECORD.equals(inputSchema.getType())) {
+ return inputSchema;
+ }
+
+ Preconditions.checkArgument(Type.UNION.equals(inputSchema.getType()), "Nested schema is only support with either record or union type of null with record");
+ Preconditions.checkArgument(inputSchema.getTypes().size() <= 2,
+ "For union type in nested record, it should only have NULL and Record type");
+
+ for (Schema inner : inputSchema.getTypes()) {
+ if (Type.NULL.equals(inner.getType())) {
+ continue;
+ }
+ Preconditions.checkArgument(Type.RECORD.equals(inner.getType()), "For union type in nested record, it should only have NULL and Record type");
+ return inner;
+
+ }
+ throw new IllegalArgumentException(inputSchema + " is not supported.");
+ }
+
private static TrieNode buildTrie(List<String> fqns) {
TrieNode root = new TrieNode(null);
for (String fqn : fqns) {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-core/src/test/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverterTest.java
index 009bcc7..a2f71f5 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverterTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverterTest.java
@@ -17,12 +17,18 @@
package org.apache.gobblin.converter.filter;
+import java.io.File;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.converter.SchemaConversionException;
-
import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.io.FileUtils;
import org.skyscreamer.jsonassert.JSONAssert;
+import org.testng.Assert;
import org.testng.annotations.Test;
@Test(groups = { "gobblin.converter.filter" })
@@ -59,4 +65,33 @@ public class AvroFieldsPickConverterTest {
JSONAssert.assertEquals(expected.toString(), converted.toString(), false);
}
}
+
+ @Test
+ public void testFieldsPickWithNestedRecord() throws Exception {
+ Schema inputSchema = new Schema.Parser().parse(getClass().getResourceAsStream("/converter/pickfields_nested_with_union.avsc"));
+
+ WorkUnitState workUnitState = new WorkUnitState();
+ workUnitState.setProp(ConfigurationKeys.CONVERTER_AVRO_FIELD_PICK_FIELDS, "name,favorite_number,nested1.nested1_string,nested1.nested2_union.nested2_string");
+
+ try (AvroFieldsPickConverter converter = new AvroFieldsPickConverter()) {
+ Schema convertedSchema = converter.convertSchema(inputSchema, workUnitState);
+ Schema expectedSchema = new Schema.Parser().parse(getClass().getResourceAsStream("/converter/converted_pickfields_nested_with_union.avsc"));
+ JSONAssert.assertEquals(expectedSchema.toString(), convertedSchema.toString(), false);
+
+ try (DataFileReader<GenericRecord> srcDataFileReader = new DataFileReader<GenericRecord>(
+ new File(getClass().getResource("/converter/pickfields_nested_with_union.avro").toURI()),
+ new GenericDatumReader<GenericRecord>(inputSchema));
+ DataFileReader<GenericRecord> expectedDataFileReader = new DataFileReader<GenericRecord>(
+ new File(getClass().getResource("/converter/converted_pickfields_nested_with_union.avro").toURI()),
+ new GenericDatumReader<GenericRecord>(expectedSchema));) {
+
+ while (expectedDataFileReader.hasNext()) {
+ GenericRecord expected = expectedDataFileReader.next();
+ GenericRecord actual = converter.convertRecord(convertedSchema, srcDataFileReader.next(), workUnitState).iterator().next();
+ Assert.assertEquals(actual, expected);
+ }
+ Assert.assertTrue(!srcDataFileReader.hasNext());
+ }
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-core/src/test/resources/converter/converted_pickfields_nested_with_union.avro
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/converter/converted_pickfields_nested_with_union.avro b/gobblin-core/src/test/resources/converter/converted_pickfields_nested_with_union.avro
new file mode 100644
index 0000000..5d63a1a
Binary files /dev/null and b/gobblin-core/src/test/resources/converter/converted_pickfields_nested_with_union.avro differ
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-core/src/test/resources/converter/converted_pickfields_nested_with_union.avsc
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/converter/converted_pickfields_nested_with_union.avsc b/gobblin-core/src/test/resources/converter/converted_pickfields_nested_with_union.avsc
new file mode 100644
index 0000000..cdfc283
--- /dev/null
+++ b/gobblin-core/src/test/resources/converter/converted_pickfields_nested_with_union.avsc
@@ -0,0 +1,47 @@
+{
+ "type": "record",
+ "name": "User",
+ "namespace": "example.avro",
+ "fields": [
+ {
+ "name": "name",
+ "type": "string"
+ },
+ {
+ "name": "favorite_number",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "nested1",
+ "type": {
+ "type": "record",
+ "name": "dummy_nested1",
+ "fields": [
+ {
+ "name": "nested1_string",
+ "type": "string"
+ },
+ {
+ "name": "nested2_union",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "dummy_nested2",
+ "fields": [
+ {
+ "name": "nested2_string",
+ "type": "string"
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ }
+ ]
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-core/src/test/resources/converter/pickfields_nested_with_union.avro
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/converter/pickfields_nested_with_union.avro b/gobblin-core/src/test/resources/converter/pickfields_nested_with_union.avro
new file mode 100644
index 0000000..b6e8d63
Binary files /dev/null and b/gobblin-core/src/test/resources/converter/pickfields_nested_with_union.avro differ
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-core/src/test/resources/converter/pickfields_nested_with_union.avsc
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/converter/pickfields_nested_with_union.avsc b/gobblin-core/src/test/resources/converter/pickfields_nested_with_union.avsc
new file mode 100644
index 0000000..bbe402d
--- /dev/null
+++ b/gobblin-core/src/test/resources/converter/pickfields_nested_with_union.avsc
@@ -0,0 +1,33 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favorite_number", "type": ["int", "null"]},
+ {"name": "favorite_color", "type": ["string", "null"]},
+ {"name": "date_of_birth", "type": "long"},
+ {"name": "last_modified", "type": "long"},
+ {"name": "created", "type": "long"},
+ {"name": "nested1",
+ "type" : {
+ "type": "record",
+ "name": "dummy_nested1",
+ "fields": [
+ {"name": "nested1_string", "type": "string"},
+ {"name": "nested1_int", "type": ["int", "null"]},
+ {"name": "nested2_union", "type": ["null", {
+ "type" : "record",
+ "name" : "dummy_nested2",
+ "fields": [
+ {"name": "nested2_string", "type": "string"},
+ {"name": "nested2_int", "type": ["int", "null"]}
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ }
+ ]
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/converter/jdbc/AvroToJdbcEntryConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/converter/jdbc/AvroToJdbcEntryConverter.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/converter/jdbc/AvroToJdbcEntryConverter.java
index b787de6..5b7e89d 100644
--- a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/converter/jdbc/AvroToJdbcEntryConverter.java
+++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/converter/jdbc/AvroToJdbcEntryConverter.java
@@ -20,9 +20,12 @@ package org.apache.gobblin.converter.jdbc;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import org.apache.avro.Schema;
@@ -36,6 +39,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
@@ -68,6 +72,11 @@ import org.apache.gobblin.writer.commands.JdbcWriterCommandsFactory;
public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema, GenericRecord, JdbcEntryData> {
public static final String CONVERTER_AVRO_JDBC_DATE_FIELDS = "converter.avro.jdbc.date_fields";
+ private static final String AVRO_NESTED_COLUMN_DELIMITER = ".";
+ private static final String JDBC_FLATTENED_COLUMN_DELIMITER = "_";
+ private static final String AVRO_NESTED_COLUMN_DELIMITER_REGEX_COMPATIBLE = "\\.";
+ private static final Splitter AVRO_RECORD_LEVEL_SPLITTER = Splitter.on(AVRO_NESTED_COLUMN_DELIMITER).omitEmptyStrings();
+
private static final Logger LOG = LoggerFactory.getLogger(AvroToJdbcEntryConverter.class);
private static final Map<Type, JdbcType> AVRO_TYPE_JDBC_TYPE_MAPPING =
@@ -83,6 +92,7 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema,
ImmutableSet.<Type> builder()
.addAll(AVRO_TYPE_JDBC_TYPE_MAPPING.keySet())
.add(Type.UNION)
+ .add(Type.RECORD)
.build();
private static final Set<JdbcType> JDBC_SUPPORTED_TYPES =
ImmutableSet.<JdbcType> builder()
@@ -93,7 +103,7 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema,
.build();
private Optional<Map<String, String>> avroToJdbcColPairs = Optional.absent();
- private Optional<Map<String, String>> jdbcToAvroColPairs = Optional.absent();
+ private Map<String, String> jdbcToAvroColPairs = new HashMap<>();
public AvroToJdbcEntryConverter() {
super();
@@ -128,7 +138,6 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema,
jdbcToAvroBuilder.put(entry.getValue().getAsString(), entry.getKey());
}
this.avroToJdbcColPairs = Optional.of((Map<String, String>) avroToJdbcBuilder.build());
- this.jdbcToAvroColPairs = Optional.of((Map<String, String>) jdbcToAvroBuilder.build());
}
}
return this;
@@ -139,7 +148,7 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema,
*
* Few precondition to the Avro schema
* 1. Avro schema should have one entry type record at first depth.
- * 2. Avro schema can recurse by having record inside record. As RDBMS structure is not recursive, this is not allowed.
+ * 2. Avro schema can recurse by having record inside record.
* 3. Supported Avro primitive types and conversion
* boolean --> java.lang.Boolean
* int --> java.lang.Integer
@@ -150,9 +159,9 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema,
* string --> java.lang.String
* null: only allowed if it's within union (see complex types for more details)
* 4. Supported Avro complex types
- * Records: Only first level depth can have Records type. Basically converter will peel out Records type and start with 2nd level.
+ * Records: Supports nested record type as well.
* Enum --> java.lang.String
- * Unions --> Only allowed if it have one primitive type in it or null type with one primitive type where null will be ignored.
+ * Unions --> Only allowed if it have one primitive type in it, along with Record type, or null type with one primitive type where null will be ignored.
* Once Union is narrowed down to one primitive type, it will follow conversion of primitive type above.
* {@inheritDoc}
*
@@ -167,6 +176,10 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema,
@Override
public JdbcEntrySchema convertSchema(Schema inputSchema, WorkUnitState workUnit) throws SchemaConversionException {
LOG.info("Converting schema " + inputSchema);
+ Preconditions.checkArgument(Type.RECORD.equals(inputSchema.getType()),
+ "%s is expected for the first level element in Avro schema %s",
+ Type.RECORD, inputSchema);
+
Map<String, Type> avroColumnType = flatten(inputSchema);
String jsonStr = Preconditions.checkNotNull(workUnit.getProp(CONVERTER_AVRO_JDBC_DATE_FIELDS));
java.lang.reflect.Type typeOfMap = new TypeToken<Map<String, JdbcType>>() {}.getType();
@@ -175,7 +188,8 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema,
List<JdbcEntryMetaDatum> jdbcEntryMetaData = Lists.newArrayList();
for (Map.Entry<String, Type> avroEntry : avroColumnType.entrySet()) {
- String colName = tryConvertColumn(avroEntry.getKey(), this.avroToJdbcColPairs);
+ String colName = tryConvertAvroColNameToJdbcColName(avroEntry.getKey());
+
JdbcType JdbcType = dateColumnMapping.get(colName);
if (JdbcType == null) {
JdbcType = AVRO_TYPE_JDBC_TYPE_MAPPING.get(avroEntry.getValue());
@@ -190,15 +204,36 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema,
return converted;
}
- private static String tryConvertColumn(String key, Optional<Map<String, String>> mapping) {
- if (!mapping.isPresent()) {
- return key;
+ /**
+ * Convert Avro column name to JDBC column name. If name mapping is defined, follow it. Otherwise, just return avro column name,
+ * while replacing nested column delimiter, dot, to underscore.
+ * This method also updates, mapping from JDBC column name to Avro column name for reverse look up.
+ * @param avroColName
+ * @return
+ */
+ private String tryConvertAvroColNameToJdbcColName(String avroColName) {
+ if (!avroToJdbcColPairs.isPresent()) {
+ String converted = avroColName.replaceAll(AVRO_NESTED_COLUMN_DELIMITER_REGEX_COMPATIBLE, JDBC_FLATTENED_COLUMN_DELIMITER);
+ jdbcToAvroColPairs.put(converted, avroColName);
+ return converted;
}
- String converted = mapping.get().get(key);
- return converted != null ? converted : key;
+ String converted = avroToJdbcColPairs.get().get(avroColName);
+ converted = converted != null ? converted : avroColName;
+ jdbcToAvroColPairs.put(converted, avroColName);
+ return converted;
+ }
+
+ /**
+ * Provides JDBC column name based on Avro column name. It's a one liner method but contains knowledge on where the mapping is.
+ * @param colName
+ * @return
+ */
+ private String convertJdbcColNameToAvroColName(String colName) {
+ return Preconditions.checkNotNull(jdbcToAvroColPairs.get(colName));
}
+
/**
* Flattens Avro's (possibly recursive) structure and provides field name and type.
* It assumes that the leaf level field name has unique name.
@@ -208,41 +243,44 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema,
*/
private static Map<String, Type> flatten(Schema schema) throws SchemaConversionException {
Map<String, Type> flattened = new LinkedHashMap<>();
- if (!Type.RECORD.equals(schema.getType())) {
- throw new SchemaConversionException(
- Type.RECORD + " is expected for the first level element in Avro schema " + schema);
- }
+ Schema recordSchema = determineType(schema);
- for (Field f : schema.getFields()) {
- produceFlattenedHelper(f.schema(), f, flattened);
+ Preconditions.checkArgument(Type.RECORD.equals(recordSchema.getType()), "%s is expected. Schema: %s",
+ Type.RECORD, recordSchema);
+
+ for (Field f : recordSchema.getFields()) {
+ produceFlattenedHelper(f, flattened);
}
return flattened;
}
- private static void produceFlattenedHelper(Schema schema, Field field, Map<String, Type> flattened)
+ private static void produceFlattenedHelper(Field field, Map<String, Type> flattened)
throws SchemaConversionException {
- if (Type.RECORD.equals(schema.getType())) {
- throw new SchemaConversionException(Type.RECORD + " is only allowed for first level.");
+ Schema actualSchema = determineType(field.schema());
+ if (Type.RECORD.equals(actualSchema.getType())) {
+ Map<String, Type> map = flatten(actualSchema);
+ for (Entry<String, Type> entry : map.entrySet()) {
+ String key = String.format("%s" + AVRO_NESTED_COLUMN_DELIMITER + "%s", field.name(), entry.getKey());
+ Type existing = flattened.put(key, entry.getValue());
+ Preconditions.checkArgument(existing == null, "Duplicate name detected in Avro schema. Field: " + key);
+ }
+ return;
}
- Type t = determineType(schema);
- if (field == null) {
- throw new IllegalArgumentException("Invalid Avro schema, no name has been assigned to " + schema);
- }
- Type existing = flattened.put(field.name(), t);
+ Type existing = flattened.put(field.name(), actualSchema.getType());
if (existing != null) {
//No duplicate name allowed when flattening (not considering name space we don't have any assumption between namespace and actual database field name)
throw new SchemaConversionException("Duplicate name detected in Avro schema. " + field.name());
}
}
- private static Type determineType(Schema schema) throws SchemaConversionException {
+ private static Schema determineType(Schema schema) throws SchemaConversionException {
if (!AVRO_SUPPORTED_TYPES.contains(schema.getType())) {
throw new SchemaConversionException(schema.getType() + " is not supported");
}
if (!Type.UNION.equals(schema.getType())) {
- return schema.getType();
+ return schema;
}
//For UNION, only supported avro type with NULL is allowed.
@@ -251,20 +289,13 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema,
throw new SchemaConversionException("More than two types are not supported " + schemas);
}
- Type t = null;
for (Schema s : schemas) {
if (Type.NULL.equals(s.getType())) {
continue;
}
- if (t == null) {
- t = s.getType();
- } else {
- throw new SchemaConversionException("Union type of " + schemas + " is not supported.");
- }
- }
- if (t != null) {
- return t;
+ return s;
}
+
throw new SchemaConversionException("Cannot determine type of " + schema);
}
@@ -276,12 +307,14 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema,
}
List<JdbcEntryDatum> jdbcEntryData = Lists.newArrayList();
for (JdbcEntryMetaDatum entry : outputSchema) {
- final String colName = entry.getColumnName();
+ final String jdbcColName = entry.getColumnName();
final JdbcType jdbcType = entry.getJdbcType();
- final Object val = record.get(tryConvertColumn(colName, this.jdbcToAvroColPairs));
+
+ String avroColName = convertJdbcColNameToAvroColName(jdbcColName);
+ final Object val = avroRecordValueGet(record, AVRO_RECORD_LEVEL_SPLITTER.split(avroColName).iterator());
if (val == null) {
- jdbcEntryData.add(new JdbcEntryDatum(colName, null));
+ jdbcEntryData.add(new JdbcEntryDatum(jdbcColName, null));
continue;
}
@@ -291,35 +324,23 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema,
switch (jdbcType) {
case VARCHAR:
- jdbcEntryData.add(new JdbcEntryDatum(colName, val.toString()));
+ jdbcEntryData.add(new JdbcEntryDatum(jdbcColName, val.toString()));
continue;
case INTEGER:
case BOOLEAN:
case BIGINT:
case FLOAT:
case DOUBLE:
- jdbcEntryData.add(new JdbcEntryDatum(colName, val));
+ jdbcEntryData.add(new JdbcEntryDatum(jdbcColName, val));
continue;
- // case BOOLEAN:
- // jdbcEntryData.add(new JdbcEntryDatum(colName, Boolean.valueOf((boolean) val)));
- // continue;
- // case BIGINT:
- // jdbcEntryData.add(new JdbcEntryDatum(colName, Long.valueOf((long) val)));
- // continue;
- // case FLOAT:
- // jdbcEntryData.add(new JdbcEntryDatum(colName, Float.valueOf((float) val)));
- // continue;
- // case DOUBLE:
- // jdbcEntryData.add(new JdbcEntryDatum(colName, Double.valueOf((double) val)));
- // continue;
case DATE:
- jdbcEntryData.add(new JdbcEntryDatum(colName, new Date((long) val)));
+ jdbcEntryData.add(new JdbcEntryDatum(jdbcColName, new Date((long) val)));
continue;
case TIME:
- jdbcEntryData.add(new JdbcEntryDatum(colName, new Time((long) val)));
+ jdbcEntryData.add(new JdbcEntryDatum(jdbcColName, new Time((long) val)));
continue;
case TIMESTAMP:
- jdbcEntryData.add(new JdbcEntryDatum(colName, new Timestamp((long) val)));
+ jdbcEntryData.add(new JdbcEntryDatum(jdbcColName, new Timestamp((long) val)));
continue;
default:
throw new DataConversionException(jdbcType + " is not supported");
@@ -332,6 +353,23 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema,
return new SingleRecordIterable<>(converted);
}
+ private Object avroRecordValueGet(GenericRecord record, Iterator<String> recordNameIterator) {
+ String name = recordNameIterator.next();
+ Object val = record.get(name);
+ if (val == null) {
+ //Either leaf value is null or nested Record (represented as UNION) is null
+ return null;
+ }
+ if (!recordNameIterator.hasNext()) {
+ //Leaf
+ return val;
+ }
+
+ //Recurse
+ return avroRecordValueGet((GenericRecord) val, recordNameIterator);
+ }
+
+ @Override
public ConverterInitializer getInitializer(State state, WorkUnitStream workUnits, int branches, int branchId) {
JdbcWriterCommandsFactory factory = new JdbcWriterCommandsFactory();
if (workUnits.isSafeToMaterialize()) {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/converter/jdbc/AvroToJdbcEntryConverterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/converter/jdbc/AvroToJdbcEntryConverterTest.java b/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/converter/jdbc/AvroToJdbcEntryConverterTest.java
index de1f0a3..a835d89 100644
--- a/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/converter/jdbc/AvroToJdbcEntryConverterTest.java
+++ b/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/converter/jdbc/AvroToJdbcEntryConverterTest.java
@@ -18,29 +18,50 @@
package org.apache.gobblin.converter.jdbc;
import static org.mockito.Mockito.*;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.DataConversionException;
import org.apache.gobblin.converter.SchemaConversionException;
import org.apache.gobblin.publisher.JdbcPublisher;
import org.apache.gobblin.writer.Destination.DestinationType;
import org.apache.gobblin.writer.commands.JdbcWriterCommands;
import org.apache.gobblin.writer.commands.JdbcWriterCommandsFactory;
+import java.io.File;
import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.reflect.Type;
+import java.net.URISyntaxException;
import java.sql.Connection;
+import java.sql.Date;
import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
import org.testng.Assert;
import org.testng.annotations.Test;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
@Test(groups = {"gobblin.converter"})
public class AvroToJdbcEntryConverterTest {
@@ -130,4 +151,104 @@ public class AvroToJdbcEntryConverterTest {
Assert.assertEquals(expected, actual);
}
+
+ @Test
+ public void testFlattening() throws IOException, SchemaConversionException, SQLException, URISyntaxException, DataConversionException {
+ final String db = "db";
+ final String table = "users";
+ Map<String, JdbcType> dateColums = new HashMap<>();
+ dateColums.put("date_of_birth", JdbcType.DATE);
+ dateColums.put("last_modified", JdbcType.TIME);
+ dateColums.put("created", JdbcType.TIMESTAMP);
+
+ JdbcWriterCommands mockWriterCommands = mock(JdbcWriterCommands.class);
+ when(mockWriterCommands.retrieveDateColumns(db, table)).thenReturn(dateColums);
+
+ JdbcWriterCommandsFactory factory = mock(JdbcWriterCommandsFactory.class);
+ when(factory.newInstance(any(State.class), any(Connection.class))).thenReturn(mockWriterCommands);
+
+ List<JdbcEntryMetaDatum> jdbcEntryMetaData = new ArrayList<>();
+ jdbcEntryMetaData.add(new JdbcEntryMetaDatum("name", JdbcType.VARCHAR));
+ jdbcEntryMetaData.add(new JdbcEntryMetaDatum("favorite_number", JdbcType.VARCHAR));
+ jdbcEntryMetaData.add(new JdbcEntryMetaDatum("favorite_color", JdbcType.VARCHAR));
+ jdbcEntryMetaData.add(new JdbcEntryMetaDatum("date_of_birth", JdbcType.DATE));
+ jdbcEntryMetaData.add(new JdbcEntryMetaDatum("last_modified", JdbcType.TIME));
+ jdbcEntryMetaData.add(new JdbcEntryMetaDatum("created", JdbcType.TIMESTAMP));
+ jdbcEntryMetaData.add(new JdbcEntryMetaDatum("nested1_nested1_string", JdbcType.VARCHAR));
+ jdbcEntryMetaData.add(new JdbcEntryMetaDatum("nested1_nested1_int", JdbcType.INTEGER));
+ jdbcEntryMetaData.add(new JdbcEntryMetaDatum("nested1_nested2_union_nested2_string", JdbcType.VARCHAR));
+ jdbcEntryMetaData.add(new JdbcEntryMetaDatum("nested1_nested2_union_nested2_int", JdbcType.INTEGER));
+ JdbcEntrySchema expected = new JdbcEntrySchema(jdbcEntryMetaData);
+
+ Schema inputSchema = new Schema.Parser().parse(getClass().getResourceAsStream("/converter/pickfields_nested_with_union.avsc"));
+ WorkUnitState workUnitState = new WorkUnitState();
+ workUnitState.appendToListProp(JdbcPublisher.JDBC_PUBLISHER_FINAL_TABLE_NAME, table);
+ AvroToJdbcEntryConverter converter = new AvroToJdbcEntryConverter(workUnitState);
+
+ Map<String, JdbcType> dateColumnMapping = Maps.newHashMap();
+ dateColumnMapping.put("date_of_birth", JdbcType.DATE);
+ dateColumnMapping.put("last_modified", JdbcType.TIME);
+ dateColumnMapping.put("created", JdbcType.TIMESTAMP);
+ workUnitState.appendToListProp(AvroToJdbcEntryConverter.CONVERTER_AVRO_JDBC_DATE_FIELDS,
+ new Gson().toJson(dateColumnMapping));
+
+ JdbcEntrySchema actualSchema = converter.convertSchema(inputSchema, workUnitState);
+ Assert.assertEquals(expected, actualSchema);
+
+ try (
+ DataFileReader<GenericRecord> srcDataFileReader =
+ new DataFileReader<GenericRecord>(new File(getClass().getResource(
+ "/converter/pickfields_nested_with_union.avro").toURI()), new GenericDatumReader<GenericRecord>(
+ inputSchema))) {
+
+ List<JdbcEntryData> entries = new ArrayList<>();
+ while (srcDataFileReader.hasNext()) {
+ JdbcEntryData actualData = converter.convertRecord(actualSchema, srcDataFileReader.next(), workUnitState).iterator().next();
+ entries.add(actualData);
+ }
+
+ final JsonSerializer<JdbcEntryDatum> datumSer = new JsonSerializer<JdbcEntryDatum>() {
+ @Override
+ public JsonElement serialize(JdbcEntryDatum datum, Type typeOfSrc, JsonSerializationContext context) {
+ JsonObject jso = new JsonObject();
+ if (datum.getVal() == null) {
+ jso.add(datum.getColumnName(), null);
+ return jso;
+ }
+
+ if (datum.getVal() instanceof Date) {
+ jso.addProperty(datum.getColumnName(), ((Date) datum.getVal()).getTime());
+ } else if (datum.getVal() instanceof Timestamp) {
+ jso.addProperty(datum.getColumnName(), ((Timestamp) datum.getVal()).getTime());
+ } else if (datum.getVal() instanceof Time) {
+ jso.addProperty(datum.getColumnName(), ((Time) datum.getVal()).getTime());
+ } else {
+ jso.addProperty(datum.getColumnName(), datum.getVal().toString());
+ }
+ return jso;
+ }
+ };
+
+ JsonSerializer<JdbcEntryData> serializer = new JsonSerializer<JdbcEntryData>() {
+ @Override
+ public JsonElement serialize(JdbcEntryData src, Type typeOfSrc, JsonSerializationContext context) {
+ JsonArray arr = new JsonArray();
+ for (JdbcEntryDatum datum : src) {
+ arr.add(datumSer.serialize(datum, datum.getClass(), context));
+ }
+ return arr;
+ }
+ };
+
+ Gson gson = new GsonBuilder().registerTypeAdapter(JdbcEntryData.class, serializer).serializeNulls().create();
+
+ JsonElement actualSerialized = gson.toJsonTree(entries);
+ JsonElement expectedSerialized =
+ new JsonParser().parse(new InputStreamReader(getClass().getResourceAsStream("/converter/pickfields_nested_with_union.json")));
+
+ Assert.assertEquals(actualSerialized, expectedSerialized);
+ }
+
+ converter.close();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.avro
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.avro b/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.avro
new file mode 100644
index 0000000..b6e8d63
Binary files /dev/null and b/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.avro differ
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.avsc
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.avsc b/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.avsc
new file mode 100644
index 0000000..bbe402d
--- /dev/null
+++ b/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.avsc
@@ -0,0 +1,33 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favorite_number", "type": ["int", "null"]},
+ {"name": "favorite_color", "type": ["string", "null"]},
+ {"name": "date_of_birth", "type": "long"},
+ {"name": "last_modified", "type": "long"},
+ {"name": "created", "type": "long"},
+ {"name": "nested1",
+ "type" : {
+ "type": "record",
+ "name": "dummy_nested1",
+ "fields": [
+ {"name": "nested1_string", "type": "string"},
+ {"name": "nested1_int", "type": ["int", "null"]},
+ {"name": "nested2_union", "type": ["null", {
+ "type" : "record",
+ "name" : "dummy_nested2",
+ "fields": [
+ {"name": "nested2_string", "type": "string"},
+ {"name": "nested2_int", "type": ["int", "null"]}
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ }
+ ]
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.json
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.json b/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.json
new file mode 100644
index 0000000..42db8d6
--- /dev/null
+++ b/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.json
@@ -0,0 +1,194 @@
+[
+ [
+ {
+ "created":-8205952289873597770
+ },
+ {
+ "date_of_birth":4924540963523509391
+ },
+ {
+ "favorite_color":"�hxqpwvxrf"
+ },
+ {
+ "favorite_number":"826032918"
+ },
+ {
+ "last_modified":5532738755424028468
+ },
+ {
+ "name":"btnzlrfptk"
+ },
+ {
+ "nested1_nested1_int":"-36788251"
+ },
+ {
+ "nested1_nested1_string":"gji\u001a\u0000sujkj"
+ },
+ {
+ "nested1_nested2_union_nested2_int":"1026040670"
+ },
+ {
+ "nested1_nested2_union_nested2_string":"yobzdadkgk"
+ }
+ ],
+ [
+ {
+ "created":-1393624224378683129
+ },
+ {
+ "date_of_birth":-1175814878817216371
+ },
+ {
+ "favorite_color":"x\n\tjhrpgyd"
+ },
+ {
+ "favorite_number":"1816171539"
+ },
+ {
+ "last_modified":8404219756951923781
+ },
+ {
+ "name":"\u000f\rpkgrlgio"
+ },
+ {
+ "nested1_nested1_int":"-50507635"
+ },
+ {
+ "nested1_nested1_string":"q\u000f?uspbscf"
+ },
+ {
+ "nested1_nested2_union_nested2_int":null
+ },
+ {
+ "nested1_nested2_union_nested2_string":null
+ }
+ ],
+ [
+ {
+ "created":-7739579554682470032
+ },
+ {
+ "date_of_birth":-607816151590576707
+ },
+ {
+ "favorite_color":"lyuuuympyg"
+ },
+ {
+ "favorite_number":"1866476467"
+ },
+ {
+ "last_modified":-941580389512399179
+ },
+ {
+ "name":"g?pbkpjrxh"
+ },
+ {
+ "nested1_nested1_int":"1327904823"
+ },
+ {
+ "nested1_nested1_string":"kxqmrenntu"
+ },
+ {
+ "nested1_nested2_union_nested2_int":null
+ },
+ {
+ "nested1_nested2_union_nested2_string":null
+ }
+ ],
+ [
+ {
+ "created":-6056615843637407776
+ },
+ {
+ "date_of_birth":-1429852167829293715
+ },
+ {
+ "favorite_color":"�dgasutgtx"
+ },
+ {
+ "favorite_number":"-1553608691"
+ },
+ {
+ "last_modified":450932180461066816
+ },
+ {
+ "name":"gqmcmimbhp"
+ },
+ {
+ "nested1_nested1_int":"-351781782"
+ },
+ {
+ "nested1_nested1_string":"o\u001ac\u0000bmefwh"
+ },
+ {
+ "nested1_nested2_union_nested2_int":"-1596923241"
+ },
+ {
+ "nested1_nested2_union_nested2_string":"dbcczapozw"
+ }
+ ],
+ [
+ {
+ "created":-4666560421015124885
+ },
+ {
+ "date_of_birth":-8070729844977385755
+ },
+ {
+ "favorite_color":"pfzharskmy"
+ },
+ {
+ "favorite_number":"-170051651"
+ },
+ {
+ "last_modified":-7703151747036814734
+ },
+ {
+ "name":"f\u000fwszbxhzm"
+ },
+ {
+ "nested1_nested1_int":"-1126087353"
+ },
+ {
+ "nested1_nested1_string":"mjwmnevxer"
+ },
+ {
+ "nested1_nested2_union_nested2_int":"-1722304492"
+ },
+ {
+ "nested1_nested2_union_nested2_string":"h\rwdawizsu"
+ }
+ ],
+ [
+ {
+ "created":6548727010966246856
+ },
+ {
+ "date_of_birth":8554093846897734514
+ },
+ {
+ "favorite_color":"\u000fcgsqjdabu"
+ },
+ {
+ "favorite_number":"-2132346518"
+ },
+ {
+ "last_modified":3298280474340398482
+ },
+ {
+ "name":"k\n\tngmvhpe"
+ },
+ {
+ "nested1_nested1_int":"-1330607161"
+ },
+ {
+ "nested1_nested1_string":"ubbhpssdeh"
+ },
+ {
+ "nested1_nested2_union_nested2_int":"992907867"
+ },
+ {
+ "nested1_nested2_union_nested2_string":"jk?jknvxkw"
+ }
+ ]
+]
\ No newline at end of file