You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/10/10 16:19:26 UTC

incubator-gobblin git commit: [GOBBLIN-278] Fix sending lineage event for KafkaSource

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master bd17f1384 -> d2e43542a


[GOBBLIN-278] Fix sending lineage event for KafkaSource

Closes #2131 from zxcware/lineage


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/d2e43542
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/d2e43542
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/d2e43542

Branch: refs/heads/master
Commit: d2e43542ab7ee3e5814755a8b37e86f07f9115ea
Parents: bd17f13
Author: zhchen <zh...@linkedin.com>
Authored: Tue Oct 10 09:19:15 2017 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Oct 10 09:19:15 2017 -0700

----------------------------------------------------------------------
 .../org/apache/gobblin/configuration/State.java | 25 +++++++++++++++
 .../gobblin/configuration/WorkUnitState.java    | 14 +++++++++
 .../org/apache/gobblin/lineage/LineageInfo.java |  2 +-
 .../apache/gobblin/configuration/StateTest.java | 12 +++++++
 .../extractor/extract/kafka/KafkaSource.java    | 13 +++++---
 .../packer/KafkaBiLevelWorkUnitPacker.java      | 19 +++--------
 .../workunit/packer/KafkaWorkUnitPacker.java    | 33 +++++++-------------
 .../gobblin/runtime/SafeDatasetCommit.java      | 25 +++++++++------
 8 files changed, 92 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d2e43542/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java
index 20b6e17..63ced0a 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.configuration;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -500,6 +501,30 @@ public class State implements WritableShim {
   }
 
   /**
+   * Remove all properties with a certain keyPrefix
+   *
+   * @param prefix key prefix
+   */
+  public void removePropsWithPrefix(String prefix) {
+    this.specProperties.entrySet().removeIf(entry -> ((String) entry.getKey()).startsWith(prefix));
+
+    Properties newCommonProperties = null;
+    for (Object key: this.commonProperties.keySet()) {
+      if (((String)key).startsWith(prefix)) {
+        if (newCommonProperties == null) {
+          newCommonProperties = new Properties();
+          newCommonProperties.putAll(this.commonProperties);
+        }
+        newCommonProperties.remove(key);
+      }
+    }
+
+    if (newCommonProperties != null) {
+      this.commonProperties = newCommonProperties;
+    }
+  }
+
+  /**
    * @deprecated Use {@link #getProp(String)}
    */
   @Deprecated

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d2e43542/gobblin-api/src/main/java/org/apache/gobblin/configuration/WorkUnitState.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/WorkUnitState.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/WorkUnitState.java
index af02d01..0b40399 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/WorkUnitState.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/WorkUnitState.java
@@ -367,6 +367,20 @@ public class WorkUnitState extends State {
     return super.contains(key) || this.workUnit.contains(key) || this.jobState.contains(key);
   }
 
+  @Override
+  public void removeProp(String key) {
+    super.removeProp(key);
+    this.workUnit.removeProp(key);
+    this.jobState.removeProp(key);
+  }
+
+  @Override
+  public void removePropsWithPrefix(String prefix) {
+    super.removePropsWithPrefix(prefix);
+    this.workUnit.removePropsWithPrefix(prefix);
+    this.jobState.removePropsWithPrefix(prefix);
+  }
+
   /**
    * Get the {@link org.apache.gobblin.source.workunit.Extract} associated with the {@link WorkUnit}.
    *

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d2e43542/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java b/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java
index 90473d4..7af71df 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java
@@ -49,10 +49,10 @@ import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 public class LineageInfo {
-  public static final String LINEAGE_DATASET_URN = "lineage.dataset.urn";
   public static final String LINEAGE_NAME_SPACE = "gobblin.lineage";
   public static final String BRANCH_ID_METADATA_KEY = "branchId";
   private static final String DATASET_PREFIX =  LINEAGE_NAME_SPACE + ".";
+  public static final String LINEAGE_DATASET_URN = DATASET_PREFIX + "dataset.urn";
   private static final String BRANCH_PREFIX = DATASET_PREFIX + "branch.";
 
   @Getter

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d2e43542/gobblin-api/src/test/java/org/apache/gobblin/configuration/StateTest.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/test/java/org/apache/gobblin/configuration/StateTest.java b/gobblin-api/src/test/java/org/apache/gobblin/configuration/StateTest.java
index bcfa9a7..35cd2eb 100644
--- a/gobblin-api/src/test/java/org/apache/gobblin/configuration/StateTest.java
+++ b/gobblin-api/src/test/java/org/apache/gobblin/configuration/StateTest.java
@@ -81,4 +81,16 @@ public class StateTest {
       Assert.fail("Concurrency test failed with first exception: " + ExceptionUtils.getFullStackTrace(this.exceptions.poll()));
     }
   }
+
+  @Test
+  public void testRemovePropsWithPrefix() {
+    final State state = new State();
+    final String prefix = "prefix";
+    for (int i = 0; i < 10; i++) {
+      state.setProp("prefix." + i, i);
+    }
+    Assert.assertTrue(state.getPropertyNames().size() == 10);
+    state.removePropsWithPrefix(prefix);
+    Assert.assertTrue(state.getPropertyNames().size() == 0);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d2e43542/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 56f81e1..e1494f7 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
+import org.apache.gobblin.lineage.LineageInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -132,6 +133,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
   private Extract.TableType tableType;
   private String extractNamespace;
   private boolean isFullExtract;
+  private String kafkaBrokers;
   private boolean shouldEnableDatasetStateStore;
   private AtomicBoolean isDatasetStateEnabled = new AtomicBoolean(false);
   private Set<String> topicsToProcess;
@@ -172,7 +174,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
       extractNamespace = KafkaSource.DEFAULT_NAMESPACE_NAME;
     }
     isFullExtract = state.getPropAsBoolean(ConfigurationKeys.EXTRACT_IS_FULL_KEY);
-
+    kafkaBrokers = state.getProp(ConfigurationKeys.KAFKA_BROKERS, "");
     this.shouldEnableDatasetStateStore = state.getPropAsBoolean(GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE,
         DEFAULT_GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE);
 
@@ -538,10 +540,6 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
     }
 
     WorkUnit workUnit = WorkUnit.create(extract);
-    if (topicSpecificState.isPresent()) {
-      workUnit.addAll(topicSpecificState.get());
-    }
-
     workUnit.setProp(TOPIC_NAME, partition.getTopicName());
     addDatasetUrnOptionally(workUnit);
     workUnit.setProp(PARTITION_ID, partition.getId());
@@ -549,6 +547,11 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
     workUnit.setProp(LEADER_HOSTANDPORT, partition.getLeader().getHostAndPort().toString());
     workUnit.setProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY, offsets.getStartOffset());
     workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, offsets.getLatestOffset());
+
+    // Add lineage info
+    workUnit.setProp(LineageInfo.LINEAGE_DATASET_URN, partition.getTopicName());
+    LineageInfo.setDatasetLineageAttribute(workUnit, ConfigurationKeys.KAFKA_BROKERS, kafkaBrokers);
+
     LOG.info(String.format("Created workunit for partition %s: lowWatermark=%d, highWatermark=%d, range=%d", partition,
         offsets.getStartOffset(), offsets.getLatestOffset(), offsets.getLatestOffset() - offsets.getStartOffset()));
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d2e43542/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
index 7971fa5..9c134b8 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
@@ -24,10 +24,8 @@ import java.util.PriorityQueue;
 
 import com.google.common.collect.Lists;
 
-import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.lineage.LineageInfo;
 import org.apache.gobblin.source.extractor.extract.AbstractSource;
 import org.apache.gobblin.source.workunit.MultiWorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnit;
@@ -70,27 +68,21 @@ public class KafkaBiLevelWorkUnitPacker extends KafkaWorkUnitPacker {
     double avgGroupSize = totalEstDataSize / numContainers / getPreGroupingSizeFactor(this.state);
 
     List<MultiWorkUnit> mwuGroups = Lists.newArrayList();
-    for (Map.Entry<String, List<WorkUnit>> entry : workUnitsByTopic.entrySet()) {
-      double estimatedDataSizeForTopic = calcTotalEstSizeForTopic(entry.getValue());
+    for (List<WorkUnit> workUnitsForTopic : workUnitsByTopic.values()) {
+      double estimatedDataSizeForTopic = calcTotalEstSizeForTopic(workUnitsForTopic);
       if (estimatedDataSizeForTopic < avgGroupSize) {
 
         // If the total estimated size of a topic is smaller than group size, put all partitions of this
         // topic in a single group.
         MultiWorkUnit mwuGroup = MultiWorkUnit.createEmpty();
-        mwuGroup.setProp(LineageInfo.LINEAGE_DATASET_URN, entry.getKey());
-        addWorkUnitsToMultiWorkUnit(entry.getValue(), mwuGroup);
+        addWorkUnitsToMultiWorkUnit(workUnitsForTopic, mwuGroup);
         mwuGroups.add(mwuGroup);
       } else {
         // Use best-fit-decreasing to group workunits for a topic into multiple groups.
-        mwuGroups.addAll(bestFitDecreasingBinPacking(entry.getKey(), entry.getValue(), avgGroupSize));
+        mwuGroups.addAll(bestFitDecreasingBinPacking(workUnitsForTopic, avgGroupSize));
       }
     }
 
-    // Add common lineage information
-    for (MultiWorkUnit multiWorkUnit: mwuGroups) {
-      LineageInfo.setDatasetLineageAttribute(multiWorkUnit, ConfigurationKeys.KAFKA_BROKERS, this.state.getProp(ConfigurationKeys.KAFKA_BROKERS, ""));
-    }
-
     List<WorkUnit> groups = squeezeMultiWorkUnits(mwuGroups);
     return worstFitDecreasingBinPacking(groups, numContainers);
   }
@@ -111,7 +103,7 @@ public class KafkaBiLevelWorkUnitPacker extends KafkaWorkUnitPacker {
    * Group {@link WorkUnit}s into groups. Each group is a {@link MultiWorkUnit}. Each group has a capacity of
    * avgGroupSize. If there's a single {@link WorkUnit} whose size is larger than avgGroupSize, it forms a group itself.
    */
-  private static List<MultiWorkUnit> bestFitDecreasingBinPacking(String topic, List<WorkUnit> workUnits, double avgGroupSize) {
+  private static List<MultiWorkUnit> bestFitDecreasingBinPacking(List<WorkUnit> workUnits, double avgGroupSize) {
 
     // Sort workunits by data size desc
     Collections.sort(workUnits, LOAD_DESC_COMPARATOR);
@@ -123,7 +115,6 @@ public class KafkaBiLevelWorkUnitPacker extends KafkaWorkUnitPacker {
         addWorkUnitToMultiWorkUnit(workUnit, bestGroup);
       } else {
         bestGroup = MultiWorkUnit.createEmpty();
-        bestGroup.setProp(LineageInfo.LINEAGE_DATASET_URN, topic);
         addWorkUnitToMultiWorkUnit(workUnit, bestGroup);
       }
       pQueue.add(bestGroup);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d2e43542/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
index 38d050d..8d03f4f 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
@@ -81,25 +81,11 @@ public abstract class KafkaWorkUnitPacker {
 
   protected final AbstractSource<?, ?> source;
   protected final SourceState state;
-  protected final Extract.TableType tableType;
-  protected final String extractNameSpace;
-  protected final boolean isFullExtract;
   protected final KafkaWorkUnitSizeEstimator sizeEstimator;
 
   protected KafkaWorkUnitPacker(AbstractSource<?, ?> source, SourceState state) {
     this.source = source;
     this.state = state;
-    if (state.getPropAsBoolean(KafkaSource.GOBBLIN_KAFKA_EXTRACT_ALLOW_TABLE_TYPE_NAMESPACE_CUSTOMIZATION)) {
-      String tableTypeStr = state.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY,
-          KafkaSource.DEFAULT_TABLE_TYPE.toString());
-      tableType = Extract.TableType.valueOf(tableTypeStr);
-      extractNameSpace = state.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, KafkaSource.DEFAULT_NAMESPACE_NAME);
-    } else {
-      // To be compatible, reject table type and namespace configuration keys as previous implementation
-      tableType = KafkaSource.DEFAULT_TABLE_TYPE;
-      extractNameSpace = KafkaSource.DEFAULT_NAMESPACE_NAME;
-    }
-    isFullExtract = state.getPropAsBoolean(ConfigurationKeys.EXTRACT_IS_FULL_KEY);
     this.sizeEstimator = getWorkUnitSizeEstimator();
   }
 
@@ -227,13 +213,18 @@ public abstract class KafkaWorkUnitPacker {
     List<KafkaPartition> partitions = getPartitionsFromMultiWorkUnit(multiWorkUnit);
     Preconditions.checkArgument(!partitions.isEmpty(), "There must be at least one partition in the multiWorkUnit");
 
-    Extract extract = this.source.createExtract(tableType, extractNameSpace, partitions.get(0).getTopicName());
-    if (isFullExtract) {
-      extract.setProp(ConfigurationKeys.EXTRACT_IS_FULL_KEY, true);
-    }
-    WorkUnit workUnit = WorkUnit.create(extract, interval);
+    // Squeeze all partitions from the multiWorkUnit into of one the work units, which can be any one
+    WorkUnit workUnit = multiWorkUnit.getWorkUnits().get(0);
+    // Update interval
+    workUnit.removeProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY);
+    workUnit.removeProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY);
+    workUnit.setWatermarkInterval(interval);
+    // Remove the original partition information
+    workUnit.removeProp(KafkaSource.PARTITION_ID);
+    workUnit.removeProp(KafkaSource.LEADER_ID);
+    workUnit.removeProp(KafkaSource.LEADER_HOSTANDPORT);
+    // Add combined partitions information
     populateMultiPartitionWorkUnit(partitions, workUnit);
-    workUnit.setProp(ESTIMATED_WORKUNIT_SIZE, multiWorkUnit.getProp(ESTIMATED_WORKUNIT_SIZE));
     LOG.info(String.format("Created MultiWorkUnit for partitions %s", partitions));
     return workUnit;
   }
@@ -243,9 +234,7 @@ public abstract class KafkaWorkUnitPacker {
    */
   private static void populateMultiPartitionWorkUnit(List<KafkaPartition> partitions, WorkUnit workUnit) {
     Preconditions.checkArgument(!partitions.isEmpty(), "There should be at least one partition");
-    workUnit.setProp(KafkaSource.TOPIC_NAME, partitions.get(0).getTopicName());
     GobblinMetrics.addCustomTagToState(workUnit, new Tag<>("kafkaTopic", partitions.get(0).getTopicName()));
-    workUnit.setProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, partitions.get(0).getTopicName());
     for (int i = 0; i < partitions.size(); i++) {
       workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID, i), partitions.get(i).getId());
       workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.LEADER_ID, i),

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d2e43542/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index 0a8984b..07d167b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -37,18 +37,18 @@ import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.lineage.LineageException;
 import org.apache.gobblin.lineage.LineageInfo;
+import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.publisher.CommitSequencePublisher;
 import org.apache.gobblin.publisher.DataPublisher;
-import org.apache.gobblin.publisher.NoopPublisher;
 import org.apache.gobblin.publisher.UnpublishedHandling;
 import org.apache.gobblin.runtime.commit.DatasetStateCommitStep;
 import org.apache.gobblin.runtime.task.TaskFactory;
 import org.apache.gobblin.runtime.task.TaskUtils;
 import org.apache.gobblin.source.extractor.JobCommitPolicy;
 
-import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
 
@@ -57,7 +57,7 @@ import lombok.extern.slf4j.Slf4j;
  * {@link DataPublisher#publish(Collection)}. This class is thread-safe if and only if the implementation of
  * {@link DataPublisher} used is also thread-safe.
  */
-@AllArgsConstructor
+@RequiredArgsConstructor
 @Slf4j
 final class SafeDatasetCommit implements Callable<Void> {
 
@@ -71,6 +71,8 @@ final class SafeDatasetCommit implements Callable<Void> {
   private final boolean isMultithreaded;
   private final JobContext jobContext;
 
+  private MetricContext metricContext;
+
   @Override
   public Void call()
       throws Exception {
@@ -78,6 +80,8 @@ final class SafeDatasetCommit implements Callable<Void> {
       log.info(this.datasetUrn + " have been committed.");
       return null;
     }
+    metricContext = Instrumented.getMetricContext(datasetState, SafeDatasetCommit.class);
+
     finalizeDatasetStateBeforeCommit(this.datasetState);
     Class<? extends DataPublisher> dataPublisherClass;
     try (Closer closer = Closer.create()) {
@@ -159,6 +163,7 @@ final class SafeDatasetCommit implements Callable<Void> {
     } finally {
       try {
         finalizeDatasetState(datasetState, datasetUrn);
+        submitLineageEvent(datasetState.getTaskStates());
         if (commitSequenceBuilder.isPresent()) {
           buildAndExecuteCommitSequence(commitSequenceBuilder.get(), datasetState, datasetUrn);
           datasetState.setState(JobState.RunningState.COMMITTED);
@@ -182,9 +187,8 @@ final class SafeDatasetCommit implements Callable<Void> {
     }
 
     TaskState oneWorkUnitState = states.iterator().next();
-    if (oneWorkUnitState.contains(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE) && oneWorkUnitState.getProp(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE).equals(
-        NoopPublisher.class.getName())) {
-      // if no publisher configured, each task should be responsible for sending lineage event.
+    if (!oneWorkUnitState.contains(LineageInfo.LINEAGE_DATASET_URN)) {
+      // Do nothing if the dataset is not configured with lineage info
       return;
     }
 
@@ -194,14 +198,18 @@ final class SafeDatasetCommit implements Callable<Void> {
       Collection<Collection<State>> datasetStates = LineageInfo.aggregateByDatasetUrn(states).values();
       for (Collection<State> dataState: datasetStates) {
         Collection<LineageInfo> branchLineages = LineageInfo.load(dataState, LineageInfo.Level.All);
-        EventSubmitter submitter = new EventSubmitter.Builder(Instrumented.getMetricContext(datasetState, SafeDatasetCommit.class),
-            LineageInfo.LINEAGE_NAME_SPACE).build();
+        EventSubmitter submitter = new EventSubmitter.Builder(metricContext, LineageInfo.LINEAGE_NAME_SPACE).build();
         for (LineageInfo info: branchLineages) {
           submitter.submit(info.getId(), info.getLineageMetaData());
         }
       }
     } catch (LineageException e) {
       log.error ("Lineage event submission failed due to :" + e.toString());
+    } finally {
+      for (TaskState taskState: states) {
+        // Remove lineage info from the state to avoid sending duplicate lineage events in the next run
+        taskState.removePropsWithPrefix(LineageInfo.LINEAGE_NAME_SPACE);
+      }
     }
   }
 
@@ -222,7 +230,6 @@ final class SafeDatasetCommit implements Callable<Void> {
 
     try {
       publisher.publish(taskStates);
-      submitLineageEvent(taskStates);
     } catch (Throwable t) {
       log.error("Failed to commit dataset", t);
       setTaskFailureException(taskStates, t);