You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/03/29 18:25:16 UTC
incubator-gobblin git commit: [GOBBLIN-399] Improved logs and error
messages for avro2orc conversion
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 21cc7c048 -> eda77bcb6
[GOBBLIN-399] Improved logs and error messages for avro2orc conversion
Closes #2323 from aditya1105/master
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/eda77bcb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/eda77bcb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/eda77bcb
Branch: refs/heads/master
Commit: eda77bcb6ae57fc1b38f7c70c2ac45995fde38b6
Parents: 21cc7c0
Author: Aditya Sharma <ad...@linkedin.com>
Authored: Thu Mar 29 11:25:10 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Mar 29 11:25:10 2018 -0700
----------------------------------------------------------------------
.../converter/AbstractAvroToOrcConverter.java | 6 ++-
.../hive/dataset/ConvertibleHiveDataset.java | 3 +-
.../hive/query/HiveAvroORCQueryGenerator.java | 48 ++++++++++++--------
.../hive/writer/HiveQueryExecutionWriter.java | 12 +++--
4 files changed, 45 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/eda77bcb/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java
index ed42946..732e149 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java
@@ -676,7 +676,7 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem
// Values for a partition are separated by ","
List<String> partitionValues = Splitter.on(",").omitEmptyStrings().trimResults().splitToList(partitionsInfoString);
- // Do not drop partition the being processed. Sometimes a partition may have replaced another partition of the same values.
+ // Do not drop the partition being processed. Sometimes a partition may have replaced another partition of the same values.
if (!partitionValues.equals(hivePartition.getValues())) {
ImmutableMap.Builder<String, String> partitionDDLInfoMap = ImmutableMap.builder();
for (int i = 0; i < partitionKeys.size(); i++) {
@@ -713,7 +713,9 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem
return Optional.of(qlPartition.getDataLocation());
}
} catch (IOException | TException | HiveException e) {
- throw new DataConversionException("Could not fetch destination table metadata", e);
+ throw new DataConversionException(
+ String.format("Could not fetch destination table %s.%s metadata", table.get().getDbName(),
+ table.get().getTableName()), e);
}
return Optional.<Path>absent();
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/eda77bcb/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java
index 933e86a..f4c8744 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java
@@ -92,9 +92,10 @@ public class ConvertibleHiveDataset extends HiveDataset {
super(fs, clientPool, table, jobProps, config);
Preconditions.checkArgument(config.hasPath(DESTINATION_CONVERSION_FORMATS_KEY), String.format(
- "Atleast one destination format should be specified at %s.%s. If you do not intend to convert this dataset set %s.%s to true",
+ "At least one destination format should be specified at %s.%s. If you do not intend to convert dataset %s set %s.%s to true",
super.properties.getProperty(HiveDatasetFinder.HIVE_DATASET_CONFIG_PREFIX_KEY, ""),
DESTINATION_CONVERSION_FORMATS_KEY,
+ table.getCompleteName(),
super.properties.getProperty(HiveDatasetFinder.HIVE_DATASET_CONFIG_PREFIX_KEY, ""),
HiveDatasetFinder.HIVE_DATASET_IS_BLACKLISTED_KEY));
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/eda77bcb/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
index d16df29..3f2206d 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
@@ -202,7 +202,7 @@ public class HiveAvroORCQueryGenerator {
// .. use columns from destination schema
if (isEvolutionEnabled || !destinationTableMeta.isPresent()) {
log.info("Generating DDL using source schema");
- ddl.append(generateAvroToHiveColumnMapping(schema, Optional.of(hiveColumns), true));
+ ddl.append(generateAvroToHiveColumnMapping(schema, Optional.of(hiveColumns), true, dbName + "." + tblName));
} else {
log.info("Generating DDL using destination schema");
ddl.append(generateDestinationToHiveColumnMapping(Optional.of(hiveColumns), destinationTableMeta.get()));
@@ -229,14 +229,15 @@ public class HiveAvroORCQueryGenerator {
if (optionalClusterInfo.isPresent()) {
if (!optionalNumOfBuckets.isPresent()) {
- throw new IllegalArgumentException(("CLUSTERED BY requested, but no NUM_BUCKETS specified"));
+ throw new IllegalArgumentException(
+ (String.format("CLUSTERED BY requested, but no NUM_BUCKETS specified for table %s.%s", dbName, tblName)));
}
ddl.append("CLUSTERED BY ( ");
boolean isFirst = true;
for (String clusterByCol : optionalClusterInfo.get()) {
if (!hiveColumns.containsKey(clusterByCol)) {
throw new IllegalArgumentException(String.format("Requested CLUSTERED BY column: %s "
- + "is not present in schema", clusterByCol));
+ + "is not present in schema for table %s.%s", clusterByCol, dbName, tblName));
}
if (isFirst) {
isFirst = false;
@@ -254,7 +255,8 @@ public class HiveAvroORCQueryGenerator {
for (Map.Entry<String, COLUMN_SORT_ORDER> sortOrderInfo : sortOrderInfoMap.entrySet()){
if (!hiveColumns.containsKey(sortOrderInfo.getKey())) {
throw new IllegalArgumentException(String.format(
- "Requested SORTED BY column: %s " + "is not present in schema", sortOrderInfo.getKey()));
+ "Requested SORTED BY column: %s " + "is not present in schema for table %s.%s", sortOrderInfo.getKey(),
+ dbName, tblName));
}
if (isFirst) {
isFirst = false;
@@ -268,7 +270,8 @@ public class HiveAvroORCQueryGenerator {
ddl.append(String.format(" INTO %s BUCKETS %n", optionalNumOfBuckets.get()));
} else {
if (optionalSortOrderInfo.isPresent()) {
- throw new IllegalArgumentException("SORTED BY requested, but no CLUSTERED BY specified");
+ throw new IllegalArgumentException(
+ String.format("SORTED BY requested, but no CLUSTERED BY specified for table %s.%s", dbName, tblName));
}
}
@@ -389,12 +392,12 @@ public class HiveAvroORCQueryGenerator {
* @param topLevel If this is first level
* @return Generate Hive columns with types for given Avro schema
*/
- private static String generateAvroToHiveColumnMapping(Schema schema,
- Optional<Map<String, String>> hiveColumns,
- boolean topLevel) {
+ private static String generateAvroToHiveColumnMapping(Schema schema, Optional<Map<String, String>> hiveColumns,
+ boolean topLevel, String datasetName) {
if (topLevel && !schema.getType().equals(Schema.Type.RECORD)) {
- throw new IllegalArgumentException(String.format("Schema for table must be of type RECORD. Received type: %s",
- schema.getType()));
+ throw new IllegalArgumentException(
+ String.format("Schema for table must be of type RECORD. Received type: %s for dataset %s", schema.getType(),
+ datasetName));
}
StringBuilder columns = new StringBuilder();
@@ -409,7 +412,7 @@ public class HiveAvroORCQueryGenerator {
} else {
columns.append(", \n");
}
- String type = generateAvroToHiveColumnMapping(field.schema(), hiveColumns, false);
+ String type = generateAvroToHiveColumnMapping(field.schema(), hiveColumns, false, datasetName);
if (hiveColumns.isPresent()) {
hiveColumns.get().put(field.name(), type);
}
@@ -427,7 +430,7 @@ public class HiveAvroORCQueryGenerator {
} else {
columns.append(",");
}
- String type = generateAvroToHiveColumnMapping(field.schema(), hiveColumns, false);
+ String type = generateAvroToHiveColumnMapping(field.schema(), hiveColumns, false, datasetName);
columns.append("`").append(field.name()).append("`").append(":").append(type);
}
columns.append(">");
@@ -437,7 +440,7 @@ public class HiveAvroORCQueryGenerator {
Optional<Schema> optionalType = isOfOptionType(schema);
if (optionalType.isPresent()) {
Schema optionalTypeSchema = optionalType.get();
- columns.append(generateAvroToHiveColumnMapping(optionalTypeSchema, hiveColumns, false));
+ columns.append(generateAvroToHiveColumnMapping(optionalTypeSchema, hiveColumns, false, datasetName));
} else {
columns.append(AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType())).append("<");
isFirst = true;
@@ -450,19 +453,20 @@ public class HiveAvroORCQueryGenerator {
} else {
columns.append(",");
}
- columns.append(generateAvroToHiveColumnMapping(unionMember, hiveColumns, false));
+ columns.append(generateAvroToHiveColumnMapping(unionMember, hiveColumns, false, datasetName));
}
columns.append(">");
}
break;
case MAP:
columns.append(AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType())).append("<");
- columns.append("string,").append(generateAvroToHiveColumnMapping(schema.getValueType(), hiveColumns, false));
+ columns.append("string,")
+ .append(generateAvroToHiveColumnMapping(schema.getValueType(), hiveColumns, false, datasetName));
columns.append(">");
break;
case ARRAY:
columns.append(AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType())).append("<");
- columns.append(generateAvroToHiveColumnMapping(schema.getElementType(), hiveColumns, false));
+ columns.append(generateAvroToHiveColumnMapping(schema.getElementType(), hiveColumns, false, datasetName));
columns.append(">");
break;
case NULL:
@@ -479,7 +483,8 @@ public class HiveAvroORCQueryGenerator {
columns.append(AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType()));
break;
default:
- String exceptionMessage = String.format("DDL query generation failed for \"%s\" ", schema);
+ String exceptionMessage =
+ String.format("DDL query generation failed for \"%s\" of dataset %s", schema, datasetName);
log.error(exceptionMessage);
throw new AvroRuntimeException(exceptionMessage);
}
@@ -844,7 +849,14 @@ public class HiveAvroORCQueryGenerator {
if (destinationField.getName().equalsIgnoreCase(evolvedColumn.getKey())) {
// If evolved column is found, but type is evolved - evolve it
// .. if incompatible, isTypeEvolved will throw an exception
- if (isTypeEvolved(evolvedColumn.getValue(), destinationField.getType())) {
+ boolean typeEvolved;
+ try {
+ typeEvolved = isTypeEvolved(evolvedColumn.getValue(), destinationField.getType());
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Unable to evolve schema for table %s.%s", finalDbName, finalTableName), e);
+ }
+ if (typeEvolved) {
ddl.add(String.format("USE %s%n", finalDbName));
ddl.add(String.format("ALTER TABLE `%s` CHANGE COLUMN %s %s %s COMMENT '%s'",
finalTableName, evolvedColumn.getKey(), evolvedColumn.getKey(), evolvedColumn.getValue(),
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/eda77bcb/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/writer/HiveQueryExecutionWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/writer/HiveQueryExecutionWriter.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/writer/HiveQueryExecutionWriter.java
index fa15459..9c9599c 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/writer/HiveQueryExecutionWriter.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/writer/HiveQueryExecutionWriter.java
@@ -61,11 +61,17 @@ public class HiveQueryExecutionWriter implements DataWriter<QueryBasedHiveConver
addPropsForPublisher(hiveConversionEntity);
EventWorkunitUtils.setEndConversionDDLExecuteTimeMetadata(this.workUnit, System.currentTimeMillis());
} catch (SQLException e) {
- log.warn("Failed to execute queries: ");
+ StringBuilder sb = new StringBuilder();
+ sb.append(String.format("Failed to execute queries for %s: ",
+ hiveConversionEntity.getPartition().isPresent() ? hiveConversionEntity.getPartition().get().getCompleteName()
+ : hiveConversionEntity.getTable().getCompleteName()));
for (String conversionQuery : conversionQueries) {
- log.warn("Conversion query attempted by Hive Query writer: " + conversionQuery);
+ sb.append("\nConversion query attempted by Hive Query writer: ");
+ sb.append(conversionQuery);
}
- throw new IOException(e);
+ String message = sb.toString();
+ log.warn(message);
+ throw new IOException(message, e);
}
}