You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/04/26 17:19:07 UTC
incubator-gobblin git commit: [GOBBLIN-478] Fixed bug to emit lineage
events during Avro2ORC conversion
Repository: incubator-gobblin
Updated Branches:
refs/heads/master c0c77ba54 -> be92ad18b
[GOBBLIN-478] Fixed bug to emit lineage events during Avro2ORC conversion
Closes #2348 from aditya1105/master
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/be92ad18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/be92ad18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/be92ad18
Branch: refs/heads/master
Commit: be92ad18b68a2ee5c60bc60a621f99eee3d17c54
Parents: c0c77ba
Author: Aditya Sharma <ad...@linkedin.com>
Authored: Thu Apr 26 10:19:00 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Apr 26 10:19:00 2018 -0700
----------------------------------------------------------------------
.../hive/publisher/HiveConvertPublisher.java | 12 ++++-----
.../hive/source/HiveAvroToOrcSource.java | 17 ++++++-------
.../conversion/hive/utils/LineageUtils.java | 11 ++++++---
.../dataset/ConvertibleHiveDatasetTest.java | 26 +++++++++++++-------
4 files changed, 37 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/be92ad18/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/publisher/HiveConvertPublisher.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/publisher/HiveConvertPublisher.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/publisher/HiveConvertPublisher.java
index f87a6a0..80da551 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/publisher/HiveConvertPublisher.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/publisher/HiveConvertPublisher.java
@@ -243,9 +243,8 @@ public class HiveConvertPublisher extends DataPublisher {
} catch (Exception e) {
log.error("Failed while emitting SLA event, but ignoring and moving forward to curate " + "all clean up commands", e);
}
- if (LineageUtils.shouldSetLineageInfo(wus.getWorkunit())) {
- setDestLineageInfo(wus.getWorkunit(),
- (ConvertibleHiveDataset) ((HiveWorkUnit) wus.getWorkunit()).getHiveDataset(), this.lineageInfo);
+ if (LineageUtils.shouldSetLineageInfo(wus)) {
+ setDestLineageInfo(wus, this.lineageInfo);
}
}
}
@@ -275,12 +274,13 @@ public class HiveConvertPublisher extends DataPublisher {
}
@VisibleForTesting
- public void setDestLineageInfo(WorkUnit workUnit, ConvertibleHiveDataset convertibleHiveDataset,
- Optional<LineageInfo> lineageInfo) {
+ public static void setDestLineageInfo(WorkUnitState wus, Optional<LineageInfo> lineageInfo) {
+ HiveWorkUnit hiveWorkUnit = new HiveWorkUnit(wus.getWorkunit());
+ ConvertibleHiveDataset convertibleHiveDataset = (ConvertibleHiveDataset) hiveWorkUnit.getHiveDataset();
List<DatasetDescriptor> destDatasets = convertibleHiveDataset.getDestDatasets();
for (int i = 0; i < destDatasets.size(); i++) {
if (lineageInfo.isPresent()) {
- lineageInfo.get().putDestination(destDatasets.get(i), i + 1, workUnit);
+ lineageInfo.get().putDestination(destDatasets.get(i), i + 1, wus);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/be92ad18/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveAvroToOrcSource.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveAvroToOrcSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveAvroToOrcSource.java
index 4659aad..5098002 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveAvroToOrcSource.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveAvroToOrcSource.java
@@ -19,23 +19,22 @@ package org.apache.gobblin.data.management.conversion.hive.source;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import java.util.List;
-
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset;
import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDatasetFinder;
import org.apache.gobblin.data.management.conversion.hive.utils.LineageUtils;
-import org.apache.gobblin.data.management.conversion.hive.watermarker.PartitionLevelWatermarker;
-import org.apache.gobblin.data.management.copy.hive.HiveDataset;
import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
import org.apache.gobblin.dataset.DatasetDescriptor;
import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.source.workunit.WorkUnit;
+
/**
* An extension to {@link HiveSource} that is used for Avro to ORC conversion jobs.
*/
public class HiveAvroToOrcSource extends HiveSource {
private Optional<LineageInfo> lineageInfo;
+
@Override
public List<WorkUnit> getWorkunits(SourceState state) {
if (!state.contains(HIVE_SOURCE_DATASET_FINDER_CLASS_KEY)) {
@@ -49,21 +48,19 @@ public class HiveAvroToOrcSource extends HiveSource {
List<WorkUnit> workunits = super.getWorkunits(state);
for (WorkUnit workUnit : workunits) {
if (LineageUtils.shouldSetLineageInfo(workUnit)) {
- setSourceLineageInfo(workUnit, (ConvertibleHiveDataset) ((HiveWorkUnit) workUnit).getHiveDataset(),
- this.lineageInfo);
+ setSourceLineageInfo(workUnit, this.lineageInfo);
}
}
return workunits;
}
@VisibleForTesting
- public void setSourceLineageInfo(WorkUnit workUnit, ConvertibleHiveDataset hiveDataset,
- Optional<LineageInfo> lineageInfo) {
- DatasetDescriptor sourceDataset = hiveDataset.getSourceDataset();
+ public void setSourceLineageInfo(WorkUnit workUnit, Optional<LineageInfo> lineageInfo) {
+ HiveWorkUnit hiveWorkUnit = new HiveWorkUnit(workUnit);
+ ConvertibleHiveDataset convertibleHiveDataset = (ConvertibleHiveDataset) hiveWorkUnit.getHiveDataset();
+ DatasetDescriptor sourceDataset = convertibleHiveDataset.getSourceDataset();
if (lineageInfo.isPresent()) {
lineageInfo.get().setSource(sourceDataset, workUnit);
}
}
-
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/be92ad18/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/LineageUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/LineageUtils.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/LineageUtils.java
index 249359b..9f46dd2 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/LineageUtils.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/LineageUtils.java
@@ -16,6 +16,7 @@
*/
package org.apache.gobblin.data.management.conversion.hive.utils;
+import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset;
import org.apache.gobblin.data.management.conversion.hive.source.HiveWorkUnit;
import org.apache.gobblin.data.management.conversion.hive.watermarker.PartitionLevelWatermarker;
@@ -28,10 +29,8 @@ import org.apache.gobblin.source.workunit.WorkUnit;
*/
public class LineageUtils {
public static boolean shouldSetLineageInfo(WorkUnit workUnit) {
- if (!(workUnit instanceof HiveWorkUnit)) {
- return false;
- }
- HiveWorkUnit hiveWorkUnit = (HiveWorkUnit) workUnit;
+ // Create a HiveWorkUnit from the workunit
+ HiveWorkUnit hiveWorkUnit = new HiveWorkUnit(workUnit);
if (hiveWorkUnit.getPropAsBoolean(PartitionLevelWatermarker.IS_WATERMARK_WORKUNIT_KEY, false)) {
return false;
}
@@ -39,6 +38,10 @@ public class LineageUtils {
return hiveDataset instanceof ConvertibleHiveDataset;
}
+ public static boolean shouldSetLineageInfo(WorkUnitState workUnitState) {
+ return shouldSetLineageInfo(workUnitState.getWorkunit());
+ }
+
private LineageUtils() {
// cant instantiate
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/be92ad18/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
index b1a38f0..0048416 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
@@ -29,6 +29,7 @@ import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.data.management.conversion.hive.publisher.HiveConvertPublisher;
import org.apache.gobblin.data.management.conversion.hive.source.HiveAvroToOrcSource;
import org.apache.gobblin.data.management.conversion.hive.source.HiveWorkUnit;
@@ -38,6 +39,7 @@ import org.apache.gobblin.dataset.HiveToHdfsDatasetResolver;
import org.apache.gobblin.dataset.HiveToHdfsDatasetResolverFactory;
import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder;
import org.apache.gobblin.metrics.event.lineage.LineageInfo;
+import org.apache.gobblin.runtime.TaskState;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -76,15 +78,25 @@ public class ConvertibleHiveDatasetTest {
workUnit.setProp("gobblin.broker.lineageInfo.datasetResolverFactory",
HiveToHdfsDatasetResolverFactory.class.getName());
workUnit.setProp(ConfigurationKeys.JOB_ID_KEY, "123456");
+
Optional<LineageInfo> lineageInfo = LineageInfo.getLineageInfo(getSharedJobBroker(workUnit.getProperties()));
HiveAvroToOrcSource src = new HiveAvroToOrcSource();
Assert.assertTrue(LineageUtils.shouldSetLineageInfo(workUnit));
- src.setSourceLineageInfo(workUnit,
- (ConvertibleHiveDataset) ((HiveWorkUnit) workUnit).getHiveDataset(), lineageInfo);
- new HiveConvertPublisher(workUnit).setDestLineageInfo(workUnit,
- (ConvertibleHiveDataset) ((HiveWorkUnit) workUnit).getHiveDataset(), lineageInfo);
+ if (LineageUtils.shouldSetLineageInfo(workUnit)) {
+ src.setSourceLineageInfo(workUnit,
+ lineageInfo);
+ }
+ // TaskState is passed to the publisher, hence test should mimic the same behavior
+ TaskState taskState = new TaskState(new WorkUnitState(workUnit));
+ if (LineageUtils.shouldSetLineageInfo(taskState)) {
+ HiveConvertPublisher.setDestLineageInfo(taskState, lineageInfo);
+ }
+ Properties props = taskState.getProperties();
+
+ // Assert that there are two eventBuilders for nestedOrc and flattenedOrc
+ Collection<LineageEventBuilder> lineageEventBuilders = LineageInfo.load(Collections.singleton(taskState));
+ Assert.assertEquals(lineageEventBuilders.size(), 2);
- Properties props = workUnit.getSpecProperties();
// Asset that lineage name is correct
Assert.assertEquals(props.getProperty("gobblin.event.lineage.name"), "/tmp/test");
@@ -113,10 +125,6 @@ public class ConvertibleHiveDatasetTest {
Assert.assertEquals(destDD2.getName(), "/tmp/data_flattenedOrc/db1/tb1/final");
Assert.assertEquals(destDD2.getMetadata().get(HiveToHdfsDatasetResolver.HIVE_TABLE),
"db1_flattenedOrcDb.tb1_flattenedOrc");
-
- // Assert that there are two eventBuilders for nestedOrc and flattenedOrc
- Collection<LineageEventBuilder> lineageEventBuilders = LineageInfo.load(Collections.singleton(workUnit));
- Assert.assertEquals(lineageEventBuilders.size(), 2);
}
@Test