You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/01/16 05:50:35 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1024] Supporting Avro logical type recognition in Avro-to-ORC transformation

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8df87fa  [GOBBLIN-1024] Supporting Avro logical type recognition in Avro-to-ORC transformation
8df87fa is described below

commit 8df87fa2bbbee452c5cd8bc6a8c6140945f5a696
Author: autumnust <le...@linkedin.com>
AuthorDate: Wed Jan 15 21:50:27 2020 -0800

    [GOBBLIN-1024] Supporting Avro logical type recognition in Avro-to-ORC transformation
    
    Closes #2867 from
    autumnust/supportAvroLogicalTypes
---
 .../converter/filter/AvroSchemaFieldRemover.java   |  6 +-
 .../hive/query/HiveAvroORCQueryGenerator.java      | 80 +++++++++++++++++++++-
 .../hive/util/HiveAvroORCQueryGeneratorTest.java   | 29 ++++++++
 .../schemaWithLogicalField.ddl                     | 17 +++++
 .../schemaWithLogicalField.json                    | 47 +++++++++++++
 .../java/org/apache/gobblin/util/AvroUtils.java    | 11 +++
 .../apache/gobblin/util/HiveAvroTypeConstants.java | 10 +++
 7 files changed, 195 insertions(+), 5 deletions(-)

diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemover.java b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemover.java
index 22f4d80..87d4101 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemover.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemover.java
@@ -29,6 +29,8 @@ import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import static org.apache.gobblin.util.AvroUtils.convertFieldToSchemaWithProps;
+
 
 /**
  * A class that removes specific fields from a (possibly recursive) Avro schema.
@@ -106,9 +108,7 @@ public class AvroSchemaFieldRemover {
   private Schema removeFieldsFromRecords(Schema schema, Map<String, Schema> schemaMap) {
 
     Schema newRecord = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), schema.isError());
-    for (Map.Entry<String, JsonNode> stringJsonNodeEntry : schema.getJsonProps().entrySet()) {
-      newRecord.addProp(stringJsonNodeEntry.getKey(), stringJsonNodeEntry.getValue());
-    }
+    convertFieldToSchemaWithProps(schema.getJsonProps(), newRecord);
 
     // Put an incomplete schema into schemaMap to avoid re-processing a recursive field.
     // The fields in the incomplete schema will be populated once the current schema is completely processed.
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
index 9ebf59f..012863d 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
@@ -24,6 +24,8 @@ import java.util.Map;
 import java.util.Properties;
 
 import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
@@ -35,6 +37,8 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
@@ -57,6 +61,7 @@ import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 
 import static org.apache.gobblin.data.management.conversion.hive.entities.StageableTableMetadata.SCHEMA_SOURCE_OF_TRUTH;
+import static org.apache.gobblin.util.AvroUtils.convertFieldToSchemaWithProps;
 import static org.apache.gobblin.util.AvroUtils.sanitizeSchemaString;
 
 
@@ -394,6 +399,7 @@ public class HiveAvroORCQueryGenerator {
             } else {
               columns.append(", \n");
             }
+            convertFieldToSchemaWithProps(field.getJsonProps(), field.schema());
             String type = generateAvroToHiveColumnMapping(field.schema(), hiveColumns, false, datasetName);
             if (hiveColumns.isPresent()) {
               hiveColumns.get().put(field.name(), type);
@@ -402,7 +408,8 @@ public class HiveAvroORCQueryGenerator {
             if (StringUtils.isBlank(flattenSource)) {
               flattenSource = field.name();
             }
-            columns.append(String.format("  `%s` %s COMMENT 'from flatten_source %s'", field.name(), type,flattenSource));
+            columns.append(
+                String.format("  `%s` %s COMMENT 'from flatten_source %s'", field.name(), type, flattenSource));
           }
         } else {
           columns.append(HiveAvroTypeConstants.AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType())).append("<");
@@ -412,6 +419,7 @@ public class HiveAvroORCQueryGenerator {
             } else {
               columns.append(",");
             }
+            convertFieldToSchemaWithProps(field.getJsonProps(), field.schema());
             String type = generateAvroToHiveColumnMapping(field.schema(), hiveColumns, false, datasetName);
             columns.append("`").append(field.name()).append("`").append(":").append(type);
           }
@@ -462,7 +470,48 @@ public class HiveAvroORCQueryGenerator {
       case LONG:
       case STRING:
       case BOOLEAN:
-        columns.append(HiveAvroTypeConstants.AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType()));
+        // Handling Avro Logical Types which should always sit in leaf-level.
+        boolean isLogicalTypeSet = false;
+        try {
+          String hiveSpecificLogicalType = generateHiveSpecificLogicalType(schema);
+          if (StringUtils.isNoneEmpty(hiveSpecificLogicalType)) {
+            isLogicalTypeSet = true;
+            columns.append(hiveSpecificLogicalType);
+            break;
+          }
+        } catch (AvroSerdeException ae) {
+          log.error("Failed to generate logical type string for field" + schema.getName() + " due to:", ae);
+        }
+
+        LogicalType logicalType = LogicalTypes.fromSchemaIgnoreInvalid(schema);
+        if (logicalType != null) {
+          switch (logicalType.getName().toLowerCase()) {
+            case HiveAvroTypeConstants.DATE:
+              LogicalTypes.Date dateType = (LogicalTypes.Date) logicalType;
+              dateType.validate(schema);
+              columns.append("date");
+              isLogicalTypeSet = true;
+              break;
+            case HiveAvroTypeConstants.DECIMAL:
+              LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType;
+              decimalType.validate(schema);
+              columns.append(String.format("decimal(%s, %s)", decimalType.getPrecision(), decimalType.getScale()));
+              isLogicalTypeSet = true;
+              break;
+            case HiveAvroTypeConstants.TIME_MILLIS:
+              LogicalTypes.TimeMillis timeMillsType = (LogicalTypes.TimeMillis) logicalType;
+              timeMillsType.validate(schema);
+              columns.append("timestamp");
+              isLogicalTypeSet = true;
+              break;
+            default:
+              log.error("Unsupported logical type" + schema.getLogicalType().getName() +  ", fallback to physical type");
+          }
+        }
+
+        if (!isLogicalTypeSet) {
+          columns.append(HiveAvroTypeConstants.AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType()));
+        }
         break;
       default:
         String exceptionMessage =
@@ -474,6 +523,33 @@ public class HiveAvroORCQueryGenerator {
     return columns.toString();
   }
 
+  /**
+   * Referencing org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo#generateTypeInfo(org.apache.avro.Schema) on
+   * how to deal with logical types that supported by Hive but not by Avro(e.g. VARCHAR).
+   *
+   * If unsupported logical types found, return empty string as a result.
+   * @param schema Avro schema
+   * @return
+   * @throws AvroSerdeException
+   */
+  public static String generateHiveSpecificLogicalType(Schema schema) throws AvroSerdeException {
+    // For bytes type, it can be mapped to decimal.
+    Schema.Type type = schema.getType();
+
+    if (type == Schema.Type.STRING && AvroSerDe.VARCHAR_TYPE_NAME
+        .equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) {
+      int maxLength = 0;
+      try {
+        maxLength = schema.getJsonProp(AvroSerDe.AVRO_PROP_MAX_LENGTH).getValueAsInt();
+      } catch (Exception ex) {
+        throw new AvroSerdeException("Failed to obtain maxLength value from file schema: " + schema, ex);
+      }
+      return String.format("varchar(%s)", maxLength);
+    } else {
+      return StringUtils.EMPTY;
+    }
+  }
+
   /***
    * Use destination table schema to generate column mapping
    * @param hiveColumns Optional Map to populate with the generated hive columns for reference of caller
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/util/HiveAvroORCQueryGeneratorTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/util/HiveAvroORCQueryGeneratorTest.java
index fadff0f..c11acbf 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/util/HiveAvroORCQueryGeneratorTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/util/HiveAvroORCQueryGeneratorTest.java
@@ -45,6 +45,35 @@ public class HiveAvroORCQueryGeneratorTest {
   private static boolean isEvolutionEnabled = true;
   private static Optional<Integer> rowLimit = Optional.absent();
 
+  /**
+   * Testing DDL generation for schema containing logical types.
+   * DDL comparison doesn't include any spacing and blank.
+   * @throws Exception
+   */
+  public void testLogicalTypeResolutionWithDDL() throws Exception {
+    String schemaName = "schemaWithLogicalFieldDDL";
+    Schema schema = ConversionHiveTestUtils.readSchemaFromJsonFile(resourceDir,
+        "schemaWithLogicalField.json");
+
+    String q = HiveAvroORCQueryGenerator
+        .generateCreateTableDDL(schema, schemaName, "file:/user/hive/warehouse/" + schemaName,
+            Optional.<String>absent(), Optional.<Map<String, String>>absent(), Optional.<List<String>>absent(),
+            Optional.<Map<String, HiveAvroORCQueryGenerator.COLUMN_SORT_ORDER>>absent(), Optional.<Integer>absent(),
+            Optional.<String>absent(), Optional.<String>absent(), Optional.<String>absent(),
+            null, isEvolutionEnabled, true, destinationTableMeta,
+            new HashMap<String, String>());
+
+    /**
+     * This unit has a known flaw: Due to the fact that hive-1.0.1 does not support "Date" as the logical type,
+     * the "date" type is not being recognized by Hive's library when translating Avro schema to
+     * TypeInfo( An TypeDescription equivalent). Therefore in schemaWithLogicalField.ddl, for the `nestedLogicalFieldDate`
+     * value in `columns.types` as part of tableProperties, we will use "int" --- the physical type of date instead of "date"
+     */
+    Assert.assertEquals(q.trim().replaceAll("\\s+",""),
+        ConversionHiveTestUtils.readQueryFromFile(resourceDir, "schemaWithLogicalField.ddl").trim().replaceAll("\\s+",""));
+  }
+
+
   /***
    * Test DDL generation for schema structured as: Array within record within array within record
    * @throws IOException
diff --git a/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/schemaWithLogicalField.ddl b/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/schemaWithLogicalField.ddl
new file mode 100644
index 0000000..54ddf9d
--- /dev/null
+++ b/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/schemaWithLogicalField.ddl
@@ -0,0 +1,17 @@
+CREATE EXTERNAL TABLE IF NOT EXISTS `default`.`schemaWithLogicalFieldDDL` (
+  `parentFieldRecord` struct<`nestedFieldString`:string,`nestedLogicalFieldDecimal`:decimal(4, 2),`nestedLogicalFieldDate`:date> COMMENT 'from flatten_source parentFieldRecord',
+  `parentFieldInt` int COMMENT 'from flatten_source parentFieldInt',
+  `parentFieldLogicalVarchar` varchar(256) COMMENT 'from flatten_source parentFieldLogicalVarchar')
+ROW FORMAT SERDE
+  'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
+STORED AS INPUTFORMAT
+  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
+OUTPUTFORMAT
+  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
+LOCATION
+  'file:/user/hive/warehouse/schemaWithLogicalFieldDDL'
+TBLPROPERTIES (
+  'columns'='parentFieldRecord,parentFieldInt,parentFieldLogicalVarchar',
+  'orc.compress'='ZLIB',
+  'columns.types'='struct<nestedFieldString:string,nestedLogicalFieldDecimal:decimal(4,2),nestedLogicalFieldDate:int>,int,varchar(256)',
+  'orc.row.index.stride'='268435456')
\ No newline at end of file
diff --git a/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/schemaWithLogicalField.json b/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/schemaWithLogicalField.json
new file mode 100644
index 0000000..7ec9b1a
--- /dev/null
+++ b/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/schemaWithLogicalField.json
@@ -0,0 +1,47 @@
+{
+  "type": "record",
+  "name": "SchemaWithLogicalType",
+  "fields": [
+    {
+      "name": "parentFieldRecord",
+      "type": {
+        "type": "record",
+        "name": "nestedRecordName",
+        "fields": [
+          {
+            "name": "nestedFieldString",
+            "type": "string"
+          },
+          {
+            "name": "nestedLogicalFieldDecimal",
+            "type": {
+              "type": "bytes",
+              "logicalType": "decimal",
+              "precision": 4,
+              "scale": 2
+            }
+          },
+          {
+            "name": "nestedLogicalFieldDate",
+            "type": {
+              "type": "int",
+              "logicalType": "date"
+            }
+          }
+        ]
+      }
+    },
+    {
+      "name": "parentFieldInt",
+      "type": "int"
+    },
+    {
+      "name": "parentFieldLogicalVarchar",
+      "type": {
+        "type": "string",
+        "logicalType": "varchar",
+        "maxLength": "256"
+      }
+    }
+  ]
+}
\ No newline at end of file
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index fceaaa6..78b6152 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -126,6 +126,17 @@ public class AvroUtils {
         .collect(Collectors.toList());
   }
 
+  /**
+   * Generate a {@link Schema} object from {@link Schema.Field} with Field's properties carried over to the new object.
+   * Common use cases for this method is in traversing {@link Schema} object into nested level and create {@link Schema}
+   * object for non-root level.
+   */
+  public static void convertFieldToSchemaWithProps(Map<String,JsonNode> fieldProps, Schema targetSchemaObj) {
+    for (Map.Entry<String, JsonNode> stringJsonNodeEntry : fieldProps.entrySet()) {
+      targetSchemaObj.addProp(stringJsonNodeEntry.getKey(), stringJsonNodeEntry.getValue());
+    }
+  }
+
 
   public static class AvroPathFilter implements PathFilter {
     @Override
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/HiveAvroTypeConstants.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/HiveAvroTypeConstants.java
index 8a86840..0456678 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/HiveAvroTypeConstants.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/HiveAvroTypeConstants.java
@@ -66,4 +66,14 @@ public class HiveAvroTypeConstants {
       .put("date",      ImmutableSet.<String>builder().add("string", "varchar").build())
       .put("binary",    Sets.<String>newHashSet())
       .put("boolean",    Sets.<String>newHashSet()).build();
+
+  /**
+   * Following are supported Avro logical types where they would be mapped to corresponding Hive types:
+   * Decimal -> "decimal"
+   * Date -> "date"
+   * TIME_MILLIS = "timestamp"
+   */
+  public static final String DECIMAL = "decimal";
+  public static final String DATE = "date";
+  public static final String TIME_MILLIS = "time-millis";
 }