You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ib...@apache.org on 2018/01/17 19:57:31 UTC

incubator-gobblin git commit: [GOBBLIN-368][GOBBLIN-365] Add configuration property to allow application of filters to directory paths

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 1b9ec19f9 -> 7d8d40dd4


[GOBBLIN-368][GOBBLIN-365] Add configuration property to allow application of filters to directory paths

Closes #2243 from sv2000/gobblin_368


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/7d8d40dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/7d8d40dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/7d8d40dd

Branch: refs/heads/master
Commit: 7d8d40dd4e905ed48720d0aeecf60f8974d4c410
Parents: 1b9ec19
Author: suvasude <su...@linkedin.biz>
Authored: Wed Jan 17 11:57:24 2018 -0800
Committer: Issac Buenrostro <ib...@apache.org>
Committed: Wed Jan 17 11:57:24 2018 -0800

----------------------------------------------------------------------
 .../data/management/copy/CopyConfiguration.java |  9 ++--
 .../copy/RecursiveCopyableDataset.java          | 38 ++++++++++-------
 .../org/apache/gobblin/util/FileListUtils.java  | 45 ++++++++++++++------
 3 files changed, 59 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7d8d40dd/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
index 82cca49..211ad13 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
@@ -55,6 +55,7 @@ public class CopyConfiguration {
    * Include empty directories in the source for copy
    */
   public static final String INCLUDE_EMPTY_DIRECTORIES = COPY_PREFIX + ".includeEmptyDirectories";
+  public static final String APPLY_FILTER_TO_DIRECTORIES = COPY_PREFIX + ".applyFilterToDirectories";
 
   public static final String PRIORITIZER_ALIAS_KEY = PRIORITIZATION_PREFIX + ".prioritizerAlias";
   public static final String MAX_COPY_PREFIX = PRIORITIZATION_PREFIX + ".maxCopy";
@@ -101,7 +102,7 @@ public class CopyConfiguration {
 
       this.targetGroup =
           properties.containsKey(DESTINATION_GROUP_KEY) ? Optional.of(properties.getProperty(DESTINATION_GROUP_KEY))
-              : Optional.<String> absent();
+              : Optional.<String>absent();
       this.preserve = PreserveAttributes.fromMnemonicString(properties.getProperty(PRESERVE_ATTRIBUTES_KEY));
       Path publishDirTmp = new Path(properties.getProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR));
       if (!publishDirTmp.isAbsolute()) {
@@ -113,9 +114,8 @@ public class CopyConfiguration {
       if (properties.containsKey(PRIORITIZER_ALIAS_KEY)) {
         try {
           this.prioritizer = Optional.of(GobblinConstructorUtils.<FileSetComparator>invokeLongestConstructor(
-              new ClassAliasResolver(FileSetComparator.class).resolveClass(properties.getProperty(
-                  PRIORITIZER_ALIAS_KEY)),
-              properties));
+              new ClassAliasResolver(FileSetComparator.class).resolveClass(
+                  properties.getProperty(PRIORITIZER_ALIAS_KEY)), properties));
         } catch (ReflectiveOperationException roe) {
           throw new RuntimeException("Could not build prioritizer.", roe);
         }
@@ -138,5 +138,4 @@ public class CopyConfiguration {
   public Config getPrioritizationConfig() {
     return ConfigUtils.getConfigOrEmpty(this.config, PRIORITIZATION_PREFIX);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7d8d40dd/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
index 138debe..252dafa 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
@@ -72,6 +72,8 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
   private final boolean includeEmptyDirectories;
   // Delete empty directories in the destination
   private final boolean deleteEmptyDirectories;
+  //Apply filter to directories
+  private final boolean applyFilterToDirectories;
 
   private final Properties properties;
 
@@ -89,6 +91,8 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
     this.deleteEmptyDirectories = Boolean.parseBoolean(properties.getProperty(DELETE_EMPTY_DIRECTORIES_KEY));
     this.includeEmptyDirectories =
         Boolean.parseBoolean(properties.getProperty(CopyConfiguration.INCLUDE_EMPTY_DIRECTORIES));
+    this.applyFilterToDirectories =
+        Boolean.parseBoolean(properties.getProperty(CopyConfiguration.APPLY_FILTER_TO_DIRECTORIES, "false"));
     this.properties = properties;
   }
 
@@ -97,10 +101,13 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
       throws IOException {
 
     Path nonGlobSearchPath = PathUtils.deepestNonGlobPath(this.glob);
-    Path targetPath = new Path(configuration.getPublishDir(), PathUtils.relativizePath(this.rootPath, nonGlobSearchPath));
+    Path targetPath =
+        new Path(configuration.getPublishDir(), PathUtils.relativizePath(this.rootPath, nonGlobSearchPath));
 
-    Map<Path, FileStatus> filesInSource = createPathMap(getFilesAtPath(this.fs, this.rootPath, this.pathFilter), this.rootPath);
-    Map<Path, FileStatus> filesInTarget = createPathMap(getFilesAtPath(targetFs, targetPath, this.pathFilter), targetPath);
+    Map<Path, FileStatus> filesInSource =
+        createPathMap(getFilesAtPath(this.fs, this.rootPath, this.pathFilter), this.rootPath);
+    Map<Path, FileStatus> filesInTarget =
+        createPathMap(getFilesAtPath(targetFs, targetPath, this.pathFilter), targetPath);
 
     List<Path> toCopy = Lists.newArrayList();
     Map<Path, FileStatus> toDelete = Maps.newHashMap();
@@ -136,11 +143,11 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
       FileStatus file = filesInSource.get(path);
       Path filePathRelativeToSearchPath = PathUtils.relativizePath(file.getPath(), nonGlobSearchPath);
       Path thisTargetPath = new Path(configuration.getPublishDir(), filePathRelativeToSearchPath);
-      CopyableFile copyableFile = CopyableFile.fromOriginAndDestination(this.fs, file, thisTargetPath, configuration)
-          .fileSet(datasetURN()).datasetOutputPath(thisTargetPath.toString())
-          .ancestorsOwnerAndPermission(CopyableFile.resolveReplicatedOwnerAndPermissionsRecursively(this.fs,
-              file.getPath().getParent(), nonGlobSearchPath, configuration))
-          .build();
+      CopyableFile copyableFile =
+          CopyableFile.fromOriginAndDestination(this.fs, file, thisTargetPath, configuration).fileSet(datasetURN())
+              .datasetOutputPath(thisTargetPath.toString()).ancestorsOwnerAndPermission(CopyableFile
+              .resolveReplicatedOwnerAndPermissionsRecursively(this.fs, file.getPath().getParent(), nonGlobSearchPath,
+                  configuration)).build();
 
       /*
        * By default, the raw Gobblin dataset for CopyableFile lineage is its parent folder
@@ -164,9 +171,8 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
     copyEntities.addAll(this.copyableFileFilter.filter(this.fs, targetFs, copyableFiles));
 
     if (!toDelete.isEmpty()) {
-      CommitStep step =
-          new DeleteFileCommitStep(targetFs, toDelete.values(), this.properties,
-              this.deleteEmptyDirectories ? Optional.of(targetPath) : Optional.<Path>absent());
+      CommitStep step = new DeleteFileCommitStep(targetFs, toDelete.values(), this.properties,
+          this.deleteEmptyDirectories ? Optional.of(targetPath) : Optional.<Path>absent());
 
       copyEntities.add(new PrePublishStep(datasetURN(), Maps.<String, String>newHashMap(), step, 1));
     }
@@ -175,9 +181,11 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
   }
 
   @VisibleForTesting
-  protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, PathFilter fileFilter) throws IOException {
+  protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, PathFilter fileFilter)
+      throws IOException {
     try {
-      return FileListUtils.listFilesToCopyAtPath(fs, path, fileFilter, includeEmptyDirectories);
+      return FileListUtils
+          .listFilesToCopyAtPath(fs, path, fileFilter, applyFilterToDirectories, includeEmptyDirectories);
     } catch (FileNotFoundException fnfe) {
       return Lists.newArrayList();
     }
@@ -202,7 +210,7 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
   }
 
   private static boolean sameFile(FileStatus fileInSource, FileStatus fileInTarget) {
-    return fileInTarget.getLen() == fileInSource.getLen()
-        && fileInSource.getModificationTime() <= fileInTarget.getModificationTime();
+    return fileInTarget.getLen() == fileInSource.getLen() && fileInSource.getModificationTime() <= fileInTarget
+        .getModificationTime();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7d8d40dd/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java
index 51bf66d..343da43 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java
@@ -56,11 +56,13 @@ public class FileListUtils {
     }
   };
 
-  public static List<FileStatus> listFilesRecursively(FileSystem fs, Path path) throws IOException {
+  public static List<FileStatus> listFilesRecursively(FileSystem fs, Path path)
+      throws IOException {
     return listFilesRecursively(fs, path, NO_OP_PATH_FILTER);
   }
 
-  public static List<FileStatus> listFilesRecursively(FileSystem fs, Iterable<Path> paths) throws IOException {
+  public static List<FileStatus> listFilesRecursively(FileSystem fs, Iterable<Path> paths)
+      throws IOException {
     List<FileStatus> results = Lists.newArrayList();
     for (Path path : paths) {
       results.addAll(listFilesRecursively(fs, path));
@@ -77,12 +79,25 @@ public class FileListUtils {
    * @param includeEmptyDirectories a control to include empty directories for copy
    */
   public static List<FileStatus> listFilesToCopyAtPath(FileSystem fs, Path path, PathFilter fileFilter,
-      boolean includeEmptyDirectories)
-      throws IOException {
+      boolean includeEmptyDirectories) throws IOException {
+    return listFilesToCopyAtPath(fs, path, fileFilter, false, includeEmptyDirectories);
+  }
+
+  /**
+   * Given a path to copy, list all files rooted at the given path to copy
+   *
+   * @param fs the file system of the path
+   * @param path root path to copy
+   * @param fileFilter a filter only applied to root
+   * @param applyFilterToDirectories a control to decide whether to apply filter to directories
+   * @param includeEmptyDirectories a control to include empty directories for copy
+   */
+  public static List<FileStatus> listFilesToCopyAtPath(FileSystem fs, Path path, PathFilter fileFilter,
+      boolean applyFilterToDirectories, boolean includeEmptyDirectories) throws IOException {
     List<FileStatus> files = Lists.newArrayList();
     FileStatus rootFile = fs.getFileStatus(path);
 
-    listFilesRecursivelyHelper(fs, files, rootFile, fileFilter, false, includeEmptyDirectories);
+    listFilesRecursivelyHelper(fs, files, rootFile, fileFilter, applyFilterToDirectories, includeEmptyDirectories);
 
     // Copy the empty root directory
     if (files.size() == 0 && rootFile.isDirectory() && includeEmptyDirectories) {
@@ -105,7 +120,8 @@ public class FileListUtils {
    * {@link PathFilter} will only be applied to files.
    */
   public static List<FileStatus> listFilesRecursively(FileSystem fs, Path path, PathFilter fileFilter,
-      boolean applyFilterToDirectories) throws IOException {
+      boolean applyFilterToDirectories)
+      throws IOException {
     return listFilesRecursivelyHelper(fs, Lists.newArrayList(), fs.getFileStatus(path), fileFilter,
         applyFilterToDirectories, false);
   }
@@ -114,8 +130,8 @@ public class FileListUtils {
       FileStatus fileStatus, PathFilter fileFilter, boolean applyFilterToDirectories, boolean includeEmptyDirectories)
       throws FileNotFoundException, IOException {
     if (fileStatus.isDirectory()) {
-      for (FileStatus status : fs.listStatus(fileStatus.getPath(),
-          applyFilterToDirectories ? fileFilter : NO_OP_PATH_FILTER)) {
+      for (FileStatus status : fs
+          .listStatus(fileStatus.getPath(), applyFilterToDirectories ? fileFilter : NO_OP_PATH_FILTER)) {
         if (status.isDirectory()) {
           // Number of files collected before diving into the directory
           int numFilesBefore = files.size();
@@ -144,11 +160,13 @@ public class FileListUtils {
   /**
    * Method to list out all files, or directory if no file exists, under a specified path.
    */
-  public static List<FileStatus> listMostNestedPathRecursively(FileSystem fs, Path path) throws IOException {
+  public static List<FileStatus> listMostNestedPathRecursively(FileSystem fs, Path path)
+      throws IOException {
     return listMostNestedPathRecursively(fs, path, NO_OP_PATH_FILTER);
   }
 
-  public static List<FileStatus> listMostNestedPathRecursively(FileSystem fs, Iterable<Path> paths) throws IOException {
+  public static List<FileStatus> listMostNestedPathRecursively(FileSystem fs, Iterable<Path> paths)
+      throws IOException {
     List<FileStatus> results = Lists.newArrayList();
     for (Path path : paths) {
       results.addAll(listMostNestedPathRecursively(fs, path));
@@ -162,12 +180,13 @@ public class FileListUtils {
    */
   public static List<FileStatus> listMostNestedPathRecursively(FileSystem fs, Path path, PathFilter fileFilter)
       throws IOException {
-    return listMostNestedPathRecursivelyHelper(fs, Lists.<FileStatus> newArrayList(), fs.getFileStatus(path),
+    return listMostNestedPathRecursivelyHelper(fs, Lists.<FileStatus>newArrayList(), fs.getFileStatus(path),
         fileFilter);
   }
 
   private static List<FileStatus> listMostNestedPathRecursivelyHelper(FileSystem fs, List<FileStatus> files,
-      FileStatus fileStatus, PathFilter fileFilter) throws IOException {
+      FileStatus fileStatus, PathFilter fileFilter)
+      throws IOException {
     if (fileStatus.isDirectory()) {
       FileStatus[] curFileStatus = fs.listStatus(fileStatus.getPath());
       if (ArrayUtils.isEmpty(curFileStatus)) {
@@ -189,7 +208,7 @@ public class FileListUtils {
    */
   public static List<FileStatus> listPathsRecursively(FileSystem fs, Path path, PathFilter fileFilter)
       throws IOException {
-    return listPathsRecursivelyHelper(fs, Lists.<FileStatus> newArrayList(), fs.getFileStatus(path), fileFilter);
+    return listPathsRecursivelyHelper(fs, Lists.<FileStatus>newArrayList(), fs.getFileStatus(path), fileFilter);
   }
 
   private static List<FileStatus> listPathsRecursivelyHelper(FileSystem fs, List<FileStatus> files,