You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/11/20 22:44:23 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-962] Refactor RecursiveCopyableDataset.

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


View the commit online:
https://github.com/apache/incubator-gobblin/commit/e41fd943f096792598c213ec016f207a9a8f87fc

The following commit(s) were added to refs/heads/master by this push:
     new e41fd94  [GOBBLIN-962] Refactor RecursiveCopyableDataset.
e41fd94 is described below

commit e41fd943f096792598c213ec016f207a9a8f87fc
Author: Kuai Yu <ku...@linkedin.com>
AuthorDate: Wed Nov 20 14:44:09 2019 -0800

    [GOBBLIN-962] Refactor RecursiveCopyableDataset.
    
    Closes #2811 from yukuai518/recursive
---
 .../management/copy/RecursiveCopyableDataset.java  | 58 +++++++++++++---------
 1 file changed, 35 insertions(+), 23 deletions(-)

diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
index 2d1f740..f6aaac9 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
@@ -94,19 +94,13 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
     this.properties = properties;
   }
 
-  @Override
-  public Collection<? extends CopyEntity> getCopyableFiles(FileSystem targetFs, CopyConfiguration configuration)
-      throws IOException {
-
-    Path nonGlobSearchPath = PathUtils.deepestNonGlobPath(this.glob);
-    Path targetPath =
-        new Path(configuration.getPublishDir(), PathUtils.relativizePath(this.rootPath, nonGlobSearchPath));
-
-    Map<Path, FileStatus> filesInSource =
-        createPathMap(getFilesAtPath(this.fs, this.rootPath, this.pathFilter), this.rootPath);
-    Map<Path, FileStatus> filesInTarget =
-        createPathMap(getFilesAtPath(targetFs, targetPath, this.pathFilter), targetPath);
-
+  protected Collection<? extends CopyEntity> getCopyableFilesImpl(CopyConfiguration configuration,
+                                                                  Map<Path, FileStatus> filesInSource,
+                                                                  Map<Path, FileStatus> filesInTarget,
+                                                                  FileSystem targetFs,
+                                                                  Path replacedPrefix,
+                                                                  Path replacingPrefix,
+                                                                  Path deleteEmptyDirectoriesUpTo) throws IOException {
     List<Path> toCopy = Lists.newArrayList();
     Map<Path, FileStatus> toDelete = Maps.newHashMap();
     boolean requiresUpdate = false;
@@ -127,7 +121,7 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
 
     if (!this.update && requiresUpdate) {
       throw new IOException("Some files need to be copied but they already exist in the destination. "
-          + "Aborting because not running in update mode.");
+              + "Aborting because not running in update mode.");
     }
 
     if (this.delete) {
@@ -139,13 +133,16 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
 
     for (Path path : toCopy) {
       FileStatus file = filesInSource.get(path);
-      Path filePathRelativeToSearchPath = PathUtils.relativizePath(file.getPath(), nonGlobSearchPath);
-      Path thisTargetPath = new Path(configuration.getPublishDir(), filePathRelativeToSearchPath);
+      Path filePathRelativeToSearchPath = PathUtils.relativizePath(file.getPath(), replacedPrefix);
+      Path thisTargetPath = new Path(replacingPrefix, filePathRelativeToSearchPath);
       CopyableFile copyableFile =
-          CopyableFile.fromOriginAndDestination(this.fs, file, thisTargetPath, configuration).fileSet(datasetURN())
-              .datasetOutputPath(thisTargetPath.toString()).ancestorsOwnerAndPermission(CopyableFile
-              .resolveReplicatedOwnerAndPermissionsRecursively(this.fs, file.getPath().getParent(), nonGlobSearchPath,
-                  configuration)).build();
+              CopyableFile.fromOriginAndDestination(this.fs, file, thisTargetPath, configuration)
+                      .fileSet(datasetURN())
+                      .datasetOutputPath(thisTargetPath.toString())
+                      .ancestorsOwnerAndPermission(CopyableFile
+                              .resolveReplicatedOwnerAndPermissionsRecursively(this.fs, file.getPath().getParent(),
+                                      replacedPrefix, configuration))
+                      .build();
       copyableFile.setFsDatasets(this.fs, targetFs);
       copyableFiles.add(copyableFile);
     }
@@ -153,14 +150,29 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
 
     if (!toDelete.isEmpty()) {
       CommitStep step = new DeleteFileCommitStep(targetFs, toDelete.values(), this.properties,
-          this.deleteEmptyDirectories ? Optional.of(targetPath) : Optional.<Path>absent());
-
+              this.deleteEmptyDirectories ? Optional.of(deleteEmptyDirectoriesUpTo) : Optional.<Path>absent());
       copyEntities.add(new PrePublishStep(datasetURN(), Maps.<String, String>newHashMap(), step, 1));
     }
-
     return copyEntities;
   }
 
+  @Override
+  public Collection<? extends CopyEntity> getCopyableFiles(FileSystem targetFs, CopyConfiguration configuration)
+      throws IOException {
+
+    Path nonGlobSearchPath = PathUtils.deepestNonGlobPath(this.glob);
+    Path targetPath =
+        new Path(configuration.getPublishDir(), PathUtils.relativizePath(this.rootPath, nonGlobSearchPath));
+
+    Map<Path, FileStatus> filesInSource =
+        createPathMap(getFilesAtPath(this.fs, this.rootPath, this.pathFilter), this.rootPath);
+    Map<Path, FileStatus> filesInTarget =
+        createPathMap(getFilesAtPath(targetFs, targetPath, this.pathFilter), targetPath);
+
+    return getCopyableFilesImpl(configuration, filesInSource, filesInTarget, targetFs,
+            nonGlobSearchPath, configuration.getPublishDir(), targetPath);
+  }
+
   @VisibleForTesting
   protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, PathFilter fileFilter)
       throws IOException {