You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/11/01 21:26:25 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-941] Enhance
DDL to add column and column.types with case-preserving schema
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 94a508b [GOBBLIN-941] Enhance DDL to add column and column.types with case-preserving schema
94a508b is described below
commit 94a508b38ec8bd879614f2d9bf0eeb96513ca7cf
Author: Zihan Li <zi...@zihli-mn1.linkedin.biz>
AuthorDate: Fri Nov 1 14:26:17 2019 -0700
[GOBBLIN-941] Enhance DDL to add column and column.types with case-preserving schema
Closes #2791 from ZihanLi58/GOBBLIN-941
---
.../hive/converter/AbstractAvroToOrcConverter.java | 3 +++
.../HiveMaterializerFromEntityQueryGenerator.java | 9 ++++++++
.../hive/query/HiveAvroORCQueryGenerator.java | 18 ++++++++++++++-
.../conversion/hive/task/HiveConverterUtils.java | 27 ++++++++++++++++++++++
...yWithinRecordWithinArrayWithinRecord_nested.ddl | 2 ++
.../optionWithinOptionWithinRecord_nested.ddl | 2 ++
.../recordWithinOptionWithinRecord_nested.ddl | 2 ++
.../recordWithinRecordWithinRecord_flattened.ddl | 2 ++
.../recordWithinRecordWithinRecord_nested.ddl | 2 ++
.../avroToOrcQueryUtilsTest/testMultiPartition.ddl | 2 ++
.../source_schema_evolution_enabled.ddl | 2 ++
11 files changed, 70 insertions(+), 1 deletion(-)
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java
index 732e149..7de06c6 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
import org.apache.thrift.TException;
import com.google.common.annotations.VisibleForTesting;
@@ -85,6 +86,7 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem
* Subdirectory within destination ORC table directory to publish data
*/
private static final String PUBLISHED_TABLE_SUBDIRECTORY = "final";
+ public static final String OUTPUT_AVRO_SCHEMA_KEY = "output.avro.schema";
private static final String ORC_FORMAT = "orc";
@@ -326,6 +328,7 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem
conversionEntity.getQueries().add(String
.format("SET %s=%s", GOBBLIN_WORKUNIT_CREATE_TIME_KEY,
workUnit.getWorkunit().getProp(SlaEventKeys.ORIGIN_TS_IN_MILLI_SECS_KEY)));
+ workUnit.setProp(OUTPUT_AVRO_SCHEMA_KEY, outputAvroSchema.toString());
// Create DDL statement for table
Map<String, String> hiveColumns = new LinkedHashMap<>();
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerFromEntityQueryGenerator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerFromEntityQueryGenerator.java
index 872a3f4..87d78bd 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerFromEntityQueryGenerator.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerFromEntityQueryGenerator.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
+import org.apache.avro.Schema;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.converter.DataConversionException;
import org.apache.gobblin.data.management.conversion.hive.converter.AbstractAvroToOrcConverter;
@@ -118,11 +119,19 @@ public abstract class HiveMaterializerFromEntityQueryGenerator extends HiveMater
Map<String, String> publishDirectories = publishEntity.getPublishDirectories();
List<String> cleanupQueries = publishEntity.getCleanupQueries();
List<String> cleanupDirectories = publishEntity.getCleanupDirectories();
+ Optional<Schema> avroSchema = Optional.absent();
+ if(workUnitState.contains(AbstractAvroToOrcConverter.OUTPUT_AVRO_SCHEMA_KEY)) {
+ avroSchema = Optional.fromNullable(new Schema.Parser().parse(workUnitState.getProp(AbstractAvroToOrcConverter.OUTPUT_AVRO_SCHEMA_KEY)));
+ }
String createFinalTableDDL =
HiveConverterUtils.generateCreateDuplicateTableDDL(outputDatabaseName, stagingTableName, outputTableName,
outputDataLocation, Optional.of(outputDatabaseName));
publishQueries.add(createFinalTableDDL);
+ if(avroSchema.isPresent()) {
+ String alterSchemaDml = HiveConverterUtils.generateAlterTblPropsDML(outputTableName, Optional.of(outputDatabaseName), avroSchema.get());
+ publishQueries.add(alterSchemaDml);
+ }
log.debug("Create final table DDL:\n" + createFinalTableDDL);
if (!this.supportTargetPartitioning || partitionsDDLInfo.size() == 0) {
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
index 4fa2dff..7f8dacc 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.stream.Collectors;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
@@ -33,6 +34,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
@@ -203,6 +205,19 @@ public class HiveAvroORCQueryGenerator {
if (isEvolutionEnabled || !destinationTableMeta.isPresent()) {
log.info("Generating DDL using source schema");
ddl.append(generateAvroToHiveColumnMapping(schema, Optional.of(hiveColumns), true, dbName + "." + tblName));
+ try {
+ AvroObjectInspectorGenerator objectInspectorGenerator = new AvroObjectInspectorGenerator(schema);
+ String columns = Joiner.on(",").join(objectInspectorGenerator.getColumnNames());
+ String columnTypes = Joiner.on(",").join(
+ objectInspectorGenerator.getColumnTypes().stream().map(x -> x.getTypeName())
+ .collect(Collectors.toList()));
+ tableProperties.setProperty("columns", columns);
+ tableProperties.setProperty("columns.types", columnTypes);
+
+ } catch (Exception e) {
+ log.error("Cannot generate add partition DDL due to ", e);
+ throw new RuntimeException(e);
+ }
} else {
log.info("Generating DDL using destination schema");
ddl.append(generateDestinationToHiveColumnMapping(Optional.of(hiveColumns), destinationTableMeta.get()));
@@ -309,7 +324,7 @@ public class HiveAvroORCQueryGenerator {
private static Properties getTableProperties(Properties tableProperties) {
if (null == tableProperties || tableProperties.size() == 0) {
- return DEFAULT_TBL_PROPERTIES;
+ return (Properties) DEFAULT_TBL_PROPERTIES.clone();
}
for (String property : DEFAULT_TBL_PROPERTIES.stringPropertyNames()) {
@@ -366,6 +381,7 @@ public class HiveAvroORCQueryGenerator {
partitionLocation));
}
+
return ddls;
}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java
index bbad3a4..ad01b35 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java
@@ -25,6 +25,8 @@ import java.util.Properties;
import java.util.Random;
import static java.util.stream.Collectors.joining;
+import java.util.stream.Collectors;
+import org.apache.avro.Schema;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.ImmutablePair;
@@ -38,6 +40,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
import org.apache.thrift.TException;
import com.google.common.base.Joiner;
@@ -151,6 +154,30 @@ public class HiveConverterUtils {
dbName, tblName, inputDbName, inputTblName, tblLocation);
}
+ public static String generateAlterTblPropsDML(
+ String tableName,
+ Optional<String> optionalDbName,
+ Schema schema
+ ) {
+ Preconditions.checkArgument(StringUtils.isNotBlank(tableName));
+ Preconditions.checkArgument(schema != null);
+
+ String dbName = optionalDbName.isPresent() ? optionalDbName.get() : "default";
+ try {
+ AvroObjectInspectorGenerator objectInspectorGenerator = new AvroObjectInspectorGenerator(schema);
+ String columns = Joiner.on(",").join(objectInspectorGenerator.getColumnNames());
+ String columnTypes = Joiner.on(",").join(
+ objectInspectorGenerator.getColumnTypes().stream().map(x -> x.getTypeName())
+ .collect(Collectors.toList()));
+ String dml = String.format("ALTER TABLE `%s`.`%s` SET TBLPROPERTIES ('columns'='%s', 'columns.types'='%s')", dbName, tableName,
+ columns, columnTypes);
+ return dml;
+ } catch (Exception e) {
+ log.error("Cannot generate add partition DDL due to ", e);
+ throw new RuntimeException(e);
+ }
+ }
+
/**
* Generates a CTAS statement to dump the contents of a table / partition into a new table.
* @param outputDbAndTable output db and table where contents should be written.
diff --git a/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/arrayWithinRecordWithinArrayWithinRecord_nested.ddl b/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/arrayWithinRecordWithinArrayWithinRecord_nested.ddl
index b66e4f3..3328e0d 100644
--- a/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/arrayWithinRecordWithinArrayWithinRecord_nested.ddl
+++ b/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/arrayWithinRecordWithinArrayWithinRecord_nested.ddl
@@ -9,5 +9,7 @@ OUTPUTFORMAT
LOCATION
'file:/user/hive/warehouse/testArrayWithinRecordWithinArrayWithinRecordDDL'
TBLPROPERTIES (
+ 'columns'='parentRecordFieldName',
'orc.compress'='ZLIB',
+ 'columns.types'='array<struct<nestedRecordFieldName:array<string>>>',
'orc.row.index.stride'='268435456')
diff --git a/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/optionWithinOptionWithinRecord_nested.ddl b/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/optionWithinOptionWithinRecord_nested.ddl
index c0141d9..6ee68f0 100644
--- a/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/optionWithinOptionWithinRecord_nested.ddl
+++ b/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/optionWithinOptionWithinRecord_nested.ddl
@@ -10,5 +10,7 @@ OUTPUTFORMAT
LOCATION
'file:/user/hive/warehouse/testOptionWithinOptionWithinRecordDDL'
TBLPROPERTIES (
+ 'columns'='parentFieldUnion,parentFieldInt',
'orc.compress'='ZLIB',
+ 'columns.types'='struct<unionRecordMemberFieldUnion:struct<superNestedFieldString1:string,superNestedFieldString2:string>,unionRecordMemberFieldString:string>,int',
'orc.row.index.stride'='268435456')
diff --git a/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/recordWithinOptionWithinRecord_nested.ddl b/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/recordWithinOptionWithinRecord_nested.ddl
index da1f112..c4c8578 100644
--- a/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/recordWithinOptionWithinRecord_nested.ddl
+++ b/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/recordWithinOptionWithinRecord_nested.ddl
@@ -10,5 +10,7 @@ OUTPUTFORMAT
LOCATION
'file:/user/hive/warehouse/testRecordWithinOptionWithinRecordDDL'
TBLPROPERTIES (
+ 'columns'='parentFieldUnion,parentFieldInt',
'orc.compress'='ZLIB',
+ 'columns.types'='struct<unionRecordMemberFieldLong:bigint,unionRecordMemberFieldString:string>,int',
'orc.row.index.stride'='268435456')
\ No newline at end of file
diff --git a/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/recordWithinRecordWithinRecord_flattened.ddl b/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/recordWithinRecordWithinRecord_flattened.ddl
index 4429f7b..1baea1b 100644
--- a/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/recordWithinRecordWithinRecord_flattened.ddl
+++ b/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/recordWithinRecordWithinRecord_flattened.ddl
@@ -13,5 +13,7 @@ OUTPUTFORMAT
LOCATION
'file:/user/hive/warehouse/testRecordWithinRecordWithinRecordDDL'
TBLPROPERTIES (
+ 'columns'='parentFieldRecord__nestedFieldRecord__superNestedFieldString,parentFieldRecord__nestedFieldRecord__superNestedFieldInt,parentFieldRecord__nestedFieldString,parentFieldRecord__nestedFieldInt,parentFieldInt',
'orc.compress'='ZLIB',
+ 'columns.types'='string,int,string,int,int',
'orc.row.index.stride'='268435456')
diff --git a/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/recordWithinRecordWithinRecord_nested.ddl b/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/recordWithinRecordWithinRecord_nested.ddl
index b963e5c..8409207 100644
--- a/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/recordWithinRecordWithinRecord_nested.ddl
+++ b/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/recordWithinRecordWithinRecord_nested.ddl
@@ -10,5 +10,7 @@ OUTPUTFORMAT
LOCATION
'file:/user/hive/warehouse/testRecordWithinRecordWithinRecordDDL'
TBLPROPERTIES (
+ 'columns'='parentFieldRecord,parentFieldInt',
'orc.compress'='ZLIB',
+ 'columns.types'='struct<nestedFieldRecord:struct<superNestedFieldString:string,superNestedFieldInt:int>,nestedFieldString:string,nestedFieldInt:int>,int',
'orc.row.index.stride'='268435456')
\ No newline at end of file
diff --git a/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/testMultiPartition.ddl b/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/testMultiPartition.ddl
index 731e02a..0213c0f 100644
--- a/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/testMultiPartition.ddl
+++ b/gobblin-data-management/src/test/resources/avroToOrcQueryUtilsTest/testMultiPartition.ddl
@@ -14,5 +14,7 @@ OUTPUTFORMAT
LOCATION
'file:/user/hive/warehouse/testMultiPartitionDDL'
TBLPROPERTIES (
+ 'columns'='parentFieldRecord__nestedFieldRecord__superNestedFieldString,parentFieldRecord__nestedFieldRecord__superNestedFieldInt,parentFieldRecord__nestedFieldString,parentFieldRecord__nestedFieldInt,parentFieldInt',
'orc.compress'='ZLIB',
+ 'columns.types'='string,int,string,int,int',
'orc.row.index.stride'='268435456')
diff --git a/gobblin-data-management/src/test/resources/avroToOrcSchemaEvolutionTest/source_schema_evolution_enabled.ddl b/gobblin-data-management/src/test/resources/avroToOrcSchemaEvolutionTest/source_schema_evolution_enabled.ddl
index 918315e..72f1a4c 100644
--- a/gobblin-data-management/src/test/resources/avroToOrcSchemaEvolutionTest/source_schema_evolution_enabled.ddl
+++ b/gobblin-data-management/src/test/resources/avroToOrcSchemaEvolutionTest/source_schema_evolution_enabled.ddl
@@ -13,5 +13,7 @@ OUTPUTFORMAT
LOCATION
'file:/user/hive/warehouse/sourceSchema'
TBLPROPERTIES (
+ 'columns'='parentFieldRecord__nestedFieldRecord__superNestedFieldString,parentFieldRecord__nestedFieldRecord__superNestedFieldInt,parentFieldRecord__nestedFieldString,parentFieldRecord__nestedFieldInt,parentFieldInt',
'orc.compress'='ZLIB',
+ 'columns.types'='string,int,string,int,int',
'orc.row.index.stride'='268435456')