You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/09/11 16:14:50 UTC

incubator-gobblin git commit: [GOBBLIN-576] Send partition level lineage in hive distcp

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 5eee52d93 -> 29d403aec


[GOBBLIN-576] Send partition level lineage in hive distcp

Closes #2442 from zxcware/pd


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/29d403ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/29d403ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/29d403ae

Branch: refs/heads/master
Commit: 29d403aec044756ad711b4bce5a7395d01168439
Parents: 5eee52d
Author: zhchen <zh...@linkedin.com>
Authored: Tue Sep 11 09:14:46 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Sep 11 09:14:46 2018 -0700

----------------------------------------------------------------------
 .../gobblin/data/management/copy/CopySource.java |  6 +++---
 .../data/management/copy/CopyableFile.java       | 15 +++++++++------
 .../copy/hive/HiveCopyEntityHelper.java          | 19 +++++++++----------
 .../copy/hive/HivePartitionFileSet.java          | 15 ++++++++++++++-
 .../copy/hive/UnpartitionedTableFileSet.java     |  3 ++-
 .../copy/publisher/CopyDataPublisher.java        |  2 +-
 .../data/management/copy/CopyableFileTest.java   | 13 +++++++++----
 7 files changed, 47 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/29d403ae/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
index 4cf0b64..9ae9b45 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
@@ -398,9 +398,9 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
        * a DatasetFinder. Consequently, the source and destination dataset for the CopyableFile lineage are expected
        * to be set by the same logic
        */
-        if (lineageInfo.isPresent() && copyableFile.getSourceDataset() != null
-            && copyableFile.getDestinationDataset() != null) {
-          lineageInfo.get().setSource(copyableFile.getSourceDataset(), workUnit);
+        if (lineageInfo.isPresent() && copyableFile.getSourceData() != null
+            && copyableFile.getDestinationData() != null) {
+          lineageInfo.get().setSource(copyableFile.getSourceData(), workUnit);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/29d403ae/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
index d2547b4..9ad918c 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
@@ -21,6 +21,7 @@ import org.apache.gobblin.data.management.partition.File;
 import org.apache.gobblin.data.management.copy.PreserveAttributes.Option;
 import org.apache.gobblin.dataset.DatasetConstants;
 import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.dataset.Descriptor;
 import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.guid.Guid;
 
@@ -54,16 +55,16 @@ import com.google.common.collect.Lists;
 @EqualsAndHashCode(callSuper = true)
 public class CopyableFile extends CopyEntity implements File {
   /**
-   * The source dataset the file belongs to. For now, since it's only used before copying, set it to be
+   * The source data the file belongs to. For now, since it's only used before copying, set it to be
    * transient so that it won't be serialized, avoid unnecessary data transfer
    */
-  private transient DatasetDescriptor sourceDataset;
+  private transient Descriptor sourceData;
 
   /** {@link FileStatus} of the existing origin file. */
   private FileStatus origin;
 
-  /** The destination dataset the file will be copied to */
-  private DatasetDescriptor destinationDataset;
+  /** The destination data the file will be copied to */
+  private Descriptor destinationData;
 
   /** Complete destination {@link Path} of the file. */
   private Path destination;
@@ -131,13 +132,15 @@ public class CopyableFile extends CopyEntity implements File {
 
     Path fullSourcePath = Path.getPathWithoutSchemeAndAuthority(origin.getPath());
     String sourceDatasetName = isDir ? fullSourcePath.toString() : fullSourcePath.getParent().toString();
-    sourceDataset = new DatasetDescriptor(originFs.getScheme(), sourceDatasetName);
+    DatasetDescriptor sourceDataset = new DatasetDescriptor(originFs.getScheme(), sourceDatasetName);
     sourceDataset.addMetadata(DatasetConstants.FS_URI, originFs.getUri().toString());
+    sourceData = sourceDataset;
 
     Path fullDestinationPath = Path.getPathWithoutSchemeAndAuthority(destination);
     String destinationDatasetName = isDir ? fullDestinationPath.toString() : fullDestinationPath.getParent().toString();
-    destinationDataset = new DatasetDescriptor(targetFs.getScheme(), destinationDatasetName);
+    DatasetDescriptor destinationDataset = new DatasetDescriptor(targetFs.getScheme(), destinationDatasetName);
     destinationDataset.addMetadata(DatasetConstants.FS_URI, targetFs.getUri().toString());
+    destinationData = destinationDataset;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/29d403ae/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
index 2b1a142..9553959 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
@@ -765,18 +765,17 @@ public class HiveCopyEntityHelper {
     return this.targetFs;
   }
 
-  /**
-   * Set the source and destination datasets of a copyable file
-   */
-  void setCopyableFileDatasets(CopyableFile copyableFile) {
+  DatasetDescriptor getSourceDataset() {
     String sourceTable = dataset.getTable().getDbName() + "." + dataset.getTable().getTableName();
-    DatasetDescriptor source = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, sourceTable);
-    source.addMetadata(DatasetConstants.FS_URI, dataset.getFs().getUri().toString());
-    copyableFile.setSourceDataset(source);
+    DatasetDescriptor sourceDataset = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, sourceTable);
+    sourceDataset.addMetadata(DatasetConstants.FS_URI, dataset.getFs().getUri().toString());
+    return sourceDataset;
+  }
 
+  DatasetDescriptor getDestinationDataset() {
     String destinationTable = this.getTargetDatabase() + "." + this.getTargetTable();
-    DatasetDescriptor destination = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, destinationTable);
-    destination.addMetadata(DatasetConstants.FS_URI, this.getTargetFs().getUri().toString());
-    copyableFile.setDestinationDataset(destination);
+    DatasetDescriptor destinationDataset = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, destinationTable);
+    destinationDataset.addMetadata(DatasetConstants.FS_URI, this.getTargetFs().getUri().toString());
+    return destinationDataset;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/29d403ae/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
index 2c3817e..93ae40f 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
@@ -35,6 +35,8 @@ import org.apache.gobblin.data.management.copy.CopyEntity;
 import org.apache.gobblin.data.management.copy.CopyableFile;
 import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
 import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.dataset.PartitionDescriptor;
 import org.apache.gobblin.hive.HiveRegisterStep;
 import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
 import org.apache.gobblin.hive.spec.HiveSpec;
@@ -153,7 +155,18 @@ public class HivePartitionFileSet extends HiveFileSet {
         CopyableFile fileEntity =
             builder.fileSet(fileSet).checksum(new byte[0]).datasetOutputPath(desiredTargetLocation.location.toString())
                 .build();
-        this.hiveCopyEntityHelper.setCopyableFileDatasets(fileEntity);
+
+        DatasetDescriptor sourceDataset = this.hiveCopyEntityHelper.getSourceDataset();
+        PartitionDescriptor source = new PartitionDescriptor(partition.getName(), sourceDataset);
+        fileEntity.setSourceData(source);
+
+        DatasetDescriptor destinationDataset = this.hiveCopyEntityHelper.getDestinationDataset();
+        Partition destinationPartition =
+            this.existingTargetPartition.isPresent() ? this.existingTargetPartition.get() : partition;
+        PartitionDescriptor destination =
+            new PartitionDescriptor(destinationPartition.getName(), destinationDataset);
+        fileEntity.setDestinationData(destination);
+
         copyEntities.add(fileEntity);
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/29d403ae/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
index 756b4dd..89ebe7e 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
@@ -123,7 +123,8 @@ public class UnpartitionedTableFileSet extends HiveFileSet {
         Optional.<Partition> absent())) {
       CopyableFile fileEntity =
           builder.fileSet(fileSet).datasetOutputPath(desiredTargetLocation.location.toString()).build();
-      this.helper.setCopyableFileDatasets(fileEntity);
+      fileEntity.setSourceData(this.helper.getSourceDataset());
+      fileEntity.setDestinationData(this.helper.getDestinationDataset());
       copyEntities.add(fileEntity);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/29d403ae/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
index 79b2b6a..ec7b1a0 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
@@ -226,7 +226,7 @@ public class CopyDataPublisher extends DataPublisher implements UnpublishedHandl
             fileSetRoot = Optional.of(copyableFile.getDatasetOutputPath());
           }
           if (lineageInfo.isPresent()) {
-            lineageInfo.get().putDestination(copyableFile.getDestinationDataset(), 0, wus);
+            lineageInfo.get().putDestination(copyableFile.getDestinationData(), 0, wus);
           }
         }
         if (datasetOriginTimestamp > copyableFile.getOriginTimestamp()) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/29d403ae/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java
index 986efeb..65425fa 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java
@@ -38,6 +38,7 @@ import com.google.common.collect.Maps;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.dataset.PartitionDescriptor;
 import org.apache.gobblin.util.PathUtils;
 
 import static org.mockito.Mockito.mock;
@@ -56,6 +57,10 @@ public class CopyableFileTest {
             "checksum".getBytes(), PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps
             .<String, String>newHashMap(), "");
 
+    DatasetDescriptor dataset = new DatasetDescriptor("hive", "db.table");
+    PartitionDescriptor descriptor = new PartitionDescriptor("datepartition=2018/09/05", dataset);
+    copyableFile.setDestinationData(descriptor);
+
     String s = CopyEntity.serialize(copyableFile);
     CopyEntity de = CopyEntity.deserialize(s);
 
@@ -111,11 +116,11 @@ public class CopyableFileTest {
     CopyableFile copyableFile = new CopyableFile(origin, new Path(destinationPath), null, null, null,
         PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps.<String, String>newHashMap(), "");
     copyableFile.setFsDatasets(originFs, targetFs);
-    DatasetDescriptor source = copyableFile.getSourceDataset();
+    DatasetDescriptor source = (DatasetDescriptor) copyableFile.getSourceData();
     Assert.assertEquals(source.getName(), "/data/databases/source");
     Assert.assertEquals(source.getPlatform(), "hdfs");
     Assert.assertEquals(source.getMetadata().get("fsUri"), originFsUri);
-    DatasetDescriptor destination = copyableFile.getDestinationDataset();
+    DatasetDescriptor destination = (DatasetDescriptor) copyableFile.getDestinationData();
     Assert.assertEquals(destination.getName(), "/data/databases/destination");
     Assert.assertEquals(destination.getPlatform(), "file");
     Assert.assertEquals(destination.getMetadata().get("fsUri"), targetFsUri);
@@ -127,11 +132,11 @@ public class CopyableFileTest {
     copyableFile = new CopyableFile(origin, new Path(destinationPath), null, null, null,
         PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps.<String, String>newHashMap(), "");
     copyableFile.setFsDatasets(originFs, targetFs);
-    source = copyableFile.getSourceDataset();
+    source = (DatasetDescriptor) copyableFile.getSourceData();
     Assert.assertEquals(source.getName(), "/data/databases/source/profile");
     Assert.assertEquals(source.getPlatform(), "hdfs");
     Assert.assertEquals(source.getMetadata().get("fsUri"), originFsUri);
-    destination = copyableFile.getDestinationDataset();
+    destination = (DatasetDescriptor) copyableFile.getDestinationData();
     Assert.assertEquals(destination.getName(), "/data/databases/destination/profile");
     Assert.assertEquals(destination.getPlatform(), "file");
     Assert.assertEquals(destination.getMetadata().get("fsUri"), targetFsUri);