You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/11/22 21:52:02 UTC

[incubator-gobblin] branch master updated: [Gobblin-975][GOBBLIN-975] Add flag to enable/disable avro type check in AvroToOrc

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 48d2eb5  [Gobblin-975][GOBBLIN-975] Add flag to enable/disable avro type check in AvroToOrc
48d2eb5 is described below

commit 48d2eb56e0a6b1b570c7cf67489b566976574a66
Author: Zihan Li <zi...@zihli-mn1.linkedin.biz>
AuthorDate: Fri Nov 22 13:51:55 2019 -0800

    [Gobblin-975][GOBBLIN-975] Add flag to enable/disable avro type check in AvroToOrc
    
    Closes #2822 from ZihanLi58/GOBBLIN-975
---
 .../conversion/hive/source/HiveSource.java         | 26 ++++++++++++----------
 1 file changed, 14 insertions(+), 12 deletions(-)

diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
index fe4f94f..3ff8096 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
@@ -106,6 +106,8 @@ import org.apache.gobblin.data.management.conversion.hive.extractor.HiveConvertE
 @Alpha
 public class HiveSource implements Source {
 
+  public static final String DISABLE_AVRO_CHAECK = "hive.source.disable.avro.check";
+  public static final boolean DEFAULT_DISABLE_AVRO_CHAECK = false;
   public static final String HIVE_SOURCE_MAXIMUM_LOOKBACK_DAYS_KEY = "hive.source.maximum.lookbackDays";
   public static final int DEFAULT_HIVE_SOURCE_MAXIMUM_LOOKBACK_DAYS = 3;
 
@@ -163,7 +165,7 @@ public class HiveSource implements Source {
 
       EventSubmitter.submit(Optional.of(this.eventSubmitter), EventConstants.CONVERSION_FIND_HIVE_TABLES_EVENT);
       Iterator<HiveDataset> iterator = this.datasetFinder.getDatasetsIterator();
-
+      boolean disableAvroCheck = state.getPropAsBoolean(DISABLE_AVRO_CHAECK, DEFAULT_DISABLE_AVRO_CHAECK);
       while (iterator.hasNext()) {
         HiveDataset hiveDataset = iterator.next();
         try (AutoReturnableObject<IMetaStoreClient> client = hiveDataset.getClientPool().getClient()) {
@@ -174,9 +176,9 @@ public class HiveSource implements Source {
           if (hiveDataset.getTable().isPartitioned()
               && state.getPropAsBoolean(HIVE_SOURCE_CREATE_WORKUNITS_FOR_PARTITIONS,
               DEFAULT_HIVE_SOURCE_CREATE_WORKUNITS_FOR_PARTITIONS)) {
-            createWorkunitsForPartitionedTable(hiveDataset, client);
+            createWorkunitsForPartitionedTable(hiveDataset, client, disableAvroCheck);
           } else {
-            createWorkunitForNonPartitionedTable(hiveDataset);
+            createWorkunitForNonPartitionedTable(hiveDataset, disableAvroCheck);
           }
         }
       }
@@ -221,7 +223,7 @@ public class HiveSource implements Source {
   }
 
 
-  protected void createWorkunitForNonPartitionedTable(HiveDataset hiveDataset) throws IOException {
+  protected void createWorkunitForNonPartitionedTable(HiveDataset hiveDataset, boolean disableAvroCheck) throws IOException {
     // Create workunits for tables
     try {
 
@@ -246,7 +248,7 @@ public class HiveSource implements Source {
             "Creating workunit for table %s as updateTime %s or createTime %s is greater than low watermark %s",
             hiveDataset.getTable().getCompleteName(), updateTime, hiveDataset.getTable().getTTable().getCreateTime(),
             lowWatermark.getValue()));
-        HiveWorkUnit hiveWorkUnit = workUnitForTable(hiveDataset);
+        HiveWorkUnit hiveWorkUnit = workUnitForTable(hiveDataset, disableAvroCheck);
 
         LongWatermark expectedDatasetHighWatermark =
             this.watermarker.getExpectedHighWatermark(hiveDataset.getTable(), tableProcessTime);
@@ -274,15 +276,15 @@ public class HiveSource implements Source {
     }
   }
 
-  protected HiveWorkUnit workUnitForTable(HiveDataset hiveDataset) throws IOException {
+  protected HiveWorkUnit workUnitForTable(HiveDataset hiveDataset, boolean disableAvroCheck) throws IOException {
     HiveWorkUnit hiveWorkUnit = new HiveWorkUnit(hiveDataset);
-    if (isAvro(hiveDataset.getTable())) {
+    if (disableAvroCheck || isAvro(hiveDataset.getTable())) {
       hiveWorkUnit.setTableSchemaUrl(this.avroSchemaManager.getSchemaUrl(hiveDataset.getTable()));
     }
     return hiveWorkUnit;
   }
 
-  protected void createWorkunitsForPartitionedTable(HiveDataset hiveDataset, AutoReturnableObject<IMetaStoreClient> client) throws IOException {
+  protected void createWorkunitsForPartitionedTable(HiveDataset hiveDataset, AutoReturnableObject<IMetaStoreClient> client, boolean disableAvroCheck) throws IOException {
 
     long tableProcessTime = new DateTime().getMillis();
     this.watermarker.onTableProcessBegin(hiveDataset.getTable(), tableProcessTime);
@@ -326,7 +328,7 @@ public class HiveSource implements Source {
           LongWatermark expectedPartitionHighWatermark = this.watermarker.getExpectedHighWatermark(sourcePartition,
               tableProcessTime, partitionProcessTime);
 
-          HiveWorkUnit hiveWorkUnit = workUnitForPartition(hiveDataset, sourcePartition);
+          HiveWorkUnit hiveWorkUnit = workUnitForPartition(hiveDataset, sourcePartition, disableAvroCheck);
           hiveWorkUnit.setWatermarkInterval(new WatermarkInterval(lowWatermark, expectedPartitionHighWatermark));
 
           EventWorkunitUtils.setPartitionSlaEventMetadata(hiveWorkUnit, hiveDataset.getTable(), sourcePartition, updateTime,
@@ -354,9 +356,9 @@ public class HiveSource implements Source {
     }
   }
 
-  protected HiveWorkUnit workUnitForPartition(HiveDataset hiveDataset, Partition partition) throws IOException {
+  protected HiveWorkUnit workUnitForPartition(HiveDataset hiveDataset, Partition partition, boolean disableAvroCheck) throws IOException {
     HiveWorkUnit hiveWorkUnit = new HiveWorkUnit(hiveDataset, partition);
-    if (isAvro(hiveDataset.getTable())) {
+    if (disableAvroCheck || isAvro(hiveDataset.getTable())) {
       hiveWorkUnit.setTableSchemaUrl(this.avroSchemaManager.getSchemaUrl(hiveDataset.getTable()));
       hiveWorkUnit.setPartitionSchemaUrl(this.avroSchemaManager.getSchemaUrl(partition));
     }
@@ -410,7 +412,7 @@ public class HiveSource implements Source {
    */
   @VisibleForTesting
   public boolean isOlderThanLookback(Partition partition) {
-     return new DateTime(getCreateTime(partition)).isBefore(this.maxLookBackTime);
+    return new DateTime(getCreateTime(partition)).isBefore(this.maxLookBackTime);
   }
 
   @VisibleForTesting