You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ib...@apache.org on 2017/08/17 18:25:18 UTC

incubator-gobblin git commit: [GOBBLIN-182] Add lineage event for query based source

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master da382dbf1 -> 280b1d35e


[GOBBLIN-182] Add lineage event for query based source

Closes #2034 from yukuai518/lineage2


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/280b1d35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/280b1d35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/280b1d35

Branch: refs/heads/master
Commit: 280b1d35edbb66ec9db19d852caa1b8ed43a34ac
Parents: da382db
Author: Kuai Yu <ku...@linkedin.com>
Authored: Thu Aug 17 11:25:11 2017 -0700
Committer: Issac Buenrostro <ib...@apache.org>
Committed: Thu Aug 17 11:25:11 2017 -0700

----------------------------------------------------------------------
 .../gobblin/lineage/LineageException.java       |  39 ++++
 .../org/apache/gobblin/lineage/LineageInfo.java | 234 +++++++++++++++++++
 .../gobblin/publisher/BaseDataPublisher.java    |  10 +
 .../extractor/extract/QueryBasedSource.java     |   6 +
 .../apache/gobblin/lineage/LineageInfoTest.java | 160 +++++++++++++
 .../extractor/extract/jdbc/MysqlSource.java     |  13 ++
 .../gobblin/runtime/SafeDatasetCommit.java      |  31 +++
 7 files changed, 493 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/280b1d35/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageException.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageException.java b/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageException.java
new file mode 100644
index 0000000..8dcf592
--- /dev/null
+++ b/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.gobblin.lineage;
+
+/**
+ * A set of exceptions used by {@link LineageInfo} when lineage information is serialized or deserialized.
+ */
+public class LineageException extends Exception {
+  public LineageException(String message) {
+    super(message);
+  }
+  public static class LineageConflictAttributeException extends LineageException {
+    public LineageConflictAttributeException (String key, String oldValue, String newValue) {
+      super ("Lineage has conflict value: key=" + key + " value=[1]" + oldValue + " [2]" + newValue);
+    }
+  }
+
+  public static class LineageUnsupportedLevelException extends LineageException {
+    public LineageUnsupportedLevelException (LineageInfo.Level level) {
+      super (level.toString() + " is not supported");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/280b1d35/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageInfo.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageInfo.java b/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageInfo.java
new file mode 100644
index 0000000..8d582f2
--- /dev/null
+++ b/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageInfo.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.lineage;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * A class to restore all lineage information from a {@link State}
+ * All lineage attributes are under LINEAGE_NAME_SPACE namespace.
+ *
+ * For example, a typical lineage attributes looks like:
+ *    gobblin.lineage.K1          ---> V1
+ *    gobblin.lineage.branch.3.K2 ---> V2
+ *
+ * K1 is dataset level attribute, K2 is branch level attribute, and branch id is 3.
+ */
+
+@Slf4j
+public class LineageInfo {
+
+  public static final String LINEAGE_NAME_SPACE = "gobblin.lineage";
+  public static final String BRANCH_ID_METADATA_KEY = "branchId";
+  private static final String DATASET_PREFIX =  LINEAGE_NAME_SPACE + ".";
+  private static final String BRANCH_PREFIX = DATASET_PREFIX + "branch.";
+
+  @Getter
+  private String datasetUrn;
+  @Getter
+  private String jobId;
+
+  private Map<String, String> lineageMetaData;
+
+  public enum Level {
+    DATASET,
+    BRANCH,
+    All
+  }
+
+  private LineageInfo() {
+  }
+
+  private LineageInfo(String datasetUrn, String jobId, ImmutableMap<String, String> lineageMetaData) {
+    Preconditions.checkArgument(datasetUrn != null);
+    Preconditions.checkArgument(jobId != null);
+    this.datasetUrn = datasetUrn;
+    this.jobId = jobId;
+    this.lineageMetaData = lineageMetaData;
+  }
+
+  /**
+   * Retrieve lineage information from a {@link State} by {@link Level}
+   * @param state A single state
+   * @param level {@link Level#DATASET}  only load dataset level lineage attributes
+   *              {@link Level#BRANCH}   only load branch level lineage attributes
+   *              {@link Level#All}      load all lineage attributes
+   * @return A collection of {@link LineageInfo}s per branch. When level is {@link Level#DATASET}, this list has only single element.
+   */
+  public static Collection<LineageInfo> load (State state, Level level) throws LineageException {
+    return load(Collections.singleton(state), level);
+  }
+
+  /**
+   * Get all lineage meta data.
+   */
+  public ImmutableMap<String, String> getLineageMetaData() {
+    return ImmutableMap.copyOf(lineageMetaData);
+  }
+
+  /**
+   * Retrieve all lineage information from different {@link State}s.
+   * This requires the job id and dataset urn to be present in the state, under job.id and dataset.urn.
+   * A global union operation is applied to combine all <K, V> pairs from the input {@link State}s. If multiple {@link State}s
+   * share the same K, but have conflicting V, a {@link LineageException} is thrown.
+   *
+   * {@link Level} can control if a dataset level or branch level information should be used. When {@link Level#All} is
+   * specified, all levels of information will be returned; otherwise only specified level of information will be returned.
+   *
+   * For instance, assume we have below input states:
+   *    State[0]: gobblin.lineage.K1          ---> V1
+   *              gobblin.lineage.K2          ---> V2
+   *              gobblin.lineage.branch.1.K4 ---> V4
+   *    State[1]: gobblin.lineage.K2          ---> V2
+   *              gobblin.lineage.K3          ---> V3
+   *              gobblin.lineage.branch.1.K4 ---> V4
+   *              gobblin.lineage.branch.1.K5 ---> V5
+   *              gobblin.lineage.branch.2.K6 ---> V6
+   *
+   *  (1) With {@link Level#DATASET} level, the output would be:
+   *      LinieageInfo[0]:  K1 ---> V1
+   *                        K2 ---> V2
+   *                        K3 ---> V3
+   *  (2) With {@link Level#All} level, the output would be: (because there are two branches, so there are two LineageInfo)
+   *      LineageInfo[0]:   K1 ---> V1
+   *                        K2 ---> V2
+   *                        K3 ---> V3
+   *                        K4 ---> V4
+   *                        K5 ---> V5
+   *
+   *      LineageInfo[1]:   K1 ---> V1
+   *                        K2 ---> V2
+   *                        K3 ---> V3
+   *                        K6 ---> V6
+   *
+   *   (3) With {@link Level#BRANCH} level, the output would be: (only branch level information was returned)
+   *      LineageInfo[0]:   K4 ---> V4
+   *                        K5 ---> V5
+   *      LineageInfo[1]:   K6 ---> V6
+   *
+   * @param states All states which belong to the same dataset and share the same jobId.
+   * @param level {@link Level#DATASET}  only load dataset level lineage attributes
+   *              {@link Level#BRANCH}   only load branch level lineage attributes
+   *              {@link Level#All}      load all lineage attributes
+   * @return A collection of {@link LineageInfo}s per branch. When level is {@link Level#DATASET}, this list has only single element.
+   *
+   * @throws LineageException.LineageConflictAttributeException if two states have same key but not the same value.
+   */
+  public static Collection<LineageInfo> load (Collection<? extends State> states, Level level) throws LineageException {
+    Preconditions.checkArgument(states != null && !states.isEmpty());
+    Map<String, String> datasetMetaData = new HashMap<>();
+    Map<String, Map<String, String>> branchAggregate = new HashMap<>();
+
+    State anyOne = states.iterator().next();
+    String jobId = anyOne.getProp(ConfigurationKeys.JOB_ID_KEY, "");
+    String urn = anyOne.getProp(ConfigurationKeys.DATASET_URN_KEY, ConfigurationKeys.DEFAULT_DATASET_URN);
+
+    for (State state: states) {
+      for (Map.Entry<Object, Object> entry : state.getProperties().entrySet()) {
+        if (entry.getKey() instanceof String && ((String) entry.getKey()).startsWith(LINEAGE_NAME_SPACE)) {
+
+          String lineageKey = ((String) entry.getKey());
+          String lineageValue = (String) entry.getValue();
+
+          if (lineageKey.startsWith(BRANCH_PREFIX)) {
+            String branchPrefixStrip = lineageKey.substring(BRANCH_PREFIX.length());
+            String branchId = branchPrefixStrip.substring(0, branchPrefixStrip.indexOf("."));
+            String key = branchPrefixStrip.substring(branchPrefixStrip.indexOf(".") + 1);
+
+            if (level == Level.BRANCH || level == Level.All) {
+              if (!branchAggregate.containsKey(branchId)) {
+                branchAggregate.put(branchId, new HashMap<>());
+              }
+              Map<String, String> branchMetaData = branchAggregate.get(branchId);
+              String prev = branchMetaData.put(key, lineageValue);
+              if (prev != null && !prev.equals(lineageValue)) {
+                throw new LineageException.LineageConflictAttributeException(lineageKey, prev, lineageValue);
+              }
+            }
+          } else if (lineageKey.startsWith(DATASET_PREFIX)) {
+            if (level == Level.DATASET || level == Level.All) {
+              String prev = datasetMetaData.put(lineageKey.substring(DATASET_PREFIX.length()), lineageValue);
+              if (prev != null && !prev.equals(lineageValue)) {
+                throw new LineageException.LineageConflictAttributeException(lineageKey, prev, lineageValue);
+              }
+            }
+          }
+        }
+      }
+    }
+
+    Collection<LineageInfo> collection = Sets.newHashSet();
+
+    if (level == Level.DATASET) {
+      ImmutableMap<String, String> metaData = ImmutableMap.<String, String>builder()
+          .putAll(datasetMetaData)
+          .build();
+      collection.add(new LineageInfo(urn, jobId, metaData));
+      return collection;
+    } else if (level == Level.BRANCH || level == Level.All){
+      if (branchAggregate.isEmpty()) {
+        if (level == Level.All) {
+          collection.add(new LineageInfo(urn, jobId, ImmutableMap.<String, String>builder().putAll(datasetMetaData).build()));
+        }
+        return collection;
+      }
+      for (Map.Entry<String, Map<String, String>> branchMetaDataEntry: branchAggregate.entrySet()) {
+        String branchId = branchMetaDataEntry.getKey();
+        Map<String, String> branchMetaData = branchMetaDataEntry.getValue();
+        ImmutableMap.Builder<String, String> metaDataBuilder = ImmutableMap.builder();
+        if (level == Level.All) {
+          metaDataBuilder.putAll(datasetMetaData);
+        }
+        metaDataBuilder.putAll(branchMetaData).put(BRANCH_ID_METADATA_KEY, branchId);
+        collection.add(new LineageInfo(urn, jobId, metaDataBuilder.build()));
+      }
+
+      return collection;
+    } else {
+      throw new LineageException.LineageUnsupportedLevelException(level);
+    }
+  }
+
+  public static void setDatasetLineageAttribute (State state, String key, String value) {
+    state.setProp(DATASET_PREFIX + key, value);
+  }
+
+  public static void setBranchLineageAttribute (State state, int branchId, String key, String value) {
+    state.setProp(BRANCH_PREFIX + Joiner.on(".").join(branchId, key), value);
+  }
+
+  public final String getId() {
+    return Joiner.on(":::").join(this.datasetUrn, this.jobId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/280b1d35/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
index f0d0e32..19314e5 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
@@ -30,6 +30,7 @@ import java.util.Objects;
 import java.util.Set;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.gobblin.lineage.LineageInfo;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -97,6 +98,9 @@ public class BaseDataPublisher extends SingleTaskDataPublisher {
   protected final int parallelRunnerThreads;
   protected final Map<String, ParallelRunner> parallelRunners = Maps.newHashMap();
   protected final Set<Path> publisherOutputDirs = Sets.newHashSet();
+
+  public static final String PUBLISH_OUTOUT = "publish.output";
+
   /* Each partition in each branch may have separate metadata. The metadata mergers are responsible
    * for aggregating this information from all workunits so it can be published.
    */
@@ -328,6 +332,7 @@ public class BaseDataPublisher extends SingleTaskDataPublisher {
         if (!replaceFinalOutputDir) {
           addWriterOutputToExistingDir(writerOutputDir, publisherOutputDir, state, branchId, parallelRunner);
           writerOutputPathsMoved.add(writerOutputDir);
+          addPublisherLineageInfo(state, branchId, publisherOutputDir.toString());
           return;
         }
 
@@ -342,9 +347,14 @@ public class BaseDataPublisher extends SingleTaskDataPublisher {
 
       movePath(parallelRunner, state, writerOutputDir, publisherOutputDir, branchId);
       writerOutputPathsMoved.add(writerOutputDir);
+      addPublisherLineageInfo(state, branchId, publisherOutputDir.toString());
     }
   }
 
+  protected void addPublisherLineageInfo(WorkUnitState state, int branchId, String output) {
+    LineageInfo.setBranchLineageAttribute(state, branchId, PUBLISH_OUTOUT, output);
+  }
+
   /**
    * Get the output directory path this {@link BaseDataPublisher} will write to.
    *

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/280b1d35/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
index fa5a360..d94dede 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import java.util.regex.Pattern;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.lineage.LineageInfo;
 import org.slf4j.MDC;
 
 import com.google.common.base.Optional;
@@ -234,6 +235,7 @@ public abstract class QueryBasedSource<S, D> extends AbstractSource<S, D> {
       workunit.setProp(ConfigurationKeys.SOURCE_ENTITY, sourceEntity.getSourceEntityName());
       workunit.setProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, sourceEntity.getDestTableName());
       workunit.setProp(WORK_UNIT_STATE_VERSION_KEY, CURRENT_WORK_UNIT_STATE_VERSION);
+      addLineageSourceInfo (state, sourceEntity, workunit);
       partition.serialize(workunit);
       workUnits.add(workunit);
     }
@@ -241,6 +243,10 @@ public abstract class QueryBasedSource<S, D> extends AbstractSource<S, D> {
     return workUnits;
   }
 
+  protected void addLineageSourceInfo (SourceState sourceState, SourceEntity entity, WorkUnit workUnit) {
+    workUnit.setProp(ConfigurationKeys.DATASET_URN_KEY, entity.destTableName);
+  }
+
   protected Set<SourceEntity> getFilteredSourceEntities(SourceState state) {
     Set<SourceEntity> unfilteredEntities = getSourceEntities(state);
     return getFilteredSourceEntitiesHelper(state, unfilteredEntities);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/280b1d35/gobblin-core/src/test/java/org/apache/gobblin/lineage/LineageInfoTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/lineage/LineageInfoTest.java b/gobblin-core/src/test/java/org/apache/gobblin/lineage/LineageInfoTest.java
new file mode 100644
index 0000000..2a7ea15
--- /dev/null
+++ b/gobblin-core/src/test/java/org/apache/gobblin/lineage/LineageInfoTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.lineage;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import gobblin.configuration.State;
+
+
+public class LineageInfoTest {
+
+  @Test
+  public void testDatasetLevel () {
+    Collection<LineageInfo> collection = null;
+    try {
+      collection = LineageInfo.load(createTestStates(), LineageInfo.Level.DATASET);
+    } catch (LineageException e) {
+      Assert.fail(e.toString());
+    }
+
+    Assert.assertEquals(1, collection.size());
+    LineageInfo info = collection.iterator().next();
+    ImmutableMap<String, String> map = info.getLineageMetaData();
+    Assert.assertEquals(3, map.size());
+    Assert.assertEquals("V1", map.get("K1"));
+    Assert.assertEquals("V2", map.get("K2"));
+    Assert.assertEquals("V3", map.get("K3"));
+  }
+
+  @Test
+  public void testBranchLevel () {
+    Collection<LineageInfo> collection = null;
+    try {
+      collection = LineageInfo.load(createTestStates(), LineageInfo.Level.BRANCH);
+    } catch (LineageException e) {
+      Assert.fail(e.toString());
+    }
+
+    Assert.assertEquals(2, collection.size());
+
+    for (LineageInfo info: collection) {
+      Map<String, String> map = info.getLineageMetaData();
+      String branchId = map.get(LineageInfo.BRANCH_ID_METADATA_KEY);
+      if (branchId.equals("1")) {
+        Assert.assertEquals(3, map.size()); // include BRANCH_ID_METADATA_KEY
+        Assert.assertEquals("V4", map.get("K4"));
+        Assert.assertEquals("V5", map.get("K5"));
+      }
+
+      if (branchId.equals("2")) {
+        Assert.assertEquals(2, map.size()); // include BRANCH_ID_METADATA_KEY
+        Assert.assertEquals("V6", map.get("K6"));
+      }
+    }
+  }
+
+  @Test
+  public void testAllLevel () {
+    Collection<LineageInfo> collection = null;
+    try {
+      collection = LineageInfo.load(createTestStates(), LineageInfo.Level.All);
+    } catch (LineageException e) {
+      Assert.fail(e.toString());
+    }
+
+    Assert.assertEquals(2, collection.size());
+    for (LineageInfo info: collection) {
+      Map<String, String> map = info.getLineageMetaData();
+      String branchId = map.get(LineageInfo.BRANCH_ID_METADATA_KEY);
+      if (branchId.equals("1")) {
+        Assert.assertEquals(6, map.size()); // include BRANCH_ID_METADATA_KEY
+        Assert.assertEquals("V1", map.get("K1"));
+        Assert.assertEquals("V2", map.get("K2"));
+        Assert.assertEquals("V3", map.get("K3"));
+        Assert.assertEquals("V4", map.get("K4"));
+        Assert.assertEquals("V5", map.get("K5"));
+      }
+
+      if (branchId.equals("2")) {
+        Assert.assertEquals(5, map.size()); // include BRANCH_ID_METADATA_KEY
+        Assert.assertEquals("V1", map.get("K1"));
+        Assert.assertEquals("V2", map.get("K2"));
+        Assert.assertEquals("V3", map.get("K3"));
+        Assert.assertEquals("V6", map.get("K6"));
+      }
+    }
+  }
+
+  @Test
+  public void testNoBranchInfo () {
+    State state = new State();
+    state.setProp(ConfigurationKeys.JOB_ID_KEY, "test_job_id_123456");
+    state.setProp(ConfigurationKeys.DATASET_URN_KEY, "PageViewEvent");
+    LineageInfo.setDatasetLineageAttribute(state,"K1", "V1");
+    LineageInfo.setDatasetLineageAttribute(state,"K2", "V2");
+    Collection<LineageInfo> collection = null;
+    try {
+      collection = LineageInfo.load(Lists.newArrayList(state), LineageInfo.Level.BRANCH);
+    } catch (LineageException e) {
+      Assert.fail(e.toString());
+    }
+
+    Assert.assertEquals(true, collection.isEmpty());
+  }
+
+  private Collection<State> createTestStates() {
+    /*
+     *    State[0]: gobblin.lineage.K1          ---> V1
+     *              gobblin.lineage.K2          ---> V2
+     *              gobblin.lineage.branch.1.K4 ---> V4
+     *    State[1]: gobblin.lineage.K2          ---> V2
+     *              gobblin.lineage.K3          ---> V3
+     *              gobblin.lineage.branch.1.K4 ---> V4
+     *              gobblin.lineage.branch.1.K5 ---> V5
+     *              gobblin.lineage.branch.2.K6 ---> V6
+     */
+    State state_1 = new State();
+    state_1.setProp(ConfigurationKeys.JOB_ID_KEY, "test_job_id_123456");
+    state_1.setProp(ConfigurationKeys.DATASET_URN_KEY, "PageViewEvent");
+    LineageInfo.setDatasetLineageAttribute(state_1,"K1", "V1");
+    LineageInfo.setDatasetLineageAttribute(state_1,"K2", "V2");
+    LineageInfo.setBranchLineageAttribute(state_1, 1, "K4", "V4");
+
+
+    State state_2 = new State();
+    state_2.setProp(ConfigurationKeys.JOB_ID_KEY, "test_job_id_123456");
+    state_2.setProp(ConfigurationKeys.DATASET_URN_KEY, "PageViewEvent");
+
+    LineageInfo.setDatasetLineageAttribute(state_2,"K2", "V2");
+    LineageInfo.setDatasetLineageAttribute(state_2,"K3", "V3");
+    LineageInfo.setBranchLineageAttribute(state_2, 1, "K4", "V4");
+    LineageInfo.setBranchLineageAttribute(state_2, 1, "K5", "V5");
+    LineageInfo.setBranchLineageAttribute(state_2, 2, "K6", "V6");
+
+    return Lists.newArrayList(state_1, state_2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/280b1d35/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
index 20a0823..57fdedd 100644
--- a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
+++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
@@ -17,10 +17,14 @@
 
 package org.apache.gobblin.source.extractor.extract.jdbc;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.lineage.LineageInfo;
 import org.apache.gobblin.source.extractor.Extractor;
 import org.apache.gobblin.source.extractor.exception.ExtractPrepareException;
 import java.io.IOException;
 
+import org.apache.gobblin.source.workunit.WorkUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,4 +54,13 @@ public class MysqlSource extends QueryBasedSource<JsonArray, JsonElement> {
     }
     return extractor;
   }
+
+  protected void addLineageSourceInfo (SourceState sourceState, SourceEntity entity, WorkUnit workUnit) {
+    super.addLineageSourceInfo(sourceState, entity, workUnit);
+    String host = sourceState.getProp(ConfigurationKeys.SOURCE_CONN_HOST_NAME);
+    String port = sourceState.getProp(ConfigurationKeys.SOURCE_CONN_PORT);
+    String database = sourceState.getProp(ConfigurationKeys.SOURCE_QUERYBASED_SCHEMA);
+    String connectionUrl = "jdbc:mysql://" + host.trim() + ":" + port + "/" + database.trim();
+    LineageInfo.setDatasetLineageAttribute(workUnit, "connectionUrl", connectionUrl);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/280b1d35/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index 363adf3..9521575 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -33,8 +33,13 @@ import org.apache.gobblin.commit.CommitStep;
 import org.apache.gobblin.commit.DeliverySemantics;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.lineage.LineageException;
+import org.apache.gobblin.lineage.LineageInfo;
+import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.publisher.CommitSequencePublisher;
 import org.apache.gobblin.publisher.DataPublisher;
+import org.apache.gobblin.publisher.NoopPublisher;
 import org.apache.gobblin.publisher.UnpublishedHandling;
 import org.apache.gobblin.runtime.commit.DatasetStateCommitStep;
 import org.apache.gobblin.runtime.task.TaskFactory;
@@ -159,6 +164,7 @@ final class SafeDatasetCommit implements Callable<Void> {
         } else if (canPersistStates) {
           persistDatasetState(datasetUrn, datasetState);
         }
+
       } catch (IOException | RuntimeException ioe) {
         log.error(String
             .format("Failed to persist dataset state for dataset %s of job %s", datasetUrn, this.jobContext.getJobId()),
@@ -169,6 +175,30 @@ final class SafeDatasetCommit implements Callable<Void> {
     return null;
   }
 
+  private void submitLineageEvent(Collection<TaskState> states) {
+    if (states.size() == 0) {
+      return;
+    }
+
+    TaskState oneWorkUnitState = states.iterator().next();
+    if (oneWorkUnitState.contains(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE) && oneWorkUnitState.getProp(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE).equals(
+        NoopPublisher.class.getName())) {
+      // if no publisher configured, each task should be responsible for sending lineage event.
+      return;
+    }
+
+    try {
+      Collection<LineageInfo> branchLineages = LineageInfo.load(states, LineageInfo.Level.All);
+      EventSubmitter submitter = new EventSubmitter.Builder(Instrumented.getMetricContext(datasetState, SafeDatasetCommit.class),
+          LineageInfo.LINEAGE_NAME_SPACE).build();
+      for (LineageInfo info: branchLineages) {
+        submitter.submit(info.getId(), info.getLineageMetaData());
+      }
+    } catch (LineageException e) {
+      log.error ("Lineage event submission failed due to :" + e.toString());
+    }
+  }
+
   /**
    * Synchronized version of {@link #commitDataset(Collection, DataPublisher)} used when publisher is not
    * thread safe.
@@ -186,6 +216,7 @@ final class SafeDatasetCommit implements Callable<Void> {
 
     try {
       publisher.publish(taskStates);
+      submitLineageEvent(taskStates);
     } catch (Throwable t) {
       log.error("Failed to commit dataset", t);
       setTaskFailureException(taskStates, t);