You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/01/16 05:50:35 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1024]
Supporting Avro logical type recognition in Avro-to-ORC transformation
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 8df87fa [GOBBLIN-1024] Supporting Avro logical type recognition in Avro-to-ORC transformation
8df87fa is described below
commit 8df87fa2bbbee452c5cd8bc6a8c6140945f5a696
Author: autumnust <le...@linkedin.com>
AuthorDate: Wed Jan 15 21:50:27 2020 -0800
[GOBBLIN-1024] Supporting Avro logical type recognition in Avro-to-ORC transformation
Closes #2867 from
autumnust/supportAvroLogicalTypes
---
.../converter/filter/AvroSchemaFieldRemover.java | 6 +-
.../hive/query/HiveAvroORCQueryGenerator.java | 80 +++++++++++++++++++++-
.../hive/util/HiveAvroORCQueryGeneratorTest.java | 29 ++++++++
.../schemaWithLogicalField.ddl | 17 +++++
.../schemaWithLogicalField.json | 47 +++++++++++++
.../java/org/apache/gobblin/util/AvroUtils.java | 11 +++
.../apache/gobblin/util/HiveAvroTypeConstants.java | 10 +++
7 files changed, 195 insertions(+), 5 deletions(-)
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemover.java b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemover.java
index 22f4d80..87d4101 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemover.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemover.java
@@ -29,6 +29,8 @@ import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import static org.apache.gobblin.util.AvroUtils.convertFieldToSchemaWithProps;
+
/**
* A class that removes specific fields from a (possibly recursive) Avro schema.
@@ -106,9 +108,7 @@ public class AvroSchemaFieldRemover {
private Schema removeFieldsFromRecords(Schema schema, Map<String, Schema> schemaMap) {
Schema newRecord = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), schema.isError());
- for (Map.Entry<String, JsonNode> stringJsonNodeEntry : schema.getJsonProps().entrySet()) {
- newRecord.addProp(stringJsonNodeEntry.getKey(), stringJsonNodeEntry.getValue());
- }
+ convertFieldToSchemaWithProps(schema.getJsonProps(), newRecord);
// Put an incomplete schema into schemaMap to avoid re-processing a recursive field.
// The fields in the incomplete schema will be populated once the current schema is completely processed.
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
index 9ebf59f..012863d 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
@@ -24,6 +24,8 @@ import java.util.Map;
import java.util.Properties;
import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
@@ -35,6 +37,8 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
@@ -57,6 +61,7 @@ import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import static org.apache.gobblin.data.management.conversion.hive.entities.StageableTableMetadata.SCHEMA_SOURCE_OF_TRUTH;
+import static org.apache.gobblin.util.AvroUtils.convertFieldToSchemaWithProps;
import static org.apache.gobblin.util.AvroUtils.sanitizeSchemaString;
@@ -394,6 +399,7 @@ public class HiveAvroORCQueryGenerator {
} else {
columns.append(", \n");
}
+ convertFieldToSchemaWithProps(field.getJsonProps(), field.schema());
String type = generateAvroToHiveColumnMapping(field.schema(), hiveColumns, false, datasetName);
if (hiveColumns.isPresent()) {
hiveColumns.get().put(field.name(), type);
@@ -402,7 +408,8 @@ public class HiveAvroORCQueryGenerator {
if (StringUtils.isBlank(flattenSource)) {
flattenSource = field.name();
}
- columns.append(String.format(" `%s` %s COMMENT 'from flatten_source %s'", field.name(), type,flattenSource));
+ columns.append(
+ String.format(" `%s` %s COMMENT 'from flatten_source %s'", field.name(), type, flattenSource));
}
} else {
columns.append(HiveAvroTypeConstants.AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType())).append("<");
@@ -412,6 +419,7 @@ public class HiveAvroORCQueryGenerator {
} else {
columns.append(",");
}
+ convertFieldToSchemaWithProps(field.getJsonProps(), field.schema());
String type = generateAvroToHiveColumnMapping(field.schema(), hiveColumns, false, datasetName);
columns.append("`").append(field.name()).append("`").append(":").append(type);
}
@@ -462,7 +470,48 @@ public class HiveAvroORCQueryGenerator {
case LONG:
case STRING:
case BOOLEAN:
- columns.append(HiveAvroTypeConstants.AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType()));
+ // Handling Avro Logical Types which should always sit in leaf-level.
+ boolean isLogicalTypeSet = false;
+ try {
+ String hiveSpecificLogicalType = generateHiveSpecificLogicalType(schema);
+ if (StringUtils.isNoneEmpty(hiveSpecificLogicalType)) {
+ isLogicalTypeSet = true;
+ columns.append(hiveSpecificLogicalType);
+ break;
+ }
+ } catch (AvroSerdeException ae) {
+ log.error("Failed to generate logical type string for field" + schema.getName() + " due to:", ae);
+ }
+
+ LogicalType logicalType = LogicalTypes.fromSchemaIgnoreInvalid(schema);
+ if (logicalType != null) {
+ switch (logicalType.getName().toLowerCase()) {
+ case HiveAvroTypeConstants.DATE:
+ LogicalTypes.Date dateType = (LogicalTypes.Date) logicalType;
+ dateType.validate(schema);
+ columns.append("date");
+ isLogicalTypeSet = true;
+ break;
+ case HiveAvroTypeConstants.DECIMAL:
+ LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType;
+ decimalType.validate(schema);
+ columns.append(String.format("decimal(%s, %s)", decimalType.getPrecision(), decimalType.getScale()));
+ isLogicalTypeSet = true;
+ break;
+ case HiveAvroTypeConstants.TIME_MILLIS:
+ LogicalTypes.TimeMillis timeMillsType = (LogicalTypes.TimeMillis) logicalType;
+ timeMillsType.validate(schema);
+ columns.append("timestamp");
+ isLogicalTypeSet = true;
+ break;
+ default:
+ log.error("Unsupported logical type" + schema.getLogicalType().getName() + ", fallback to physical type");
+ }
+ }
+
+ if (!isLogicalTypeSet) {
+ columns.append(HiveAvroTypeConstants.AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType()));
+ }
break;
default:
String exceptionMessage =
@@ -474,6 +523,33 @@ public class HiveAvroORCQueryGenerator {
return columns.toString();
}
+ /**
+ * Referencing org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo#generateTypeInfo(org.apache.avro.Schema) on
+ * how to deal with logical types that supported by Hive but not by Avro(e.g. VARCHAR).
+ *
+ * If unsupported logical types found, return empty string as a result.
+ * @param schema Avro schema
+ * @return
+ * @throws AvroSerdeException
+ */
+ public static String generateHiveSpecificLogicalType(Schema schema) throws AvroSerdeException {
+ // For bytes type, it can be mapped to decimal.
+ Schema.Type type = schema.getType();
+
+ if (type == Schema.Type.STRING && AvroSerDe.VARCHAR_TYPE_NAME
+ .equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) {
+ int maxLength = 0;
+ try {
+ maxLength = schema.getJsonProp(AvroSerDe.AVRO_PROP_MAX_LENGTH).getValueAsInt();
+ } catch (Exception ex) {
+ throw new AvroSerdeException("Failed to obtain maxLength value from file schema: " + schema, ex);
+ }
+ return String.format("varchar(%s)", maxLength);
+ } else {
+ return StringUtils.EMPTY;
+ }
+ }
+
/***
* Use destination table schema to generate column mapping
* @param hiveColumns Optional Map to populate with the generated hive columns for reference of caller
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/util/HiveAvroORCQueryGeneratorTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/util/HiveAvroORCQueryGeneratorTest.java
index fadff0f..c11acbf 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/util/HiveAvroORCQueryGeneratorTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/util/HiveAvroORCQueryGeneratorTest.java
@@ -45,6 +45,35 @@ public class HiveAvroORCQueryGeneratorTest {
private static boolean isEvolutionEnabled = true;
private static Optional<Integer> rowLimit = Optional.absent();
+ /**
+ * Testing DDL generation for schema containing logical types.
+ * DDL comparison doesn't include any spacing and blank.
+ * @throws Exception
+ */
+ public void testLogicalTypeResolutionWithDDL() throws Exception {
+ String schemaName = "schemaWithLogicalFieldDDL";
+ Schema schema = ConversionHiveTestUtils.readSchemaFromJsonFile(resourceDir,
+ "schemaWithLogicalField.json");
+
+ String q = HiveAvroORCQueryGenerator
+ .generateCreateTableDDL(schema, schemaName, "file:/user/hive/warehouse/" + schemaName,
+ Optional.<String>absent(), Optional.<Map<String, String>>absent(), Optional.<List<String>>absent(),
+ Optional.<Map<String, HiveAvroORCQueryGenerator.COLUMN_SORT_ORDER>>absent(), Optional.<Integer>absent(),
+ Optional.<String>absent(), Optional.<String>absent(), Optional.<String>absent(),
+ null, isEvolutionEnabled, true, destinationTableMeta,
+ new HashMap<String, String>());
+
+ /**
+ * This unit has a known flaw: Due to the fact that hive-1.0.1 does not support "Date" as the logical type,
+ * the "date" type is not being recognized by Hive's library when translating Avro schema to
+ * TypeInfo( An TypeDescription equivalent). Therefore in schemaWithLogicalField.ddl, for the `nestedLogicalFieldDate`
+ * value in `columns.types` as part of tableProperties, we will use "int" --- the physical type of date instead of "date"
+ */
+ Assert.assertEquals(q.trim().replaceAll("\\s+",""),
+ ConversionHiveTestUtils.readQueryFromFile(resourceDir, "schemaWithLogicalField.ddl").trim().replaceAll("\\s+",""));
+ }
+
+
/***
* Test DDL generation for schema structured as: Array within record within array within record
* @throws IOException
diff --git a/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/schemaWithLogicalField.ddl b/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/schemaWithLogicalField.ddl
new file mode 100644
index 0000000..54ddf9d
--- /dev/null
+++ b/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/schemaWithLogicalField.ddl
@@ -0,0 +1,17 @@
+CREATE EXTERNAL TABLE IF NOT EXISTS `default`.`schemaWithLogicalFieldDDL` (
+ `parentFieldRecord` struct<`nestedFieldString`:string,`nestedLogicalFieldDecimal`:decimal(4, 2),`nestedLogicalFieldDate`:date> COMMENT 'from flatten_source parentFieldRecord',
+ `parentFieldInt` int COMMENT 'from flatten_source parentFieldInt',
+ `parentFieldLogicalVarchar` varchar(256) COMMENT 'from flatten_source parentFieldLogicalVarchar')
+ROW FORMAT SERDE
+ 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
+STORED AS INPUTFORMAT
+ 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
+OUTPUTFORMAT
+ 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
+LOCATION
+ 'file:/user/hive/warehouse/schemaWithLogicalFieldDDL'
+TBLPROPERTIES (
+ 'columns'='parentFieldRecord,parentFieldInt,parentFieldLogicalVarchar',
+ 'orc.compress'='ZLIB',
+ 'columns.types'='struct<nestedFieldString:string,nestedLogicalFieldDecimal:decimal(4,2),nestedLogicalFieldDate:int>,int,varchar(256)',
+ 'orc.row.index.stride'='268435456')
\ No newline at end of file
diff --git a/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/schemaWithLogicalField.json b/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/schemaWithLogicalField.json
new file mode 100644
index 0000000..7ec9b1a
--- /dev/null
+++ b/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/schemaWithLogicalField.json
@@ -0,0 +1,47 @@
+{
+ "type": "record",
+ "name": "SchemaWithLogicalType",
+ "fields": [
+ {
+ "name": "parentFieldRecord",
+ "type": {
+ "type": "record",
+ "name": "nestedRecordName",
+ "fields": [
+ {
+ "name": "nestedFieldString",
+ "type": "string"
+ },
+ {
+ "name": "nestedLogicalFieldDecimal",
+ "type": {
+ "type": "bytes",
+ "logicalType": "decimal",
+ "precision": 4,
+ "scale": 2
+ }
+ },
+ {
+ "name": "nestedLogicalFieldDate",
+ "type": {
+ "type": "int",
+ "logicalType": "date"
+ }
+ }
+ ]
+ }
+ },
+ {
+ "name": "parentFieldInt",
+ "type": "int"
+ },
+ {
+ "name": "parentFieldLogicalVarchar",
+ "type": {
+ "type": "string",
+ "logicalType": "varchar",
+ "maxLength": "256"
+ }
+ }
+ ]
+}
\ No newline at end of file
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index fceaaa6..78b6152 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -126,6 +126,17 @@ public class AvroUtils {
.collect(Collectors.toList());
}
+ /**
+ * Generate a {@link Schema} object from {@link Schema.Field} with Field's properties carried over to the new object.
+ * Common use cases for this method is in traversing {@link Schema} object into nested level and create {@link Schema}
+ * object for non-root level.
+ */
+ public static void convertFieldToSchemaWithProps(Map<String,JsonNode> fieldProps, Schema targetSchemaObj) {
+ for (Map.Entry<String, JsonNode> stringJsonNodeEntry : fieldProps.entrySet()) {
+ targetSchemaObj.addProp(stringJsonNodeEntry.getKey(), stringJsonNodeEntry.getValue());
+ }
+ }
+
public static class AvroPathFilter implements PathFilter {
@Override
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/HiveAvroTypeConstants.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/HiveAvroTypeConstants.java
index 8a86840..0456678 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/HiveAvroTypeConstants.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/HiveAvroTypeConstants.java
@@ -66,4 +66,14 @@ public class HiveAvroTypeConstants {
.put("date", ImmutableSet.<String>builder().add("string", "varchar").build())
.put("binary", Sets.<String>newHashSet())
.put("boolean", Sets.<String>newHashSet()).build();
+
+ /**
+ * Following are supported Avro logical types where they would be mapped to corresponding Hive types:
+ * Decimal -> "decimal"
+ * Date -> "date"
+ * TIME_MILLIS = "timestamp"
+ */
+ public static final String DECIMAL = "decimal";
+ public static final String DATE = "date";
+ public static final String TIME_MILLIS = "time-millis";
}