You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/11/20 22:44:23 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-962] Refactor
RecursiveCopyableDataset.
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
View the commit online:
https://github.com/apache/incubator-gobblin/commit/e41fd943f096792598c213ec016f207a9a8f87fc
The following commit(s) were added to refs/heads/master by this push:
new e41fd94 [GOBBLIN-962] Refactor RecursiveCopyableDataset.
e41fd94 is described below
commit e41fd943f096792598c213ec016f207a9a8f87fc
Author: Kuai Yu <ku...@linkedin.com>
AuthorDate: Wed Nov 20 14:44:09 2019 -0800
[GOBBLIN-962] Refactor RecursiveCopyableDataset.
Closes #2811 from yukuai518/recursive
---
.../management/copy/RecursiveCopyableDataset.java | 58 +++++++++++++---------
1 file changed, 35 insertions(+), 23 deletions(-)
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
index 2d1f740..f6aaac9 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
@@ -94,19 +94,13 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
this.properties = properties;
}
- @Override
- public Collection<? extends CopyEntity> getCopyableFiles(FileSystem targetFs, CopyConfiguration configuration)
- throws IOException {
-
- Path nonGlobSearchPath = PathUtils.deepestNonGlobPath(this.glob);
- Path targetPath =
- new Path(configuration.getPublishDir(), PathUtils.relativizePath(this.rootPath, nonGlobSearchPath));
-
- Map<Path, FileStatus> filesInSource =
- createPathMap(getFilesAtPath(this.fs, this.rootPath, this.pathFilter), this.rootPath);
- Map<Path, FileStatus> filesInTarget =
- createPathMap(getFilesAtPath(targetFs, targetPath, this.pathFilter), targetPath);
-
+ protected Collection<? extends CopyEntity> getCopyableFilesImpl(CopyConfiguration configuration,
+ Map<Path, FileStatus> filesInSource,
+ Map<Path, FileStatus> filesInTarget,
+ FileSystem targetFs,
+ Path replacedPrefix,
+ Path replacingPrefix,
+ Path deleteEmptyDirectoriesUpTo) throws IOException {
List<Path> toCopy = Lists.newArrayList();
Map<Path, FileStatus> toDelete = Maps.newHashMap();
boolean requiresUpdate = false;
@@ -127,7 +121,7 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
if (!this.update && requiresUpdate) {
throw new IOException("Some files need to be copied but they already exist in the destination. "
- + "Aborting because not running in update mode.");
+ + "Aborting because not running in update mode.");
}
if (this.delete) {
@@ -139,13 +133,16 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
for (Path path : toCopy) {
FileStatus file = filesInSource.get(path);
- Path filePathRelativeToSearchPath = PathUtils.relativizePath(file.getPath(), nonGlobSearchPath);
- Path thisTargetPath = new Path(configuration.getPublishDir(), filePathRelativeToSearchPath);
+ Path filePathRelativeToSearchPath = PathUtils.relativizePath(file.getPath(), replacedPrefix);
+ Path thisTargetPath = new Path(replacingPrefix, filePathRelativeToSearchPath);
CopyableFile copyableFile =
- CopyableFile.fromOriginAndDestination(this.fs, file, thisTargetPath, configuration).fileSet(datasetURN())
- .datasetOutputPath(thisTargetPath.toString()).ancestorsOwnerAndPermission(CopyableFile
- .resolveReplicatedOwnerAndPermissionsRecursively(this.fs, file.getPath().getParent(), nonGlobSearchPath,
- configuration)).build();
+ CopyableFile.fromOriginAndDestination(this.fs, file, thisTargetPath, configuration)
+ .fileSet(datasetURN())
+ .datasetOutputPath(thisTargetPath.toString())
+ .ancestorsOwnerAndPermission(CopyableFile
+ .resolveReplicatedOwnerAndPermissionsRecursively(this.fs, file.getPath().getParent(),
+ replacedPrefix, configuration))
+ .build();
copyableFile.setFsDatasets(this.fs, targetFs);
copyableFiles.add(copyableFile);
}
@@ -153,14 +150,29 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
if (!toDelete.isEmpty()) {
CommitStep step = new DeleteFileCommitStep(targetFs, toDelete.values(), this.properties,
- this.deleteEmptyDirectories ? Optional.of(targetPath) : Optional.<Path>absent());
-
+ this.deleteEmptyDirectories ? Optional.of(deleteEmptyDirectoriesUpTo) : Optional.<Path>absent());
copyEntities.add(new PrePublishStep(datasetURN(), Maps.<String, String>newHashMap(), step, 1));
}
-
return copyEntities;
}
+ @Override
+ public Collection<? extends CopyEntity> getCopyableFiles(FileSystem targetFs, CopyConfiguration configuration)
+ throws IOException {
+
+ Path nonGlobSearchPath = PathUtils.deepestNonGlobPath(this.glob);
+ Path targetPath =
+ new Path(configuration.getPublishDir(), PathUtils.relativizePath(this.rootPath, nonGlobSearchPath));
+
+ Map<Path, FileStatus> filesInSource =
+ createPathMap(getFilesAtPath(this.fs, this.rootPath, this.pathFilter), this.rootPath);
+ Map<Path, FileStatus> filesInTarget =
+ createPathMap(getFilesAtPath(targetFs, targetPath, this.pathFilter), targetPath);
+
+ return getCopyableFilesImpl(configuration, filesInSource, filesInTarget, targetFs,
+ nonGlobSearchPath, configuration.getPublishDir(), targetPath);
+ }
+
@VisibleForTesting
protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, PathFilter fileFilter)
throws IOException {