You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ib...@apache.org on 2018/01/17 19:57:31 UTC
incubator-gobblin git commit: [GOBBLIN-368][GOBBLIN-365] Add
configuration property to allow application of filters to directory paths
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 1b9ec19f9 -> 7d8d40dd4
[GOBBLIN-368][GOBBLIN-365] Add configuration property to allow application of filters to directory paths
Closes #2243 from sv2000/gobblin_368
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/7d8d40dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/7d8d40dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/7d8d40dd
Branch: refs/heads/master
Commit: 7d8d40dd4e905ed48720d0aeecf60f8974d4c410
Parents: 1b9ec19
Author: suvasude <su...@linkedin.biz>
Authored: Wed Jan 17 11:57:24 2018 -0800
Committer: Issac Buenrostro <ib...@apache.org>
Committed: Wed Jan 17 11:57:24 2018 -0800
----------------------------------------------------------------------
.../data/management/copy/CopyConfiguration.java | 9 ++--
.../copy/RecursiveCopyableDataset.java | 38 ++++++++++-------
.../org/apache/gobblin/util/FileListUtils.java | 45 ++++++++++++++------
3 files changed, 59 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7d8d40dd/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
index 82cca49..211ad13 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
@@ -55,6 +55,7 @@ public class CopyConfiguration {
* Include empty directories in the source for copy
*/
public static final String INCLUDE_EMPTY_DIRECTORIES = COPY_PREFIX + ".includeEmptyDirectories";
+ public static final String APPLY_FILTER_TO_DIRECTORIES = COPY_PREFIX + ".applyFilterToDirectories";
public static final String PRIORITIZER_ALIAS_KEY = PRIORITIZATION_PREFIX + ".prioritizerAlias";
public static final String MAX_COPY_PREFIX = PRIORITIZATION_PREFIX + ".maxCopy";
@@ -101,7 +102,7 @@ public class CopyConfiguration {
this.targetGroup =
properties.containsKey(DESTINATION_GROUP_KEY) ? Optional.of(properties.getProperty(DESTINATION_GROUP_KEY))
- : Optional.<String> absent();
+ : Optional.<String>absent();
this.preserve = PreserveAttributes.fromMnemonicString(properties.getProperty(PRESERVE_ATTRIBUTES_KEY));
Path publishDirTmp = new Path(properties.getProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR));
if (!publishDirTmp.isAbsolute()) {
@@ -113,9 +114,8 @@ public class CopyConfiguration {
if (properties.containsKey(PRIORITIZER_ALIAS_KEY)) {
try {
this.prioritizer = Optional.of(GobblinConstructorUtils.<FileSetComparator>invokeLongestConstructor(
- new ClassAliasResolver(FileSetComparator.class).resolveClass(properties.getProperty(
- PRIORITIZER_ALIAS_KEY)),
- properties));
+ new ClassAliasResolver(FileSetComparator.class).resolveClass(
+ properties.getProperty(PRIORITIZER_ALIAS_KEY)), properties));
} catch (ReflectiveOperationException roe) {
throw new RuntimeException("Could not build prioritizer.", roe);
}
@@ -138,5 +138,4 @@ public class CopyConfiguration {
public Config getPrioritizationConfig() {
return ConfigUtils.getConfigOrEmpty(this.config, PRIORITIZATION_PREFIX);
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7d8d40dd/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
index 138debe..252dafa 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
@@ -72,6 +72,8 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
private final boolean includeEmptyDirectories;
// Delete empty directories in the destination
private final boolean deleteEmptyDirectories;
+ //Apply filter to directories
+ private final boolean applyFilterToDirectories;
private final Properties properties;
@@ -89,6 +91,8 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
this.deleteEmptyDirectories = Boolean.parseBoolean(properties.getProperty(DELETE_EMPTY_DIRECTORIES_KEY));
this.includeEmptyDirectories =
Boolean.parseBoolean(properties.getProperty(CopyConfiguration.INCLUDE_EMPTY_DIRECTORIES));
+ this.applyFilterToDirectories =
+ Boolean.parseBoolean(properties.getProperty(CopyConfiguration.APPLY_FILTER_TO_DIRECTORIES, "false"));
this.properties = properties;
}
@@ -97,10 +101,13 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
throws IOException {
Path nonGlobSearchPath = PathUtils.deepestNonGlobPath(this.glob);
- Path targetPath = new Path(configuration.getPublishDir(), PathUtils.relativizePath(this.rootPath, nonGlobSearchPath));
+ Path targetPath =
+ new Path(configuration.getPublishDir(), PathUtils.relativizePath(this.rootPath, nonGlobSearchPath));
- Map<Path, FileStatus> filesInSource = createPathMap(getFilesAtPath(this.fs, this.rootPath, this.pathFilter), this.rootPath);
- Map<Path, FileStatus> filesInTarget = createPathMap(getFilesAtPath(targetFs, targetPath, this.pathFilter), targetPath);
+ Map<Path, FileStatus> filesInSource =
+ createPathMap(getFilesAtPath(this.fs, this.rootPath, this.pathFilter), this.rootPath);
+ Map<Path, FileStatus> filesInTarget =
+ createPathMap(getFilesAtPath(targetFs, targetPath, this.pathFilter), targetPath);
List<Path> toCopy = Lists.newArrayList();
Map<Path, FileStatus> toDelete = Maps.newHashMap();
@@ -136,11 +143,11 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
FileStatus file = filesInSource.get(path);
Path filePathRelativeToSearchPath = PathUtils.relativizePath(file.getPath(), nonGlobSearchPath);
Path thisTargetPath = new Path(configuration.getPublishDir(), filePathRelativeToSearchPath);
- CopyableFile copyableFile = CopyableFile.fromOriginAndDestination(this.fs, file, thisTargetPath, configuration)
- .fileSet(datasetURN()).datasetOutputPath(thisTargetPath.toString())
- .ancestorsOwnerAndPermission(CopyableFile.resolveReplicatedOwnerAndPermissionsRecursively(this.fs,
- file.getPath().getParent(), nonGlobSearchPath, configuration))
- .build();
+ CopyableFile copyableFile =
+ CopyableFile.fromOriginAndDestination(this.fs, file, thisTargetPath, configuration).fileSet(datasetURN())
+ .datasetOutputPath(thisTargetPath.toString()).ancestorsOwnerAndPermission(CopyableFile
+ .resolveReplicatedOwnerAndPermissionsRecursively(this.fs, file.getPath().getParent(), nonGlobSearchPath,
+ configuration)).build();
/*
* By default, the raw Gobblin dataset for CopyableFile lineage is its parent folder
@@ -164,9 +171,8 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
copyEntities.addAll(this.copyableFileFilter.filter(this.fs, targetFs, copyableFiles));
if (!toDelete.isEmpty()) {
- CommitStep step =
- new DeleteFileCommitStep(targetFs, toDelete.values(), this.properties,
- this.deleteEmptyDirectories ? Optional.of(targetPath) : Optional.<Path>absent());
+ CommitStep step = new DeleteFileCommitStep(targetFs, toDelete.values(), this.properties,
+ this.deleteEmptyDirectories ? Optional.of(targetPath) : Optional.<Path>absent());
copyEntities.add(new PrePublishStep(datasetURN(), Maps.<String, String>newHashMap(), step, 1));
}
@@ -175,9 +181,11 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
}
@VisibleForTesting
- protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, PathFilter fileFilter) throws IOException {
+ protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, PathFilter fileFilter)
+ throws IOException {
try {
- return FileListUtils.listFilesToCopyAtPath(fs, path, fileFilter, includeEmptyDirectories);
+ return FileListUtils
+ .listFilesToCopyAtPath(fs, path, fileFilter, applyFilterToDirectories, includeEmptyDirectories);
} catch (FileNotFoundException fnfe) {
return Lists.newArrayList();
}
@@ -202,7 +210,7 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
}
private static boolean sameFile(FileStatus fileInSource, FileStatus fileInTarget) {
- return fileInTarget.getLen() == fileInSource.getLen()
- && fileInSource.getModificationTime() <= fileInTarget.getModificationTime();
+ return fileInTarget.getLen() == fileInSource.getLen() && fileInSource.getModificationTime() <= fileInTarget
+ .getModificationTime();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7d8d40dd/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java
index 51bf66d..343da43 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java
@@ -56,11 +56,13 @@ public class FileListUtils {
}
};
- public static List<FileStatus> listFilesRecursively(FileSystem fs, Path path) throws IOException {
+ public static List<FileStatus> listFilesRecursively(FileSystem fs, Path path)
+ throws IOException {
return listFilesRecursively(fs, path, NO_OP_PATH_FILTER);
}
- public static List<FileStatus> listFilesRecursively(FileSystem fs, Iterable<Path> paths) throws IOException {
+ public static List<FileStatus> listFilesRecursively(FileSystem fs, Iterable<Path> paths)
+ throws IOException {
List<FileStatus> results = Lists.newArrayList();
for (Path path : paths) {
results.addAll(listFilesRecursively(fs, path));
@@ -77,12 +79,25 @@ public class FileListUtils {
* @param includeEmptyDirectories a control to include empty directories for copy
*/
public static List<FileStatus> listFilesToCopyAtPath(FileSystem fs, Path path, PathFilter fileFilter,
- boolean includeEmptyDirectories)
- throws IOException {
+ boolean includeEmptyDirectories) throws IOException {
+ return listFilesToCopyAtPath(fs, path, fileFilter, false, includeEmptyDirectories);
+ }
+
+ /**
+ * Given a path to copy, list all files rooted at the given path to copy
+ *
+ * @param fs the file system of the path
+ * @param path root path to copy
+ * @param fileFilter a filter only applied to root
+ * @param applyFilterToDirectories a control to decide whether to apply filter to directories
+ * @param includeEmptyDirectories a control to include empty directories for copy
+ */
+ public static List<FileStatus> listFilesToCopyAtPath(FileSystem fs, Path path, PathFilter fileFilter,
+ boolean applyFilterToDirectories, boolean includeEmptyDirectories) throws IOException {
List<FileStatus> files = Lists.newArrayList();
FileStatus rootFile = fs.getFileStatus(path);
- listFilesRecursivelyHelper(fs, files, rootFile, fileFilter, false, includeEmptyDirectories);
+ listFilesRecursivelyHelper(fs, files, rootFile, fileFilter, applyFilterToDirectories, includeEmptyDirectories);
// Copy the empty root directory
if (files.size() == 0 && rootFile.isDirectory() && includeEmptyDirectories) {
@@ -105,7 +120,8 @@ public class FileListUtils {
* {@link PathFilter} will only be applied to files.
*/
public static List<FileStatus> listFilesRecursively(FileSystem fs, Path path, PathFilter fileFilter,
- boolean applyFilterToDirectories) throws IOException {
+ boolean applyFilterToDirectories)
+ throws IOException {
return listFilesRecursivelyHelper(fs, Lists.newArrayList(), fs.getFileStatus(path), fileFilter,
applyFilterToDirectories, false);
}
@@ -114,8 +130,8 @@ public class FileListUtils {
FileStatus fileStatus, PathFilter fileFilter, boolean applyFilterToDirectories, boolean includeEmptyDirectories)
throws FileNotFoundException, IOException {
if (fileStatus.isDirectory()) {
- for (FileStatus status : fs.listStatus(fileStatus.getPath(),
- applyFilterToDirectories ? fileFilter : NO_OP_PATH_FILTER)) {
+ for (FileStatus status : fs
+ .listStatus(fileStatus.getPath(), applyFilterToDirectories ? fileFilter : NO_OP_PATH_FILTER)) {
if (status.isDirectory()) {
// Number of files collected before diving into the directory
int numFilesBefore = files.size();
@@ -144,11 +160,13 @@ public class FileListUtils {
/**
* Method to list out all files, or directory if no file exists, under a specified path.
*/
- public static List<FileStatus> listMostNestedPathRecursively(FileSystem fs, Path path) throws IOException {
+ public static List<FileStatus> listMostNestedPathRecursively(FileSystem fs, Path path)
+ throws IOException {
return listMostNestedPathRecursively(fs, path, NO_OP_PATH_FILTER);
}
- public static List<FileStatus> listMostNestedPathRecursively(FileSystem fs, Iterable<Path> paths) throws IOException {
+ public static List<FileStatus> listMostNestedPathRecursively(FileSystem fs, Iterable<Path> paths)
+ throws IOException {
List<FileStatus> results = Lists.newArrayList();
for (Path path : paths) {
results.addAll(listMostNestedPathRecursively(fs, path));
@@ -162,12 +180,13 @@ public class FileListUtils {
*/
public static List<FileStatus> listMostNestedPathRecursively(FileSystem fs, Path path, PathFilter fileFilter)
throws IOException {
- return listMostNestedPathRecursivelyHelper(fs, Lists.<FileStatus> newArrayList(), fs.getFileStatus(path),
+ return listMostNestedPathRecursivelyHelper(fs, Lists.<FileStatus>newArrayList(), fs.getFileStatus(path),
fileFilter);
}
private static List<FileStatus> listMostNestedPathRecursivelyHelper(FileSystem fs, List<FileStatus> files,
- FileStatus fileStatus, PathFilter fileFilter) throws IOException {
+ FileStatus fileStatus, PathFilter fileFilter)
+ throws IOException {
if (fileStatus.isDirectory()) {
FileStatus[] curFileStatus = fs.listStatus(fileStatus.getPath());
if (ArrayUtils.isEmpty(curFileStatus)) {
@@ -189,7 +208,7 @@ public class FileListUtils {
*/
public static List<FileStatus> listPathsRecursively(FileSystem fs, Path path, PathFilter fileFilter)
throws IOException {
- return listPathsRecursivelyHelper(fs, Lists.<FileStatus> newArrayList(), fs.getFileStatus(path), fileFilter);
+ return listPathsRecursivelyHelper(fs, Lists.<FileStatus>newArrayList(), fs.getFileStatus(path), fileFilter);
}
private static List<FileStatus> listPathsRecursivelyHelper(FileSystem fs, List<FileStatus> files,