You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/11/01 21:26:25 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-941] Enhance DDL to add column and column.types with case-preserving schema

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 94a508b  [GOBBLIN-941] Enhance DDL to add column and column.types with case-preserving schema
94a508b is described below

commit 94a508b38ec8bd879614f2d9bf0eeb96513ca7cf
Author: Zihan Li <zi...@zihli-mn1.linkedin.biz>
AuthorDate: Fri Nov 1 14:26:17 2019 -0700

    [GOBBLIN-941] Enhance DDL to add column and column.types with case-preserving schema
    
    Closes #2791 from ZihanLi58/GOBBLIN-941
---
 .../hive/converter/AbstractAvroToOrcConverter.java |  3 +++
 .../HiveMaterializerFromEntityQueryGenerator.java  |  9 ++++++++
 .../hive/query/HiveAvroORCQueryGenerator.java      | 18 ++++++++++++++-
 .../conversion/hive/task/HiveConverterUtils.java   | 27 ++++++++++++++++++++++
 ...yWithinRecordWithinArrayWithinRecord_nested.ddl |  2 ++
 .../optionWithinOptionWithinRecord_nested.ddl      |  2 ++
 .../recordWithinOptionWithinRecord_nested.ddl      |  2 ++
 .../recordWithinRecordWithinRecord_flattened.ddl   |  2 ++
 .../recordWithinRecordWithinRecord_nested.ddl      |  2 ++
 .../avroToOrcQueryUtilsTest/testMultiPartition.ddl |  2 ++
 .../source_schema_evolution_enabled.ddl            |  2 ++
 11 files changed, 70 insertions(+), 1 deletion(-)

diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java
index 732e149..7de06c6 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
 import org.apache.thrift.TException;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -85,6 +86,7 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem
    * Subdirectory within destination ORC table directory to publish data
    */
   private static final String PUBLISHED_TABLE_SUBDIRECTORY = "final";
+  public static final String OUTPUT_AVRO_SCHEMA_KEY = "output.avro.schema";
 
   private static final String ORC_FORMAT = "orc";
 
@@ -326,6 +328,7 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem
     conversionEntity.getQueries().add(String
         .format("SET %s=%s", GOBBLIN_WORKUNIT_CREATE_TIME_KEY,
             workUnit.getWorkunit().getProp(SlaEventKeys.ORIGIN_TS_IN_MILLI_SECS_KEY)));
+    workUnit.setProp(OUTPUT_AVRO_SCHEMA_KEY, outputAvroSchema.toString());
 
     // Create DDL statement for table
     Map<String, String> hiveColumns = new LinkedHashMap<>();
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerFromEntityQueryGenerator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerFromEntityQueryGenerator.java
index 872a3f4..87d78bd 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerFromEntityQueryGenerator.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerFromEntityQueryGenerator.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.avro.Schema;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.converter.DataConversionException;
 import org.apache.gobblin.data.management.conversion.hive.converter.AbstractAvroToOrcConverter;
@@ -118,11 +119,19 @@ public abstract class HiveMaterializerFromEntityQueryGenerator extends HiveMater
     Map<String, String> publishDirectories = publishEntity.getPublishDirectories();
     List<String> cleanupQueries = publishEntity.getCleanupQueries();
     List<String> cleanupDirectories = publishEntity.getCleanupDirectories();
+    Optional<Schema> avroSchema = Optional.absent();
+    if(workUnitState.contains(AbstractAvroToOrcConverter.OUTPUT_AVRO_SCHEMA_KEY)) {
+      avroSchema = Optional.fromNullable(new Schema.Parser().parse(workUnitState.getProp(AbstractAvroToOrcConverter.OUTPUT_AVRO_SCHEMA_KEY)));
+    }
 
     String createFinalTableDDL =
         HiveConverterUtils.generateCreateDuplicateTableDDL(outputDatabaseName, stagingTableName, outputTableName,
             outputDataLocation, Optional.of(outputDatabaseName));
     publishQueries.add(createFinalTableDDL);
+    if(avroSchema.isPresent()) {
+      String alterSchemaDml = HiveConverterUtils.generateAlterTblPropsDML(outputTableName, Optional.of(outputDatabaseName), avroSchema.get());
+      publishQueries.add(alterSchemaDml);
+    }
     log.debug("Create final table DDL:\n" + createFinalTableDDL);
 
     if (!this.supportTargetPartitioning || partitionsDDLInfo.size() == 0) {
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
index 4fa2dff..7f8dacc 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
+import java.util.stream.Collectors;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 
@@ -33,6 +34,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
@@ -203,6 +205,19 @@ public class HiveAvroORCQueryGenerator {
     if (isEvolutionEnabled || !destinationTableMeta.isPresent()) {
       log.info("Generating DDL using source schema");
       ddl.append(generateAvroToHiveColumnMapping(schema, Optional.of(hiveColumns), true, dbName + "." + tblName));
+      try {
+        AvroObjectInspectorGenerator objectInspectorGenerator = new AvroObjectInspectorGenerator(schema);
+        String columns = Joiner.on(",").join(objectInspectorGenerator.getColumnNames());
+        String columnTypes = Joiner.on(",").join(
+            objectInspectorGenerator.getColumnTypes().stream().map(x -> x.getTypeName())
+                .collect(Collectors.toList()));
+        tableProperties.setProperty("columns", columns);
+        tableProperties.setProperty("columns.types", columnTypes);
+
+      } catch (Exception e) {
+        log.error("Cannot generate add partition DDL due to ", e);
+        throw new RuntimeException(e);
+      }
     } else {
       log.info("Generating DDL using destination schema");
       ddl.append(generateDestinationToHiveColumnMapping(Optional.of(hiveColumns), destinationTableMeta.get()));
@@ -309,7 +324,7 @@ public class HiveAvroORCQueryGenerator {
 
   private static Properties getTableProperties(Properties tableProperties) {
     if (null == tableProperties || tableProperties.size() == 0) {
-      return DEFAULT_TBL_PROPERTIES;
+      return (Properties) DEFAULT_TBL_PROPERTIES.clone();
     }
 
     for (String property : DEFAULT_TBL_PROPERTIES.stringPropertyNames()) {
@@ -366,6 +381,7 @@ public class HiveAvroORCQueryGenerator {
           partitionLocation));
     }
 
+
     return ddls;
   }
 
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java
index bbad3a4..ad01b35 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java
@@ -25,6 +25,8 @@ import java.util.Properties;
 import java.util.Random;
 import static java.util.stream.Collectors.joining;
 
+import java.util.stream.Collectors;
+import org.apache.avro.Schema;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.lang3.tuple.ImmutablePair;
@@ -38,6 +40,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
 import org.apache.thrift.TException;
 
 import com.google.common.base.Joiner;
@@ -151,6 +154,30 @@ public class HiveConverterUtils {
         dbName, tblName, inputDbName, inputTblName, tblLocation);
   }
 
+  public static String generateAlterTblPropsDML(
+      String tableName,
+      Optional<String> optionalDbName,
+      Schema schema
+      ) {
+    Preconditions.checkArgument(StringUtils.isNotBlank(tableName));
+    Preconditions.checkArgument(schema != null);
+
+    String dbName = optionalDbName.isPresent() ? optionalDbName.get() : "default";
+    try {
+      AvroObjectInspectorGenerator objectInspectorGenerator = new AvroObjectInspectorGenerator(schema);
+      String columns = Joiner.on(",").join(objectInspectorGenerator.getColumnNames());
+      String columnTypes = Joiner.on(",").join(
+          objectInspectorGenerator.getColumnTypes().stream().map(x -> x.getTypeName())
+              .collect(Collectors.toList()));
+    String dml = String.format("ALTER TABLE `%s`.`%s` SET TBLPROPERTIES ('columns'='%s', 'columns.types'='%s')", dbName, tableName,
+        columns, columnTypes);
+    return dml;
+    } catch (Exception e) {
+      log.error("Cannot generate add partition DDL due to ", e);
+      throw new RuntimeException(e);
+    }
+  }
+
   /**
    * Generates a CTAS statement to dump the contents of a table / partition into a new table.
    * @param outputDbAndTable output db and table where contents should be written.
diff --git a/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/arrayWithinRecordWithinArrayWithinRecord_nested.ddl b/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/arrayWithinRecordWithinArrayWithinRecord_nested.ddl
index b66e4f3..3328e0d 100644
--- a/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/arrayWithinRecordWithinArrayWithinRecord_nested.ddl
+++ b/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/arrayWithinRecordWithinArrayWithinRecord_nested.ddl
@@ -9,5 +9,7 @@ OUTPUTFORMAT
 LOCATION 
   'file:/user/hive/warehouse/testArrayWithinRecordWithinArrayWithinRecordDDL' 
 TBLPROPERTIES ( 
+  'columns'='parentRecordFieldName', 
   'orc.compress'='ZLIB', 
+  'columns.types'='array<struct<nestedRecordFieldName:array<string>>>', 
   'orc.row.index.stride'='268435456') 
diff --git a/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/optionWithinOptionWithinRecord_nested.ddl b/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/optionWithinOptionWithinRecord_nested.ddl
index c0141d9..6ee68f0 100644
--- a/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/optionWithinOptionWithinRecord_nested.ddl
+++ b/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/optionWithinOptionWithinRecord_nested.ddl
@@ -10,5 +10,7 @@ OUTPUTFORMAT
 LOCATION 
   'file:/user/hive/warehouse/testOptionWithinOptionWithinRecordDDL' 
 TBLPROPERTIES ( 
+  'columns'='parentFieldUnion,parentFieldInt', 
   'orc.compress'='ZLIB', 
+  'columns.types'='struct<unionRecordMemberFieldUnion:struct<superNestedFieldString1:string,superNestedFieldString2:string>,unionRecordMemberFieldString:string>,int', 
   'orc.row.index.stride'='268435456') 
diff --git a/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/recordWithinOptionWithinRecord_nested.ddl b/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/recordWithinOptionWithinRecord_nested.ddl
index da1f112..c4c8578 100644
--- a/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/recordWithinOptionWithinRecord_nested.ddl
+++ b/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/recordWithinOptionWithinRecord_nested.ddl
@@ -10,5 +10,7 @@ OUTPUTFORMAT
 LOCATION 
   'file:/user/hive/warehouse/testRecordWithinOptionWithinRecordDDL' 
 TBLPROPERTIES ( 
+  'columns'='parentFieldUnion,parentFieldInt', 
   'orc.compress'='ZLIB', 
+  'columns.types'='struct<unionRecordMemberFieldLong:bigint,unionRecordMemberFieldString:string>,int', 
   'orc.row.index.stride'='268435456')
\ No newline at end of file
diff --git a/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/recordWithinRecordWithinRecord_flattened.ddl b/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/recordWithinRecordWithinRecord_flattened.ddl
index 4429f7b..1baea1b 100644
--- a/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/recordWithinRecordWithinRecord_flattened.ddl
+++ b/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/recordWithinRecordWithinRecord_flattened.ddl
@@ -13,5 +13,7 @@ OUTPUTFORMAT
 LOCATION 
   'file:/user/hive/warehouse/testRecordWithinRecordWithinRecordDDL' 
 TBLPROPERTIES ( 
+  'columns'='parentFieldRecord__nestedFieldRecord__superNestedFieldString,parentFieldRecord__nestedFieldRecord__superNestedFieldInt,parentFieldRecord__nestedFieldString,parentFieldRecord__nestedFieldInt,parentFieldInt', 
   'orc.compress'='ZLIB', 
+  'columns.types'='string,int,string,int,int', 
   'orc.row.index.stride'='268435456') 
diff --git a/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/recordWithinRecordWithinRecord_nested.ddl b/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/recordWithinRecordWithinRecord_nested.ddl
index b963e5c..8409207 100644
--- a/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/recordWithinRecordWithinRecord_nested.ddl
+++ b/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/recordWithinRecordWithinRecord_nested.ddl
@@ -10,5 +10,7 @@ OUTPUTFORMAT
 LOCATION 
   'file:/user/hive/warehouse/testRecordWithinRecordWithinRecordDDL' 
 TBLPROPERTIES ( 
+  'columns'='parentFieldRecord,parentFieldInt', 
   'orc.compress'='ZLIB', 
+  'columns.types'='struct<nestedFieldRecord:struct<superNestedFieldString:string,superNestedFieldInt:int>,nestedFieldString:string,nestedFieldInt:int>,int', 
   'orc.row.index.stride'='268435456')
\ No newline at end of file
diff --git a/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/testMultiPartition.ddl b/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/testMultiPartition.ddl
index 731e02a..0213c0f 100644
--- a/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/testMultiPartition.ddl
+++ b/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/testMultiPartition.ddl
@@ -14,5 +14,7 @@ OUTPUTFORMAT
 LOCATION 
   'file:/user/hive/warehouse/testMultiPartitionDDL' 
 TBLPROPERTIES ( 
+  'columns'='parentFieldRecord__nestedFieldRecord__superNestedFieldString,parentFieldRecord__nestedFieldRecord__superNestedFieldInt,parentFieldRecord__nestedFieldString,parentFieldRecord__nestedFieldInt,parentFieldInt', 
   'orc.compress'='ZLIB', 
+  'columns.types'='string,int,string,int,int', 
   'orc.row.index.stride'='268435456') 
diff --git a/gobblin-data-management/src/test/resources/avroToOrcSchemaEvolutionTest/source_schema_evolution_enabled.ddl b/gobblin-data-management/src/test/resources/avroToOrcSchemaEvolutionTest/source_schema_evolution_enabled.ddl
index 918315e..72f1a4c 100644
--- a/gobblin-data-management/src/test/resources/avroToOrcSchemaEvolutionTest/source_schema_evolution_enabled.ddl
+++ b/gobblin-data-management/src/test/resources/avroToOrcSchemaEvolutionTest/source_schema_evolution_enabled.ddl
@@ -13,5 +13,7 @@ OUTPUTFORMAT
 LOCATION 
   'file:/user/hive/warehouse/sourceSchema' 
 TBLPROPERTIES ( 
+  'columns'='parentFieldRecord__nestedFieldRecord__superNestedFieldString,parentFieldRecord__nestedFieldRecord__superNestedFieldInt,parentFieldRecord__nestedFieldString,parentFieldRecord__nestedFieldInt,parentFieldInt', 
   'orc.compress'='ZLIB', 
+  'columns.types'='string,int,string,int,int', 
   'orc.row.index.stride'='268435456')