You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ib...@apache.org on 2017/08/17 18:25:18 UTC
incubator-gobblin git commit: [GOBBLIN-182] Add lineage event for
query based source
Repository: incubator-gobblin
Updated Branches:
refs/heads/master da382dbf1 -> 280b1d35e
[GOBBLIN-182] Add lineage event for query based source
Closes #2034 from yukuai518/lineage2
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/280b1d35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/280b1d35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/280b1d35
Branch: refs/heads/master
Commit: 280b1d35edbb66ec9db19d852caa1b8ed43a34ac
Parents: da382db
Author: Kuai Yu <ku...@linkedin.com>
Authored: Thu Aug 17 11:25:11 2017 -0700
Committer: Issac Buenrostro <ib...@apache.org>
Committed: Thu Aug 17 11:25:11 2017 -0700
----------------------------------------------------------------------
.../gobblin/lineage/LineageException.java | 39 ++++
.../org/apache/gobblin/lineage/LineageInfo.java | 234 +++++++++++++++++++
.../gobblin/publisher/BaseDataPublisher.java | 10 +
.../extractor/extract/QueryBasedSource.java | 6 +
.../apache/gobblin/lineage/LineageInfoTest.java | 160 +++++++++++++
.../extractor/extract/jdbc/MysqlSource.java | 13 ++
.../gobblin/runtime/SafeDatasetCommit.java | 31 +++
7 files changed, 493 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/280b1d35/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageException.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageException.java b/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageException.java
new file mode 100644
index 0000000..8dcf592
--- /dev/null
+++ b/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.gobblin.lineage;
+
+/**
+ * A set of exceptions used by {@link LineageInfo} when lineage information is serialized or deserialized.
+ */
+public class LineageException extends Exception {
+ public LineageException(String message) {
+ super(message);
+ }
+ public static class LineageConflictAttributeException extends LineageException {
+ public LineageConflictAttributeException (String key, String oldValue, String newValue) {
+ super ("Lineage has conflict value: key=" + key + " value=[1]" + oldValue + " [2]" + newValue);
+ }
+ }
+
+ public static class LineageUnsupportedLevelException extends LineageException {
+ public LineageUnsupportedLevelException (LineageInfo.Level level) {
+ super (level.toString() + " is not supported");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/280b1d35/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageInfo.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageInfo.java b/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageInfo.java
new file mode 100644
index 0000000..8d582f2
--- /dev/null
+++ b/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageInfo.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.lineage;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * A class to restore all lineage information from a {@link State}
+ * All lineage attributes are under LINEAGE_NAME_SPACE namespace.
+ *
+ * For example, a typical lineage attributes looks like:
+ * gobblin.lineage.K1 ---> V1
+ * gobblin.lineage.branch.3.K2 ---> V2
+ *
+ * K1 is dataset level attribute, K2 is branch level attribute, and branch id is 3.
+ */
+
+@Slf4j
+public class LineageInfo {
+
+ public static final String LINEAGE_NAME_SPACE = "gobblin.lineage";
+ public static final String BRANCH_ID_METADATA_KEY = "branchId";
+ private static final String DATASET_PREFIX = LINEAGE_NAME_SPACE + ".";
+ private static final String BRANCH_PREFIX = DATASET_PREFIX + "branch.";
+
+ @Getter
+ private String datasetUrn;
+ @Getter
+ private String jobId;
+
+ private Map<String, String> lineageMetaData;
+
+ public enum Level {
+ DATASET,
+ BRANCH,
+ All
+ }
+
+ private LineageInfo() {
+ }
+
+ private LineageInfo(String datasetUrn, String jobId, ImmutableMap<String, String> lineageMetaData) {
+ Preconditions.checkArgument(datasetUrn != null);
+ Preconditions.checkArgument(jobId != null);
+ this.datasetUrn = datasetUrn;
+ this.jobId = jobId;
+ this.lineageMetaData = lineageMetaData;
+ }
+
+ /**
+ * Retrieve lineage information from a {@link State} by {@link Level}
+ * @param state A single state
+ * @param level {@link Level#DATASET} only load dataset level lineage attributes
+ * {@link Level#BRANCH} only load branch level lineage attributes
+ * {@link Level#All} load all lineage attributes
+ * @return A collection of {@link LineageInfo}s per branch. When level is {@link Level#DATASET}, this list has only single element.
+ */
+ public static Collection<LineageInfo> load (State state, Level level) throws LineageException {
+ return load(Collections.singleton(state), level);
+ }
+
+ /**
+ * Get all lineage meta data.
+ */
+ public ImmutableMap<String, String> getLineageMetaData() {
+ return ImmutableMap.copyOf(lineageMetaData);
+ }
+
+ /**
+ * Retrieve all lineage information from different {@link State}s.
+ * This requires the job id and dataset urn to be present in the state, under job.id and dataset.urn.
+ * A global union operation is applied to combine all <K, V> pairs from the input {@link State}s. If multiple {@link State}s
+ * share the same K, but have conflicting V, a {@link LineageException} is thrown.
+ *
+ * {@link Level} can control if a dataset level or branch level information should be used. When {@link Level#All} is
+ * specified, all levels of information will be returned; otherwise only specified level of information will be returned.
+ *
+ * For instance, assume we have below input states:
+ * State[0]: gobblin.lineage.K1 ---> V1
+ * gobblin.lineage.K2 ---> V2
+ * gobblin.lineage.branch.1.K4 ---> V4
+ * State[1]: gobblin.lineage.K2 ---> V2
+ * gobblin.lineage.K3 ---> V3
+ * gobblin.lineage.branch.1.K4 ---> V4
+ * gobblin.lineage.branch.1.K5 ---> V5
+ * gobblin.lineage.branch.2.K6 ---> V6
+ *
+ * (1) With {@link Level#DATASET} level, the output would be:
+ * LinieageInfo[0]: K1 ---> V1
+ * K2 ---> V2
+ * K3 ---> V3
+ * (2) With {@link Level#All} level, the output would be: (because there are two branches, so there are two LineageInfo)
+ * LineageInfo[0]: K1 ---> V1
+ * K2 ---> V2
+ * K3 ---> V3
+ * K4 ---> V4
+ * K5 ---> V5
+ *
+ * LineageInfo[1]: K1 ---> V1
+ * K2 ---> V2
+ * K3 ---> V3
+ * K6 ---> V6
+ *
+ * (3) With {@link Level#BRANCH} level, the output would be: (only branch level information was returned)
+ * LineageInfo[0]: K4 ---> V4
+ * K5 ---> V5
+ * LineageInfo[1]: K6 ---> V6
+ *
+ * @param states All states which belong to the same dataset and share the same jobId.
+ * @param level {@link Level#DATASET} only load dataset level lineage attributes
+ * {@link Level#BRANCH} only load branch level lineage attributes
+ * {@link Level#All} load all lineage attributes
+ * @return A collection of {@link LineageInfo}s per branch. When level is {@link Level#DATASET}, this list has only single element.
+ *
+ * @throws LineageException.LineageConflictAttributeException if two states have same key but not the same value.
+ */
+ public static Collection<LineageInfo> load (Collection<? extends State> states, Level level) throws LineageException {
+ Preconditions.checkArgument(states != null && !states.isEmpty());
+ Map<String, String> datasetMetaData = new HashMap<>();
+ Map<String, Map<String, String>> branchAggregate = new HashMap<>();
+
+ State anyOne = states.iterator().next();
+ String jobId = anyOne.getProp(ConfigurationKeys.JOB_ID_KEY, "");
+ String urn = anyOne.getProp(ConfigurationKeys.DATASET_URN_KEY, ConfigurationKeys.DEFAULT_DATASET_URN);
+
+ for (State state: states) {
+ for (Map.Entry<Object, Object> entry : state.getProperties().entrySet()) {
+ if (entry.getKey() instanceof String && ((String) entry.getKey()).startsWith(LINEAGE_NAME_SPACE)) {
+
+ String lineageKey = ((String) entry.getKey());
+ String lineageValue = (String) entry.getValue();
+
+ if (lineageKey.startsWith(BRANCH_PREFIX)) {
+ String branchPrefixStrip = lineageKey.substring(BRANCH_PREFIX.length());
+ String branchId = branchPrefixStrip.substring(0, branchPrefixStrip.indexOf("."));
+ String key = branchPrefixStrip.substring(branchPrefixStrip.indexOf(".") + 1);
+
+ if (level == Level.BRANCH || level == Level.All) {
+ if (!branchAggregate.containsKey(branchId)) {
+ branchAggregate.put(branchId, new HashMap<>());
+ }
+ Map<String, String> branchMetaData = branchAggregate.get(branchId);
+ String prev = branchMetaData.put(key, lineageValue);
+ if (prev != null && !prev.equals(lineageValue)) {
+ throw new LineageException.LineageConflictAttributeException(lineageKey, prev, lineageValue);
+ }
+ }
+ } else if (lineageKey.startsWith(DATASET_PREFIX)) {
+ if (level == Level.DATASET || level == Level.All) {
+ String prev = datasetMetaData.put(lineageKey.substring(DATASET_PREFIX.length()), lineageValue);
+ if (prev != null && !prev.equals(lineageValue)) {
+ throw new LineageException.LineageConflictAttributeException(lineageKey, prev, lineageValue);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ Collection<LineageInfo> collection = Sets.newHashSet();
+
+ if (level == Level.DATASET) {
+ ImmutableMap<String, String> metaData = ImmutableMap.<String, String>builder()
+ .putAll(datasetMetaData)
+ .build();
+ collection.add(new LineageInfo(urn, jobId, metaData));
+ return collection;
+ } else if (level == Level.BRANCH || level == Level.All){
+ if (branchAggregate.isEmpty()) {
+ if (level == Level.All) {
+ collection.add(new LineageInfo(urn, jobId, ImmutableMap.<String, String>builder().putAll(datasetMetaData).build()));
+ }
+ return collection;
+ }
+ for (Map.Entry<String, Map<String, String>> branchMetaDataEntry: branchAggregate.entrySet()) {
+ String branchId = branchMetaDataEntry.getKey();
+ Map<String, String> branchMetaData = branchMetaDataEntry.getValue();
+ ImmutableMap.Builder<String, String> metaDataBuilder = ImmutableMap.builder();
+ if (level == Level.All) {
+ metaDataBuilder.putAll(datasetMetaData);
+ }
+ metaDataBuilder.putAll(branchMetaData).put(BRANCH_ID_METADATA_KEY, branchId);
+ collection.add(new LineageInfo(urn, jobId, metaDataBuilder.build()));
+ }
+
+ return collection;
+ } else {
+ throw new LineageException.LineageUnsupportedLevelException(level);
+ }
+ }
+
+ public static void setDatasetLineageAttribute (State state, String key, String value) {
+ state.setProp(DATASET_PREFIX + key, value);
+ }
+
+ public static void setBranchLineageAttribute (State state, int branchId, String key, String value) {
+ state.setProp(BRANCH_PREFIX + Joiner.on(".").join(branchId, key), value);
+ }
+
+ public final String getId() {
+ return Joiner.on(":::").join(this.datasetUrn, this.jobId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/280b1d35/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
index f0d0e32..19314e5 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
@@ -30,6 +30,7 @@ import java.util.Objects;
import java.util.Set;
import org.apache.commons.io.IOUtils;
+import org.apache.gobblin.lineage.LineageInfo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -97,6 +98,9 @@ public class BaseDataPublisher extends SingleTaskDataPublisher {
protected final int parallelRunnerThreads;
protected final Map<String, ParallelRunner> parallelRunners = Maps.newHashMap();
protected final Set<Path> publisherOutputDirs = Sets.newHashSet();
+
+ public static final String PUBLISH_OUTOUT = "publish.output";
+
/* Each partition in each branch may have separate metadata. The metadata mergers are responsible
* for aggregating this information from all workunits so it can be published.
*/
@@ -328,6 +332,7 @@ public class BaseDataPublisher extends SingleTaskDataPublisher {
if (!replaceFinalOutputDir) {
addWriterOutputToExistingDir(writerOutputDir, publisherOutputDir, state, branchId, parallelRunner);
writerOutputPathsMoved.add(writerOutputDir);
+ addPublisherLineageInfo(state, branchId, publisherOutputDir.toString());
return;
}
@@ -342,9 +347,14 @@ public class BaseDataPublisher extends SingleTaskDataPublisher {
movePath(parallelRunner, state, writerOutputDir, publisherOutputDir, branchId);
writerOutputPathsMoved.add(writerOutputDir);
+ addPublisherLineageInfo(state, branchId, publisherOutputDir.toString());
}
}
+ protected void addPublisherLineageInfo(WorkUnitState state, int branchId, String output) {
+ LineageInfo.setBranchLineageAttribute(state, branchId, PUBLISH_OUTOUT, output);
+ }
+
/**
* Get the output directory path this {@link BaseDataPublisher} will write to.
*
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/280b1d35/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
index fa5a360..d94dede 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
@@ -27,6 +27,7 @@ import java.util.Set;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.lineage.LineageInfo;
import org.slf4j.MDC;
import com.google.common.base.Optional;
@@ -234,6 +235,7 @@ public abstract class QueryBasedSource<S, D> extends AbstractSource<S, D> {
workunit.setProp(ConfigurationKeys.SOURCE_ENTITY, sourceEntity.getSourceEntityName());
workunit.setProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, sourceEntity.getDestTableName());
workunit.setProp(WORK_UNIT_STATE_VERSION_KEY, CURRENT_WORK_UNIT_STATE_VERSION);
+ addLineageSourceInfo (state, sourceEntity, workunit);
partition.serialize(workunit);
workUnits.add(workunit);
}
@@ -241,6 +243,10 @@ public abstract class QueryBasedSource<S, D> extends AbstractSource<S, D> {
return workUnits;
}
+ protected void addLineageSourceInfo (SourceState sourceState, SourceEntity entity, WorkUnit workUnit) {
+ workUnit.setProp(ConfigurationKeys.DATASET_URN_KEY, entity.destTableName);
+ }
+
protected Set<SourceEntity> getFilteredSourceEntities(SourceState state) {
Set<SourceEntity> unfilteredEntities = getSourceEntities(state);
return getFilteredSourceEntitiesHelper(state, unfilteredEntities);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/280b1d35/gobblin-core/src/test/java/org/apache/gobblin/lineage/LineageInfoTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/lineage/LineageInfoTest.java b/gobblin-core/src/test/java/org/apache/gobblin/lineage/LineageInfoTest.java
new file mode 100644
index 0000000..2a7ea15
--- /dev/null
+++ b/gobblin-core/src/test/java/org/apache/gobblin/lineage/LineageInfoTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.lineage;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import gobblin.configuration.State;
+
+
+public class LineageInfoTest {
+
+ @Test
+ public void testDatasetLevel () {
+ Collection<LineageInfo> collection = null;
+ try {
+ collection = LineageInfo.load(createTestStates(), LineageInfo.Level.DATASET);
+ } catch (LineageException e) {
+ Assert.fail(e.toString());
+ }
+
+ Assert.assertEquals(1, collection.size());
+ LineageInfo info = collection.iterator().next();
+ ImmutableMap<String, String> map = info.getLineageMetaData();
+ Assert.assertEquals(3, map.size());
+ Assert.assertEquals("V1", map.get("K1"));
+ Assert.assertEquals("V2", map.get("K2"));
+ Assert.assertEquals("V3", map.get("K3"));
+ }
+
+ @Test
+ public void testBranchLevel () {
+ Collection<LineageInfo> collection = null;
+ try {
+ collection = LineageInfo.load(createTestStates(), LineageInfo.Level.BRANCH);
+ } catch (LineageException e) {
+ Assert.fail(e.toString());
+ }
+
+ Assert.assertEquals(2, collection.size());
+
+ for (LineageInfo info: collection) {
+ Map<String, String> map = info.getLineageMetaData();
+ String branchId = map.get(LineageInfo.BRANCH_ID_METADATA_KEY);
+ if (branchId.equals("1")) {
+ Assert.assertEquals(3, map.size()); // include BRANCH_ID_METADATA_KEY
+ Assert.assertEquals("V4", map.get("K4"));
+ Assert.assertEquals("V5", map.get("K5"));
+ }
+
+ if (branchId.equals("2")) {
+ Assert.assertEquals(2, map.size()); // include BRANCH_ID_METADATA_KEY
+ Assert.assertEquals("V6", map.get("K6"));
+ }
+ }
+ }
+
+ @Test
+ public void testAllLevel () {
+ Collection<LineageInfo> collection = null;
+ try {
+ collection = LineageInfo.load(createTestStates(), LineageInfo.Level.All);
+ } catch (LineageException e) {
+ Assert.fail(e.toString());
+ }
+
+ Assert.assertEquals(2, collection.size());
+ for (LineageInfo info: collection) {
+ Map<String, String> map = info.getLineageMetaData();
+ String branchId = map.get(LineageInfo.BRANCH_ID_METADATA_KEY);
+ if (branchId.equals("1")) {
+ Assert.assertEquals(6, map.size()); // include BRANCH_ID_METADATA_KEY
+ Assert.assertEquals("V1", map.get("K1"));
+ Assert.assertEquals("V2", map.get("K2"));
+ Assert.assertEquals("V3", map.get("K3"));
+ Assert.assertEquals("V4", map.get("K4"));
+ Assert.assertEquals("V5", map.get("K5"));
+ }
+
+ if (branchId.equals("2")) {
+ Assert.assertEquals(5, map.size()); // include BRANCH_ID_METADATA_KEY
+ Assert.assertEquals("V1", map.get("K1"));
+ Assert.assertEquals("V2", map.get("K2"));
+ Assert.assertEquals("V3", map.get("K3"));
+ Assert.assertEquals("V6", map.get("K6"));
+ }
+ }
+ }
+
+ @Test
+ public void testNoBranchInfo () {
+ State state = new State();
+ state.setProp(ConfigurationKeys.JOB_ID_KEY, "test_job_id_123456");
+ state.setProp(ConfigurationKeys.DATASET_URN_KEY, "PageViewEvent");
+ LineageInfo.setDatasetLineageAttribute(state,"K1", "V1");
+ LineageInfo.setDatasetLineageAttribute(state,"K2", "V2");
+ Collection<LineageInfo> collection = null;
+ try {
+ collection = LineageInfo.load(Lists.newArrayList(state), LineageInfo.Level.BRANCH);
+ } catch (LineageException e) {
+ Assert.fail(e.toString());
+ }
+
+ Assert.assertEquals(true, collection.isEmpty());
+ }
+
+ private Collection<State> createTestStates() {
+ /*
+ * State[0]: gobblin.lineage.K1 ---> V1
+ * gobblin.lineage.K2 ---> V2
+ * gobblin.lineage.branch.1.K4 ---> V4
+ * State[1]: gobblin.lineage.K2 ---> V2
+ * gobblin.lineage.K3 ---> V3
+ * gobblin.lineage.branch.1.K4 ---> V4
+ * gobblin.lineage.branch.1.K5 ---> V5
+ * gobblin.lineage.branch.2.K6 ---> V6
+ */
+ State state_1 = new State();
+ state_1.setProp(ConfigurationKeys.JOB_ID_KEY, "test_job_id_123456");
+ state_1.setProp(ConfigurationKeys.DATASET_URN_KEY, "PageViewEvent");
+ LineageInfo.setDatasetLineageAttribute(state_1,"K1", "V1");
+ LineageInfo.setDatasetLineageAttribute(state_1,"K2", "V2");
+ LineageInfo.setBranchLineageAttribute(state_1, 1, "K4", "V4");
+
+
+ State state_2 = new State();
+ state_2.setProp(ConfigurationKeys.JOB_ID_KEY, "test_job_id_123456");
+ state_2.setProp(ConfigurationKeys.DATASET_URN_KEY, "PageViewEvent");
+
+ LineageInfo.setDatasetLineageAttribute(state_2,"K2", "V2");
+ LineageInfo.setDatasetLineageAttribute(state_2,"K3", "V3");
+ LineageInfo.setBranchLineageAttribute(state_2, 1, "K4", "V4");
+ LineageInfo.setBranchLineageAttribute(state_2, 1, "K5", "V5");
+ LineageInfo.setBranchLineageAttribute(state_2, 2, "K6", "V6");
+
+ return Lists.newArrayList(state_1, state_2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/280b1d35/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
index 20a0823..57fdedd 100644
--- a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
+++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
@@ -17,10 +17,14 @@
package org.apache.gobblin.source.extractor.extract.jdbc;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.lineage.LineageInfo;
import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.source.extractor.exception.ExtractPrepareException;
import java.io.IOException;
+import org.apache.gobblin.source.workunit.WorkUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,4 +54,13 @@ public class MysqlSource extends QueryBasedSource<JsonArray, JsonElement> {
}
return extractor;
}
+
+ protected void addLineageSourceInfo (SourceState sourceState, SourceEntity entity, WorkUnit workUnit) {
+ super.addLineageSourceInfo(sourceState, entity, workUnit);
+ String host = sourceState.getProp(ConfigurationKeys.SOURCE_CONN_HOST_NAME);
+ String port = sourceState.getProp(ConfigurationKeys.SOURCE_CONN_PORT);
+ String database = sourceState.getProp(ConfigurationKeys.SOURCE_QUERYBASED_SCHEMA);
+ String connectionUrl = "jdbc:mysql://" + host.trim() + ":" + port + "/" + database.trim();
+ LineageInfo.setDatasetLineageAttribute(workUnit, "connectionUrl", connectionUrl);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/280b1d35/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index 363adf3..9521575 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -33,8 +33,13 @@ import org.apache.gobblin.commit.CommitStep;
import org.apache.gobblin.commit.DeliverySemantics;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.lineage.LineageException;
+import org.apache.gobblin.lineage.LineageInfo;
+import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.publisher.CommitSequencePublisher;
import org.apache.gobblin.publisher.DataPublisher;
+import org.apache.gobblin.publisher.NoopPublisher;
import org.apache.gobblin.publisher.UnpublishedHandling;
import org.apache.gobblin.runtime.commit.DatasetStateCommitStep;
import org.apache.gobblin.runtime.task.TaskFactory;
@@ -159,6 +164,7 @@ final class SafeDatasetCommit implements Callable<Void> {
} else if (canPersistStates) {
persistDatasetState(datasetUrn, datasetState);
}
+
} catch (IOException | RuntimeException ioe) {
log.error(String
.format("Failed to persist dataset state for dataset %s of job %s", datasetUrn, this.jobContext.getJobId()),
@@ -169,6 +175,30 @@ final class SafeDatasetCommit implements Callable<Void> {
return null;
}
+ private void submitLineageEvent(Collection<TaskState> states) {
+ if (states.size() == 0) {
+ return;
+ }
+
+ TaskState oneWorkUnitState = states.iterator().next();
+ if (oneWorkUnitState.contains(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE) && oneWorkUnitState.getProp(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE).equals(
+ NoopPublisher.class.getName())) {
+ // if no publisher configured, each task should be responsible for sending lineage event.
+ return;
+ }
+
+ try {
+ Collection<LineageInfo> branchLineages = LineageInfo.load(states, LineageInfo.Level.All);
+ EventSubmitter submitter = new EventSubmitter.Builder(Instrumented.getMetricContext(datasetState, SafeDatasetCommit.class),
+ LineageInfo.LINEAGE_NAME_SPACE).build();
+ for (LineageInfo info: branchLineages) {
+ submitter.submit(info.getId(), info.getLineageMetaData());
+ }
+ } catch (LineageException e) {
+ log.error ("Lineage event submission failed due to :" + e.toString());
+ }
+ }
+
/**
* Synchronized version of {@link #commitDataset(Collection, DataPublisher)} used when publisher is not
* thread safe.
@@ -186,6 +216,7 @@ final class SafeDatasetCommit implements Callable<Void> {
try {
publisher.publish(taskStates);
+ submitLineageEvent(taskStates);
} catch (Throwable t) {
log.error("Failed to commit dataset", t);
setTaskFailureException(taskStates, t);