You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/03/29 18:25:16 UTC

incubator-gobblin git commit: [GOBBLIN-399] Improved logs and error messages for avro2orc conversion

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 21cc7c048 -> eda77bcb6


[GOBBLIN-399] Improved logs and error messages for avro2orc conversion

Closes #2323 from aditya1105/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/eda77bcb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/eda77bcb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/eda77bcb

Branch: refs/heads/master
Commit: eda77bcb6ae57fc1b38f7c70c2ac45995fde38b6
Parents: 21cc7c0
Author: Aditya Sharma <ad...@linkedin.com>
Authored: Thu Mar 29 11:25:10 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Mar 29 11:25:10 2018 -0700

----------------------------------------------------------------------
 .../converter/AbstractAvroToOrcConverter.java   |  6 ++-
 .../hive/dataset/ConvertibleHiveDataset.java    |  3 +-
 .../hive/query/HiveAvroORCQueryGenerator.java   | 48 ++++++++++++--------
 .../hive/writer/HiveQueryExecutionWriter.java   | 12 +++--
 4 files changed, 45 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/eda77bcb/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java
index ed42946..732e149 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java
@@ -676,7 +676,7 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem
         // Values for a partition are separated by ","
         List<String> partitionValues = Splitter.on(",").omitEmptyStrings().trimResults().splitToList(partitionsInfoString);
 
-        // Do not drop partition the being processed. Sometimes a partition may have replaced another partition of the same values.
+        // Do not drop the partition being processed. Sometimes a partition may have replaced another partition of the same values.
         if (!partitionValues.equals(hivePartition.getValues())) {
           ImmutableMap.Builder<String, String> partitionDDLInfoMap = ImmutableMap.builder();
           for (int i = 0; i < partitionKeys.size(); i++) {
@@ -713,7 +713,9 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem
         return Optional.of(qlPartition.getDataLocation());
       }
     } catch (IOException | TException | HiveException e) {
-      throw new DataConversionException("Could not fetch destination table metadata", e);
+      throw new DataConversionException(
+          String.format("Could not fetch destination table %s.%s metadata", table.get().getDbName(),
+              table.get().getTableName()), e);
     }
     return Optional.<Path>absent();
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/eda77bcb/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java
index 933e86a..f4c8744 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java
@@ -92,9 +92,10 @@ public class ConvertibleHiveDataset extends HiveDataset {
     super(fs, clientPool, table, jobProps, config);
 
     Preconditions.checkArgument(config.hasPath(DESTINATION_CONVERSION_FORMATS_KEY), String.format(
-        "Atleast one destination format should be specified at %s.%s. If you do not intend to convert this dataset set %s.%s to true",
+        "At least one destination format should be specified at %s.%s. If you do not intend to convert dataset %s set %s.%s to true",
         super.properties.getProperty(HiveDatasetFinder.HIVE_DATASET_CONFIG_PREFIX_KEY, ""),
         DESTINATION_CONVERSION_FORMATS_KEY,
+        table.getCompleteName(),
         super.properties.getProperty(HiveDatasetFinder.HIVE_DATASET_CONFIG_PREFIX_KEY, ""),
         HiveDatasetFinder.HIVE_DATASET_IS_BLACKLISTED_KEY));
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/eda77bcb/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
index d16df29..3f2206d 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
@@ -202,7 +202,7 @@ public class HiveAvroORCQueryGenerator {
     //    .. use columns from destination schema
     if (isEvolutionEnabled || !destinationTableMeta.isPresent()) {
       log.info("Generating DDL using source schema");
-      ddl.append(generateAvroToHiveColumnMapping(schema, Optional.of(hiveColumns), true));
+      ddl.append(generateAvroToHiveColumnMapping(schema, Optional.of(hiveColumns), true, dbName + "." + tblName));
     } else {
       log.info("Generating DDL using destination schema");
       ddl.append(generateDestinationToHiveColumnMapping(Optional.of(hiveColumns), destinationTableMeta.get()));
@@ -229,14 +229,15 @@ public class HiveAvroORCQueryGenerator {
 
     if (optionalClusterInfo.isPresent()) {
       if (!optionalNumOfBuckets.isPresent()) {
-        throw new IllegalArgumentException(("CLUSTERED BY requested, but no NUM_BUCKETS specified"));
+        throw new IllegalArgumentException(
+            (String.format("CLUSTERED BY requested, but no NUM_BUCKETS specified for table %s.%s", dbName, tblName)));
       }
       ddl.append("CLUSTERED BY ( ");
       boolean isFirst = true;
       for (String clusterByCol : optionalClusterInfo.get()) {
         if (!hiveColumns.containsKey(clusterByCol)) {
           throw new IllegalArgumentException(String.format("Requested CLUSTERED BY column: %s "
-              + "is not present in schema", clusterByCol));
+              + "is not present in schema for table %s.%s", clusterByCol, dbName, tblName));
         }
         if (isFirst) {
           isFirst = false;
@@ -254,7 +255,8 @@ public class HiveAvroORCQueryGenerator {
         for (Map.Entry<String, COLUMN_SORT_ORDER> sortOrderInfo : sortOrderInfoMap.entrySet()){
           if (!hiveColumns.containsKey(sortOrderInfo.getKey())) {
             throw new IllegalArgumentException(String.format(
-                "Requested SORTED BY column: %s " + "is not present in schema", sortOrderInfo.getKey()));
+                "Requested SORTED BY column: %s " + "is not present in schema for table %s.%s", sortOrderInfo.getKey(),
+                dbName, tblName));
           }
           if (isFirst) {
             isFirst = false;
@@ -268,7 +270,8 @@ public class HiveAvroORCQueryGenerator {
       ddl.append(String.format(" INTO %s BUCKETS %n", optionalNumOfBuckets.get()));
     } else {
       if (optionalSortOrderInfo.isPresent()) {
-        throw new IllegalArgumentException("SORTED BY requested, but no CLUSTERED BY specified");
+        throw new IllegalArgumentException(
+            String.format("SORTED BY requested, but no CLUSTERED BY specified for table %s.%s", dbName, tblName));
       }
     }
 
@@ -389,12 +392,12 @@ public class HiveAvroORCQueryGenerator {
    * @param topLevel If this is first level
    * @return Generate Hive columns with types for given Avro schema
    */
-  private static String generateAvroToHiveColumnMapping(Schema schema,
-      Optional<Map<String, String>> hiveColumns,
-      boolean topLevel) {
+  private static String generateAvroToHiveColumnMapping(Schema schema, Optional<Map<String, String>> hiveColumns,
+      boolean topLevel, String datasetName) {
     if (topLevel && !schema.getType().equals(Schema.Type.RECORD)) {
-      throw new IllegalArgumentException(String.format("Schema for table must be of type RECORD. Received type: %s",
-          schema.getType()));
+      throw new IllegalArgumentException(
+          String.format("Schema for table must be of type RECORD. Received type: %s for dataset %s", schema.getType(),
+              datasetName));
     }
 
     StringBuilder columns = new StringBuilder();
@@ -409,7 +412,7 @@ public class HiveAvroORCQueryGenerator {
             } else {
               columns.append(", \n");
             }
-            String type = generateAvroToHiveColumnMapping(field.schema(), hiveColumns, false);
+            String type = generateAvroToHiveColumnMapping(field.schema(), hiveColumns, false, datasetName);
             if (hiveColumns.isPresent()) {
               hiveColumns.get().put(field.name(), type);
             }
@@ -427,7 +430,7 @@ public class HiveAvroORCQueryGenerator {
             } else {
               columns.append(",");
             }
-            String type = generateAvroToHiveColumnMapping(field.schema(), hiveColumns, false);
+            String type = generateAvroToHiveColumnMapping(field.schema(), hiveColumns, false, datasetName);
             columns.append("`").append(field.name()).append("`").append(":").append(type);
           }
           columns.append(">");
@@ -437,7 +440,7 @@ public class HiveAvroORCQueryGenerator {
         Optional<Schema> optionalType = isOfOptionType(schema);
         if (optionalType.isPresent()) {
           Schema optionalTypeSchema = optionalType.get();
-          columns.append(generateAvroToHiveColumnMapping(optionalTypeSchema, hiveColumns, false));
+          columns.append(generateAvroToHiveColumnMapping(optionalTypeSchema, hiveColumns, false, datasetName));
         } else {
           columns.append(AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType())).append("<");
           isFirst = true;
@@ -450,19 +453,20 @@ public class HiveAvroORCQueryGenerator {
             } else {
               columns.append(",");
             }
-            columns.append(generateAvroToHiveColumnMapping(unionMember, hiveColumns, false));
+            columns.append(generateAvroToHiveColumnMapping(unionMember, hiveColumns, false, datasetName));
           }
           columns.append(">");
         }
         break;
       case MAP:
         columns.append(AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType())).append("<");
-        columns.append("string,").append(generateAvroToHiveColumnMapping(schema.getValueType(), hiveColumns, false));
+        columns.append("string,")
+            .append(generateAvroToHiveColumnMapping(schema.getValueType(), hiveColumns, false, datasetName));
         columns.append(">");
         break;
       case ARRAY:
         columns.append(AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType())).append("<");
-        columns.append(generateAvroToHiveColumnMapping(schema.getElementType(), hiveColumns, false));
+        columns.append(generateAvroToHiveColumnMapping(schema.getElementType(), hiveColumns, false, datasetName));
         columns.append(">");
         break;
       case NULL:
@@ -479,7 +483,8 @@ public class HiveAvroORCQueryGenerator {
         columns.append(AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType()));
         break;
       default:
-        String exceptionMessage = String.format("DDL query generation failed for \"%s\" ", schema);
+        String exceptionMessage =
+            String.format("DDL query generation failed for \"%s\" of dataset %s", schema, datasetName);
         log.error(exceptionMessage);
         throw new AvroRuntimeException(exceptionMessage);
     }
@@ -844,7 +849,14 @@ public class HiveAvroORCQueryGenerator {
         if (destinationField.getName().equalsIgnoreCase(evolvedColumn.getKey())) {
           // If evolved column is found, but type is evolved - evolve it
           // .. if incompatible, isTypeEvolved will throw an exception
-          if (isTypeEvolved(evolvedColumn.getValue(), destinationField.getType())) {
+          boolean typeEvolved;
+          try {
+            typeEvolved = isTypeEvolved(evolvedColumn.getValue(), destinationField.getType());
+          } catch (Exception e) {
+            throw new RuntimeException(
+                String.format("Unable to evolve schema for table %s.%s", finalDbName, finalTableName), e);
+          }
+          if (typeEvolved) {
             ddl.add(String.format("USE %s%n", finalDbName));
             ddl.add(String.format("ALTER TABLE `%s` CHANGE COLUMN %s %s %s COMMENT '%s'",
                 finalTableName, evolvedColumn.getKey(), evolvedColumn.getKey(), evolvedColumn.getValue(),

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/eda77bcb/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/writer/HiveQueryExecutionWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/writer/HiveQueryExecutionWriter.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/writer/HiveQueryExecutionWriter.java
index fa15459..9c9599c 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/writer/HiveQueryExecutionWriter.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/writer/HiveQueryExecutionWriter.java
@@ -61,11 +61,17 @@ public class HiveQueryExecutionWriter implements DataWriter<QueryBasedHiveConver
       addPropsForPublisher(hiveConversionEntity);
       EventWorkunitUtils.setEndConversionDDLExecuteTimeMetadata(this.workUnit, System.currentTimeMillis());
     } catch (SQLException e) {
-      log.warn("Failed to execute queries: ");
+      StringBuilder sb = new StringBuilder();
+      sb.append(String.format("Failed to execute queries for %s: ",
+          hiveConversionEntity.getPartition().isPresent() ? hiveConversionEntity.getPartition().get().getCompleteName()
+              : hiveConversionEntity.getTable().getCompleteName()));
       for (String conversionQuery : conversionQueries) {
-        log.warn("Conversion query attempted by Hive Query writer: " + conversionQuery);
+        sb.append("\nConversion query attempted by Hive Query writer: ");
+        sb.append(conversionQuery);
       }
-      throw new IOException(e);
+      String message = sb.toString();
+      log.warn(message);
+      throw new IOException(message, e);
     }
   }