You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/04/17 03:42:25 UTC
incubator-gobblin git commit: [GOBBLIN-463] Change lineage event for
Avro2Orc conversion to have underlying FileSystem as platform
Repository: incubator-gobblin
Updated Branches:
refs/heads/master fc389522a -> 1c03ea22f
[GOBBLIN-463] Change lineage event for Avro2Orc conversion to have underlying FileSystem as platform
Closes #2340 from eogren/aditya-branch
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/1c03ea22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/1c03ea22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/1c03ea22
Branch: refs/heads/master
Commit: 1c03ea22fa21f44000e7aa73b142fc92c5fc2ba2
Parents: fc38952
Author: Aditya Sharma <ad...@linkedin.com>
Authored: Mon Apr 16 20:42:18 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Apr 16 20:42:18 2018 -0700
----------------------------------------------------------------------
.../gobblin/dataset/DatasetConstants.java | 4 ++
.../dataset/HiveToHdfsDatasetResolver.java | 50 +++++++++++++++++
.../HiveToHdfsDatasetResolverFactory.java | 28 ++++++++++
.../hive/dataset/ConvertibleHiveDataset.java | 50 +++++++++++++++++
.../hive/publisher/HiveConvertPublisher.java | 32 +++++++++++
.../hive/source/HiveAvroToOrcSource.java | 32 ++++++++++-
.../conversion/hive/source/HiveSource.java | 54 +-----------------
.../conversion/hive/utils/LineageUtils.java | 45 +++++++++++++++
.../dataset/ConvertibleHiveDatasetTest.java | 59 ++++++++++++++------
9 files changed, 283 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c03ea22/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java
index 03b7fcb..d704525 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java
@@ -37,4 +37,8 @@ public class DatasetConstants {
/** JDBC metadata */
public static final String CONNECTION_URL = "connectionUrl";
+
+ /** FileSystem scheme and location */
+ public static final String FS_SCHEME = "fsScheme";
+ public static final String FS_LOCATION = "fsLocation";
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c03ea22/gobblin-api/src/main/java/org/apache/gobblin/dataset/HiveToHdfsDatasetResolver.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/HiveToHdfsDatasetResolver.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/HiveToHdfsDatasetResolver.java
new file mode 100644
index 0000000..0130271
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/HiveToHdfsDatasetResolver.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.dataset;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import org.apache.gobblin.configuration.State;
+
+
+/**
+ * Singleton {@link DatasetResolver} to convert a Hive {@link DatasetDescriptor} to HDFS {@link DatasetDescriptor}
+ */
+public class HiveToHdfsDatasetResolver implements DatasetResolver {
+ public static final String HIVE_TABLE = "hiveTable";
+ public static final HiveToHdfsDatasetResolver INSTANCE = new HiveToHdfsDatasetResolver();
+
+ private HiveToHdfsDatasetResolver() {
+ // To make it singleton
+ }
+
+ @Override
+ public DatasetDescriptor resolve(DatasetDescriptor raw, State state) {
+ ImmutableMap<String, String> metadata = raw.getMetadata();
+ Preconditions.checkArgument(metadata.containsKey(DatasetConstants.FS_SCHEME),
+ String.format("Hive Dataset Descriptor must contain metadata %s to create Hdfs Dataset Descriptor",
+ DatasetConstants.FS_SCHEME));
+ Preconditions.checkArgument(metadata.containsKey(DatasetConstants.FS_SCHEME),
+ String.format("Hive Dataset Descriptor must contain metadata %s to create Hdfs Dataset Descriptor",
+ DatasetConstants.FS_LOCATION));
+ DatasetDescriptor datasetDescriptor =
+ new DatasetDescriptor(metadata.get(DatasetConstants.FS_SCHEME), metadata.get(DatasetConstants.FS_LOCATION));
+ datasetDescriptor.addMetadata(HIVE_TABLE, raw.getName());
+ return datasetDescriptor;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c03ea22/gobblin-api/src/main/java/org/apache/gobblin/dataset/HiveToHdfsDatasetResolverFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/HiveToHdfsDatasetResolverFactory.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/HiveToHdfsDatasetResolverFactory.java
new file mode 100644
index 0000000..acd2212
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/HiveToHdfsDatasetResolverFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.dataset;
+
+import com.typesafe.config.Config;
+
+
+public class HiveToHdfsDatasetResolverFactory implements DatasetResolverFactory {
+ @Override
+ public DatasetResolver createResolver(Config config) {
+ return HiveToHdfsDatasetResolver.INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c03ea22/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java
index f4c8744..63f1bee 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java
@@ -16,6 +16,9 @@
*/
package org.apache.gobblin.data.management.conversion.hive.dataset;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@@ -25,7 +28,11 @@ import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.metadata.Table;
import com.google.common.base.Optional;
@@ -74,6 +81,14 @@ public class ConvertibleHiveDataset extends HiveDataset {
// Mapping for destination format to it's Conversion config
private final Map<String, ConversionConfig> destConversionConfigs;
+ // Source Dataset Descriptor
+ @Getter
+ private final DatasetDescriptor sourceDataset;
+
+ // List of destination Dataset Descriptor
+ @Getter
+ private final List<DatasetDescriptor> destDatasets;
+
/**
* <ul>
* <li> The constructor takes in a dataset {@link Config} which MUST have a comma separated list of destination formats at key,
@@ -112,6 +127,41 @@ public class ConvertibleHiveDataset extends HiveDataset {
}
}
+ this.sourceDataset = createSourceDataset();
+ this.destDatasets = createDestDatasets();
+ }
+
+ private List<DatasetDescriptor> createDestDatasets() {
+ List<DatasetDescriptor> destDatasets = new ArrayList<>();
+ for (String format : getDestFormats()) {
+ Optional<ConversionConfig> conversionConfigForFormat = getConversionConfigForFormat(format);
+ if (!conversionConfigForFormat.isPresent()) {
+ continue;
+ }
+ String destTable = conversionConfigForFormat.get().getDestinationDbName() + "." + conversionConfigForFormat.get()
+ .getDestinationTableName();
+ DatasetDescriptor dest = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, destTable);
+ String destLocation = conversionConfigForFormat.get().getDestinationDataPath() + Path.SEPARATOR + "final";
+ dest.addMetadata(DatasetConstants.FS_SCHEME, getSourceDataset().getMetadata().get(DatasetConstants.FS_SCHEME));
+ dest.addMetadata(DatasetConstants.FS_LOCATION, destLocation);
+ destDatasets.add(dest);
+ }
+ return destDatasets;
+ }
+
+ private DatasetDescriptor createSourceDataset() {
+ try {
+ String sourceTable = getTable().getDbName() + "." + getTable().getTableName();
+ DatasetDescriptor source = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, sourceTable);
+ Path sourcePath = getTable().getDataLocation();
+ String sourceLocation = Path.getPathWithoutSchemeAndAuthority(sourcePath).toString();
+ FileSystem sourceFs = sourcePath.getFileSystem(new Configuration());
+ source.addMetadata(DatasetConstants.FS_SCHEME, sourceFs.getScheme());
+ source.addMetadata(DatasetConstants.FS_LOCATION, sourceLocation);
+ return source;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c03ea22/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/publisher/HiveConvertPublisher.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/publisher/HiveConvertPublisher.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/publisher/HiveConvertPublisher.java
index ff4d9e9..f87a6a0 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/publisher/HiveConvertPublisher.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/publisher/HiveConvertPublisher.java
@@ -31,6 +31,13 @@ import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset;
+import org.apache.gobblin.data.management.conversion.hive.source.HiveAvroToOrcSource;
+import org.apache.gobblin.data.management.conversion.hive.utils.LineageUtils;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
+import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -90,6 +97,7 @@ public class HiveConvertPublisher extends DataPublisher {
private final FileSystem fs;
private final HiveSourceWatermarker watermarker;
private final HiveMetastoreClientPool pool;
+ private final Optional<LineageInfo> lineageInfo;
public static final String PARTITION_PARAMETERS_WHITELIST = "hive.conversion.partitionParameters.whitelist";
public static final String PARTITION_PARAMETERS_BLACKLIST = "hive.conversion.partitionParameters.blacklist";
@@ -105,6 +113,15 @@ public class HiveConvertPublisher extends DataPublisher {
this.metricContext = Instrumented.getMetricContext(state, HiveConvertPublisher.class);
this.eventSubmitter = new EventSubmitter.Builder(this.metricContext, EventConstants.CONVERSION_NAMESPACE).build();
+ // Extract LineageInfo from state
+ if (state instanceof SourceState) {
+ lineageInfo = LineageInfo.getLineageInfo(((SourceState) state).getBroker());
+ } else if (state instanceof WorkUnitState) {
+ lineageInfo = LineageInfo.getLineageInfo(((WorkUnitState) state).getTaskBrokerNullable());
+ } else {
+ lineageInfo = Optional.absent();
+ }
+
Configuration conf = new Configuration();
Optional<String> uri = Optional.fromNullable(this.state.getProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI));
if (uri.isPresent()) {
@@ -226,6 +243,10 @@ public class HiveConvertPublisher extends DataPublisher {
} catch (Exception e) {
log.error("Failed while emitting SLA event, but ignoring and moving forward to curate " + "all clean up commands", e);
}
+ if (LineageUtils.shouldSetLineageInfo(wus.getWorkunit())) {
+ setDestLineageInfo(wus.getWorkunit(),
+ (ConvertibleHiveDataset) ((HiveWorkUnit) wus.getWorkunit()).getHiveDataset(), this.lineageInfo);
+ }
}
}
}
@@ -254,6 +275,17 @@ public class HiveConvertPublisher extends DataPublisher {
}
@VisibleForTesting
+ public void setDestLineageInfo(WorkUnit workUnit, ConvertibleHiveDataset convertibleHiveDataset,
+ Optional<LineageInfo> lineageInfo) {
+ List<DatasetDescriptor> destDatasets = convertibleHiveDataset.getDestDatasets();
+ for (int i = 0; i < destDatasets.size(); i++) {
+ if (lineageInfo.isPresent()) {
+ lineageInfo.get().putDestination(destDatasets.get(i), i + 1, workUnit);
+ }
+ }
+ }
+
+ @VisibleForTesting
public void preservePartitionParams(Collection<? extends WorkUnitState> states) {
for (WorkUnitState wus : states) {
if (wus.getWorkingState() != WorkingState.COMMITTED) {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c03ea22/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveAvroToOrcSource.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveAvroToOrcSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveAvroToOrcSource.java
index 89615e9..4659aad 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveAvroToOrcSource.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveAvroToOrcSource.java
@@ -16,18 +16,26 @@
*/
package org.apache.gobblin.data.management.conversion.hive.source;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
import java.util.List;
import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset;
import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDatasetFinder;
+import org.apache.gobblin.data.management.conversion.hive.utils.LineageUtils;
+import org.apache.gobblin.data.management.conversion.hive.watermarker.PartitionLevelWatermarker;
+import org.apache.gobblin.data.management.copy.hive.HiveDataset;
import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.source.workunit.WorkUnit;
/**
* An extension to {@link HiveSource} that is used for Avro to ORC conversion jobs.
*/
public class HiveAvroToOrcSource extends HiveSource {
-
+ private Optional<LineageInfo> lineageInfo;
@Override
public List<WorkUnit> getWorkunits(SourceState state) {
if (!state.contains(HIVE_SOURCE_DATASET_FINDER_CLASS_KEY)) {
@@ -36,8 +44,26 @@ public class HiveAvroToOrcSource extends HiveSource {
if (!state.contains(HiveDatasetFinder.HIVE_DATASET_CONFIG_PREFIX_KEY)) {
state.setProp(HiveDatasetFinder.HIVE_DATASET_CONFIG_PREFIX_KEY, "hive.conversion.avro");
}
+ this.lineageInfo = LineageInfo.getLineageInfo(state.getBroker());
+
+ List<WorkUnit> workunits = super.getWorkunits(state);
+ for (WorkUnit workUnit : workunits) {
+ if (LineageUtils.shouldSetLineageInfo(workUnit)) {
+ setSourceLineageInfo(workUnit, (ConvertibleHiveDataset) ((HiveWorkUnit) workUnit).getHiveDataset(),
+ this.lineageInfo);
+ }
+ }
+ return workunits;
+ }
- return super.getWorkunits(state);
+ @VisibleForTesting
+ public void setSourceLineageInfo(WorkUnit workUnit, ConvertibleHiveDataset hiveDataset,
+ Optional<LineageInfo> lineageInfo) {
+ DatasetDescriptor sourceDataset = hiveDataset.getSourceDataset();
+ if (lineageInfo.isPresent()) {
+ lineageInfo.get().setSource(sourceDataset, workUnit);
+ }
}
-}
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c03ea22/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
index 4cee48f..3ad99fd 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
@@ -27,13 +27,6 @@ import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
-import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
-import org.apache.gobblin.broker.iface.SharedResourcesBroker;
-import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset;
-import org.apache.gobblin.dataset.DatasetConstants;
-import org.apache.gobblin.dataset.DatasetDescriptor;
-import org.apache.gobblin.metrics.event.lineage.LineageInfo;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -155,7 +148,6 @@ public class HiveSource implements Source {
protected long maxLookBackTime;
protected long beginGetWorkunitsTime;
protected List<String> ignoreDataPathIdentifierList;
- protected SharedResourcesBroker<GobblinScopeTypes> sharedJobBroker;
protected final ClassAliasResolver<HiveBaseExtractorFactory> classAliasResolver =
new ClassAliasResolver<>(HiveBaseExtractorFactory.class);
@@ -222,7 +214,6 @@ public class HiveSource implements Source {
this.maxLookBackTime = new DateTime().minusDays(maxLookBackDays).getMillis();
this.ignoreDataPathIdentifierList = COMMA_BASED_SPLITTER.splitToList(state.getProp(HIVE_SOURCE_IGNORE_DATA_PATH_IDENTIFIER_KEY,
DEFAULT_HIVE_SOURCE_IGNORE_DATA_PATH_IDENTIFIER));
- this.sharedJobBroker = state.getBroker();
silenceHiveLoggers();
}
@@ -261,10 +252,7 @@ public class HiveSource implements Source {
EventWorkunitUtils.setTableSlaEventMetadata(hiveWorkUnit, hiveDataset.getTable(), updateTime, lowWatermark.getValue(),
this.beginGetWorkunitsTime);
- if (hiveDataset instanceof ConvertibleHiveDataset) {
- setLineageInfo((ConvertibleHiveDataset) hiveDataset, hiveWorkUnit, this.sharedJobBroker);
- log.info("Added lineage event for dataset " + hiveDataset.getUrn());
- }
+
this.workunits.add(hiveWorkUnit);
log.debug(String.format("Workunit added for table: %s", hiveWorkUnit));
@@ -293,7 +281,7 @@ public class HiveSource implements Source {
}
protected void createWorkunitsForPartitionedTable(HiveDataset hiveDataset, AutoReturnableObject<IMetaStoreClient> client) throws IOException {
- boolean setLineageInfo = false;
+
long tableProcessTime = new DateTime().getMillis();
this.watermarker.onTableProcessBegin(hiveDataset.getTable(), tableProcessTime);
@@ -341,12 +329,7 @@ public class HiveSource implements Source {
EventWorkunitUtils.setPartitionSlaEventMetadata(hiveWorkUnit, hiveDataset.getTable(), sourcePartition, updateTime,
lowWatermark.getValue(), this.beginGetWorkunitsTime);
- if (hiveDataset instanceof ConvertibleHiveDataset && !setLineageInfo) {
- setLineageInfo((ConvertibleHiveDataset) hiveDataset, hiveWorkUnit, this.sharedJobBroker);
- log.info("Added lineage event for dataset " + hiveDataset.getUrn());
- // Add lineage information only once per hive table
- setLineageInfo = true;
- }
+
workunits.add(hiveWorkUnit);
log.info(String.format("Creating workunit for partition %s as updateTime %s is greater than low watermark %s",
sourcePartition.getCompleteName(), updateTime, lowWatermark.getValue()));
@@ -491,35 +474,4 @@ public class HiveSource implements Source {
private boolean isAvro(Table table) {
return AvroSerDe.class.getName().equals(table.getSd().getSerdeInfo().getSerializationLib());
}
-
- public static void setLineageInfo(ConvertibleHiveDataset convertibleHiveDataset, WorkUnit workUnit,
- SharedResourcesBroker<GobblinScopeTypes> sharedJobBroker)
- throws IOException {
- String sourceTable =
- convertibleHiveDataset.getTable().getDbName() + "." + convertibleHiveDataset.getTable().getTableName();
- DatasetDescriptor source = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, sourceTable);
- source.addMetadata(DatasetConstants.FS_URI,
- convertibleHiveDataset.getTable().getDataLocation().getFileSystem(new Configuration()).getUri().toString());
-
- int virtualBranch = 0;
- for (String format : convertibleHiveDataset.getDestFormats()) {
- ++virtualBranch;
- Optional<ConvertibleHiveDataset.ConversionConfig> conversionConfigForFormat =
- convertibleHiveDataset.getConversionConfigForFormat(format);
- Optional<LineageInfo> lineageInfo = LineageInfo.getLineageInfo(sharedJobBroker);
- if (!lineageInfo.isPresent()) {
- continue;
- } else if (!conversionConfigForFormat.isPresent()) {
- continue;
- }
- String destTable = conversionConfigForFormat.get().getDestinationDbName() + "." + conversionConfigForFormat.get()
- .getDestinationTableName();
- DatasetDescriptor dest = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, destTable);
- Path destPath = new Path(conversionConfigForFormat.get().getDestinationDataPath());
- dest.addMetadata(DatasetConstants.FS_URI, destPath.getFileSystem(new Configuration()).getUri().toString());
-
- lineageInfo.get().setSource(source, workUnit);
- lineageInfo.get().putDestination(dest, virtualBranch, workUnit);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c03ea22/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/LineageUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/LineageUtils.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/LineageUtils.java
new file mode 100644
index 0000000..249359b
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/LineageUtils.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.data.management.conversion.hive.utils;
+
+import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset;
+import org.apache.gobblin.data.management.conversion.hive.source.HiveWorkUnit;
+import org.apache.gobblin.data.management.conversion.hive.watermarker.PartitionLevelWatermarker;
+import org.apache.gobblin.data.management.copy.hive.HiveDataset;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+
+/**
+ * Utility functions for tracking lineage in hive conversion workflows
+ */
+public class LineageUtils {
+ public static boolean shouldSetLineageInfo(WorkUnit workUnit) {
+ if (!(workUnit instanceof HiveWorkUnit)) {
+ return false;
+ }
+ HiveWorkUnit hiveWorkUnit = (HiveWorkUnit) workUnit;
+ if (hiveWorkUnit.getPropAsBoolean(PartitionLevelWatermarker.IS_WATERMARK_WORKUNIT_KEY, false)) {
+ return false;
+ }
+ HiveDataset hiveDataset = hiveWorkUnit.getHiveDataset();
+ return hiveDataset instanceof ConvertibleHiveDataset;
+ }
+
+ private LineageUtils() {
+ // cant instantiate
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c03ea22/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
index c399264..b1a38f0 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.gobblin.data.management.conversion.hive.dataset;
+import com.google.common.base.Optional;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
@@ -27,13 +28,18 @@ import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
-import org.apache.gobblin.data.management.conversion.hive.source.HiveSource;
-import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.data.management.conversion.hive.publisher.HiveConvertPublisher;
+import org.apache.gobblin.data.management.conversion.hive.source.HiveAvroToOrcSource;
+import org.apache.gobblin.data.management.conversion.hive.source.HiveWorkUnit;
+import org.apache.gobblin.data.management.conversion.hive.utils.LineageUtils;
import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.dataset.HiveToHdfsDatasetResolver;
+import org.apache.gobblin.dataset.HiveToHdfsDatasetResolverFactory;
import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder;
import org.apache.gobblin.metrics.event.lineage.LineageInfo;
-import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.mockito.Mockito;
@@ -56,39 +62,57 @@ import static org.mockito.Mockito.when;
@Test(groups = { "gobblin.data.management.conversion" })
public class ConvertibleHiveDatasetTest {
-
+ /**
+ * Test if lineage information is properly set in the workunit for convertible hive datasets
+ */
@Test
- public void testLineageInfo()
- throws Exception {
+ public void testLineageInfo() throws Exception {
String testConfFilePath = "convertibleHiveDatasetTest/flattenedAndNestedOrc.conf";
Config config = ConfigFactory.parseResources(testConfFilePath).getConfig("hive.conversion.avro");
- WorkUnit workUnit = WorkUnit.createEmpty();
+ // Set datasetResolverFactory to convert Hive Lineage event to Hdfs Lineage event
Gson GSON = new Gson();
- HiveSource.setLineageInfo(createTestConvertibleDataset(config), workUnit, getSharedJobBroker());
+ ConvertibleHiveDataset testConvertibleDataset = createTestConvertibleDataset(config);
+ HiveWorkUnit workUnit = new HiveWorkUnit(testConvertibleDataset);
+ workUnit.setProp("gobblin.broker.lineageInfo.datasetResolverFactory",
+ HiveToHdfsDatasetResolverFactory.class.getName());
+ workUnit.setProp(ConfigurationKeys.JOB_ID_KEY, "123456");
+ Optional<LineageInfo> lineageInfo = LineageInfo.getLineageInfo(getSharedJobBroker(workUnit.getProperties()));
+ HiveAvroToOrcSource src = new HiveAvroToOrcSource();
+ Assert.assertTrue(LineageUtils.shouldSetLineageInfo(workUnit));
+ src.setSourceLineageInfo(workUnit,
+ (ConvertibleHiveDataset) ((HiveWorkUnit) workUnit).getHiveDataset(), lineageInfo);
+ new HiveConvertPublisher(workUnit).setDestLineageInfo(workUnit,
+ (ConvertibleHiveDataset) ((HiveWorkUnit) workUnit).getHiveDataset(), lineageInfo);
+
Properties props = workUnit.getSpecProperties();
// Asset that lineage name is correct
- Assert.assertEquals(props.getProperty("gobblin.event.lineage.name"), "db1.tb1");
+ Assert.assertEquals(props.getProperty("gobblin.event.lineage.name"), "/tmp/test");
// Assert that source is correct for lineage event
Assert.assertTrue(props.containsKey("gobblin.event.lineage.source"));
DatasetDescriptor sourceDD =
GSON.fromJson(props.getProperty("gobblin.event.lineage.source"), DatasetDescriptor.class);
- Assert.assertEquals(sourceDD.getPlatform(), DatasetConstants.PLATFORM_HIVE);
- Assert.assertEquals(sourceDD.getName(), "db1.tb1");
+ Assert.assertEquals(sourceDD.getPlatform(), "file");
+ Assert.assertEquals(sourceDD.getName(), "/tmp/test");
+ Assert.assertEquals(sourceDD.getMetadata().get(HiveToHdfsDatasetResolver.HIVE_TABLE), "db1.tb1");
// Assert that first dest is correct for lineage event
Assert.assertTrue(props.containsKey("gobblin.event.lineage.branch.1.destination"));
DatasetDescriptor destDD1 =
GSON.fromJson(props.getProperty("gobblin.event.lineage.branch.1.destination"), DatasetDescriptor.class);
- Assert.assertEquals(destDD1.getPlatform(), DatasetConstants.PLATFORM_HIVE);
- Assert.assertEquals(destDD1.getName(), "db1_nestedOrcDb.tb1_nestedOrc");
+ Assert.assertEquals(destDD1.getPlatform(), "file");
+ Assert.assertEquals(destDD1.getName(), "/tmp/data_nestedOrc/db1/tb1/final");
+ Assert.assertEquals(destDD1.getMetadata().get(HiveToHdfsDatasetResolver.HIVE_TABLE),
+ "db1_nestedOrcDb.tb1_nestedOrc");
// Assert that second dest is correct for lineage event
Assert.assertTrue(props.containsKey("gobblin.event.lineage.branch.2.destination"));
DatasetDescriptor destDD2 =
GSON.fromJson(props.getProperty("gobblin.event.lineage.branch.2.destination"), DatasetDescriptor.class);
- Assert.assertEquals(destDD2.getPlatform(), DatasetConstants.PLATFORM_HIVE);
- Assert.assertEquals(destDD2.getName(), "db1_flattenedOrcDb.tb1_flattenedOrc");
+ Assert.assertEquals(destDD2.getPlatform(), "file");
+ Assert.assertEquals(destDD2.getName(), "/tmp/data_flattenedOrc/db1/tb1/final");
+ Assert.assertEquals(destDD2.getMetadata().get(HiveToHdfsDatasetResolver.HIVE_TABLE),
+ "db1_flattenedOrcDb.tb1_flattenedOrc");
// Assert that there are two eventBuilders for nestedOrc and flattenedOrc
Collection<LineageEventBuilder> lineageEventBuilders = LineageInfo.load(Collections.singleton(workUnit));
@@ -227,15 +251,16 @@ public class ConvertibleHiveDatasetTest {
Table table = new Table();
table.setDbName(dbName);
table.setTableName(tableName);
+ table.setTableType(TableType.EXTERNAL_TABLE.name());
StorageDescriptor sd = new StorageDescriptor();
sd.setLocation("/tmp/test");
table.setSd(sd);
return table;
}
- public static SharedResourcesBroker<GobblinScopeTypes> getSharedJobBroker() {
+ public static SharedResourcesBroker<GobblinScopeTypes> getSharedJobBroker(Properties props) {
SharedResourcesBroker<GobblinScopeTypes> instanceBroker = SharedResourcesBrokerFactory
- .createDefaultTopLevelBroker(ConfigFactory.empty(), GobblinScopeTypes.GLOBAL.defaultScopeInstance());
+ .createDefaultTopLevelBroker(ConfigFactory.parseProperties(props), GobblinScopeTypes.GLOBAL.defaultScopeInstance());
SharedResourcesBroker<GobblinScopeTypes> jobBroker = instanceBroker
.newSubscopedBuilder(new JobScopeInstance("ConvertibleHiveDatasetLineageEventTest", String.valueOf(System.currentTimeMillis())))
.build();