You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/09/11 16:14:50 UTC
incubator-gobblin git commit: [GOBBLIN-576] Send partition level
lineage in hive distcp
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 5eee52d93 -> 29d403aec
[GOBBLIN-576] Send partition level lineage in hive distcp
Closes #2442 from zxcware/pd
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/29d403ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/29d403ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/29d403ae
Branch: refs/heads/master
Commit: 29d403aec044756ad711b4bce5a7395d01168439
Parents: 5eee52d
Author: zhchen <zh...@linkedin.com>
Authored: Tue Sep 11 09:14:46 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Sep 11 09:14:46 2018 -0700
----------------------------------------------------------------------
.../gobblin/data/management/copy/CopySource.java | 6 +++---
.../data/management/copy/CopyableFile.java | 15 +++++++++------
.../copy/hive/HiveCopyEntityHelper.java | 19 +++++++++----------
.../copy/hive/HivePartitionFileSet.java | 15 ++++++++++++++-
.../copy/hive/UnpartitionedTableFileSet.java | 3 ++-
.../copy/publisher/CopyDataPublisher.java | 2 +-
.../data/management/copy/CopyableFileTest.java | 13 +++++++++----
7 files changed, 47 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/29d403ae/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
index 4cf0b64..9ae9b45 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
@@ -398,9 +398,9 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
* a DatasetFinder. Consequently, the source and destination dataset for the CopyableFile lineage are expected
* to be set by the same logic
*/
- if (lineageInfo.isPresent() && copyableFile.getSourceDataset() != null
- && copyableFile.getDestinationDataset() != null) {
- lineageInfo.get().setSource(copyableFile.getSourceDataset(), workUnit);
+ if (lineageInfo.isPresent() && copyableFile.getSourceData() != null
+ && copyableFile.getDestinationData() != null) {
+ lineageInfo.get().setSource(copyableFile.getSourceData(), workUnit);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/29d403ae/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
index d2547b4..9ad918c 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
@@ -21,6 +21,7 @@ import org.apache.gobblin.data.management.partition.File;
import org.apache.gobblin.data.management.copy.PreserveAttributes.Option;
import org.apache.gobblin.dataset.DatasetConstants;
import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.dataset.Descriptor;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.guid.Guid;
@@ -54,16 +55,16 @@ import com.google.common.collect.Lists;
@EqualsAndHashCode(callSuper = true)
public class CopyableFile extends CopyEntity implements File {
/**
- * The source dataset the file belongs to. For now, since it's only used before copying, set it to be
+ * The source data the file belongs to. For now, since it's only used before copying, set it to be
* transient so that it won't be serialized, avoid unnecessary data transfer
*/
- private transient DatasetDescriptor sourceDataset;
+ private transient Descriptor sourceData;
/** {@link FileStatus} of the existing origin file. */
private FileStatus origin;
- /** The destination dataset the file will be copied to */
- private DatasetDescriptor destinationDataset;
+ /** The destination data the file will be copied to */
+ private Descriptor destinationData;
/** Complete destination {@link Path} of the file. */
private Path destination;
@@ -131,13 +132,15 @@ public class CopyableFile extends CopyEntity implements File {
Path fullSourcePath = Path.getPathWithoutSchemeAndAuthority(origin.getPath());
String sourceDatasetName = isDir ? fullSourcePath.toString() : fullSourcePath.getParent().toString();
- sourceDataset = new DatasetDescriptor(originFs.getScheme(), sourceDatasetName);
+ DatasetDescriptor sourceDataset = new DatasetDescriptor(originFs.getScheme(), sourceDatasetName);
sourceDataset.addMetadata(DatasetConstants.FS_URI, originFs.getUri().toString());
+ sourceData = sourceDataset;
Path fullDestinationPath = Path.getPathWithoutSchemeAndAuthority(destination);
String destinationDatasetName = isDir ? fullDestinationPath.toString() : fullDestinationPath.getParent().toString();
- destinationDataset = new DatasetDescriptor(targetFs.getScheme(), destinationDatasetName);
+ DatasetDescriptor destinationDataset = new DatasetDescriptor(targetFs.getScheme(), destinationDatasetName);
destinationDataset.addMetadata(DatasetConstants.FS_URI, targetFs.getUri().toString());
+ destinationData = destinationDataset;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/29d403ae/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
index 2b1a142..9553959 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
@@ -765,18 +765,17 @@ public class HiveCopyEntityHelper {
return this.targetFs;
}
- /**
- * Set the source and destination datasets of a copyable file
- */
- void setCopyableFileDatasets(CopyableFile copyableFile) {
+ DatasetDescriptor getSourceDataset() {
String sourceTable = dataset.getTable().getDbName() + "." + dataset.getTable().getTableName();
- DatasetDescriptor source = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, sourceTable);
- source.addMetadata(DatasetConstants.FS_URI, dataset.getFs().getUri().toString());
- copyableFile.setSourceDataset(source);
+ DatasetDescriptor sourceDataset = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, sourceTable);
+ sourceDataset.addMetadata(DatasetConstants.FS_URI, dataset.getFs().getUri().toString());
+ return sourceDataset;
+ }
+ DatasetDescriptor getDestinationDataset() {
String destinationTable = this.getTargetDatabase() + "." + this.getTargetTable();
- DatasetDescriptor destination = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, destinationTable);
- destination.addMetadata(DatasetConstants.FS_URI, this.getTargetFs().getUri().toString());
- copyableFile.setDestinationDataset(destination);
+ DatasetDescriptor destinationDataset = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, destinationTable);
+ destinationDataset.addMetadata(DatasetConstants.FS_URI, this.getTargetFs().getUri().toString());
+ return destinationDataset;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/29d403ae/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
index 2c3817e..93ae40f 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
@@ -35,6 +35,8 @@ import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.CopyableFile;
import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.dataset.PartitionDescriptor;
import org.apache.gobblin.hive.HiveRegisterStep;
import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
import org.apache.gobblin.hive.spec.HiveSpec;
@@ -153,7 +155,18 @@ public class HivePartitionFileSet extends HiveFileSet {
CopyableFile fileEntity =
builder.fileSet(fileSet).checksum(new byte[0]).datasetOutputPath(desiredTargetLocation.location.toString())
.build();
- this.hiveCopyEntityHelper.setCopyableFileDatasets(fileEntity);
+
+ DatasetDescriptor sourceDataset = this.hiveCopyEntityHelper.getSourceDataset();
+ PartitionDescriptor source = new PartitionDescriptor(partition.getName(), sourceDataset);
+ fileEntity.setSourceData(source);
+
+ DatasetDescriptor destinationDataset = this.hiveCopyEntityHelper.getDestinationDataset();
+ Partition destinationPartition =
+ this.existingTargetPartition.isPresent() ? this.existingTargetPartition.get() : partition;
+ PartitionDescriptor destination =
+ new PartitionDescriptor(destinationPartition.getName(), destinationDataset);
+ fileEntity.setDestinationData(destination);
+
copyEntities.add(fileEntity);
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/29d403ae/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
index 756b4dd..89ebe7e 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
@@ -123,7 +123,8 @@ public class UnpartitionedTableFileSet extends HiveFileSet {
Optional.<Partition> absent())) {
CopyableFile fileEntity =
builder.fileSet(fileSet).datasetOutputPath(desiredTargetLocation.location.toString()).build();
- this.helper.setCopyableFileDatasets(fileEntity);
+ fileEntity.setSourceData(this.helper.getSourceDataset());
+ fileEntity.setDestinationData(this.helper.getDestinationDataset());
copyEntities.add(fileEntity);
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/29d403ae/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
index 79b2b6a..ec7b1a0 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
@@ -226,7 +226,7 @@ public class CopyDataPublisher extends DataPublisher implements UnpublishedHandl
fileSetRoot = Optional.of(copyableFile.getDatasetOutputPath());
}
if (lineageInfo.isPresent()) {
- lineageInfo.get().putDestination(copyableFile.getDestinationDataset(), 0, wus);
+ lineageInfo.get().putDestination(copyableFile.getDestinationData(), 0, wus);
}
}
if (datasetOriginTimestamp > copyableFile.getOriginTimestamp()) {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/29d403ae/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java
index 986efeb..65425fa 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java
@@ -38,6 +38,7 @@ import com.google.common.collect.Maps;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.dataset.PartitionDescriptor;
import org.apache.gobblin.util.PathUtils;
import static org.mockito.Mockito.mock;
@@ -56,6 +57,10 @@ public class CopyableFileTest {
"checksum".getBytes(), PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps
.<String, String>newHashMap(), "");
+ DatasetDescriptor dataset = new DatasetDescriptor("hive", "db.table");
+ PartitionDescriptor descriptor = new PartitionDescriptor("datepartition=2018/09/05", dataset);
+ copyableFile.setDestinationData(descriptor);
+
String s = CopyEntity.serialize(copyableFile);
CopyEntity de = CopyEntity.deserialize(s);
@@ -111,11 +116,11 @@ public class CopyableFileTest {
CopyableFile copyableFile = new CopyableFile(origin, new Path(destinationPath), null, null, null,
PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps.<String, String>newHashMap(), "");
copyableFile.setFsDatasets(originFs, targetFs);
- DatasetDescriptor source = copyableFile.getSourceDataset();
+ DatasetDescriptor source = (DatasetDescriptor) copyableFile.getSourceData();
Assert.assertEquals(source.getName(), "/data/databases/source");
Assert.assertEquals(source.getPlatform(), "hdfs");
Assert.assertEquals(source.getMetadata().get("fsUri"), originFsUri);
- DatasetDescriptor destination = copyableFile.getDestinationDataset();
+ DatasetDescriptor destination = (DatasetDescriptor) copyableFile.getDestinationData();
Assert.assertEquals(destination.getName(), "/data/databases/destination");
Assert.assertEquals(destination.getPlatform(), "file");
Assert.assertEquals(destination.getMetadata().get("fsUri"), targetFsUri);
@@ -127,11 +132,11 @@ public class CopyableFileTest {
copyableFile = new CopyableFile(origin, new Path(destinationPath), null, null, null,
PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps.<String, String>newHashMap(), "");
copyableFile.setFsDatasets(originFs, targetFs);
- source = copyableFile.getSourceDataset();
+ source = (DatasetDescriptor) copyableFile.getSourceData();
Assert.assertEquals(source.getName(), "/data/databases/source/profile");
Assert.assertEquals(source.getPlatform(), "hdfs");
Assert.assertEquals(source.getMetadata().get("fsUri"), originFsUri);
- destination = copyableFile.getDestinationDataset();
+ destination = (DatasetDescriptor) copyableFile.getDestinationData();
Assert.assertEquals(destination.getName(), "/data/databases/destination/profile");
Assert.assertEquals(destination.getPlatform(), "file");
Assert.assertEquals(destination.getMetadata().get("fsUri"), targetFsUri);