You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/10/10 16:19:26 UTC
incubator-gobblin git commit: [GOBBLIN-278] Fix sending lineage event
for KafkaSource
Repository: incubator-gobblin
Updated Branches:
refs/heads/master bd17f1384 -> d2e43542a
[GOBBLIN-278] Fix sending lineage event for KafkaSource
Closes #2131 from zxcware/lineage
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/d2e43542
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/d2e43542
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/d2e43542
Branch: refs/heads/master
Commit: d2e43542ab7ee3e5814755a8b37e86f07f9115ea
Parents: bd17f13
Author: zhchen <zh...@linkedin.com>
Authored: Tue Oct 10 09:19:15 2017 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Oct 10 09:19:15 2017 -0700
----------------------------------------------------------------------
.../org/apache/gobblin/configuration/State.java | 25 +++++++++++++++
.../gobblin/configuration/WorkUnitState.java | 14 +++++++++
.../org/apache/gobblin/lineage/LineageInfo.java | 2 +-
.../apache/gobblin/configuration/StateTest.java | 12 +++++++
.../extractor/extract/kafka/KafkaSource.java | 13 +++++---
.../packer/KafkaBiLevelWorkUnitPacker.java | 19 +++--------
.../workunit/packer/KafkaWorkUnitPacker.java | 33 +++++++-------------
.../gobblin/runtime/SafeDatasetCommit.java | 25 +++++++++------
8 files changed, 92 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d2e43542/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java
index 20b6e17..63ced0a 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.configuration;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -500,6 +501,30 @@ public class State implements WritableShim {
}
/**
+ * Remove all properties with a certain keyPrefix
+ *
+ * @param prefix key prefix
+ */
+ public void removePropsWithPrefix(String prefix) {
+ this.specProperties.entrySet().removeIf(entry -> ((String) entry.getKey()).startsWith(prefix));
+
+ Properties newCommonProperties = null;
+ for (Object key: this.commonProperties.keySet()) {
+ if (((String)key).startsWith(prefix)) {
+ if (newCommonProperties == null) {
+ newCommonProperties = new Properties();
+ newCommonProperties.putAll(this.commonProperties);
+ }
+ newCommonProperties.remove(key);
+ }
+ }
+
+ if (newCommonProperties != null) {
+ this.commonProperties = newCommonProperties;
+ }
+ }
+
+ /**
* @deprecated Use {@link #getProp(String)}
*/
@Deprecated
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d2e43542/gobblin-api/src/main/java/org/apache/gobblin/configuration/WorkUnitState.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/WorkUnitState.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/WorkUnitState.java
index af02d01..0b40399 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/WorkUnitState.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/WorkUnitState.java
@@ -367,6 +367,20 @@ public class WorkUnitState extends State {
return super.contains(key) || this.workUnit.contains(key) || this.jobState.contains(key);
}
+ @Override
+ public void removeProp(String key) {
+ super.removeProp(key);
+ this.workUnit.removeProp(key);
+ this.jobState.removeProp(key);
+ }
+
+ @Override
+ public void removePropsWithPrefix(String prefix) {
+ super.removePropsWithPrefix(prefix);
+ this.workUnit.removePropsWithPrefix(prefix);
+ this.jobState.removePropsWithPrefix(prefix);
+ }
+
/**
* Get the {@link org.apache.gobblin.source.workunit.Extract} associated with the {@link WorkUnit}.
*
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d2e43542/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java b/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java
index 90473d4..7af71df 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java
@@ -49,10 +49,10 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class LineageInfo {
- public static final String LINEAGE_DATASET_URN = "lineage.dataset.urn";
public static final String LINEAGE_NAME_SPACE = "gobblin.lineage";
public static final String BRANCH_ID_METADATA_KEY = "branchId";
private static final String DATASET_PREFIX = LINEAGE_NAME_SPACE + ".";
+ public static final String LINEAGE_DATASET_URN = DATASET_PREFIX + "dataset.urn";
private static final String BRANCH_PREFIX = DATASET_PREFIX + "branch.";
@Getter
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d2e43542/gobblin-api/src/test/java/org/apache/gobblin/configuration/StateTest.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/test/java/org/apache/gobblin/configuration/StateTest.java b/gobblin-api/src/test/java/org/apache/gobblin/configuration/StateTest.java
index bcfa9a7..35cd2eb 100644
--- a/gobblin-api/src/test/java/org/apache/gobblin/configuration/StateTest.java
+++ b/gobblin-api/src/test/java/org/apache/gobblin/configuration/StateTest.java
@@ -81,4 +81,16 @@ public class StateTest {
Assert.fail("Concurrency test failed with first exception: " + ExceptionUtils.getFullStackTrace(this.exceptions.poll()));
}
}
+
+ @Test
+ public void testRemovePropsWithPrefix() {
+ final State state = new State();
+ final String prefix = "prefix";
+ for (int i = 0; i < 10; i++) {
+ state.setProp("prefix." + i, i);
+ }
+ Assert.assertTrue(state.getPropertyNames().size() == 10);
+ state.removePropsWithPrefix(prefix);
+ Assert.assertTrue(state.getPropertyNames().size() == 0);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d2e43542/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 56f81e1..e1494f7 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
+import org.apache.gobblin.lineage.LineageInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -132,6 +133,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
private Extract.TableType tableType;
private String extractNamespace;
private boolean isFullExtract;
+ private String kafkaBrokers;
private boolean shouldEnableDatasetStateStore;
private AtomicBoolean isDatasetStateEnabled = new AtomicBoolean(false);
private Set<String> topicsToProcess;
@@ -172,7 +174,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
extractNamespace = KafkaSource.DEFAULT_NAMESPACE_NAME;
}
isFullExtract = state.getPropAsBoolean(ConfigurationKeys.EXTRACT_IS_FULL_KEY);
-
+ kafkaBrokers = state.getProp(ConfigurationKeys.KAFKA_BROKERS, "");
this.shouldEnableDatasetStateStore = state.getPropAsBoolean(GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE,
DEFAULT_GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE);
@@ -538,10 +540,6 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
}
WorkUnit workUnit = WorkUnit.create(extract);
- if (topicSpecificState.isPresent()) {
- workUnit.addAll(topicSpecificState.get());
- }
-
workUnit.setProp(TOPIC_NAME, partition.getTopicName());
addDatasetUrnOptionally(workUnit);
workUnit.setProp(PARTITION_ID, partition.getId());
@@ -549,6 +547,11 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
workUnit.setProp(LEADER_HOSTANDPORT, partition.getLeader().getHostAndPort().toString());
workUnit.setProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY, offsets.getStartOffset());
workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, offsets.getLatestOffset());
+
+ // Add lineage info
+ workUnit.setProp(LineageInfo.LINEAGE_DATASET_URN, partition.getTopicName());
+ LineageInfo.setDatasetLineageAttribute(workUnit, ConfigurationKeys.KAFKA_BROKERS, kafkaBrokers);
+
LOG.info(String.format("Created workunit for partition %s: lowWatermark=%d, highWatermark=%d, range=%d", partition,
offsets.getStartOffset(), offsets.getLatestOffset(), offsets.getLatestOffset() - offsets.getStartOffset()));
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d2e43542/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
index 7971fa5..9c134b8 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
@@ -24,10 +24,8 @@ import java.util.PriorityQueue;
import com.google.common.collect.Lists;
-import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.lineage.LineageInfo;
import org.apache.gobblin.source.extractor.extract.AbstractSource;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
@@ -70,27 +68,21 @@ public class KafkaBiLevelWorkUnitPacker extends KafkaWorkUnitPacker {
double avgGroupSize = totalEstDataSize / numContainers / getPreGroupingSizeFactor(this.state);
List<MultiWorkUnit> mwuGroups = Lists.newArrayList();
- for (Map.Entry<String, List<WorkUnit>> entry : workUnitsByTopic.entrySet()) {
- double estimatedDataSizeForTopic = calcTotalEstSizeForTopic(entry.getValue());
+ for (List<WorkUnit> workUnitsForTopic : workUnitsByTopic.values()) {
+ double estimatedDataSizeForTopic = calcTotalEstSizeForTopic(workUnitsForTopic);
if (estimatedDataSizeForTopic < avgGroupSize) {
// If the total estimated size of a topic is smaller than group size, put all partitions of this
// topic in a single group.
MultiWorkUnit mwuGroup = MultiWorkUnit.createEmpty();
- mwuGroup.setProp(LineageInfo.LINEAGE_DATASET_URN, entry.getKey());
- addWorkUnitsToMultiWorkUnit(entry.getValue(), mwuGroup);
+ addWorkUnitsToMultiWorkUnit(workUnitsForTopic, mwuGroup);
mwuGroups.add(mwuGroup);
} else {
// Use best-fit-decreasing to group workunits for a topic into multiple groups.
- mwuGroups.addAll(bestFitDecreasingBinPacking(entry.getKey(), entry.getValue(), avgGroupSize));
+ mwuGroups.addAll(bestFitDecreasingBinPacking(workUnitsForTopic, avgGroupSize));
}
}
- // Add common lineage information
- for (MultiWorkUnit multiWorkUnit: mwuGroups) {
- LineageInfo.setDatasetLineageAttribute(multiWorkUnit, ConfigurationKeys.KAFKA_BROKERS, this.state.getProp(ConfigurationKeys.KAFKA_BROKERS, ""));
- }
-
List<WorkUnit> groups = squeezeMultiWorkUnits(mwuGroups);
return worstFitDecreasingBinPacking(groups, numContainers);
}
@@ -111,7 +103,7 @@ public class KafkaBiLevelWorkUnitPacker extends KafkaWorkUnitPacker {
* Group {@link WorkUnit}s into groups. Each group is a {@link MultiWorkUnit}. Each group has a capacity of
* avgGroupSize. If there's a single {@link WorkUnit} whose size is larger than avgGroupSize, it forms a group itself.
*/
- private static List<MultiWorkUnit> bestFitDecreasingBinPacking(String topic, List<WorkUnit> workUnits, double avgGroupSize) {
+ private static List<MultiWorkUnit> bestFitDecreasingBinPacking(List<WorkUnit> workUnits, double avgGroupSize) {
// Sort workunits by data size desc
Collections.sort(workUnits, LOAD_DESC_COMPARATOR);
@@ -123,7 +115,6 @@ public class KafkaBiLevelWorkUnitPacker extends KafkaWorkUnitPacker {
addWorkUnitToMultiWorkUnit(workUnit, bestGroup);
} else {
bestGroup = MultiWorkUnit.createEmpty();
- bestGroup.setProp(LineageInfo.LINEAGE_DATASET_URN, topic);
addWorkUnitToMultiWorkUnit(workUnit, bestGroup);
}
pQueue.add(bestGroup);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d2e43542/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
index 38d050d..8d03f4f 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
@@ -81,25 +81,11 @@ public abstract class KafkaWorkUnitPacker {
protected final AbstractSource<?, ?> source;
protected final SourceState state;
- protected final Extract.TableType tableType;
- protected final String extractNameSpace;
- protected final boolean isFullExtract;
protected final KafkaWorkUnitSizeEstimator sizeEstimator;
protected KafkaWorkUnitPacker(AbstractSource<?, ?> source, SourceState state) {
this.source = source;
this.state = state;
- if (state.getPropAsBoolean(KafkaSource.GOBBLIN_KAFKA_EXTRACT_ALLOW_TABLE_TYPE_NAMESPACE_CUSTOMIZATION)) {
- String tableTypeStr = state.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY,
- KafkaSource.DEFAULT_TABLE_TYPE.toString());
- tableType = Extract.TableType.valueOf(tableTypeStr);
- extractNameSpace = state.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, KafkaSource.DEFAULT_NAMESPACE_NAME);
- } else {
- // To be compatible, reject table type and namespace configuration keys as previous implementation
- tableType = KafkaSource.DEFAULT_TABLE_TYPE;
- extractNameSpace = KafkaSource.DEFAULT_NAMESPACE_NAME;
- }
- isFullExtract = state.getPropAsBoolean(ConfigurationKeys.EXTRACT_IS_FULL_KEY);
this.sizeEstimator = getWorkUnitSizeEstimator();
}
@@ -227,13 +213,18 @@ public abstract class KafkaWorkUnitPacker {
List<KafkaPartition> partitions = getPartitionsFromMultiWorkUnit(multiWorkUnit);
Preconditions.checkArgument(!partitions.isEmpty(), "There must be at least one partition in the multiWorkUnit");
- Extract extract = this.source.createExtract(tableType, extractNameSpace, partitions.get(0).getTopicName());
- if (isFullExtract) {
- extract.setProp(ConfigurationKeys.EXTRACT_IS_FULL_KEY, true);
- }
- WorkUnit workUnit = WorkUnit.create(extract, interval);
+ // Squeeze all partitions from the multiWorkUnit into of one the work units, which can be any one
+ WorkUnit workUnit = multiWorkUnit.getWorkUnits().get(0);
+ // Update interval
+ workUnit.removeProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY);
+ workUnit.removeProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY);
+ workUnit.setWatermarkInterval(interval);
+ // Remove the original partition information
+ workUnit.removeProp(KafkaSource.PARTITION_ID);
+ workUnit.removeProp(KafkaSource.LEADER_ID);
+ workUnit.removeProp(KafkaSource.LEADER_HOSTANDPORT);
+ // Add combined partitions information
populateMultiPartitionWorkUnit(partitions, workUnit);
- workUnit.setProp(ESTIMATED_WORKUNIT_SIZE, multiWorkUnit.getProp(ESTIMATED_WORKUNIT_SIZE));
LOG.info(String.format("Created MultiWorkUnit for partitions %s", partitions));
return workUnit;
}
@@ -243,9 +234,7 @@ public abstract class KafkaWorkUnitPacker {
*/
private static void populateMultiPartitionWorkUnit(List<KafkaPartition> partitions, WorkUnit workUnit) {
Preconditions.checkArgument(!partitions.isEmpty(), "There should be at least one partition");
- workUnit.setProp(KafkaSource.TOPIC_NAME, partitions.get(0).getTopicName());
GobblinMetrics.addCustomTagToState(workUnit, new Tag<>("kafkaTopic", partitions.get(0).getTopicName()));
- workUnit.setProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, partitions.get(0).getTopicName());
for (int i = 0; i < partitions.size(); i++) {
workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID, i), partitions.get(i).getId());
workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.LEADER_ID, i),
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d2e43542/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index 0a8984b..07d167b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -37,18 +37,18 @@ import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.lineage.LineageException;
import org.apache.gobblin.lineage.LineageInfo;
+import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.publisher.CommitSequencePublisher;
import org.apache.gobblin.publisher.DataPublisher;
-import org.apache.gobblin.publisher.NoopPublisher;
import org.apache.gobblin.publisher.UnpublishedHandling;
import org.apache.gobblin.runtime.commit.DatasetStateCommitStep;
import org.apache.gobblin.runtime.task.TaskFactory;
import org.apache.gobblin.runtime.task.TaskUtils;
import org.apache.gobblin.source.extractor.JobCommitPolicy;
-import lombok.AllArgsConstructor;
import lombok.Data;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -57,7 +57,7 @@ import lombok.extern.slf4j.Slf4j;
* {@link DataPublisher#publish(Collection)}. This class is thread-safe if and only if the implementation of
* {@link DataPublisher} used is also thread-safe.
*/
-@AllArgsConstructor
+@RequiredArgsConstructor
@Slf4j
final class SafeDatasetCommit implements Callable<Void> {
@@ -71,6 +71,8 @@ final class SafeDatasetCommit implements Callable<Void> {
private final boolean isMultithreaded;
private final JobContext jobContext;
+ private MetricContext metricContext;
+
@Override
public Void call()
throws Exception {
@@ -78,6 +80,8 @@ final class SafeDatasetCommit implements Callable<Void> {
log.info(this.datasetUrn + " have been committed.");
return null;
}
+ metricContext = Instrumented.getMetricContext(datasetState, SafeDatasetCommit.class);
+
finalizeDatasetStateBeforeCommit(this.datasetState);
Class<? extends DataPublisher> dataPublisherClass;
try (Closer closer = Closer.create()) {
@@ -159,6 +163,7 @@ final class SafeDatasetCommit implements Callable<Void> {
} finally {
try {
finalizeDatasetState(datasetState, datasetUrn);
+ submitLineageEvent(datasetState.getTaskStates());
if (commitSequenceBuilder.isPresent()) {
buildAndExecuteCommitSequence(commitSequenceBuilder.get(), datasetState, datasetUrn);
datasetState.setState(JobState.RunningState.COMMITTED);
@@ -182,9 +187,8 @@ final class SafeDatasetCommit implements Callable<Void> {
}
TaskState oneWorkUnitState = states.iterator().next();
- if (oneWorkUnitState.contains(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE) && oneWorkUnitState.getProp(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE).equals(
- NoopPublisher.class.getName())) {
- // if no publisher configured, each task should be responsible for sending lineage event.
+ if (!oneWorkUnitState.contains(LineageInfo.LINEAGE_DATASET_URN)) {
+ // Do nothing if the dataset is not configured with lineage info
return;
}
@@ -194,14 +198,18 @@ final class SafeDatasetCommit implements Callable<Void> {
Collection<Collection<State>> datasetStates = LineageInfo.aggregateByDatasetUrn(states).values();
for (Collection<State> dataState: datasetStates) {
Collection<LineageInfo> branchLineages = LineageInfo.load(dataState, LineageInfo.Level.All);
- EventSubmitter submitter = new EventSubmitter.Builder(Instrumented.getMetricContext(datasetState, SafeDatasetCommit.class),
- LineageInfo.LINEAGE_NAME_SPACE).build();
+ EventSubmitter submitter = new EventSubmitter.Builder(metricContext, LineageInfo.LINEAGE_NAME_SPACE).build();
for (LineageInfo info: branchLineages) {
submitter.submit(info.getId(), info.getLineageMetaData());
}
}
} catch (LineageException e) {
log.error ("Lineage event submission failed due to :" + e.toString());
+ } finally {
+ for (TaskState taskState: states) {
+ // Remove lineage info from the state to avoid sending duplicate lineage events in the next run
+ taskState.removePropsWithPrefix(LineageInfo.LINEAGE_NAME_SPACE);
+ }
}
}
@@ -222,7 +230,6 @@ final class SafeDatasetCommit implements Callable<Void> {
try {
publisher.publish(taskStates);
- submitLineageEvent(taskStates);
} catch (Throwable t) {
log.error("Failed to commit dataset", t);
setTaskFailureException(taskStates, t);