You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/11/13 22:15:58 UTC
[2/2] incubator-gobblin git commit: [GOBBLIN-307] Implement lineage
event as LineageEventBuilder in gobblin
[GOBBLIN-307] Implement lineage event as LineageEventBuilder in gobblin
Closes #2161 from zxcware/lineage
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/3e229db9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/3e229db9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/3e229db9
Branch: refs/heads/master
Commit: 3e229db9810de8410e0b8fcaf680fcb9f80b5db2
Parents: a34a81a
Author: zhchen <zh...@linkedin.com>
Authored: Mon Nov 13 14:15:51 2017 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Nov 13 14:15:51 2017 -0800
----------------------------------------------------------------------
.../gobblin/dataset/DatasetConstants.java | 34 +++
.../gobblin/dataset/DatasetDescriptor.java | 114 +++++++++
.../gobblin/lineage/LineageException.java | 39 ---
.../org/apache/gobblin/lineage/LineageInfo.java | 246 -------------------
.../gobblin/publisher/BaseDataPublisher.java | 23 +-
.../publisher/TimePartitionedDataPublisher.java | 28 ++-
.../extractor/extract/QueryBasedSource.java | 9 +-
.../apache/gobblin/lineage/LineageInfoTest.java | 160 ------------
.../data/management/copy/CopySource.java | 10 +-
.../management/copy/CopyableDatasetBase.java | 7 +
.../data/management/copy/CopyableFile.java | 4 +
.../copy/RecursiveCopyableDataset.java | 15 +-
.../copy/hive/HiveCopyEntityHelper.java | 8 +
.../data/management/copy/hive/HiveDataset.java | 8 +-
.../copy/hive/HivePartitionFileSet.java | 7 +-
.../copy/hive/UnpartitionedTableFileSet.java | 5 +-
.../copy/publisher/CopyDataPublisher.java | 3 +-
.../dataset/ConvertibleHiveDatasetTest.java | 18 +-
.../gobblin/metrics/event/EventSubmitter.java | 3 +
.../metrics/event/FailureEventBuilder.java | 58 +----
.../metrics/event/GobblinEventBuilder.java | 86 +++++++
.../event/lineage/LineageEventBuilder.java | 147 +++++++++++
.../metrics/event/lineage/LineageException.java | 32 +++
.../metrics/event/lineage/LineageInfo.java | 207 ++++++++++++++++
.../metrics/event/lineage/LineageEventTest.java | 113 +++++++++
.../extractor/extract/kafka/KafkaSource.java | 10 +-
.../extractor/extract/jdbc/MysqlSource.java | 14 +-
.../gobblin/runtime/SafeDatasetCommit.java | 70 ++++--
.../templates/textFileBasedSourceTest.template | 2 +
.../runtime_test/skip_workunits_test.properties | 2 +-
30 files changed, 918 insertions(+), 564 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java
new file mode 100644
index 0000000..73999dc
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.dataset;
+
+public class DatasetConstants {
+ /** Platforms */
+ public static final String PLATFORM_KAFKA = "kafka";
+ public static final String PLATFORM_HIVE = "hive";
+ public static final String PLATFORM_MYSQL = "mysql";
+
+ /** File system metadata */
+ public static final String FS_URI = "fsUri";
+
+ /** Kafka metadata */
+ public static final String BROKERS = "brokers";
+
+ /** JDBC metadata */
+ public static final String CONNECTION_URL = "connectionUrl";
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetDescriptor.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetDescriptor.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetDescriptor.java
new file mode 100644
index 0000000..5b41862
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetDescriptor.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.dataset;
+
+
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+
+/**
+ * A {@link DatasetDescriptor} identifies and provides metadata to describe a dataset
+ */
+@RequiredArgsConstructor
+public final class DatasetDescriptor {
+ private static final String PLATFORM_KEY = "platform";
+ private static final String NAME_KEY = "name";
+
+ /**
+ * which platform the dataset is stored, for example: local, hdfs, oracle, mysql, kafka
+ */
+ @Getter
+ private final String platform;
+ /**
+ * name of the dataset
+ */
+ @Getter
+ private final String name;
+
+ /**
+ * metadata about the dataset
+ */
+ private final Map<String, String> metadata = Maps.newHashMap();
+
+ public DatasetDescriptor(DatasetDescriptor copy) {
+ platform = copy.getPlatform();
+ name = copy.getName();
+ metadata.putAll(copy.getMetadata());
+ }
+
+ public ImmutableMap<String, String> getMetadata() {
+ return ImmutableMap.<String, String>builder()
+ .putAll(metadata)
+ .build();
+ }
+
+ public void addMetadata(String key, String value) {
+ metadata.put(key, value);
+ }
+
+ /**
+ * Serialize to a string map
+ */
+ public Map<String, String> toDataMap() {
+ Map<String, String> map = Maps.newHashMap();
+ map.put(PLATFORM_KEY, platform);
+ map.put(NAME_KEY, name);
+ map.putAll(metadata);
+ return map;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ DatasetDescriptor that = (DatasetDescriptor) o;
+ return platform.equals(that.platform) && name.equals(that.name) && metadata.equals(that.metadata);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = platform.hashCode();
+ result = 31 * result + name.hashCode();
+ result = 31 * result + metadata.hashCode();
+ return result;
+ }
+
+ /**
+ * Deserialize a {@link DatasetDescriptor} from a string map
+ */
+ public static DatasetDescriptor fromDataMap(Map<String, String> dataMap) {
+ DatasetDescriptor descriptor = new DatasetDescriptor(dataMap.get(PLATFORM_KEY), dataMap.get(NAME_KEY));
+ dataMap.forEach((key, value) -> {
+ if (!key.equals(PLATFORM_KEY) && !key.equals(NAME_KEY)) {
+ descriptor.addMetadata(key, value);
+ }
+ });
+ return descriptor;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageException.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageException.java b/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageException.java
deleted file mode 100644
index 8dcf592..0000000
--- a/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageException.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.gobblin.lineage;
-
-/**
- * A set of exceptions used by {@link LineageInfo} when lineage information is serialized or deserialized.
- */
-public class LineageException extends Exception {
- public LineageException(String message) {
- super(message);
- }
- public static class LineageConflictAttributeException extends LineageException {
- public LineageConflictAttributeException (String key, String oldValue, String newValue) {
- super ("Lineage has conflict value: key=" + key + " value=[1]" + oldValue + " [2]" + newValue);
- }
- }
-
- public static class LineageUnsupportedLevelException extends LineageException {
- public LineageUnsupportedLevelException (LineageInfo.Level level) {
- super (level.toString() + " is not supported");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java b/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java
deleted file mode 100644
index 7af71df..0000000
--- a/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.lineage;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
-
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
-
-/**
- * A class to restore all lineage information from a {@link State}
- * All lineage attributes are under LINEAGE_NAME_SPACE namespace.
- *
- * For example, a typical lineage attributes looks like:
- * gobblin.lineage.K1 ---> V1
- * gobblin.lineage.branch.3.K2 ---> V2
- *
- * K1 is dataset level attribute, K2 is branch level attribute, and branch id is 3.
- */
-
-@Slf4j
-public class LineageInfo {
- public static final String LINEAGE_NAME_SPACE = "gobblin.lineage";
- public static final String BRANCH_ID_METADATA_KEY = "branchId";
- private static final String DATASET_PREFIX = LINEAGE_NAME_SPACE + ".";
- public static final String LINEAGE_DATASET_URN = DATASET_PREFIX + "dataset.urn";
- private static final String BRANCH_PREFIX = DATASET_PREFIX + "branch.";
-
- @Getter
- private String datasetUrn;
- @Getter
- private String jobId;
-
- private Map<String, String> lineageMetaData;
-
- public enum Level {
- DATASET,
- BRANCH,
- All
- }
-
- private LineageInfo() {
- }
-
- private LineageInfo(String datasetUrn, String jobId, ImmutableMap<String, String> lineageMetaData) {
- Preconditions.checkArgument(datasetUrn != null);
- Preconditions.checkArgument(jobId != null);
- this.datasetUrn = datasetUrn;
- this.jobId = jobId;
- this.lineageMetaData = lineageMetaData;
- }
-
- /**
- * Retrieve lineage information from a {@link State} by {@link Level}
- * @param state A single state
- * @param level {@link Level#DATASET} only load dataset level lineage attributes
- * {@link Level#BRANCH} only load branch level lineage attributes
- * {@link Level#All} load all lineage attributes
- * @return A collection of {@link LineageInfo}s per branch. When level is {@link Level#DATASET}, this list has only single element.
- */
- public static Collection<LineageInfo> load (State state, Level level) throws LineageException {
- return load(Collections.singleton(state), level);
- }
-
- /**
- * Get all lineage meta data.
- */
- public ImmutableMap<String, String> getLineageMetaData() {
- return ImmutableMap.copyOf(lineageMetaData);
- }
-
- /**
- * Retrieve all lineage information from different {@link State}s.
- * This requires the job id and dataset urn to be present in the state, under job.id and dataset.urn.
- * A global union operation is applied to combine all <K, V> pairs from the input {@link State}s. If multiple {@link State}s
- * share the same K, but have conflicting V, a {@link LineageException} is thrown.
- *
- * {@link Level} can control if a dataset level or branch level information should be used. When {@link Level#All} is
- * specified, all levels of information will be returned; otherwise only specified level of information will be returned.
- *
- * For instance, assume we have below input states:
- * State[0]: gobblin.lineage.K1 ---> V1
- * gobblin.lineage.K2 ---> V2
- * gobblin.lineage.branch.1.K4 ---> V4
- * State[1]: gobblin.lineage.K2 ---> V2
- * gobblin.lineage.K3 ---> V3
- * gobblin.lineage.branch.1.K4 ---> V4
- * gobblin.lineage.branch.1.K5 ---> V5
- * gobblin.lineage.branch.2.K6 ---> V6
- *
- * (1) With {@link Level#DATASET} level, the output would be:
- * LinieageInfo[0]: K1 ---> V1
- * K2 ---> V2
- * K3 ---> V3
- * (2) With {@link Level#All} level, the output would be: (because there are two branches, so there are two LineageInfo)
- * LineageInfo[0]: K1 ---> V1
- * K2 ---> V2
- * K3 ---> V3
- * K4 ---> V4
- * K5 ---> V5
- *
- * LineageInfo[1]: K1 ---> V1
- * K2 ---> V2
- * K3 ---> V3
- * K6 ---> V6
- *
- * (3) With {@link Level#BRANCH} level, the output would be: (only branch level information was returned)
- * LineageInfo[0]: K4 ---> V4
- * K5 ---> V5
- * LineageInfo[1]: K6 ---> V6
- *
- * @param states All states which belong to the same dataset and share the same jobId.
- * @param level {@link Level#DATASET} only load dataset level lineage attributes
- * {@link Level#BRANCH} only load branch level lineage attributes
- * {@link Level#All} load all lineage attributes
- * @return A collection of {@link LineageInfo}s per branch. When level is {@link Level#DATASET}, this list has only single element.
- *
- * @throws LineageException.LineageConflictAttributeException if two states have same key but not the same value.
- */
- public static Collection<LineageInfo> load (Collection<? extends State> states, Level level) throws LineageException {
- Preconditions.checkArgument(states != null && !states.isEmpty());
- Map<String, String> datasetMetaData = new HashMap<>();
- Map<String, Map<String, String>> branchAggregate = new HashMap<>();
-
- State anyOne = states.iterator().next();
- String jobId = anyOne.getProp(ConfigurationKeys.JOB_ID_KEY, "");
- String urn = anyOne.getProp(ConfigurationKeys.DATASET_URN_KEY, ConfigurationKeys.DEFAULT_DATASET_URN);
-
- for (State state: states) {
- for (Map.Entry<Object, Object> entry : state.getProperties().entrySet()) {
- if (entry.getKey() instanceof String && ((String) entry.getKey()).startsWith(LINEAGE_NAME_SPACE)) {
-
- String lineageKey = ((String) entry.getKey());
- String lineageValue = (String) entry.getValue();
-
- if (lineageKey.startsWith(BRANCH_PREFIX)) {
- String branchPrefixStrip = lineageKey.substring(BRANCH_PREFIX.length());
- String branchId = branchPrefixStrip.substring(0, branchPrefixStrip.indexOf("."));
- String key = branchPrefixStrip.substring(branchPrefixStrip.indexOf(".") + 1);
-
- if (level == Level.BRANCH || level == Level.All) {
- if (!branchAggregate.containsKey(branchId)) {
- branchAggregate.put(branchId, new HashMap<>());
- }
- Map<String, String> branchMetaData = branchAggregate.get(branchId);
- String prev = branchMetaData.put(key, lineageValue);
- if (prev != null && !prev.equals(lineageValue)) {
- throw new LineageException.LineageConflictAttributeException(lineageKey, prev, lineageValue);
- }
- }
- } else if (lineageKey.startsWith(DATASET_PREFIX)) {
- if (level == Level.DATASET || level == Level.All) {
- String prev = datasetMetaData.put(lineageKey.substring(DATASET_PREFIX.length()), lineageValue);
- if (prev != null && !prev.equals(lineageValue)) {
- throw new LineageException.LineageConflictAttributeException(lineageKey, prev, lineageValue);
- }
- }
- }
- }
- }
- }
-
- Collection<LineageInfo> collection = Sets.newHashSet();
-
- if (level == Level.DATASET) {
- ImmutableMap<String, String> metaData = ImmutableMap.<String, String>builder()
- .putAll(datasetMetaData)
- .build();
- collection.add(new LineageInfo(urn, jobId, metaData));
- return collection;
- } else if (level == Level.BRANCH || level == Level.All){
- if (branchAggregate.isEmpty()) {
- if (level == Level.All) {
- collection.add(new LineageInfo(urn, jobId, ImmutableMap.<String, String>builder().putAll(datasetMetaData).build()));
- }
- return collection;
- }
- for (Map.Entry<String, Map<String, String>> branchMetaDataEntry: branchAggregate.entrySet()) {
- String branchId = branchMetaDataEntry.getKey();
- Map<String, String> branchMetaData = branchMetaDataEntry.getValue();
- ImmutableMap.Builder<String, String> metaDataBuilder = ImmutableMap.builder();
- if (level == Level.All) {
- metaDataBuilder.putAll(datasetMetaData);
- }
- metaDataBuilder.putAll(branchMetaData).put(BRANCH_ID_METADATA_KEY, branchId);
- collection.add(new LineageInfo(urn, jobId, metaDataBuilder.build()));
- }
-
- return collection;
- } else {
- throw new LineageException.LineageUnsupportedLevelException(level);
- }
- }
-
- public static void setDatasetLineageAttribute (State state, String key, String value) {
- state.setProp(DATASET_PREFIX + key, value);
- }
-
- public static void setBranchLineageAttribute (State state, int branchId, String key, String value) {
- state.setProp(BRANCH_PREFIX + Joiner.on(".").join(branchId, key), value);
- }
-
- public static Map<String, Collection<State>> aggregateByDatasetUrn (Collection<? extends State> states) {
- Map<String, Collection<State>> datasetStates = new HashMap<>();
- for (State state: states) {
- String urn = state.getProp(LINEAGE_DATASET_URN, state.getProp(ConfigurationKeys.DATASET_URN_KEY, ConfigurationKeys.DEFAULT_DATASET_URN));
- datasetStates.putIfAbsent(urn, new ArrayList<>());
- Collection<State> datasetState = datasetStates.get(urn);
- datasetState.add(state);
- }
- return datasetStates;
- }
-
- public final String getId() {
- return Joiner.on(":::").join(this.datasetUrn, this.jobId);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
index 9bdcbdd..0097c15 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
@@ -56,9 +56,11 @@ import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
-import org.apache.gobblin.lineage.LineageInfo;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
import org.apache.gobblin.metadata.MetadataMerger;
import org.apache.gobblin.metadata.types.StaticStringMetadataMerger;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.util.ForkOperatorUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.ParallelRunner;
@@ -106,8 +108,6 @@ public class BaseDataPublisher extends SingleTaskDataPublisher {
protected final Map<String, ParallelRunner> parallelRunners = Maps.newHashMap();
protected final Set<Path> publisherOutputDirs = Sets.newHashSet();
- public static final String PUBLISH_OUTOUT = "publish.output";
-
/* Each partition in each branch may have separate metadata. The metadata mergers are responsible
* for aggregating this information from all workunits so it can be published.
*/
@@ -131,7 +131,6 @@ public class BaseDataPublisher extends SingleTaskDataPublisher {
PUBLISH_RETRY_DEFAULTS = ConfigFactory.parseMap(configMap);
};
-
public BaseDataPublisher(State state)
throws IOException {
super(state);
@@ -330,6 +329,16 @@ public class BaseDataPublisher extends SingleTaskDataPublisher {
private void publishMultiTaskData(WorkUnitState state, int branchId, Set<Path> writerOutputPathsMoved)
throws IOException {
publishData(state, branchId, false, writerOutputPathsMoved);
+ DatasetDescriptor destination = createDestinationDescriptor(state, branchId);
+ LineageInfo.putDestination(destination, branchId, state);
+ }
+
+ protected DatasetDescriptor createDestinationDescriptor(WorkUnitState state, int branchId) {
+ Path publisherOutputDir = getPublisherOutputDir(state, branchId);
+ FileSystem fs = this.publisherFileSystemByBranches.get(branchId);
+ DatasetDescriptor destination = new DatasetDescriptor(fs.getScheme(), publisherOutputDir.toString());
+ destination.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString());
+ return destination;
}
protected void publishData(WorkUnitState state, int branchId, boolean publishSingleTaskData,
@@ -372,7 +381,6 @@ public class BaseDataPublisher extends SingleTaskDataPublisher {
if (!replaceFinalOutputDir) {
addWriterOutputToExistingDir(writerOutputDir, publisherOutputDir, state, branchId, parallelRunner);
writerOutputPathsMoved.add(writerOutputDir);
- addPublisherLineageInfo(state, branchId, publisherOutputDir.toString());
return;
}
@@ -387,14 +395,9 @@ public class BaseDataPublisher extends SingleTaskDataPublisher {
movePath(parallelRunner, state, writerOutputDir, publisherOutputDir, branchId);
writerOutputPathsMoved.add(writerOutputDir);
- addPublisherLineageInfo(state, branchId, publisherOutputDir.toString());
}
}
- protected void addPublisherLineageInfo(WorkUnitState state, int branchId, String output) {
- LineageInfo.setBranchLineageAttribute(state, branchId, PUBLISH_OUTOUT, output);
- }
-
/**
* Get the output directory path this {@link BaseDataPublisher} will write to.
*
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
index 90e241a..157552e 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
@@ -18,18 +18,18 @@
package org.apache.gobblin.publisher;
import java.io.IOException;
-import java.util.Set;
-import org.apache.gobblin.lineage.LineageInfo;
-import org.apache.gobblin.writer.partitioner.TimeBasedWriterPartitioner;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.dataset.DatasetDescriptor;
import org.apache.gobblin.util.FileListUtils;
+import org.apache.gobblin.util.ForkOperatorUtils;
import org.apache.gobblin.util.ParallelRunner;
import org.apache.gobblin.util.WriterUtils;
+import org.apache.gobblin.writer.partitioner.TimeBasedWriterPartitioner;
/**
@@ -70,13 +70,19 @@ public class TimePartitionedDataPublisher extends BaseDataPublisher {
}
@Override
- protected void publishData(WorkUnitState state, int branchId, boolean publishSingleTaskData, Set<Path> writerOutputPathsMoved) throws IOException {
- super.publishData(state, branchId, publishSingleTaskData, writerOutputPathsMoved);
- if (publishSingleTaskData) {
- // Add lineage event for destination. Make sure all workunits belongs to the same dataset has exactly the same value
- Path publisherOutputDir = getPublisherOutputDir(state, branchId);
- String timePrefix = state.getProp(TimeBasedWriterPartitioner.WRITER_PARTITION_PREFIX, "");
- LineageInfo.setBranchLineageAttribute(state, branchId, PUBLISH_OUTOUT, new Path(publisherOutputDir, timePrefix).toString());
- }
+ protected DatasetDescriptor createDestinationDescriptor(WorkUnitState state, int branchId) {
+ // Get base descriptor
+ DatasetDescriptor descriptor = super.createDestinationDescriptor(state, branchId);
+
+ // Decorate with partition prefix
+ String propName = ForkOperatorUtils
+ .getPropertyNameForBranch(TimeBasedWriterPartitioner.WRITER_PARTITION_PREFIX, numBranches, branchId);
+ String timePrefix = state.getProp(propName, "");
+ Path pathWithTimePrefix = new Path(descriptor.getName(), timePrefix);
+ DatasetDescriptor destination = new DatasetDescriptor(descriptor.getPlatform(), pathWithTimePrefix.toString());
+ // Add back the metadata
+ descriptor.getMetadata().forEach(destination::addMetadata);
+
+ return destination;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
index c77051d..d074f3a 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
@@ -27,7 +27,6 @@ import java.util.Set;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
-import org.apache.gobblin.lineage.LineageInfo;
import org.slf4j.MDC;
import com.google.common.base.Optional;
@@ -48,6 +47,8 @@ import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.configuration.WorkUnitState.WorkingState;
+import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.source.extractor.JobCommitPolicy;
import org.apache.gobblin.source.extractor.partition.Partition;
import org.apache.gobblin.source.extractor.partition.Partitioner;
@@ -235,7 +236,7 @@ public abstract class QueryBasedSource<S, D> extends AbstractSource<S, D> {
workunit.setProp(ConfigurationKeys.SOURCE_ENTITY, sourceEntity.getSourceEntityName());
workunit.setProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, sourceEntity.getDestTableName());
workunit.setProp(WORK_UNIT_STATE_VERSION_KEY, CURRENT_WORK_UNIT_STATE_VERSION);
- addLineageSourceInfo (state, sourceEntity, workunit);
+ addLineageSourceInfo(state, sourceEntity, workunit);
partition.serialize(workunit);
workUnits.add(workunit);
}
@@ -243,8 +244,8 @@ public abstract class QueryBasedSource<S, D> extends AbstractSource<S, D> {
return workUnits;
}
- protected void addLineageSourceInfo (SourceState sourceState, SourceEntity entity, WorkUnit workUnit) {
- workUnit.setProp(ConfigurationKeys.DATASET_URN_KEY, entity.destTableName);
+ protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity, WorkUnit workUnit) {
+ // Does nothing by default
}
protected Set<SourceEntity> getFilteredSourceEntities(SourceState state) {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-core/src/test/java/org/apache/gobblin/lineage/LineageInfoTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/lineage/LineageInfoTest.java b/gobblin-core/src/test/java/org/apache/gobblin/lineage/LineageInfoTest.java
deleted file mode 100644
index 2a7ea15..0000000
--- a/gobblin-core/src/test/java/org/apache/gobblin/lineage/LineageInfoTest.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.lineage;
-
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.junit.Assert;
-import org.testng.annotations.Test;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-
-import gobblin.configuration.State;
-
-
-public class LineageInfoTest {
-
- @Test
- public void testDatasetLevel () {
- Collection<LineageInfo> collection = null;
- try {
- collection = LineageInfo.load(createTestStates(), LineageInfo.Level.DATASET);
- } catch (LineageException e) {
- Assert.fail(e.toString());
- }
-
- Assert.assertEquals(1, collection.size());
- LineageInfo info = collection.iterator().next();
- ImmutableMap<String, String> map = info.getLineageMetaData();
- Assert.assertEquals(3, map.size());
- Assert.assertEquals("V1", map.get("K1"));
- Assert.assertEquals("V2", map.get("K2"));
- Assert.assertEquals("V3", map.get("K3"));
- }
-
- @Test
- public void testBranchLevel () {
- Collection<LineageInfo> collection = null;
- try {
- collection = LineageInfo.load(createTestStates(), LineageInfo.Level.BRANCH);
- } catch (LineageException e) {
- Assert.fail(e.toString());
- }
-
- Assert.assertEquals(2, collection.size());
-
- for (LineageInfo info: collection) {
- Map<String, String> map = info.getLineageMetaData();
- String branchId = map.get(LineageInfo.BRANCH_ID_METADATA_KEY);
- if (branchId.equals("1")) {
- Assert.assertEquals(3, map.size()); // include BRANCH_ID_METADATA_KEY
- Assert.assertEquals("V4", map.get("K4"));
- Assert.assertEquals("V5", map.get("K5"));
- }
-
- if (branchId.equals("2")) {
- Assert.assertEquals(2, map.size()); // include BRANCH_ID_METADATA_KEY
- Assert.assertEquals("V6", map.get("K6"));
- }
- }
- }
-
- @Test
- public void testAllLevel () {
- Collection<LineageInfo> collection = null;
- try {
- collection = LineageInfo.load(createTestStates(), LineageInfo.Level.All);
- } catch (LineageException e) {
- Assert.fail(e.toString());
- }
-
- Assert.assertEquals(2, collection.size());
- for (LineageInfo info: collection) {
- Map<String, String> map = info.getLineageMetaData();
- String branchId = map.get(LineageInfo.BRANCH_ID_METADATA_KEY);
- if (branchId.equals("1")) {
- Assert.assertEquals(6, map.size()); // include BRANCH_ID_METADATA_KEY
- Assert.assertEquals("V1", map.get("K1"));
- Assert.assertEquals("V2", map.get("K2"));
- Assert.assertEquals("V3", map.get("K3"));
- Assert.assertEquals("V4", map.get("K4"));
- Assert.assertEquals("V5", map.get("K5"));
- }
-
- if (branchId.equals("2")) {
- Assert.assertEquals(5, map.size()); // include BRANCH_ID_METADATA_KEY
- Assert.assertEquals("V1", map.get("K1"));
- Assert.assertEquals("V2", map.get("K2"));
- Assert.assertEquals("V3", map.get("K3"));
- Assert.assertEquals("V6", map.get("K6"));
- }
- }
- }
-
- @Test
- public void testNoBranchInfo () {
- State state = new State();
- state.setProp(ConfigurationKeys.JOB_ID_KEY, "test_job_id_123456");
- state.setProp(ConfigurationKeys.DATASET_URN_KEY, "PageViewEvent");
- LineageInfo.setDatasetLineageAttribute(state,"K1", "V1");
- LineageInfo.setDatasetLineageAttribute(state,"K2", "V2");
- Collection<LineageInfo> collection = null;
- try {
- collection = LineageInfo.load(Lists.newArrayList(state), LineageInfo.Level.BRANCH);
- } catch (LineageException e) {
- Assert.fail(e.toString());
- }
-
- Assert.assertEquals(true, collection.isEmpty());
- }
-
- private Collection<State> createTestStates() {
- /*
- * State[0]: gobblin.lineage.K1 ---> V1
- * gobblin.lineage.K2 ---> V2
- * gobblin.lineage.branch.1.K4 ---> V4
- * State[1]: gobblin.lineage.K2 ---> V2
- * gobblin.lineage.K3 ---> V3
- * gobblin.lineage.branch.1.K4 ---> V4
- * gobblin.lineage.branch.1.K5 ---> V5
- * gobblin.lineage.branch.2.K6 ---> V6
- */
- State state_1 = new State();
- state_1.setProp(ConfigurationKeys.JOB_ID_KEY, "test_job_id_123456");
- state_1.setProp(ConfigurationKeys.DATASET_URN_KEY, "PageViewEvent");
- LineageInfo.setDatasetLineageAttribute(state_1,"K1", "V1");
- LineageInfo.setDatasetLineageAttribute(state_1,"K2", "V2");
- LineageInfo.setBranchLineageAttribute(state_1, 1, "K4", "V4");
-
-
- State state_2 = new State();
- state_2.setProp(ConfigurationKeys.JOB_ID_KEY, "test_job_id_123456");
- state_2.setProp(ConfigurationKeys.DATASET_URN_KEY, "PageViewEvent");
-
- LineageInfo.setDatasetLineageAttribute(state_2,"K2", "V2");
- LineageInfo.setDatasetLineageAttribute(state_2,"K3", "V3");
- LineageInfo.setBranchLineageAttribute(state_2, 1, "K4", "V4");
- LineageInfo.setBranchLineageAttribute(state_2, 1, "K5", "V5");
- LineageInfo.setBranchLineageAttribute(state_2, 2, "K6", "V6");
-
- return Lists.newArrayList(state_1, state_2);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
index f60e5f0..8ca05e3 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.data.management.copy;
import java.io.IOException;
import java.net.URI;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -66,6 +65,8 @@ import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.metrics.event.sla.SlaEventKeys;
import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.source.extractor.WatermarkInterval;
@@ -299,6 +300,7 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
setWorkUnitWatermark(workUnit, watermarkGenerator, copyEntity);
computeAndSetWorkUnitGuid(workUnit);
workUnitsForPartition.add(workUnit);
+ addLineageInfo(copyEntity, copyableDataset, workUnit);
}
this.workUnitList.putAll(this.fileSet, workUnitsForPartition);
@@ -311,6 +313,12 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
}
}
+ private void addLineageInfo(CopyEntity copyEntity, CopyableDatasetBase copyableDataset, WorkUnit workUnit) {
+ if (copyEntity instanceof CopyableFile && copyableDataset.getDatasetDescriptor() != null) {
+ LineageInfo.setSource(copyableDataset.getDatasetDescriptor(), workUnit);
+ }
+ }
+
/**
* @param state a {@link org.apache.gobblin.configuration.WorkUnitState} carrying properties needed by the returned
* {@link Extractor}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java
index c27b839..6c71edc 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.data.management.copy;
import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.dataset.DatasetDescriptor;
/**
@@ -25,4 +26,10 @@ import org.apache.gobblin.dataset.Dataset;
* Concrete classes must implement a subinterface of this interface ({@link CopyableDataset} or {@link IterableCopyableDataset}).
*/
public interface CopyableDatasetBase extends Dataset {
+ /**
+ * Get the descriptor which identifies and provides metadata of the dataset
+ */
+ default DatasetDescriptor getDatasetDescriptor() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
index cec06f2..04e5e34 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.data.management.copy;
import org.apache.gobblin.data.management.partition.File;
import org.apache.gobblin.data.management.copy.PreserveAttributes.Option;
+import org.apache.gobblin.dataset.DatasetDescriptor;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.guid.Guid;
@@ -55,6 +56,9 @@ public class CopyableFile extends CopyEntity implements File {
/** {@link FileStatus} of the existing origin file. */
private FileStatus origin;
+ /** The destination dataset the file will be copied to */
+ private DatasetDescriptor destDataset;
+
/** Complete destination {@link Path} of the file. */
private Path destination;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
index 0f18f68..35108df 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
@@ -21,6 +21,7 @@ import org.apache.gobblin.commit.CommitStep;
import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
import org.apache.gobblin.data.management.dataset.DatasetUtils;
import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.gobblin.dataset.DatasetDescriptor;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.FileListUtils;
import org.apache.gobblin.util.commit.DeleteFileCommitStep;
@@ -42,6 +43,8 @@ import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import lombok.Getter;
+
/**
* Implementation of {@link CopyableDataset} that creates a {@link CopyableFile} for every file that is a descendant if
@@ -66,6 +69,9 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
private final boolean update;
private final boolean delete;
+ @Getter
+ private transient final DatasetDescriptor datasetDescriptor;
+
// Include empty directories in the source for copy
private final boolean includeEmptyDirectories;
// Delete empty directories in the destination
@@ -77,6 +83,7 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
this.rootPath = PathUtils.getPathWithoutSchemeAndAuthority(rootPath);
this.fs = fs;
+ this.datasetDescriptor = new DatasetDescriptor(fs.getScheme(), rootPath.toString());
this.pathFilter = DatasetUtils.instantiatePathFilter(properties);
this.copyableFileFilter = DatasetUtils.instantiateCopyableFileFilter(properties);
@@ -129,17 +136,19 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
List<CopyEntity> copyEntities = Lists.newArrayList();
List<CopyableFile> copyableFiles = Lists.newArrayList();
+ DatasetDescriptor targetDataset = new DatasetDescriptor(targetFs.getScheme(), targetPath.toString());
for (Path path : toCopy) {
FileStatus file = filesInSource.get(path);
Path filePathRelativeToSearchPath = PathUtils.relativizePath(file.getPath(), nonGlobSearchPath);
Path thisTargetPath = new Path(configuration.getPublishDir(), filePathRelativeToSearchPath);
-
- copyableFiles.add(CopyableFile.fromOriginAndDestination(this.fs, file, thisTargetPath, configuration)
+ CopyableFile copyableFile = CopyableFile.fromOriginAndDestination(this.fs, file, thisTargetPath, configuration)
.fileSet(datasetURN()).datasetOutputPath(thisTargetPath.toString())
.ancestorsOwnerAndPermission(CopyableFile.resolveReplicatedOwnerAndPermissionsRecursively(this.fs,
file.getPath().getParent(), nonGlobSearchPath, configuration))
- .build());
+ .build();
+ copyableFile.setDestDataset(targetDataset);
+ copyableFiles.add(copyableFile);
}
copyEntities.addAll(this.copyableFileFilter.filter(this.fs, targetFs, copyableFiles));
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
index 3c7643b..cc7be1e 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
@@ -56,6 +56,8 @@ import com.typesafe.config.Config;
import org.apache.gobblin.commit.CommitStep;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.data.management.copy.CopyConfiguration;
import org.apache.gobblin.data.management.copy.CopyEntity;
@@ -760,4 +762,10 @@ public class HiveCopyEntityHelper {
public FileSystem getTargetFileSystem() {
return this.targetFs;
}
+
+ void setCopyableFileDestinationDataset(CopyableFile copyableFile) {
+ DatasetDescriptor destination = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, this.getTargetDatabase() + "." + this.getTargetTable());
+ destination.addMetadata(DatasetConstants.FS_URI, this.getTargetFs().getUri().toString());
+ copyableFile.setDestDataset(destination);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
index 2af2f80..03dba25 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
@@ -57,10 +57,12 @@ import org.apache.gobblin.data.management.copy.CopyableDataset;
import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder.DbAndTable;
import org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
import org.apache.gobblin.data.management.partition.FileSet;
+import org.apache.gobblin.dataset.DatasetConstants;
import org.apache.gobblin.hive.HiveMetastoreClientPool;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.dataset.DatasetDescriptor;
import org.apache.gobblin.util.AutoReturnableObject;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PathUtils;
@@ -106,6 +108,8 @@ public class HiveDataset implements PrioritizedCopyableDataset {
protected final DbAndTable dbAndTable;
protected final DbAndTable logicalDbAndTable;
+ private transient final DatasetDescriptor datasetDescriptor;
+
public HiveDataset(FileSystem fs, HiveMetastoreClientPool clientPool, Table table, Properties properties) {
this(fs, clientPool, table, properties, ConfigFactory.empty());
}
@@ -124,6 +128,9 @@ public class HiveDataset implements PrioritizedCopyableDataset {
Optional.fromNullable(this.table.getDataLocation());
this.tableIdentifier = this.table.getDbName() + "." + this.table.getTableName();
+ this.datasetDescriptor = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, tableIdentifier);
+ this.datasetDescriptor.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString());
+
this.datasetNamePattern = Optional.fromNullable(ConfigUtils.getString(datasetConfig, DATASET_NAME_PATTERN_KEY, null));
this.dbAndTable = new DbAndTable(table.getDbName(), table.getTableName());
if (this.datasetNamePattern.isPresent()) {
@@ -132,7 +139,6 @@ public class HiveDataset implements PrioritizedCopyableDataset {
this.logicalDbAndTable = this.dbAndTable;
}
this.datasetConfig = resolveConfig(datasetConfig, dbAndTable, logicalDbAndTable);
-
this.metricContext = Instrumented.getMetricContext(new State(properties), HiveDataset.class,
Lists.<Tag<?>> newArrayList(new Tag<>(DATABASE, table.getDbName()), new Tag<>(TABLE, table.getTableName())));
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
index a9982bf..790c0b4 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
@@ -149,8 +149,11 @@ public class HivePartitionFileSet extends HiveFileSet {
multiTimer.nextStage(HiveCopyEntityHelper.Stages.CREATE_COPY_UNITS);
for (CopyableFile.Builder builder : hiveCopyEntityHelper.getCopyableFilesFromPaths(diffPathSet.filesToCopy,
hiveCopyEntityHelper.getConfiguration(), Optional.of(this.partition))) {
- copyEntities.add(builder.fileSet(fileSet).checksum(new byte[0])
- .datasetOutputPath(desiredTargetLocation.location.toString()).build());
+ CopyableFile fileEntity =
+ builder.fileSet(fileSet).checksum(new byte[0]).datasetOutputPath(desiredTargetLocation.location.toString())
+ .build();
+ this.hiveCopyEntityHelper.setCopyableFileDestinationDataset(fileEntity);
+ copyEntities.add(fileEntity);
}
log.info("Created {} copy entities for partition {}", copyEntities.size(), this.partition.getCompleteName());
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
index a796a2b..4d82a62 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
@@ -120,7 +120,10 @@ public class UnpartitionedTableFileSet extends HiveFileSet {
for (CopyableFile.Builder builder : this.helper.getCopyableFilesFromPaths(diffPathSet.filesToCopy, this.helper.getConfiguration(),
Optional.<Partition> absent())) {
- copyEntities.add(builder.fileSet(fileSet).datasetOutputPath(desiredTargetLocation.location.toString()).build());
+ CopyableFile fileEntity =
+ builder.fileSet(fileSet).datasetOutputPath(desiredTargetLocation.location.toString()).build();
+ this.helper.setCopyableFileDestinationDataset(fileEntity);
+ copyEntities.add(fileEntity);
}
multiTimer.close();
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
index e443271..71ebd59 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.data.management.copy.publisher;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.metrics.event.sla.SlaEventKeys;
import java.io.IOException;
import java.net.URI;
@@ -27,7 +28,6 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -214,6 +214,7 @@ public class CopyDataPublisher extends DataPublisher implements UnpublishedHandl
if (!fileSetRoot.isPresent() && copyableFile.getDatasetOutputPath() != null) {
fileSetRoot = Optional.of(copyableFile.getDatasetOutputPath());
}
+ LineageInfo.putDestination(copyableFile.getDestDataset(), 0, wus);
}
if (datasetOriginTimestamp > copyableFile.getOriginTimestamp()) {
datasetOriginTimestamp = copyableFile.getOriginTimestamp();
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
index 51a390d..5021d4d 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
@@ -17,6 +17,8 @@
package org.apache.gobblin.data.management.conversion.hive.dataset;
import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.Properties;
import org.apache.hadoop.fs.FileSystem;
@@ -36,6 +38,9 @@ import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiv
import org.apache.gobblin.hive.HiveMetastoreClientPool;
import org.apache.gobblin.util.ConfigUtils;
+import static org.mockito.Mockito.when;
+
+
@Test(groups = { "gobblin.data.management.conversion" })
public class ConvertibleHiveDatasetTest {
@@ -94,7 +99,8 @@ public class ConvertibleHiveDatasetTest {
}
@Test
- public void testInvalidFormat() {
+ public void testInvalidFormat()
+ throws Exception {
Config config = ConfigFactory.parseMap(ImmutableMap.<String, String>of("destinationFormats", "flattenedOrc,nestedOrc"));
ConvertibleHiveDataset cd = createTestConvertibleDataset(config);
@@ -103,7 +109,8 @@ public class ConvertibleHiveDatasetTest {
}
@Test
- public void testDisableFormat() {
+ public void testDisableFormat()
+ throws Exception {
Config config = ConfigFactory.parseMap(ImmutableMap.<String, String> builder()
.put("destinationFormats", "flattenedOrc")
@@ -154,10 +161,13 @@ public class ConvertibleHiveDatasetTest {
Assert.assertEquals(conversionConfig.getHiveRuntimeProperties(), hiveProps);
}
- public static ConvertibleHiveDataset createTestConvertibleDataset(Config config) {
+ public static ConvertibleHiveDataset createTestConvertibleDataset(Config config)
+ throws URISyntaxException {
Table table = getTestTable("db1", "tb1");
+ FileSystem mockFs = Mockito.mock(FileSystem.class);
+ when(mockFs.getUri()).thenReturn(new URI("test"));
ConvertibleHiveDataset cd =
- new ConvertibleHiveDataset(Mockito.mock(FileSystem.class), Mockito.mock(HiveMetastoreClientPool.class), new org.apache.hadoop.hive.ql.metadata.Table(
+ new ConvertibleHiveDataset(mockFs, Mockito.mock(HiveMetastoreClientPool.class), new org.apache.hadoop.hive.ql.metadata.Table(
table), new Properties(), config);
return cd;
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
index 34258ba..16ee2de 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
@@ -35,7 +35,10 @@ import lombok.Getter;
* <p>
* Instances of this class are immutable. Calling set* methods returns a copy of the calling instance.
* </p>
+ *
+ * @deprecated Use {@link GobblinEventBuilder}
*/
+@Deprecated
public class EventSubmitter {
public static final String EVENT_TYPE = "eventType";
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java
index 89f83f5..d1ce681 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java
@@ -17,47 +17,32 @@
package org.apache.gobblin.metrics.event;
-import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
-import org.apache.gobblin.metrics.MetricContext;
-
-import com.google.common.collect.Maps;
-
-import lombok.Getter;
/**
- * A failure event builds a specific {@link GobblinTrackingEvent} whose metadata has
- * {@value EventSubmitter#EVENT_TYPE} to be {@value #EVENT_TYPE}
+ * The builder builds builds a specific {@link GobblinTrackingEvent} whose metadata has
+ * {@value GobblinEventBuilder#EVENT_TYPE} to be {@value #FAILURE_EVENT_TYPE}
*
* <p>
* Note: A {@link FailureEventBuilder} instance is not reusable
*/
-public class FailureEventBuilder {
- private static final String EVENT_TYPE = "FailureEvent";
- private static final String EVENT_NAMESPACE = "gobblin.event";
+public class FailureEventBuilder extends GobblinEventBuilder {
+ private static final String FAILURE_EVENT_TYPE = "FailureEvent";
private static final String ROOT_CAUSE = "rootException";
- @Getter
- private final String name;
- @Getter
- private final String namespace;
- private final Map<String, String> metadata;
-
private Throwable rootCause;
public FailureEventBuilder(String name) {
- this(name, EVENT_NAMESPACE);
+ this(name, NAMESPACE);
}
public FailureEventBuilder(String name, String namespace) {
- this.name = name;
- this.namespace = namespace;
- metadata = Maps.newHashMap();
- metadata.put(EventSubmitter.EVENT_TYPE, EVENT_TYPE);
+ super(name, namespace);
+ metadata.put(EVENT_TYPE, FAILURE_EVENT_TYPE);
}
/**
@@ -68,42 +53,21 @@ public class FailureEventBuilder {
}
/**
- * Add a metadata pair
- */
- public void addMetadata(String key, String value) {
- metadata.put(key, value);
- }
-
- /**
- * Add additional metadata
- */
- public void addAdditionalMetadata(Map<String, String> additionalMetadata) {
- metadata.putAll(additionalMetadata);
- }
-
- /**
* Build as {@link GobblinTrackingEvent}
*/
public GobblinTrackingEvent build() {
if (rootCause != null) {
metadata.put(ROOT_CAUSE, ExceptionUtils.getStackTrace(rootCause));
}
- return new GobblinTrackingEvent(0L, EVENT_NAMESPACE, name, metadata);
- }
-
- /**
- * Submit the event
- */
- public void submit(MetricContext context) {
- context.submitEvent(build());
+ return new GobblinTrackingEvent(0L, namespace, name, metadata);
}
/**
- * Check if the given {@link GobblinTrackingEvent} is a failiure event
+ * Check if the given {@link GobblinTrackingEvent} is a failure event
*/
public static boolean isFailureEvent(GobblinTrackingEvent event) {
- String eventType = event.getMetadata().get(EventSubmitter.EVENT_TYPE);
- return StringUtils.isNotEmpty(eventType) && eventType.equals(EVENT_TYPE);
+ String eventType = event.getMetadata().get(EVENT_TYPE);
+ return StringUtils.isNotEmpty(eventType) && eventType.equals(FAILURE_EVENT_TYPE);
}
private static Throwable getRootCause(Throwable t) {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
new file mode 100644
index 0000000..6b82342
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.event;
+
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+import lombok.Getter;
+
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+
+
+/**
+ * A general gobblin event builder which builds a {@link GobblinTrackingEvent}
+ *
+ * Note: a {@link GobblinEventBuilder} instance is not reusable
+ */
+public class GobblinEventBuilder {
+ public static final String NAMESPACE = "gobblin.event";
+ public static final String EVENT_TYPE = "eventType";
+
+ @Getter
+ protected final String name;
+ @Getter
+ protected final String namespace;
+ protected final Map<String, String> metadata;
+
+ public GobblinEventBuilder(String name) {
+ this(name, NAMESPACE);
+ }
+
+ public GobblinEventBuilder(String name, String namespace) {
+ this.name = name;
+ this.namespace = namespace;
+ metadata = Maps.newHashMap();
+ }
+
+ public ImmutableMap<String, String> getMetadata() {
+ return new ImmutableMap.Builder<String, String>().putAll(metadata).build();
+ }
+
+ /**
+ * Add a metadata pair
+ */
+ public void addMetadata(String key, String value) {
+ metadata.put(key, value);
+ }
+
+ /**
+ * Add additional metadata
+ */
+ public void addAdditionalMetadata(Map<String, String> additionalMetadata) {
+ metadata.putAll(additionalMetadata);
+ }
+
+ /**
+ * Build as {@link GobblinTrackingEvent}
+ */
+ public GobblinTrackingEvent build() {
+ return new GobblinTrackingEvent(0L, namespace, name, metadata);
+ }
+ /**
+ * Submit the event
+ */
+ public void submit(MetricContext context) {
+ context.submitEvent(build());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java
new file mode 100644
index 0000000..f9030eb
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.event.lineage;
+
+
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+
+import com.google.common.base.Joiner;
+import com.google.gson.Gson;
+
+import avro.shaded.com.google.common.collect.Maps;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * The builder builds a specific {@link GobblinTrackingEvent} whose metadata has {@value GobblinEventBuilder#EVENT_TYPE}
+ * to be {@value LineageEventBuilder#LINEAGE_EVENT_TYPE}
+ *
+ * Note: A {@link LineageEventBuilder} instance is not reusable
+ */
+
+@Slf4j
+public final class LineageEventBuilder extends GobblinEventBuilder {
+ static final String LIENAGE_EVENT_NAMESPACE = getKey(NAMESPACE, "lineage");
+ static final String SOURCE = "source";
+ static final String DESTINATION = "destination";
+ static final String LINEAGE_EVENT_TYPE = "LineageEvent";
+
+ private static final Gson GSON = new Gson();
+
+ @Getter @Setter
+ private DatasetDescriptor source;
+ @Getter @Setter
+ private DatasetDescriptor destination;
+
+ public LineageEventBuilder(String name) {
+ super(name, LIENAGE_EVENT_NAMESPACE);
+ addMetadata(EVENT_TYPE, LINEAGE_EVENT_TYPE);
+ }
+
+ @Override
+ public GobblinTrackingEvent build() {
+ source.toDataMap().forEach((key, value) -> metadata.put(getKey(SOURCE, key), value));
+ destination.toDataMap().forEach((key, value) -> metadata.put(getKey(DESTINATION, key), value));
+ return new GobblinTrackingEvent(0L, namespace, name, metadata);
+ }
+
+ @Override
+ public String toString() {
+ return GSON.toJson(this);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ LineageEventBuilder event = (LineageEventBuilder) o;
+
+ if (!namespace.equals(event.namespace) || !name.equals(event.name) || !metadata.equals(event.metadata)) {
+ return false;
+ }
+
+ if (source != null ? !source.equals(event.source) : event.source != null) {
+ return false;
+ }
+
+ return destination != null ? destination.equals(event.destination) : event.destination == null;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = name.hashCode();
+ result = 31 * result + namespace.hashCode();
+ result = 31 * result + metadata.hashCode();
+ result = 31 * result + (source != null ? source.hashCode() : 0);
+ result = 31 * result + (destination != null ? destination.hashCode() : 0);
+ return result;
+ }
+
+ /**
+ * Check if the given {@link GobblinTrackingEvent} is a lineage event
+ */
+ public static boolean isLineageEvent(GobblinTrackingEvent event) {
+ String eventType = event.getMetadata().get(EVENT_TYPE);
+ return StringUtils.isNotEmpty(eventType) && eventType.equals(LINEAGE_EVENT_TYPE);
+ }
+
+ /**
+ * Create a {@link LineageEventBuilder} from a {@link GobblinEventBuilder}. An inverse function
+ * to {@link LineageEventBuilder#build()}
+ */
+ public static LineageEventBuilder fromEvent(GobblinTrackingEvent event) {
+ Map<String, String> metadata = event.getMetadata();
+ LineageEventBuilder lineageEvent = new LineageEventBuilder(event.getName());
+
+ String sourcePrefix = getKey(SOURCE, "");
+ Map<String, String> sourceDataMap = Maps.newHashMap();
+ String destinationPrefix = getKey(DESTINATION, "");
+ Map<String, String> destinationDataMap = Maps.newHashMap();
+
+ metadata.forEach((key, value) -> {
+ if (key.startsWith(sourcePrefix)) {
+ sourceDataMap.put(key.substring(sourcePrefix.length()), value);
+ } else if (key.startsWith(destinationPrefix)) {
+ destinationDataMap.put(key.substring(destinationPrefix.length()), value);
+ } else {
+ lineageEvent.addMetadata(key, value);
+ }
+ });
+
+ lineageEvent.setSource(DatasetDescriptor.fromDataMap(sourceDataMap));
+ lineageEvent.setDestination(DatasetDescriptor.fromDataMap(destinationDataMap));
+ return lineageEvent;
+ }
+
+ static String getKey(Object ... parts) {
+ return Joiner.on(".").join(parts);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageException.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageException.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageException.java
new file mode 100644
index 0000000..e7528b3
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.event.lineage;
+
+/**
+ * A set of exceptions used by {@link LineageEventBuilder} when lineage information is serialized or deserialized.
+ */
+public class LineageException extends Exception {
+ public LineageException(String message) {
+ super(message);
+ }
+ public static class ConflictException extends LineageException {
+ public ConflictException(String branchId, LineageEventBuilder actual, LineageEventBuilder expect) {
+ super("Conflict LineageEvent: branchId=" + branchId + ", expected=" + expect.toString() + " actual=" + actual.toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
new file mode 100644
index 0000000..dd6c8f2
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.event.lineage;
+
+import java.util.Collection;
+import java.util.Map;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+
+
+/**
+ * The lineage coordinator in a Gobblin job with single source and multiple destinations
+ *
+ * <p>
+ * In Gobblin, a work unit processes records from only one dataset. It writes output to one or more destinations,
+ * depending on the number of branches configured in the job. One destination means an output as another dataset.
+ * </p>
+ *
+ * <p>
+ * Lineage info is jointly collected from the source, represented by {@link org.apache.gobblin.source.Source} or
+ * {@link org.apache.gobblin.source.extractor.Extractor}, and destination,
+ * represented by {@link org.apache.gobblin.writer.DataWriter} or {@link org.apache.gobblin.publisher.DataPublisher}
+ * </p>
+ *
+ * <p>
+ * The general flow is:
+ * <ol>
+ * <li> source sets its {@link DatasetDescriptor} to each work unit </li>
+ * <li> destination puts its {@link DatasetDescriptor} to the work unit </li>
+ * <li> load and send all lineage events from all states </li>
+ * <li> purge lineage info from all states </li>
+ * </ol>
+ * </p>
+ */
+@Slf4j
+public final class LineageInfo {
+ public static final String BRANCH = "branch";
+
+ private static final Gson GSON = new Gson();
+ private static final String NAME_KEY = "name";
+
+ private LineageInfo() {
+ }
+
+ /**
+ * Set source {@link DatasetDescriptor} of a lineage event
+ *
+ * <p>
+ * Only the {@link org.apache.gobblin.source.Source} or its {@link org.apache.gobblin.source.extractor.Extractor}
+ * is supposed to set the source for a work unit of a dataset
+ * </p>
+ *
+ * @param state state about a {@link org.apache.gobblin.source.workunit.WorkUnit}
+ *
+ */
+ public static void setSource(DatasetDescriptor source, State state) {
+ state.setProp(getKey(NAME_KEY), source.getName());
+ state.setProp(getKey(LineageEventBuilder.SOURCE), GSON.toJson(source));
+ }
+
+ /**
+ * Put a {@link DatasetDescriptor} of a destination dataset to a state
+ *
+ * <p>
+ * Only the {@link org.apache.gobblin.writer.DataWriter} or {@link org.apache.gobblin.publisher.DataPublisher}
+ * is supposed to put the destination dataset information. Since different branches may concurrently put,
+ * the method is implemented to be threadsafe
+ * </p>
+ */
+ public static void putDestination(DatasetDescriptor destination, int branchId, State state) {
+ if (!hasLineageInfo(state)) {
+ log.warn("State has no lineage info but branch " + branchId + " puts a destination: " + GSON.toJson(destination));
+ return;
+ }
+
+ synchronized (state.getProp(getKey(NAME_KEY))) {
+ state.setProp(getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION), GSON.toJson(destination));
+ }
+ }
+
+ /**
+ * Load all lineage information from {@link State}s of a dataset
+ *
+ * <p>
+ * For a dataset, the same branch across different {@link State}s must be the same, as
+ * the same branch means the same destination
+ * </p>
+ *
+ * @param states All states which belong to the same dataset
+ * @return A collection of {@link LineageEventBuilder}s put in the state
+ * @throws LineageException.ConflictException if two states have conflict lineage info
+ */
+ public static Collection<LineageEventBuilder> load(Collection<? extends State> states)
+ throws LineageException {
+ Preconditions.checkArgument(states != null && !states.isEmpty());
+ final Map<String, LineageEventBuilder> resultEvents = Maps.newHashMap();
+ for (State state : states) {
+ Map<String, LineageEventBuilder> branchedEvents = load(state);
+ for (Map.Entry<String, LineageEventBuilder> entry : branchedEvents.entrySet()) {
+ String branch = entry.getKey();
+ LineageEventBuilder event = entry.getValue();
+ LineageEventBuilder resultEvent = resultEvents.get(branch);
+ if (resultEvent == null) {
+ resultEvents.put(branch, event);
+ } else if (!resultEvent.equals(event)) {
+ throw new LineageException.ConflictException(branch, event, resultEvent);
+ }
+ }
+ }
+ return resultEvents.values();
+ }
+
+ /**
+ * Load all lineage info from a {@link State}
+ *
+ * @return A map from branch to its lineage info. If there is no destination info, return an empty map
+ */
+ static Map<String, LineageEventBuilder> load(State state) {
+ String name = state.getProp(getKey(NAME_KEY));
+ DatasetDescriptor source = GSON.fromJson(state.getProp(getKey(LineageEventBuilder.SOURCE)), DatasetDescriptor.class);
+
+ String branchedPrefix = getKey(BRANCH, "");
+ Map<String, LineageEventBuilder> events = Maps.newHashMap();
+ for (Map.Entry<Object, Object> entry : state.getProperties().entrySet()) {
+ String key = entry.getKey().toString();
+ if (!key.startsWith(branchedPrefix)) {
+ continue;
+ }
+
+ String[] parts = key.substring(branchedPrefix.length()).split("\\.");
+ assert parts.length == 2;
+ String branchId = parts[0];
+ LineageEventBuilder event = events.get(branchId);
+ if (event == null) {
+ event = new LineageEventBuilder(name);
+ event.setSource(new DatasetDescriptor(source));
+ events.put(parts[0], event);
+ }
+ switch (parts[1]) {
+ case LineageEventBuilder.DESTINATION:
+ DatasetDescriptor destination = GSON.fromJson(entry.getValue().toString(), DatasetDescriptor.class);
+ destination.addMetadata(BRANCH, branchId);
+ event.setDestination(destination);
+ break;
+ default:
+ throw new RuntimeException("Unsupported lineage key: " + key);
+ }
+ }
+
+ return events;
+ }
+
+ /**
+ * Remove all lineage related properties from a state
+ */
+ public static void purgeLineageInfo(State state) {
+ state.removePropsWithPrefix(LineageEventBuilder.LIENAGE_EVENT_NAMESPACE);
+ }
+
+ /**
+ * Check if the given state has lineage info
+ */
+ public static boolean hasLineageInfo(State state) {
+ return state.contains(getKey(NAME_KEY));
+ }
+
+ /**
+ * Get the full lineage event name from a state
+ */
+ public static String getFullEventName(State state) {
+ return Joiner.on('.').join(LineageEventBuilder.LIENAGE_EVENT_NAMESPACE, state.getProp(getKey(NAME_KEY)));
+ }
+
+ /**
+ * Prefix all keys with {@link LineageEventBuilder#LIENAGE_EVENT_NAMESPACE}
+ */
+ private static String getKey(Object... objects) {
+ Object[] args = new Object[objects.length + 1];
+ args[0] = LineageEventBuilder.LIENAGE_EVENT_NAMESPACE;
+ System.arraycopy(objects, 0, args, 1, objects.length);
+ return LineageEventBuilder.getKey(args);
+ }
+}