You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/11/13 22:15:57 UTC

[1/2] incubator-gobblin git commit: [GOBBLIN-307] Implement lineage event as LineageEventBuilder in gobblin

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master a34a81a42 -> 3e229db98


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
new file mode 100644
index 0000000..4e711b9
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.event.lineage;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+
+
+public class LineageEventTest {
+  @Test
+  public void testEvent() {
+    final String topic = "testTopic";
+    State state0 = new State();
+    DatasetDescriptor source = new DatasetDescriptor("kafka", topic);
+    LineageInfo.setSource(source, state0);
+    DatasetDescriptor destination0 = new DatasetDescriptor("hdfs", "/data/dbchanges");
+    LineageInfo.putDestination(destination0, 0, state0);
+    DatasetDescriptor destination1 = new DatasetDescriptor("mysql", "kafka.testTopic");
+    LineageInfo.putDestination(destination1, 1, state0);
+
+    Map<String, LineageEventBuilder> events = LineageInfo.load(state0);
+    verify(events.get("0"), topic, source, destination0, 0);
+    verify(events.get("1"), topic, source, destination1, 1);
+
+    State state1 = new State();
+    LineageInfo.setSource(source, state1);
+    List<State> states = Lists.newArrayList();
+    states.add(state0);
+    states.add(state1);
+
+    // Test only full fledged lineage events are loaded
+    try {
+      Collection<LineageEventBuilder> eventsList = LineageInfo.load(states);
+      Assert.assertTrue(eventsList.size() == 2);
+      Assert.assertEquals(getLineageEvent(eventsList, 0), events.get("0"));
+      Assert.assertEquals(getLineageEvent(eventsList, 1), events.get("1"));
+    } catch (LineageException e) {
+      Assert.fail("Unexpected exception");
+    }
+
+    // There are 3 full fledged lineage events
+    DatasetDescriptor destination2 = new DatasetDescriptor("mysql", "kafka.testTopic2");
+    LineageInfo.putDestination(destination2, 2, state1);
+    try {
+      Collection<LineageEventBuilder> eventsList = LineageInfo.load(states);
+      Assert.assertTrue(eventsList.size() == 3);
+      Assert.assertEquals(getLineageEvent(eventsList, 0), events.get("0"));
+      Assert.assertEquals(getLineageEvent(eventsList, 1), events.get("1"));
+      verify(getLineageEvent(eventsList, 2), topic, source, destination2, 2);
+    } catch (LineageException e) {
+      Assert.fail("Unexpected exception");
+    }
+
+    // Throw conflict exception when there is a conflict on a branch between 2 states
+    LineageInfo.putDestination(destination2, 0, state1);
+    boolean hasLineageException = false;
+    try {
+      Collection<LineageEventBuilder> eventsList = LineageInfo.load(states);
+    } catch (LineageException e) {
+      Assert.assertTrue(e instanceof LineageException.ConflictException);
+      hasLineageException = true;
+    }
+    Assert.assertTrue(hasLineageException);
+  }
+
+  private LineageEventBuilder getLineageEvent(Collection<LineageEventBuilder> events, int branchId) {
+    for (LineageEventBuilder event : events) {
+      if (event.getDestination().getMetadata().get(LineageInfo.BRANCH).equals(String.valueOf(branchId))) {
+        return event;
+      }
+    }
+    return null;
+  }
+
+  private void verify(LineageEventBuilder event, String name, DatasetDescriptor source, DatasetDescriptor destination, int branchId) {
+    Assert.assertEquals(event.getName(), name);
+    Assert.assertEquals(event.getNamespace(), LineageEventBuilder.LIENAGE_EVENT_NAMESPACE);
+    Assert.assertEquals(event.getMetadata().get(GobblinEventBuilder.EVENT_TYPE), LineageEventBuilder.LINEAGE_EVENT_TYPE);
+    Assert.assertTrue(event.getSource().equals(source));
+
+    DatasetDescriptor updatedDestination = new DatasetDescriptor(destination);
+    updatedDestination.addMetadata(LineageInfo.BRANCH, String.valueOf(branchId));
+    Assert.assertTrue(event.getDestination().equals(updatedDestination));
+
+    // It only has eventType info
+    Assert.assertTrue(event.getMetadata().size() == 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 45223fb..8cd25c2 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -30,7 +30,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
-import org.apache.gobblin.lineage.LineageInfo;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,6 +55,7 @@ import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
 import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
 import org.apache.gobblin.source.extractor.extract.EventBasedSource;
 import org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker;
 import org.apache.gobblin.source.extractor.limiter.LimiterConfigurationKeys;
@@ -549,8 +552,9 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
     workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, offsets.getLatestOffset());
 
     // Add lineage info
-    workUnit.setProp(LineageInfo.LINEAGE_DATASET_URN, partition.getTopicName());
-    LineageInfo.setDatasetLineageAttribute(workUnit, ConfigurationKeys.KAFKA_BROKERS, kafkaBrokers);
+    DatasetDescriptor source = new DatasetDescriptor(DatasetConstants.PLATFORM_KAFKA, partition.getTopicName());
+    source.addMetadata(DatasetConstants.BROKERS, kafkaBrokers);
+    LineageInfo.setSource(source, workUnit);
 
     LOG.info(String.format("Created workunit for partition %s: lowWatermark=%d, highWatermark=%d, range=%d", partition,
         offsets.getStartOffset(), offsets.getLatestOffset(), offsets.getLatestOffset() - offsets.getStartOffset()));

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
index 57fdedd..e2292c7 100644
--- a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
+++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
@@ -19,12 +19,13 @@ package org.apache.gobblin.source.extractor.extract.jdbc;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
-import org.apache.gobblin.lineage.LineageInfo;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
 import org.apache.gobblin.source.extractor.Extractor;
 import org.apache.gobblin.source.extractor.exception.ExtractPrepareException;
 import java.io.IOException;
 
-import org.apache.gobblin.source.workunit.WorkUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,6 +34,7 @@ import com.google.gson.JsonElement;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.source.extractor.extract.QueryBasedSource;
 import org.apache.gobblin.source.jdbc.MysqlExtractor;
+import org.apache.gobblin.source.workunit.WorkUnit;
 
 
 /**
@@ -55,12 +57,14 @@ public class MysqlSource extends QueryBasedSource<JsonArray, JsonElement> {
     return extractor;
   }
 
-  protected void addLineageSourceInfo (SourceState sourceState, SourceEntity entity, WorkUnit workUnit) {
-    super.addLineageSourceInfo(sourceState, entity, workUnit);
+  protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity, WorkUnit workUnit) {
     String host = sourceState.getProp(ConfigurationKeys.SOURCE_CONN_HOST_NAME);
     String port = sourceState.getProp(ConfigurationKeys.SOURCE_CONN_PORT);
     String database = sourceState.getProp(ConfigurationKeys.SOURCE_QUERYBASED_SCHEMA);
     String connectionUrl = "jdbc:mysql://" + host.trim() + ":" + port + "/" + database.trim();
-    LineageInfo.setDatasetLineageAttribute(workUnit, "connectionUrl", connectionUrl);
+    DatasetDescriptor source =
+        new DatasetDescriptor(DatasetConstants.PLATFORM_MYSQL, database + "." + entity.getSourceEntityName());
+    source.addMetadata(DatasetConstants.CONNECTION_URL, connectionUrl);
+    LineageInfo.setSource(source, workUnit);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index d6a1b58..7ff9bb1 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -22,24 +22,27 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.Callable;
 
+import org.apache.commons.lang.StringUtils;
+
 import com.google.common.base.Optional;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.io.Closer;
 
 import org.apache.gobblin.commit.CommitSequence;
 import org.apache.gobblin.commit.CommitStep;
 import org.apache.gobblin.commit.DeliverySemantics;
 import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.lineage.LineageException;
-import org.apache.gobblin.lineage.LineageInfo;
+import org.apache.gobblin.metrics.event.lineage.LineageException;
+import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder;
 import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.FailureEventBuilder;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
 import org.apache.gobblin.publisher.CommitSequencePublisher;
 import org.apache.gobblin.publisher.DataPublisher;
 import org.apache.gobblin.publisher.UnpublishedHandling;
@@ -170,8 +173,7 @@ final class SafeDatasetCommit implements Callable<Void> {
       try {
         finalizeDatasetState(datasetState, datasetUrn);
         maySubmitFailureEvent(datasetState);
-        submitLineageEvent(datasetState.getTaskStates());
-
+        maySubmitLineageEvent(datasetState);
         if (commitSequenceBuilder.isPresent()) {
           buildAndExecuteCommitSequence(commitSequenceBuilder.get(), datasetState, datasetUrn);
           datasetState.setState(JobState.RunningState.COMMITTED);
@@ -197,38 +199,45 @@ final class SafeDatasetCommit implements Callable<Void> {
     }
   }
 
-  private void submitLineageEvent(Collection<TaskState> states) {
-    if (states.size() == 0) {
-      return;
+  private void maySubmitLineageEvent(JobState.DatasetState datasetState) {
+    Collection<TaskState> allStates = datasetState.getTaskStates();
+    Collection<TaskState> states = Lists.newArrayList();
+    // Filter out failed states or states that don't have lineage info
+    for (TaskState state : allStates) {
+      if (state.getWorkingState() == WorkUnitState.WorkingState.COMMITTED &&
+          LineageInfo.hasLineageInfo(state)) {
+        states.add(state);
+      }
     }
-
-    TaskState oneWorkUnitState = states.iterator().next();
-    if (!oneWorkUnitState.contains(LineageInfo.LINEAGE_DATASET_URN)) {
-      // Do nothing if the dataset is not configured with lineage info
+    if (states.size() == 0) {
       return;
     }
 
     try {
-      // Aggregate states by lineage.dataset.urn, in case datasetUrn may be set to empty so that all task states falls into one empty dataset.
-      // FixMe: once all dataset.urn attribues are set properly, we don't need this aggregation.
-      Collection<Collection<State>> datasetStates = LineageInfo.aggregateByDatasetUrn(states).values();
-      for (Collection<State> dataState: datasetStates) {
-        Collection<LineageInfo> branchLineages = LineageInfo.load(dataState, LineageInfo.Level.All);
-        EventSubmitter submitter = new EventSubmitter.Builder(metricContext, LineageInfo.LINEAGE_NAME_SPACE).build();
-        for (LineageInfo info: branchLineages) {
-          submitter.submit(info.getId(), info.getLineageMetaData());
+      if (StringUtils.isEmpty(datasetUrn)) {
+        // This dataset may contain different kinds of LineageEvent
+        for (Collection<TaskState> collection : aggregateByLineageEvent(states)) {
+          submitLineageEvent(collection);
         }
+      } else {
+        submitLineageEvent(states);
       }
     } catch (LineageException e) {
-      log.error ("Lineage event submission failed due to :" + e.toString());
+      log.error("Lineage event submission failed due to :" + e.toString());
     } finally {
-      for (TaskState taskState: states) {
-        // Remove lineage info from the state to avoid sending duplicate lineage events in the next run
-        taskState.removePropsWithPrefix(LineageInfo.LINEAGE_NAME_SPACE);
+      // Purge lineage info from all states
+      for (TaskState taskState : allStates) {
+        LineageInfo.purgeLineageInfo(taskState);
       }
     }
   }
 
+  private void submitLineageEvent(Collection<TaskState> states) throws LineageException {
+    Collection<LineageEventBuilder> events = LineageInfo.load(states);
+    // Send events
+    events.forEach(event -> event.submit(metricContext));
+  }
+
   /**
    * Synchronized version of {@link #commitDataset(Collection, DataPublisher)} used when publisher is not
    * thread safe.
@@ -415,4 +424,15 @@ final class SafeDatasetCommit implements Callable<Void> {
     return Optional.of(new DatasetStateCommitStep.Builder<>().withProps(datasetState).withDatasetUrn(datasetUrn)
         .withDatasetState(datasetState).build());
   }
+
+  private static Collection<Collection<TaskState>> aggregateByLineageEvent(Collection<TaskState> states) {
+    Map<String, Collection<TaskState>> statesByEvents = Maps.newHashMap();
+    for (TaskState state : states) {
+      String eventName = LineageInfo.getFullEventName(state);
+      Collection<TaskState> statesForEvent = statesByEvents.computeIfAbsent(eventName, k -> Lists.newArrayList());
+      statesForEvent.add(state);
+    }
+
+    return statesByEvents.values();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-runtime/src/test/resources/templates/textFileBasedSourceTest.template
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/resources/templates/textFileBasedSourceTest.template b/gobblin-runtime/src/test/resources/templates/textFileBasedSourceTest.template
index 4cdf991..7380257 100644
--- a/gobblin-runtime/src/test/resources/templates/textFileBasedSourceTest.template
+++ b/gobblin-runtime/src/test/resources/templates/textFileBasedSourceTest.template
@@ -2,3 +2,5 @@ source.class=org.apache.gobblin.source.extractor.filebased.TextFileBasedSource
 writer.builder.class="org.apache.gobblin.writer.test.GobblinTestEventBusWriter$Builder"
 
 extract.table.type=APPEND_ONLY
+
+data.publisher.final.dir=/tmp

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-test-harness/src/test/resources/runtime_test/skip_workunits_test.properties
----------------------------------------------------------------------
diff --git a/gobblin-test-harness/src/test/resources/runtime_test/skip_workunits_test.properties b/gobblin-test-harness/src/test/resources/runtime_test/skip_workunits_test.properties
index 83a97de..b5f8fef 100644
--- a/gobblin-test-harness/src/test/resources/runtime_test/skip_workunits_test.properties
+++ b/gobblin-test-harness/src/test/resources/runtime_test/skip_workunits_test.properties
@@ -24,6 +24,6 @@ job.lock.enabled=false
 state.store.dir=./gobblin-test-harness/src/test/resources/runtime_test/state_store
 writer.staging.dir=./gobblin-test-harness/src/test/resources/runtime_test/writer_staging
 writer.output.dir=./gobblin-test-harness/src/test/resources/runtime_test/writer_output
-
+data.publisher.final.dir=/tmp
 
 source.class=org.apache.gobblin.TestSkipWorkUnitsSource


[2/2] incubator-gobblin git commit: [GOBBLIN-307] Implement lineage event as LineageEventBuilder in gobblin

Posted by hu...@apache.org.
[GOBBLIN-307] Implement lineage event as LineageEventBuilder in gobblin

Closes #2161 from zxcware/lineage


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/3e229db9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/3e229db9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/3e229db9

Branch: refs/heads/master
Commit: 3e229db9810de8410e0b8fcaf680fcb9f80b5db2
Parents: a34a81a
Author: zhchen <zh...@linkedin.com>
Authored: Mon Nov 13 14:15:51 2017 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Nov 13 14:15:51 2017 -0800

----------------------------------------------------------------------
 .../gobblin/dataset/DatasetConstants.java       |  34 +++
 .../gobblin/dataset/DatasetDescriptor.java      | 114 +++++++++
 .../gobblin/lineage/LineageException.java       |  39 ---
 .../org/apache/gobblin/lineage/LineageInfo.java | 246 -------------------
 .../gobblin/publisher/BaseDataPublisher.java    |  23 +-
 .../publisher/TimePartitionedDataPublisher.java |  28 ++-
 .../extractor/extract/QueryBasedSource.java     |   9 +-
 .../apache/gobblin/lineage/LineageInfoTest.java | 160 ------------
 .../data/management/copy/CopySource.java        |  10 +-
 .../management/copy/CopyableDatasetBase.java    |   7 +
 .../data/management/copy/CopyableFile.java      |   4 +
 .../copy/RecursiveCopyableDataset.java          |  15 +-
 .../copy/hive/HiveCopyEntityHelper.java         |   8 +
 .../data/management/copy/hive/HiveDataset.java  |   8 +-
 .../copy/hive/HivePartitionFileSet.java         |   7 +-
 .../copy/hive/UnpartitionedTableFileSet.java    |   5 +-
 .../copy/publisher/CopyDataPublisher.java       |   3 +-
 .../dataset/ConvertibleHiveDatasetTest.java     |  18 +-
 .../gobblin/metrics/event/EventSubmitter.java   |   3 +
 .../metrics/event/FailureEventBuilder.java      |  58 +----
 .../metrics/event/GobblinEventBuilder.java      |  86 +++++++
 .../event/lineage/LineageEventBuilder.java      | 147 +++++++++++
 .../metrics/event/lineage/LineageException.java |  32 +++
 .../metrics/event/lineage/LineageInfo.java      | 207 ++++++++++++++++
 .../metrics/event/lineage/LineageEventTest.java | 113 +++++++++
 .../extractor/extract/kafka/KafkaSource.java    |  10 +-
 .../extractor/extract/jdbc/MysqlSource.java     |  14 +-
 .../gobblin/runtime/SafeDatasetCommit.java      |  70 ++++--
 .../templates/textFileBasedSourceTest.template  |   2 +
 .../runtime_test/skip_workunits_test.properties |   2 +-
 30 files changed, 918 insertions(+), 564 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java
new file mode 100644
index 0000000..73999dc
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.dataset;
+
+public class DatasetConstants {
+  /** Platforms */
+  public static final String PLATFORM_KAFKA = "kafka";
+  public static final String PLATFORM_HIVE = "hive";
+  public static final String PLATFORM_MYSQL = "mysql";
+
+  /** File system metadata */
+  public static final String FS_URI = "fsUri";
+
+  /** Kafka metadata */
+  public static final String BROKERS = "brokers";
+
+  /** JDBC metadata */
+  public static final String CONNECTION_URL = "connectionUrl";
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetDescriptor.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetDescriptor.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetDescriptor.java
new file mode 100644
index 0000000..5b41862
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetDescriptor.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.dataset;
+
+
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+
+/**
+ * A {@link DatasetDescriptor} identifies and provides metadata to describe a dataset
+ */
+@RequiredArgsConstructor
+public final class DatasetDescriptor {
+  private static final String PLATFORM_KEY = "platform";
+  private static final String NAME_KEY = "name";
+
+  /**
+   * which platform the dataset is stored, for example: local, hdfs, oracle, mysql, kafka
+   */
+  @Getter
+  private final String platform;
+  /**
+   * name of the dataset
+   */
+  @Getter
+  private final String name;
+
+  /**
+   * metadata about the dataset
+   */
+  private final Map<String, String> metadata = Maps.newHashMap();
+
+  public DatasetDescriptor(DatasetDescriptor copy) {
+    platform = copy.getPlatform();
+    name = copy.getName();
+    metadata.putAll(copy.getMetadata());
+  }
+
+  public ImmutableMap<String, String> getMetadata() {
+    return ImmutableMap.<String, String>builder()
+        .putAll(metadata)
+        .build();
+  }
+
+  public void addMetadata(String key, String value) {
+    metadata.put(key, value);
+  }
+
+  /**
+   * Serialize to a string map
+   */
+  public Map<String, String> toDataMap() {
+    Map<String, String> map = Maps.newHashMap();
+    map.put(PLATFORM_KEY, platform);
+    map.put(NAME_KEY, name);
+    map.putAll(metadata);
+    return map;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    DatasetDescriptor that = (DatasetDescriptor) o;
+    return platform.equals(that.platform) && name.equals(that.name) && metadata.equals(that.metadata);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = platform.hashCode();
+    result = 31 * result + name.hashCode();
+    result = 31 * result + metadata.hashCode();
+    return result;
+  }
+
+  /**
+   * Deserialize a {@link DatasetDescriptor} from a string map
+   */
+  public static DatasetDescriptor fromDataMap(Map<String, String> dataMap) {
+    DatasetDescriptor descriptor = new DatasetDescriptor(dataMap.get(PLATFORM_KEY), dataMap.get(NAME_KEY));
+    dataMap.forEach((key, value) -> {
+      if (!key.equals(PLATFORM_KEY) && !key.equals(NAME_KEY)) {
+        descriptor.addMetadata(key, value);
+      }
+    });
+    return descriptor;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageException.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageException.java b/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageException.java
deleted file mode 100644
index 8dcf592..0000000
--- a/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageException.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.gobblin.lineage;
-
-/**
- * A set of exceptions used by {@link LineageInfo} when lineage information is serialized or deserialized.
- */
-public class LineageException extends Exception {
-  public LineageException(String message) {
-    super(message);
-  }
-  public static class LineageConflictAttributeException extends LineageException {
-    public LineageConflictAttributeException (String key, String oldValue, String newValue) {
-      super ("Lineage has conflict value: key=" + key + " value=[1]" + oldValue + " [2]" + newValue);
-    }
-  }
-
-  public static class LineageUnsupportedLevelException extends LineageException {
-    public LineageUnsupportedLevelException (LineageInfo.Level level) {
-      super (level.toString() + " is not supported");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java b/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java
deleted file mode 100644
index 7af71df..0000000
--- a/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.lineage;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
-
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
-
-/**
- * A class to restore all lineage information from a {@link State}
- * All lineage attributes are under LINEAGE_NAME_SPACE namespace.
- *
- * For example, a typical lineage attributes looks like:
- *    gobblin.lineage.K1          ---> V1
- *    gobblin.lineage.branch.3.K2 ---> V2
- *
- * K1 is dataset level attribute, K2 is branch level attribute, and branch id is 3.
- */
-
-@Slf4j
-public class LineageInfo {
-  public static final String LINEAGE_NAME_SPACE = "gobblin.lineage";
-  public static final String BRANCH_ID_METADATA_KEY = "branchId";
-  private static final String DATASET_PREFIX =  LINEAGE_NAME_SPACE + ".";
-  public static final String LINEAGE_DATASET_URN = DATASET_PREFIX + "dataset.urn";
-  private static final String BRANCH_PREFIX = DATASET_PREFIX + "branch.";
-
-  @Getter
-  private String datasetUrn;
-  @Getter
-  private String jobId;
-
-  private Map<String, String> lineageMetaData;
-
-  public enum Level {
-    DATASET,
-    BRANCH,
-    All
-  }
-
-  private LineageInfo() {
-  }
-
-  private LineageInfo(String datasetUrn, String jobId, ImmutableMap<String, String> lineageMetaData) {
-    Preconditions.checkArgument(datasetUrn != null);
-    Preconditions.checkArgument(jobId != null);
-    this.datasetUrn = datasetUrn;
-    this.jobId = jobId;
-    this.lineageMetaData = lineageMetaData;
-  }
-
-  /**
-   * Retrieve lineage information from a {@link State} by {@link Level}
-   * @param state A single state
-   * @param level {@link Level#DATASET}  only load dataset level lineage attributes
-   *              {@link Level#BRANCH}   only load branch level lineage attributes
-   *              {@link Level#All}      load all lineage attributes
-   * @return A collection of {@link LineageInfo}s per branch. When level is {@link Level#DATASET}, this list has only single element.
-   */
-  public static Collection<LineageInfo> load (State state, Level level) throws LineageException {
-    return load(Collections.singleton(state), level);
-  }
-
-  /**
-   * Get all lineage meta data.
-   */
-  public ImmutableMap<String, String> getLineageMetaData() {
-    return ImmutableMap.copyOf(lineageMetaData);
-  }
-
-  /**
-   * Retrieve all lineage information from different {@link State}s.
-   * This requires the job id and dataset urn to be present in the state, under job.id and dataset.urn.
-   * A global union operation is applied to combine all <K, V> pairs from the input {@link State}s. If multiple {@link State}s
-   * share the same K, but have conflicting V, a {@link LineageException} is thrown.
-   *
-   * {@link Level} can control if a dataset level or branch level information should be used. When {@link Level#All} is
-   * specified, all levels of information will be returned; otherwise only specified level of information will be returned.
-   *
-   * For instance, assume we have below input states:
-   *    State[0]: gobblin.lineage.K1          ---> V1
-   *              gobblin.lineage.K2          ---> V2
-   *              gobblin.lineage.branch.1.K4 ---> V4
-   *    State[1]: gobblin.lineage.K2          ---> V2
-   *              gobblin.lineage.K3          ---> V3
-   *              gobblin.lineage.branch.1.K4 ---> V4
-   *              gobblin.lineage.branch.1.K5 ---> V5
-   *              gobblin.lineage.branch.2.K6 ---> V6
-   *
-   *  (1) With {@link Level#DATASET} level, the output would be:
-   *      LinieageInfo[0]:  K1 ---> V1
-   *                        K2 ---> V2
-   *                        K3 ---> V3
-   *  (2) With {@link Level#All} level, the output would be: (because there are two branches, so there are two LineageInfo)
-   *      LineageInfo[0]:   K1 ---> V1
-   *                        K2 ---> V2
-   *                        K3 ---> V3
-   *                        K4 ---> V4
-   *                        K5 ---> V5
-   *
-   *      LineageInfo[1]:   K1 ---> V1
-   *                        K2 ---> V2
-   *                        K3 ---> V3
-   *                        K6 ---> V6
-   *
-   *   (3) With {@link Level#BRANCH} level, the output would be: (only branch level information was returned)
-   *      LineageInfo[0]:   K4 ---> V4
-   *                        K5 ---> V5
-   *      LineageInfo[1]:   K6 ---> V6
-   *
-   * @param states All states which belong to the same dataset and share the same jobId.
-   * @param level {@link Level#DATASET}  only load dataset level lineage attributes
-   *              {@link Level#BRANCH}   only load branch level lineage attributes
-   *              {@link Level#All}      load all lineage attributes
-   * @return A collection of {@link LineageInfo}s per branch. When level is {@link Level#DATASET}, this list has only single element.
-   *
-   * @throws LineageException.LineageConflictAttributeException if two states have same key but not the same value.
-   */
-  public static Collection<LineageInfo> load (Collection<? extends State> states, Level level) throws LineageException {
-    Preconditions.checkArgument(states != null && !states.isEmpty());
-    Map<String, String> datasetMetaData = new HashMap<>();
-    Map<String, Map<String, String>> branchAggregate = new HashMap<>();
-
-    State anyOne = states.iterator().next();
-    String jobId = anyOne.getProp(ConfigurationKeys.JOB_ID_KEY, "");
-    String urn = anyOne.getProp(ConfigurationKeys.DATASET_URN_KEY, ConfigurationKeys.DEFAULT_DATASET_URN);
-
-    for (State state: states) {
-      for (Map.Entry<Object, Object> entry : state.getProperties().entrySet()) {
-        if (entry.getKey() instanceof String && ((String) entry.getKey()).startsWith(LINEAGE_NAME_SPACE)) {
-
-          String lineageKey = ((String) entry.getKey());
-          String lineageValue = (String) entry.getValue();
-
-          if (lineageKey.startsWith(BRANCH_PREFIX)) {
-            String branchPrefixStrip = lineageKey.substring(BRANCH_PREFIX.length());
-            String branchId = branchPrefixStrip.substring(0, branchPrefixStrip.indexOf("."));
-            String key = branchPrefixStrip.substring(branchPrefixStrip.indexOf(".") + 1);
-
-            if (level == Level.BRANCH || level == Level.All) {
-              if (!branchAggregate.containsKey(branchId)) {
-                branchAggregate.put(branchId, new HashMap<>());
-              }
-              Map<String, String> branchMetaData = branchAggregate.get(branchId);
-              String prev = branchMetaData.put(key, lineageValue);
-              if (prev != null && !prev.equals(lineageValue)) {
-                throw new LineageException.LineageConflictAttributeException(lineageKey, prev, lineageValue);
-              }
-            }
-          } else if (lineageKey.startsWith(DATASET_PREFIX)) {
-            if (level == Level.DATASET || level == Level.All) {
-              String prev = datasetMetaData.put(lineageKey.substring(DATASET_PREFIX.length()), lineageValue);
-              if (prev != null && !prev.equals(lineageValue)) {
-                throw new LineageException.LineageConflictAttributeException(lineageKey, prev, lineageValue);
-              }
-            }
-          }
-        }
-      }
-    }
-
-    Collection<LineageInfo> collection = Sets.newHashSet();
-
-    if (level == Level.DATASET) {
-      ImmutableMap<String, String> metaData = ImmutableMap.<String, String>builder()
-          .putAll(datasetMetaData)
-          .build();
-      collection.add(new LineageInfo(urn, jobId, metaData));
-      return collection;
-    } else if (level == Level.BRANCH || level == Level.All){
-      if (branchAggregate.isEmpty()) {
-        if (level == Level.All) {
-          collection.add(new LineageInfo(urn, jobId, ImmutableMap.<String, String>builder().putAll(datasetMetaData).build()));
-        }
-        return collection;
-      }
-      for (Map.Entry<String, Map<String, String>> branchMetaDataEntry: branchAggregate.entrySet()) {
-        String branchId = branchMetaDataEntry.getKey();
-        Map<String, String> branchMetaData = branchMetaDataEntry.getValue();
-        ImmutableMap.Builder<String, String> metaDataBuilder = ImmutableMap.builder();
-        if (level == Level.All) {
-          metaDataBuilder.putAll(datasetMetaData);
-        }
-        metaDataBuilder.putAll(branchMetaData).put(BRANCH_ID_METADATA_KEY, branchId);
-        collection.add(new LineageInfo(urn, jobId, metaDataBuilder.build()));
-      }
-
-      return collection;
-    } else {
-      throw new LineageException.LineageUnsupportedLevelException(level);
-    }
-  }
-
-  public static void setDatasetLineageAttribute (State state, String key, String value) {
-    state.setProp(DATASET_PREFIX + key, value);
-  }
-
-  public static void setBranchLineageAttribute (State state, int branchId, String key, String value) {
-    state.setProp(BRANCH_PREFIX + Joiner.on(".").join(branchId, key), value);
-  }
-
-  public static Map<String, Collection<State>> aggregateByDatasetUrn (Collection<? extends State> states) {
-    Map<String, Collection<State>> datasetStates = new HashMap<>();
-    for (State state: states) {
-      String urn = state.getProp(LINEAGE_DATASET_URN, state.getProp(ConfigurationKeys.DATASET_URN_KEY, ConfigurationKeys.DEFAULT_DATASET_URN));
-      datasetStates.putIfAbsent(urn, new ArrayList<>());
-      Collection<State> datasetState = datasetStates.get(urn);
-      datasetState.add(state);
-    }
-    return datasetStates;
-  }
-
-  public final String getId() {
-    return Joiner.on(":::").join(this.datasetUrn, this.jobId);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
index 9bdcbdd..0097c15 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
@@ -56,9 +56,11 @@ import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
-import org.apache.gobblin.lineage.LineageInfo;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
 import org.apache.gobblin.metadata.MetadataMerger;
 import org.apache.gobblin.metadata.types.StaticStringMetadataMerger;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
 import org.apache.gobblin.util.ForkOperatorUtils;
 import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.ParallelRunner;
@@ -106,8 +108,6 @@ public class BaseDataPublisher extends SingleTaskDataPublisher {
   protected final Map<String, ParallelRunner> parallelRunners = Maps.newHashMap();
   protected final Set<Path> publisherOutputDirs = Sets.newHashSet();
 
-  public static final String PUBLISH_OUTOUT = "publish.output";
-
   /* Each partition in each branch may have separate metadata. The metadata mergers are responsible
    * for aggregating this information from all workunits so it can be published.
    */
@@ -131,7 +131,6 @@ public class BaseDataPublisher extends SingleTaskDataPublisher {
     PUBLISH_RETRY_DEFAULTS = ConfigFactory.parseMap(configMap);
   };
 
-
   public BaseDataPublisher(State state)
       throws IOException {
     super(state);
@@ -330,6 +329,16 @@ public class BaseDataPublisher extends SingleTaskDataPublisher {
   private void publishMultiTaskData(WorkUnitState state, int branchId, Set<Path> writerOutputPathsMoved)
       throws IOException {
     publishData(state, branchId, false, writerOutputPathsMoved);
+    DatasetDescriptor destination = createDestinationDescriptor(state, branchId);
+    LineageInfo.putDestination(destination, branchId, state);
+  }
+
+  protected DatasetDescriptor createDestinationDescriptor(WorkUnitState state, int branchId) {
+    Path publisherOutputDir = getPublisherOutputDir(state, branchId);
+    FileSystem fs = this.publisherFileSystemByBranches.get(branchId);
+    DatasetDescriptor destination = new DatasetDescriptor(fs.getScheme(), publisherOutputDir.toString());
+    destination.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString());
+    return destination;
   }
 
   protected void publishData(WorkUnitState state, int branchId, boolean publishSingleTaskData,
@@ -372,7 +381,6 @@ public class BaseDataPublisher extends SingleTaskDataPublisher {
         if (!replaceFinalOutputDir) {
           addWriterOutputToExistingDir(writerOutputDir, publisherOutputDir, state, branchId, parallelRunner);
           writerOutputPathsMoved.add(writerOutputDir);
-          addPublisherLineageInfo(state, branchId, publisherOutputDir.toString());
           return;
         }
 
@@ -387,14 +395,9 @@ public class BaseDataPublisher extends SingleTaskDataPublisher {
 
       movePath(parallelRunner, state, writerOutputDir, publisherOutputDir, branchId);
       writerOutputPathsMoved.add(writerOutputDir);
-      addPublisherLineageInfo(state, branchId, publisherOutputDir.toString());
     }
   }
 
-  protected void addPublisherLineageInfo(WorkUnitState state, int branchId, String output) {
-    LineageInfo.setBranchLineageAttribute(state, branchId, PUBLISH_OUTOUT, output);
-  }
-
   /**
    * Get the output directory path this {@link BaseDataPublisher} will write to.
    *

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
index 90e241a..157552e 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
@@ -18,18 +18,18 @@
 package org.apache.gobblin.publisher;
 
 import java.io.IOException;
-import java.util.Set;
 
-import org.apache.gobblin.lineage.LineageInfo;
-import org.apache.gobblin.writer.partitioner.TimeBasedWriterPartitioner;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.dataset.DatasetDescriptor;
 import org.apache.gobblin.util.FileListUtils;
+import org.apache.gobblin.util.ForkOperatorUtils;
 import org.apache.gobblin.util.ParallelRunner;
 import org.apache.gobblin.util.WriterUtils;
+import org.apache.gobblin.writer.partitioner.TimeBasedWriterPartitioner;
 
 
 /**
@@ -70,13 +70,19 @@ public class TimePartitionedDataPublisher extends BaseDataPublisher {
   }
 
   @Override
-  protected void publishData(WorkUnitState state, int branchId, boolean publishSingleTaskData, Set<Path> writerOutputPathsMoved) throws IOException {
-    super.publishData(state, branchId, publishSingleTaskData, writerOutputPathsMoved);
-    if (publishSingleTaskData) {
-      // Add lineage event for destination. Make sure all workunits belongs to the same dataset has exactly the same value
-      Path publisherOutputDir = getPublisherOutputDir(state, branchId);
-      String timePrefix = state.getProp(TimeBasedWriterPartitioner.WRITER_PARTITION_PREFIX, "");
-      LineageInfo.setBranchLineageAttribute(state, branchId, PUBLISH_OUTOUT, new Path(publisherOutputDir, timePrefix).toString());
-    }
+  protected DatasetDescriptor createDestinationDescriptor(WorkUnitState state, int branchId) {
+    // Get base descriptor
+    DatasetDescriptor descriptor = super.createDestinationDescriptor(state, branchId);
+
+    // Decorate with partition prefix
+    String propName = ForkOperatorUtils
+        .getPropertyNameForBranch(TimeBasedWriterPartitioner.WRITER_PARTITION_PREFIX, numBranches, branchId);
+    String timePrefix = state.getProp(propName, "");
+    Path pathWithTimePrefix = new Path(descriptor.getName(), timePrefix);
+    DatasetDescriptor destination = new DatasetDescriptor(descriptor.getPlatform(), pathWithTimePrefix.toString());
+    // Add back the metadata
+    descriptor.getMetadata().forEach(destination::addMetadata);
+
+    return destination;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
index c77051d..d074f3a 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
@@ -27,7 +27,6 @@ import java.util.Set;
 import java.util.regex.Pattern;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.gobblin.lineage.LineageInfo;
 import org.slf4j.MDC;
 
 import com.google.common.base.Optional;
@@ -48,6 +47,8 @@ import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.configuration.WorkUnitState.WorkingState;
+import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
 import org.apache.gobblin.source.extractor.JobCommitPolicy;
 import org.apache.gobblin.source.extractor.partition.Partition;
 import org.apache.gobblin.source.extractor.partition.Partitioner;
@@ -235,7 +236,7 @@ public abstract class QueryBasedSource<S, D> extends AbstractSource<S, D> {
       workunit.setProp(ConfigurationKeys.SOURCE_ENTITY, sourceEntity.getSourceEntityName());
       workunit.setProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, sourceEntity.getDestTableName());
       workunit.setProp(WORK_UNIT_STATE_VERSION_KEY, CURRENT_WORK_UNIT_STATE_VERSION);
-      addLineageSourceInfo (state, sourceEntity, workunit);
+      addLineageSourceInfo(state, sourceEntity, workunit);
       partition.serialize(workunit);
       workUnits.add(workunit);
     }
@@ -243,8 +244,8 @@ public abstract class QueryBasedSource<S, D> extends AbstractSource<S, D> {
     return workUnits;
   }
 
-  protected void addLineageSourceInfo (SourceState sourceState, SourceEntity entity, WorkUnit workUnit) {
-    workUnit.setProp(ConfigurationKeys.DATASET_URN_KEY, entity.destTableName);
+  protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity, WorkUnit workUnit) {
+    // Does nothing by default
   }
 
   protected Set<SourceEntity> getFilteredSourceEntities(SourceState state) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-core/src/test/java/org/apache/gobblin/lineage/LineageInfoTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/lineage/LineageInfoTest.java b/gobblin-core/src/test/java/org/apache/gobblin/lineage/LineageInfoTest.java
deleted file mode 100644
index 2a7ea15..0000000
--- a/gobblin-core/src/test/java/org/apache/gobblin/lineage/LineageInfoTest.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.lineage;
-
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.junit.Assert;
-import org.testng.annotations.Test;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-
-import gobblin.configuration.State;
-
-
-public class LineageInfoTest {
-
-  @Test
-  public void testDatasetLevel () {
-    Collection<LineageInfo> collection = null;
-    try {
-      collection = LineageInfo.load(createTestStates(), LineageInfo.Level.DATASET);
-    } catch (LineageException e) {
-      Assert.fail(e.toString());
-    }
-
-    Assert.assertEquals(1, collection.size());
-    LineageInfo info = collection.iterator().next();
-    ImmutableMap<String, String> map = info.getLineageMetaData();
-    Assert.assertEquals(3, map.size());
-    Assert.assertEquals("V1", map.get("K1"));
-    Assert.assertEquals("V2", map.get("K2"));
-    Assert.assertEquals("V3", map.get("K3"));
-  }
-
-  @Test
-  public void testBranchLevel () {
-    Collection<LineageInfo> collection = null;
-    try {
-      collection = LineageInfo.load(createTestStates(), LineageInfo.Level.BRANCH);
-    } catch (LineageException e) {
-      Assert.fail(e.toString());
-    }
-
-    Assert.assertEquals(2, collection.size());
-
-    for (LineageInfo info: collection) {
-      Map<String, String> map = info.getLineageMetaData();
-      String branchId = map.get(LineageInfo.BRANCH_ID_METADATA_KEY);
-      if (branchId.equals("1")) {
-        Assert.assertEquals(3, map.size()); // include BRANCH_ID_METADATA_KEY
-        Assert.assertEquals("V4", map.get("K4"));
-        Assert.assertEquals("V5", map.get("K5"));
-      }
-
-      if (branchId.equals("2")) {
-        Assert.assertEquals(2, map.size()); // include BRANCH_ID_METADATA_KEY
-        Assert.assertEquals("V6", map.get("K6"));
-      }
-    }
-  }
-
-  @Test
-  public void testAllLevel () {
-    Collection<LineageInfo> collection = null;
-    try {
-      collection = LineageInfo.load(createTestStates(), LineageInfo.Level.All);
-    } catch (LineageException e) {
-      Assert.fail(e.toString());
-    }
-
-    Assert.assertEquals(2, collection.size());
-    for (LineageInfo info: collection) {
-      Map<String, String> map = info.getLineageMetaData();
-      String branchId = map.get(LineageInfo.BRANCH_ID_METADATA_KEY);
-      if (branchId.equals("1")) {
-        Assert.assertEquals(6, map.size()); // include BRANCH_ID_METADATA_KEY
-        Assert.assertEquals("V1", map.get("K1"));
-        Assert.assertEquals("V2", map.get("K2"));
-        Assert.assertEquals("V3", map.get("K3"));
-        Assert.assertEquals("V4", map.get("K4"));
-        Assert.assertEquals("V5", map.get("K5"));
-      }
-
-      if (branchId.equals("2")) {
-        Assert.assertEquals(5, map.size()); // include BRANCH_ID_METADATA_KEY
-        Assert.assertEquals("V1", map.get("K1"));
-        Assert.assertEquals("V2", map.get("K2"));
-        Assert.assertEquals("V3", map.get("K3"));
-        Assert.assertEquals("V6", map.get("K6"));
-      }
-    }
-  }
-
-  @Test
-  public void testNoBranchInfo () {
-    State state = new State();
-    state.setProp(ConfigurationKeys.JOB_ID_KEY, "test_job_id_123456");
-    state.setProp(ConfigurationKeys.DATASET_URN_KEY, "PageViewEvent");
-    LineageInfo.setDatasetLineageAttribute(state,"K1", "V1");
-    LineageInfo.setDatasetLineageAttribute(state,"K2", "V2");
-    Collection<LineageInfo> collection = null;
-    try {
-      collection = LineageInfo.load(Lists.newArrayList(state), LineageInfo.Level.BRANCH);
-    } catch (LineageException e) {
-      Assert.fail(e.toString());
-    }
-
-    Assert.assertEquals(true, collection.isEmpty());
-  }
-
-  private Collection<State> createTestStates() {
-    /*
-     *    State[0]: gobblin.lineage.K1          ---> V1
-     *              gobblin.lineage.K2          ---> V2
-     *              gobblin.lineage.branch.1.K4 ---> V4
-     *    State[1]: gobblin.lineage.K2          ---> V2
-     *              gobblin.lineage.K3          ---> V3
-     *              gobblin.lineage.branch.1.K4 ---> V4
-     *              gobblin.lineage.branch.1.K5 ---> V5
-     *              gobblin.lineage.branch.2.K6 ---> V6
-     */
-    State state_1 = new State();
-    state_1.setProp(ConfigurationKeys.JOB_ID_KEY, "test_job_id_123456");
-    state_1.setProp(ConfigurationKeys.DATASET_URN_KEY, "PageViewEvent");
-    LineageInfo.setDatasetLineageAttribute(state_1,"K1", "V1");
-    LineageInfo.setDatasetLineageAttribute(state_1,"K2", "V2");
-    LineageInfo.setBranchLineageAttribute(state_1, 1, "K4", "V4");
-
-
-    State state_2 = new State();
-    state_2.setProp(ConfigurationKeys.JOB_ID_KEY, "test_job_id_123456");
-    state_2.setProp(ConfigurationKeys.DATASET_URN_KEY, "PageViewEvent");
-
-    LineageInfo.setDatasetLineageAttribute(state_2,"K2", "V2");
-    LineageInfo.setDatasetLineageAttribute(state_2,"K3", "V3");
-    LineageInfo.setBranchLineageAttribute(state_2, 1, "K4", "V4");
-    LineageInfo.setBranchLineageAttribute(state_2, 1, "K5", "V5");
-    LineageInfo.setBranchLineageAttribute(state_2, 2, "K6", "V6");
-
-    return Lists.newArrayList(state_1, state_2);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
index f60e5f0..8ca05e3 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.data.management.copy;
 
 import java.io.IOException;
 import java.net.URI;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
@@ -66,6 +65,8 @@ import org.apache.gobblin.metrics.GobblinMetrics;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
 import org.apache.gobblin.metrics.event.sla.SlaEventKeys;
 import org.apache.gobblin.source.extractor.Extractor;
 import org.apache.gobblin.source.extractor.WatermarkInterval;
@@ -299,6 +300,7 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
           setWorkUnitWatermark(workUnit, watermarkGenerator, copyEntity);
           computeAndSetWorkUnitGuid(workUnit);
           workUnitsForPartition.add(workUnit);
+          addLineageInfo(copyEntity, copyableDataset, workUnit);
         }
 
         this.workUnitList.putAll(this.fileSet, workUnitsForPartition);
@@ -311,6 +313,12 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
     }
   }
 
+  private void addLineageInfo(CopyEntity copyEntity, CopyableDatasetBase copyableDataset, WorkUnit workUnit) {
+    if (copyEntity instanceof CopyableFile && copyableDataset.getDatasetDescriptor() != null) {
+      LineageInfo.setSource(copyableDataset.getDatasetDescriptor(), workUnit);
+    }
+  }
+
   /**
    * @param state a {@link org.apache.gobblin.configuration.WorkUnitState} carrying properties needed by the returned
    *          {@link Extractor}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java
index c27b839..6c71edc 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.data.management.copy;
 
 import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.dataset.DatasetDescriptor;
 
 
 /**
@@ -25,4 +26,10 @@ import org.apache.gobblin.dataset.Dataset;
  * Concrete classes must implement a subinterface of this interface ({@link CopyableDataset} or {@link IterableCopyableDataset}).
  */
 public interface CopyableDatasetBase extends Dataset {
+  /**
+   * Get the descriptor which identifies and provides metadata of the dataset
+   */
+  default DatasetDescriptor getDatasetDescriptor() {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
index cec06f2..04e5e34 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.data.management.copy;
 
 import org.apache.gobblin.data.management.partition.File;
 import org.apache.gobblin.data.management.copy.PreserveAttributes.Option;
+import org.apache.gobblin.dataset.DatasetDescriptor;
 import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.guid.Guid;
 
@@ -55,6 +56,9 @@ public class CopyableFile extends CopyEntity implements File {
   /** {@link FileStatus} of the existing origin file. */
   private FileStatus origin;
 
+  /** The destination dataset the file will be copied to */
+  private DatasetDescriptor destDataset;
+
   /** Complete destination {@link Path} of the file. */
   private Path destination;
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
index 0f18f68..35108df 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
@@ -21,6 +21,7 @@ import org.apache.gobblin.commit.CommitStep;
 import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
 import org.apache.gobblin.data.management.dataset.DatasetUtils;
 import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.gobblin.dataset.DatasetDescriptor;
 import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.FileListUtils;
 import org.apache.gobblin.util.commit.DeleteFileCommitStep;
@@ -42,6 +43,8 @@ import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import lombok.Getter;
+
 
 /**
  * Implementation of {@link CopyableDataset} that creates a {@link CopyableFile} for every file that is a descendant if
@@ -66,6 +69,9 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
   private final boolean update;
   private final boolean delete;
 
+  @Getter
+  private transient final DatasetDescriptor datasetDescriptor;
+
   // Include empty directories in the source for copy
   private final boolean includeEmptyDirectories;
   // Delete empty directories in the destination
@@ -77,6 +83,7 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
 
     this.rootPath = PathUtils.getPathWithoutSchemeAndAuthority(rootPath);
     this.fs = fs;
+    this.datasetDescriptor = new DatasetDescriptor(fs.getScheme(), rootPath.toString());
 
     this.pathFilter = DatasetUtils.instantiatePathFilter(properties);
     this.copyableFileFilter = DatasetUtils.instantiateCopyableFileFilter(properties);
@@ -129,17 +136,19 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
 
     List<CopyEntity> copyEntities = Lists.newArrayList();
     List<CopyableFile> copyableFiles = Lists.newArrayList();
+    DatasetDescriptor targetDataset = new DatasetDescriptor(targetFs.getScheme(), targetPath.toString());
 
     for (Path path : toCopy) {
       FileStatus file = filesInSource.get(path);
       Path filePathRelativeToSearchPath = PathUtils.relativizePath(file.getPath(), nonGlobSearchPath);
       Path thisTargetPath = new Path(configuration.getPublishDir(), filePathRelativeToSearchPath);
-
-      copyableFiles.add(CopyableFile.fromOriginAndDestination(this.fs, file, thisTargetPath, configuration)
+      CopyableFile copyableFile = CopyableFile.fromOriginAndDestination(this.fs, file, thisTargetPath, configuration)
           .fileSet(datasetURN()).datasetOutputPath(thisTargetPath.toString())
           .ancestorsOwnerAndPermission(CopyableFile.resolveReplicatedOwnerAndPermissionsRecursively(this.fs,
               file.getPath().getParent(), nonGlobSearchPath, configuration))
-          .build());
+          .build();
+      copyableFile.setDestDataset(targetDataset);
+      copyableFiles.add(copyableFile);
     }
     copyEntities.addAll(this.copyableFileFilter.filter(this.fs, targetFs, copyableFiles));
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
index 3c7643b..cc7be1e 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
@@ -56,6 +56,8 @@ import com.typesafe.config.Config;
 
 import org.apache.gobblin.commit.CommitStep;
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.data.management.copy.CopyConfiguration;
 import org.apache.gobblin.data.management.copy.CopyEntity;
@@ -760,4 +762,10 @@ public class HiveCopyEntityHelper {
   public FileSystem getTargetFileSystem() {
     return this.targetFs;
   }
+
+  void setCopyableFileDestinationDataset(CopyableFile copyableFile) {
+    DatasetDescriptor destination = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, this.getTargetDatabase() + "." + this.getTargetTable());
+    destination.addMetadata(DatasetConstants.FS_URI, this.getTargetFs().getUri().toString());
+    copyableFile.setDestDataset(destination);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
index 2af2f80..03dba25 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
@@ -57,10 +57,12 @@ import org.apache.gobblin.data.management.copy.CopyableDataset;
 import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder.DbAndTable;
 import org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
 import org.apache.gobblin.data.management.partition.FileSet;
+import org.apache.gobblin.dataset.DatasetConstants;
 import org.apache.gobblin.hive.HiveMetastoreClientPool;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.dataset.DatasetDescriptor;
 import org.apache.gobblin.util.AutoReturnableObject;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PathUtils;
@@ -106,6 +108,8 @@ public class HiveDataset implements PrioritizedCopyableDataset {
   protected final DbAndTable dbAndTable;
   protected final DbAndTable logicalDbAndTable;
 
+  private transient final DatasetDescriptor datasetDescriptor;
+
   public HiveDataset(FileSystem fs, HiveMetastoreClientPool clientPool, Table table, Properties properties) {
     this(fs, clientPool, table, properties, ConfigFactory.empty());
   }
@@ -124,6 +128,9 @@ public class HiveDataset implements PrioritizedCopyableDataset {
         Optional.fromNullable(this.table.getDataLocation());
 
     this.tableIdentifier = this.table.getDbName() + "." + this.table.getTableName();
+    this.datasetDescriptor = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, tableIdentifier);
+    this.datasetDescriptor.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString());
+
     this.datasetNamePattern = Optional.fromNullable(ConfigUtils.getString(datasetConfig, DATASET_NAME_PATTERN_KEY, null));
     this.dbAndTable = new DbAndTable(table.getDbName(), table.getTableName());
     if (this.datasetNamePattern.isPresent()) {
@@ -132,7 +139,6 @@ public class HiveDataset implements PrioritizedCopyableDataset {
       this.logicalDbAndTable = this.dbAndTable;
     }
     this.datasetConfig = resolveConfig(datasetConfig, dbAndTable, logicalDbAndTable);
-
     this.metricContext = Instrumented.getMetricContext(new State(properties), HiveDataset.class,
         Lists.<Tag<?>> newArrayList(new Tag<>(DATABASE, table.getDbName()), new Tag<>(TABLE, table.getTableName())));
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
index a9982bf..790c0b4 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
@@ -149,8 +149,11 @@ public class HivePartitionFileSet extends HiveFileSet {
       multiTimer.nextStage(HiveCopyEntityHelper.Stages.CREATE_COPY_UNITS);
       for (CopyableFile.Builder builder : hiveCopyEntityHelper.getCopyableFilesFromPaths(diffPathSet.filesToCopy,
           hiveCopyEntityHelper.getConfiguration(), Optional.of(this.partition))) {
-        copyEntities.add(builder.fileSet(fileSet).checksum(new byte[0])
-            .datasetOutputPath(desiredTargetLocation.location.toString()).build());
+        CopyableFile fileEntity =
+            builder.fileSet(fileSet).checksum(new byte[0]).datasetOutputPath(desiredTargetLocation.location.toString())
+                .build();
+        this.hiveCopyEntityHelper.setCopyableFileDestinationDataset(fileEntity);
+        copyEntities.add(fileEntity);
       }
 
       log.info("Created {} copy entities for partition {}", copyEntities.size(), this.partition.getCompleteName());

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
index a796a2b..4d82a62 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
@@ -120,7 +120,10 @@ public class UnpartitionedTableFileSet extends HiveFileSet {
 
     for (CopyableFile.Builder builder : this.helper.getCopyableFilesFromPaths(diffPathSet.filesToCopy, this.helper.getConfiguration(),
         Optional.<Partition> absent())) {
-      copyEntities.add(builder.fileSet(fileSet).datasetOutputPath(desiredTargetLocation.location.toString()).build());
+      CopyableFile fileEntity =
+          builder.fileSet(fileSet).datasetOutputPath(desiredTargetLocation.location.toString()).build();
+      this.helper.setCopyableFileDestinationDataset(fileEntity);
+      copyEntities.add(fileEntity);
     }
 
     multiTimer.close();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
index e443271..71ebd59 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.data.management.copy.publisher;
 
 
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
 import org.apache.gobblin.metrics.event.sla.SlaEventKeys;
 import java.io.IOException;
 import java.net.URI;
@@ -27,7 +28,6 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -214,6 +214,7 @@ public class CopyDataPublisher extends DataPublisher implements UnpublishedHandl
           if (!fileSetRoot.isPresent() && copyableFile.getDatasetOutputPath() != null) {
             fileSetRoot = Optional.of(copyableFile.getDatasetOutputPath());
           }
+          LineageInfo.putDestination(copyableFile.getDestDataset(), 0, wus);
         }
         if (datasetOriginTimestamp > copyableFile.getOriginTimestamp()) {
           datasetOriginTimestamp = copyableFile.getOriginTimestamp();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
index 51a390d..5021d4d 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
@@ -17,6 +17,8 @@
 package org.apache.gobblin.data.management.conversion.hive.dataset;
 
 import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Properties;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -36,6 +38,9 @@ import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiv
 import org.apache.gobblin.hive.HiveMetastoreClientPool;
 import org.apache.gobblin.util.ConfigUtils;
 
+import static org.mockito.Mockito.when;
+
+
 @Test(groups = { "gobblin.data.management.conversion" })
 public class ConvertibleHiveDatasetTest {
 
@@ -94,7 +99,8 @@ public class ConvertibleHiveDatasetTest {
   }
 
   @Test
-  public void testInvalidFormat() {
+  public void testInvalidFormat()
+      throws Exception {
 
     Config config = ConfigFactory.parseMap(ImmutableMap.<String, String>of("destinationFormats", "flattenedOrc,nestedOrc"));
     ConvertibleHiveDataset cd = createTestConvertibleDataset(config);
@@ -103,7 +109,8 @@ public class ConvertibleHiveDatasetTest {
   }
 
   @Test
-  public void testDisableFormat() {
+  public void testDisableFormat()
+      throws Exception {
 
     Config config = ConfigFactory.parseMap(ImmutableMap.<String, String> builder()
         .put("destinationFormats", "flattenedOrc")
@@ -154,10 +161,13 @@ public class ConvertibleHiveDatasetTest {
     Assert.assertEquals(conversionConfig.getHiveRuntimeProperties(), hiveProps);
   }
 
-  public static ConvertibleHiveDataset createTestConvertibleDataset(Config config) {
+  public static ConvertibleHiveDataset createTestConvertibleDataset(Config config)
+      throws URISyntaxException {
     Table table = getTestTable("db1", "tb1");
+    FileSystem mockFs = Mockito.mock(FileSystem.class);
+    when(mockFs.getUri()).thenReturn(new URI("test"));
     ConvertibleHiveDataset cd =
-        new ConvertibleHiveDataset(Mockito.mock(FileSystem.class), Mockito.mock(HiveMetastoreClientPool.class), new org.apache.hadoop.hive.ql.metadata.Table(
+        new ConvertibleHiveDataset(mockFs, Mockito.mock(HiveMetastoreClientPool.class), new org.apache.hadoop.hive.ql.metadata.Table(
             table), new Properties(), config);
     return cd;
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
index 34258ba..16ee2de 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
@@ -35,7 +35,10 @@ import lombok.Getter;
  * <p>
  *   Instances of this class are immutable. Calling set* methods returns a copy of the calling instance.
  * </p>
+ *
+ * @deprecated Use {@link GobblinEventBuilder}
  */
+@Deprecated
 public class EventSubmitter {
 
   public static final String EVENT_TYPE = "eventType";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java
index 89f83f5..d1ce681 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java
@@ -17,47 +17,32 @@
 
 package org.apache.gobblin.metrics.event;
 
-import java.util.Map;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.gobblin.metrics.GobblinTrackingEvent;
-import org.apache.gobblin.metrics.MetricContext;
-
-import com.google.common.collect.Maps;
-
-import lombok.Getter;
 
 
 /**
- * A failure event builds a specific {@link GobblinTrackingEvent} whose metadata has
- * {@value EventSubmitter#EVENT_TYPE} to be {@value #EVENT_TYPE}
+ * The builder builds builds a specific {@link GobblinTrackingEvent} whose metadata has
+ * {@value GobblinEventBuilder#EVENT_TYPE} to be {@value #FAILURE_EVENT_TYPE}
  *
  * <p>
  * Note: A {@link FailureEventBuilder} instance is not reusable
  */
-public class FailureEventBuilder {
-  private static final String EVENT_TYPE = "FailureEvent";
-  private static final String EVENT_NAMESPACE = "gobblin.event";
+public class FailureEventBuilder extends GobblinEventBuilder {
+  private static final String FAILURE_EVENT_TYPE = "FailureEvent";
   private static final String ROOT_CAUSE = "rootException";
 
-  @Getter
-  private final String name;
-  @Getter
-  private final String namespace;
-  private final Map<String, String> metadata;
-
   private Throwable rootCause;
 
   public FailureEventBuilder(String name) {
-    this(name, EVENT_NAMESPACE);
+    this(name, NAMESPACE);
   }
 
   public FailureEventBuilder(String name, String namespace) {
-    this.name = name;
-    this.namespace = namespace;
-    metadata = Maps.newHashMap();
-    metadata.put(EventSubmitter.EVENT_TYPE, EVENT_TYPE);
+    super(name, namespace);
+    metadata.put(EVENT_TYPE, FAILURE_EVENT_TYPE);
   }
 
   /**
@@ -68,42 +53,21 @@ public class FailureEventBuilder {
   }
 
   /**
-   * Add a metadata pair
-   */
-  public void addMetadata(String key, String value) {
-    metadata.put(key, value);
-  }
-
-  /**
-   * Add additional metadata
-   */
-  public void addAdditionalMetadata(Map<String, String> additionalMetadata) {
-    metadata.putAll(additionalMetadata);
-  }
-
-  /**
    * Build as {@link GobblinTrackingEvent}
    */
   public GobblinTrackingEvent build() {
     if (rootCause != null) {
       metadata.put(ROOT_CAUSE, ExceptionUtils.getStackTrace(rootCause));
     }
-    return new GobblinTrackingEvent(0L, EVENT_NAMESPACE, name, metadata);
-  }
-
-  /**
-   * Submit the event
-   */
-  public void submit(MetricContext context) {
-    context.submitEvent(build());
+    return new GobblinTrackingEvent(0L, namespace, name, metadata);
   }
 
   /**
-   * Check if the given {@link GobblinTrackingEvent} is a failiure event
+   * Check if the given {@link GobblinTrackingEvent} is a failure event
    */
   public static boolean isFailureEvent(GobblinTrackingEvent event) {
-    String eventType = event.getMetadata().get(EventSubmitter.EVENT_TYPE);
-    return StringUtils.isNotEmpty(eventType) && eventType.equals(EVENT_TYPE);
+    String eventType = event.getMetadata().get(EVENT_TYPE);
+    return StringUtils.isNotEmpty(eventType) && eventType.equals(FAILURE_EVENT_TYPE);
   }
 
   private static Throwable getRootCause(Throwable t) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
new file mode 100644
index 0000000..6b82342
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.event;
+
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+import lombok.Getter;
+
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+
+
+/**
+ * A general gobblin event builder which builds a {@link GobblinTrackingEvent}
+ *
+ * Note: a {@link GobblinEventBuilder} instance is not reusable
+ */
+public class GobblinEventBuilder {
+  public static final String NAMESPACE = "gobblin.event";
+  public static final String EVENT_TYPE = "eventType";
+
+  @Getter
+  protected final String name;
+  @Getter
+  protected final String namespace;
+  protected final Map<String, String> metadata;
+
+  public GobblinEventBuilder(String name) {
+    this(name, NAMESPACE);
+  }
+
+  public GobblinEventBuilder(String name, String namespace) {
+    this.name = name;
+    this.namespace = namespace;
+    metadata = Maps.newHashMap();
+  }
+
+  public ImmutableMap<String, String> getMetadata() {
+    return new ImmutableMap.Builder<String, String>().putAll(metadata).build();
+  }
+
+  /**
+   * Add a metadata pair
+   */
+  public void addMetadata(String key, String value) {
+    metadata.put(key, value);
+  }
+
+  /**
+   * Add additional metadata
+   */
+  public void addAdditionalMetadata(Map<String, String> additionalMetadata) {
+    metadata.putAll(additionalMetadata);
+  }
+
+  /**
+   * Build as {@link GobblinTrackingEvent}
+   */
+  public GobblinTrackingEvent build() {
+    return new GobblinTrackingEvent(0L, namespace, name, metadata);
+  }
+  /**
+   * Submit the event
+   */
+  public void submit(MetricContext context) {
+    context.submitEvent(build());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java
new file mode 100644
index 0000000..f9030eb
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.event.lineage;
+
+
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+
+import com.google.common.base.Joiner;
+import com.google.gson.Gson;
+
+import avro.shaded.com.google.common.collect.Maps;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * The builder builds a specific {@link GobblinTrackingEvent} whose metadata has {@value GobblinEventBuilder#EVENT_TYPE}
+ * to be {@value LineageEventBuilder#LINEAGE_EVENT_TYPE}
+ *
+ * Note: A {@link LineageEventBuilder} instance is not reusable
+ */
+
+@Slf4j
+public final class LineageEventBuilder extends GobblinEventBuilder {
+  static final String LIENAGE_EVENT_NAMESPACE = getKey(NAMESPACE, "lineage");
+  static final String SOURCE = "source";
+  static final String DESTINATION = "destination";
+  static final String LINEAGE_EVENT_TYPE = "LineageEvent";
+
+  private static final Gson GSON = new Gson();
+
+  @Getter @Setter
+  private DatasetDescriptor source;
+  @Getter @Setter
+  private DatasetDescriptor destination;
+
+  public LineageEventBuilder(String name) {
+    super(name, LIENAGE_EVENT_NAMESPACE);
+    addMetadata(EVENT_TYPE, LINEAGE_EVENT_TYPE);
+  }
+
+  @Override
+  public GobblinTrackingEvent build() {
+    source.toDataMap().forEach((key, value) -> metadata.put(getKey(SOURCE, key), value));
+    destination.toDataMap().forEach((key, value) -> metadata.put(getKey(DESTINATION, key), value));
+    return new GobblinTrackingEvent(0L, namespace, name, metadata);
+  }
+
+  @Override
+  public String toString() {
+    return GSON.toJson(this);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    LineageEventBuilder event = (LineageEventBuilder) o;
+
+    if (!namespace.equals(event.namespace) || !name.equals(event.name) || !metadata.equals(event.metadata)) {
+      return false;
+    }
+
+    if (source != null ? !source.equals(event.source) : event.source != null) {
+      return false;
+    }
+
+    return destination != null ? destination.equals(event.destination) : event.destination == null;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = name.hashCode();
+    result = 31 * result + namespace.hashCode();
+    result = 31 * result + metadata.hashCode();
+    result = 31 * result + (source != null ? source.hashCode() : 0);
+    result = 31 * result + (destination != null ? destination.hashCode() : 0);
+    return result;
+  }
+
+  /**
+   * Check if the given {@link GobblinTrackingEvent} is a lineage event
+   */
+  public static boolean isLineageEvent(GobblinTrackingEvent event) {
+    String eventType = event.getMetadata().get(EVENT_TYPE);
+    return StringUtils.isNotEmpty(eventType) && eventType.equals(LINEAGE_EVENT_TYPE);
+  }
+
+  /**
+   * Create a {@link LineageEventBuilder} from a {@link GobblinEventBuilder}. An inverse function
+   * to {@link LineageEventBuilder#build()}
+   */
+  public static LineageEventBuilder fromEvent(GobblinTrackingEvent event) {
+    Map<String, String> metadata = event.getMetadata();
+    LineageEventBuilder lineageEvent = new LineageEventBuilder(event.getName());
+
+    String sourcePrefix = getKey(SOURCE, "");
+    Map<String, String> sourceDataMap = Maps.newHashMap();
+    String destinationPrefix = getKey(DESTINATION, "");
+    Map<String, String> destinationDataMap = Maps.newHashMap();
+
+    metadata.forEach((key, value) -> {
+      if (key.startsWith(sourcePrefix)) {
+        sourceDataMap.put(key.substring(sourcePrefix.length()), value);
+      } else if (key.startsWith(destinationPrefix)) {
+        destinationDataMap.put(key.substring(destinationPrefix.length()), value);
+      } else {
+        lineageEvent.addMetadata(key, value);
+      }
+    });
+
+    lineageEvent.setSource(DatasetDescriptor.fromDataMap(sourceDataMap));
+    lineageEvent.setDestination(DatasetDescriptor.fromDataMap(destinationDataMap));
+    return lineageEvent;
+  }
+
+  static String getKey(Object ... parts) {
+    return Joiner.on(".").join(parts);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageException.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageException.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageException.java
new file mode 100644
index 0000000..e7528b3
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.event.lineage;
+
+/**
+ * A set of exceptions used by {@link LineageEventBuilder} when lineage information is serialized or deserialized.
+ */
+public class LineageException extends Exception {
+  public LineageException(String message) {
+    super(message);
+  }
+  public static class ConflictException extends LineageException {
+    public ConflictException(String branchId, LineageEventBuilder actual, LineageEventBuilder expect) {
+      super("Conflict LineageEvent: branchId=" + branchId + ", expected=" + expect.toString() + " actual=" + actual.toString());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
new file mode 100644
index 0000000..dd6c8f2
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.event.lineage;
+
+import java.util.Collection;
+import java.util.Map;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+
+
+/**
+ * The lineage coordinator in a Gobblin job with single source and multiple destinations
+ *
+ * <p>
+ *   In Gobblin, a work unit processes records from only one dataset. It writes output to one or more destinations,
+ *   depending on the number of branches configured in the job. One destination means an output as another dataset.
+ * </p>
+ *
+ * <p>
+ *   Lineage info is jointly collected from the source, represented by {@link org.apache.gobblin.source.Source} or
+ *   {@link org.apache.gobblin.source.extractor.Extractor}, and destination,
+ *   represented by {@link org.apache.gobblin.writer.DataWriter} or {@link org.apache.gobblin.publisher.DataPublisher}
+ * </p>
+ *
+ * <p>
+ *   The general flow is:
+ *   <ol>
+ *     <li> source sets its {@link DatasetDescriptor} to each work unit </li>
+ *     <li> destination puts its {@link DatasetDescriptor} to the work unit </li>
+ *     <li> load and send all lineage events from all states </li>
+ *     <li> purge lineage info from all states </li>
+ *   </ol>
+ * </p>
+ */
+@Slf4j
+public final class LineageInfo {
+  public static final String BRANCH = "branch";
+
+  private static final Gson GSON = new Gson();
+  private static final String NAME_KEY = "name";
+
+  private LineageInfo() {
+  }
+
+  /**
+   * Set source {@link DatasetDescriptor} of a lineage event
+   *
+   * <p>
+   *   Only the {@link org.apache.gobblin.source.Source} or its {@link org.apache.gobblin.source.extractor.Extractor}
+   *   is supposed to set the source for a work unit of a dataset
+   * </p>
+   *
+   * @param state state about a {@link org.apache.gobblin.source.workunit.WorkUnit}
+   *
+   */
+  public static void setSource(DatasetDescriptor source, State state) {
+    state.setProp(getKey(NAME_KEY), source.getName());
+    state.setProp(getKey(LineageEventBuilder.SOURCE), GSON.toJson(source));
+  }
+
+  /**
+   * Put a {@link DatasetDescriptor} of a destination dataset to a state
+   *
+   * <p>
+   *   Only the {@link org.apache.gobblin.writer.DataWriter} or {@link org.apache.gobblin.publisher.DataPublisher}
+   *   is supposed to put the destination dataset information. Since different branches may concurrently put,
+   *   the method is implemented to be threadsafe
+   * </p>
+   */
+  public static void putDestination(DatasetDescriptor destination, int branchId, State state) {
+    if (!hasLineageInfo(state)) {
+      log.warn("State has no lineage info but branch " + branchId + " puts a destination: " + GSON.toJson(destination));
+      return;
+    }
+
+    synchronized (state.getProp(getKey(NAME_KEY))) {
+      state.setProp(getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION), GSON.toJson(destination));
+    }
+  }
+
+  /**
+   * Load all lineage information from {@link State}s of a dataset
+   *
+   * <p>
+   *   For a dataset, the same branch across different {@link State}s must be the same, as
+   *   the same branch means the same destination
+   * </p>
+   *
+   * @param states All states which belong to the same dataset
+   * @return A collection of {@link LineageEventBuilder}s put in the state
+   * @throws LineageException.ConflictException if two states have conflict lineage info
+   */
+  public static Collection<LineageEventBuilder> load(Collection<? extends State> states)
+      throws LineageException {
+    Preconditions.checkArgument(states != null && !states.isEmpty());
+    final Map<String, LineageEventBuilder> resultEvents = Maps.newHashMap();
+    for (State state : states) {
+      Map<String, LineageEventBuilder> branchedEvents = load(state);
+      for (Map.Entry<String, LineageEventBuilder> entry : branchedEvents.entrySet()) {
+        String branch = entry.getKey();
+        LineageEventBuilder event = entry.getValue();
+        LineageEventBuilder resultEvent = resultEvents.get(branch);
+        if (resultEvent == null) {
+          resultEvents.put(branch, event);
+        } else if (!resultEvent.equals(event)) {
+          throw new LineageException.ConflictException(branch, event, resultEvent);
+        }
+      }
+    }
+    return resultEvents.values();
+  }
+
+  /**
+   * Load all lineage info from a {@link State}
+   *
+   * @return A map from branch to its lineage info. If there is no destination info, return an empty map
+   */
+  static Map<String, LineageEventBuilder> load(State state) {
+    String name = state.getProp(getKey(NAME_KEY));
+    DatasetDescriptor source = GSON.fromJson(state.getProp(getKey(LineageEventBuilder.SOURCE)), DatasetDescriptor.class);
+
+    String branchedPrefix = getKey(BRANCH, "");
+    Map<String, LineageEventBuilder> events = Maps.newHashMap();
+    for (Map.Entry<Object, Object> entry : state.getProperties().entrySet()) {
+      String key = entry.getKey().toString();
+      if (!key.startsWith(branchedPrefix)) {
+        continue;
+      }
+
+      String[] parts = key.substring(branchedPrefix.length()).split("\\.");
+      assert parts.length == 2;
+      String branchId = parts[0];
+      LineageEventBuilder event = events.get(branchId);
+      if (event == null) {
+        event = new LineageEventBuilder(name);
+        event.setSource(new DatasetDescriptor(source));
+        events.put(parts[0], event);
+      }
+      switch (parts[1]) {
+        case LineageEventBuilder.DESTINATION:
+          DatasetDescriptor destination = GSON.fromJson(entry.getValue().toString(), DatasetDescriptor.class);
+          destination.addMetadata(BRANCH, branchId);
+          event.setDestination(destination);
+          break;
+        default:
+          throw new RuntimeException("Unsupported lineage key: " + key);
+      }
+    }
+
+    return events;
+  }
+
+  /**
+   * Remove all lineage related properties from a state
+   */
+  public static void purgeLineageInfo(State state) {
+    state.removePropsWithPrefix(LineageEventBuilder.LIENAGE_EVENT_NAMESPACE);
+  }
+
+  /**
+   * Check if the given state has lineage info
+   */
+  public static boolean hasLineageInfo(State state) {
+    return state.contains(getKey(NAME_KEY));
+  }
+
+  /**
+   * Get the full lineage event name from a state
+   */
+  public static String getFullEventName(State state) {
+    return Joiner.on('.').join(LineageEventBuilder.LIENAGE_EVENT_NAMESPACE, state.getProp(getKey(NAME_KEY)));
+  }
+
+  /**
+   * Prefix all keys with {@link LineageEventBuilder#LIENAGE_EVENT_NAMESPACE}
+   */
+  private static String getKey(Object... objects) {
+    Object[] args = new Object[objects.length + 1];
+    args[0] = LineageEventBuilder.LIENAGE_EVENT_NAMESPACE;
+    System.arraycopy(objects, 0, args, 1, objects.length);
+    return LineageEventBuilder.getKey(args);
+  }
+}