You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/04/26 17:19:07 UTC

incubator-gobblin git commit: [GOBBLIN-478] Fixed bug to emit lineage events during Avro2ORC conversion

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master c0c77ba54 -> be92ad18b


[GOBBLIN-478] Fixed bug to emit lineage events during Avro2ORC conversion

Closes #2348 from aditya1105/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/be92ad18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/be92ad18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/be92ad18

Branch: refs/heads/master
Commit: be92ad18b68a2ee5c60bc60a621f99eee3d17c54
Parents: c0c77ba
Author: Aditya Sharma <ad...@linkedin.com>
Authored: Thu Apr 26 10:19:00 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Apr 26 10:19:00 2018 -0700

----------------------------------------------------------------------
 .../hive/publisher/HiveConvertPublisher.java    | 12 ++++-----
 .../hive/source/HiveAvroToOrcSource.java        | 17 ++++++-------
 .../conversion/hive/utils/LineageUtils.java     | 11 ++++++---
 .../dataset/ConvertibleHiveDatasetTest.java     | 26 +++++++++++++-------
 4 files changed, 37 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/be92ad18/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/publisher/HiveConvertPublisher.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/publisher/HiveConvertPublisher.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/publisher/HiveConvertPublisher.java
index f87a6a0..80da551 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/publisher/HiveConvertPublisher.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/publisher/HiveConvertPublisher.java
@@ -243,9 +243,8 @@ public class HiveConvertPublisher extends DataPublisher {
             } catch (Exception e) {
               log.error("Failed while emitting SLA event, but ignoring and moving forward to curate " + "all clean up commands", e);
             }
-            if (LineageUtils.shouldSetLineageInfo(wus.getWorkunit())) {
-              setDestLineageInfo(wus.getWorkunit(),
-                  (ConvertibleHiveDataset) ((HiveWorkUnit) wus.getWorkunit()).getHiveDataset(), this.lineageInfo);
+            if (LineageUtils.shouldSetLineageInfo(wus)) {
+              setDestLineageInfo(wus, this.lineageInfo);
             }
           }
         }
@@ -275,12 +274,13 @@ public class HiveConvertPublisher extends DataPublisher {
   }
 
   @VisibleForTesting
-  public void setDestLineageInfo(WorkUnit workUnit, ConvertibleHiveDataset convertibleHiveDataset,
-      Optional<LineageInfo> lineageInfo) {
+  public static void setDestLineageInfo(WorkUnitState wus, Optional<LineageInfo> lineageInfo) {
+    HiveWorkUnit hiveWorkUnit = new HiveWorkUnit(wus.getWorkunit());
+    ConvertibleHiveDataset convertibleHiveDataset = (ConvertibleHiveDataset) hiveWorkUnit.getHiveDataset();
     List<DatasetDescriptor> destDatasets = convertibleHiveDataset.getDestDatasets();
     for (int i = 0; i < destDatasets.size(); i++) {
       if (lineageInfo.isPresent()) {
-        lineageInfo.get().putDestination(destDatasets.get(i), i + 1, workUnit);
+        lineageInfo.get().putDestination(destDatasets.get(i), i + 1, wus);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/be92ad18/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveAvroToOrcSource.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveAvroToOrcSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveAvroToOrcSource.java
index 4659aad..5098002 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveAvroToOrcSource.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveAvroToOrcSource.java
@@ -19,23 +19,22 @@ package org.apache.gobblin.data.management.conversion.hive.source;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import java.util.List;
-
 import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset;
 import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDatasetFinder;
 import org.apache.gobblin.data.management.conversion.hive.utils.LineageUtils;
-import org.apache.gobblin.data.management.conversion.hive.watermarker.PartitionLevelWatermarker;
-import org.apache.gobblin.data.management.copy.hive.HiveDataset;
 import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
 import org.apache.gobblin.dataset.DatasetDescriptor;
 import org.apache.gobblin.metrics.event.lineage.LineageInfo;
 import org.apache.gobblin.source.workunit.WorkUnit;
 
+
 /**
  * An extension to {@link HiveSource} that is used for Avro to ORC conversion jobs.
  */
 public class HiveAvroToOrcSource extends HiveSource {
   private Optional<LineageInfo> lineageInfo;
+
   @Override
   public List<WorkUnit> getWorkunits(SourceState state) {
     if (!state.contains(HIVE_SOURCE_DATASET_FINDER_CLASS_KEY)) {
@@ -49,21 +48,19 @@ public class HiveAvroToOrcSource extends HiveSource {
     List<WorkUnit> workunits = super.getWorkunits(state);
     for (WorkUnit workUnit : workunits) {
       if (LineageUtils.shouldSetLineageInfo(workUnit)) {
-        setSourceLineageInfo(workUnit, (ConvertibleHiveDataset) ((HiveWorkUnit) workUnit).getHiveDataset(),
-            this.lineageInfo);
+        setSourceLineageInfo(workUnit, this.lineageInfo);
       }
     }
     return workunits;
   }
 
   @VisibleForTesting
-  public void setSourceLineageInfo(WorkUnit workUnit, ConvertibleHiveDataset hiveDataset,
-      Optional<LineageInfo> lineageInfo) {
-    DatasetDescriptor sourceDataset = hiveDataset.getSourceDataset();
+  public void setSourceLineageInfo(WorkUnit workUnit, Optional<LineageInfo> lineageInfo) {
+    HiveWorkUnit hiveWorkUnit = new HiveWorkUnit(workUnit);
+    ConvertibleHiveDataset convertibleHiveDataset = (ConvertibleHiveDataset) hiveWorkUnit.getHiveDataset();
+    DatasetDescriptor sourceDataset = convertibleHiveDataset.getSourceDataset();
     if (lineageInfo.isPresent()) {
       lineageInfo.get().setSource(sourceDataset, workUnit);
     }
   }
-
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/be92ad18/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/LineageUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/LineageUtils.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/LineageUtils.java
index 249359b..9f46dd2 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/LineageUtils.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/LineageUtils.java
@@ -16,6 +16,7 @@
  */
 package org.apache.gobblin.data.management.conversion.hive.utils;
 
+import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset;
 import org.apache.gobblin.data.management.conversion.hive.source.HiveWorkUnit;
 import org.apache.gobblin.data.management.conversion.hive.watermarker.PartitionLevelWatermarker;
@@ -28,10 +29,8 @@ import org.apache.gobblin.source.workunit.WorkUnit;
  */
 public class LineageUtils {
   public static boolean shouldSetLineageInfo(WorkUnit workUnit) {
-    if (!(workUnit instanceof HiveWorkUnit)) {
-      return false;
-    }
-    HiveWorkUnit hiveWorkUnit = (HiveWorkUnit) workUnit;
+    // Create a HiveWorkUnit from the workunit
+    HiveWorkUnit hiveWorkUnit = new HiveWorkUnit(workUnit);
     if (hiveWorkUnit.getPropAsBoolean(PartitionLevelWatermarker.IS_WATERMARK_WORKUNIT_KEY, false)) {
       return false;
     }
@@ -39,6 +38,10 @@ public class LineageUtils {
     return hiveDataset instanceof ConvertibleHiveDataset;
   }
 
+  public static boolean shouldSetLineageInfo(WorkUnitState workUnitState) {
+    return shouldSetLineageInfo(workUnitState.getWorkunit());
+  }
+
   private LineageUtils() {
     // cant instantiate
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/be92ad18/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
index b1a38f0..0048416 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
@@ -29,6 +29,7 @@ import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
 import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
 import org.apache.gobblin.broker.iface.SharedResourcesBroker;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.data.management.conversion.hive.publisher.HiveConvertPublisher;
 import org.apache.gobblin.data.management.conversion.hive.source.HiveAvroToOrcSource;
 import org.apache.gobblin.data.management.conversion.hive.source.HiveWorkUnit;
@@ -38,6 +39,7 @@ import org.apache.gobblin.dataset.HiveToHdfsDatasetResolver;
 import org.apache.gobblin.dataset.HiveToHdfsDatasetResolverFactory;
 import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder;
 import org.apache.gobblin.metrics.event.lineage.LineageInfo;
+import org.apache.gobblin.runtime.TaskState;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -76,15 +78,25 @@ public class ConvertibleHiveDatasetTest {
     workUnit.setProp("gobblin.broker.lineageInfo.datasetResolverFactory",
         HiveToHdfsDatasetResolverFactory.class.getName());
     workUnit.setProp(ConfigurationKeys.JOB_ID_KEY, "123456");
+
     Optional<LineageInfo> lineageInfo = LineageInfo.getLineageInfo(getSharedJobBroker(workUnit.getProperties()));
     HiveAvroToOrcSource src = new HiveAvroToOrcSource();
     Assert.assertTrue(LineageUtils.shouldSetLineageInfo(workUnit));
-    src.setSourceLineageInfo(workUnit,
-      (ConvertibleHiveDataset) ((HiveWorkUnit) workUnit).getHiveDataset(), lineageInfo);
-    new HiveConvertPublisher(workUnit).setDestLineageInfo(workUnit,
-      (ConvertibleHiveDataset) ((HiveWorkUnit) workUnit).getHiveDataset(), lineageInfo);
+    if (LineageUtils.shouldSetLineageInfo(workUnit)) {
+      src.setSourceLineageInfo(workUnit,
+          lineageInfo);
+    }
+    // TaskState is passed to the publisher, hence test should mimic the same behavior
+    TaskState taskState = new TaskState(new WorkUnitState(workUnit));
+    if (LineageUtils.shouldSetLineageInfo(taskState)) {
+      HiveConvertPublisher.setDestLineageInfo(taskState, lineageInfo);
+    }
+    Properties props = taskState.getProperties();
+
+    // Assert that there are two eventBuilders for nestedOrc and flattenedOrc
+    Collection<LineageEventBuilder> lineageEventBuilders = LineageInfo.load(Collections.singleton(taskState));
+    Assert.assertEquals(lineageEventBuilders.size(), 2);
 
-    Properties props = workUnit.getSpecProperties();
     // Asset that lineage name is correct
     Assert.assertEquals(props.getProperty("gobblin.event.lineage.name"), "/tmp/test");
 
@@ -113,10 +125,6 @@ public class ConvertibleHiveDatasetTest {
     Assert.assertEquals(destDD2.getName(), "/tmp/data_flattenedOrc/db1/tb1/final");
     Assert.assertEquals(destDD2.getMetadata().get(HiveToHdfsDatasetResolver.HIVE_TABLE),
         "db1_flattenedOrcDb.tb1_flattenedOrc");
-
-    // Assert that there are two eventBuilders for nestedOrc and flattenedOrc
-    Collection<LineageEventBuilder> lineageEventBuilders = LineageInfo.load(Collections.singleton(workUnit));
-    Assert.assertEquals(lineageEventBuilders.size(), 2);
   }
 
   @Test