You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ib...@apache.org on 2017/08/09 18:04:22 UTC

incubator-gobblin git commit: [GOBBLIN-183] Gobblin data management copy empty directories

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 3ab02b3f2 -> 42677dc8c


[GOBBLIN-183] Gobblin data management copy empty directories

Closes #2036 from zxcware/empty


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/42677dc8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/42677dc8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/42677dc8

Branch: refs/heads/master
Commit: 42677dc8ccd32262006f7d77c4d40c7735ed71e5
Parents: 3ab02b3
Author: zhchen <zh...@linkedin.com>
Authored: Wed Aug 9 11:04:03 2017 -0700
Committer: Issac Buenrostro <ib...@apache.org>
Committed: Wed Aug 9 11:04:03 2017 -0700

----------------------------------------------------------------------
 .../data/management/copy/CopyConfiguration.java |  5 ++
 .../data/management/copy/CopyableFile.java      |  2 +-
 .../copy/RecursiveCopyableDataset.java          |  9 +++-
 .../management/copy/RecursivePathFinder.java    |  9 +++-
 .../FileAwareInputStreamExtractor.java          |  4 ++
 .../writer/FileAwareInputStreamDataWriter.java  |  5 ++
 .../org/apache/gobblin/util/FileListUtils.java  | 50 +++++++++++++++++---
 .../gobblin/util/io/EmptyInputStream.java       | 36 ++++++++++++++
 .../apache/gobblin/util/FileListUtilsTest.java  | 47 ++++++++++++++++++
 9 files changed, 156 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42677dc8/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
index b8f0365..82cca49 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
@@ -51,6 +51,11 @@ public class CopyConfiguration {
   public static final String DESTINATION_GROUP_KEY = COPY_PREFIX + ".dataset.destination.group";
   public static final String PRIORITIZATION_PREFIX = COPY_PREFIX + ".prioritization";
 
+  /**
+   * Include empty directories in the source for copy
+   */
+  public static final String INCLUDE_EMPTY_DIRECTORIES = COPY_PREFIX + ".includeEmptyDirectories";
+
   public static final String PRIORITIZER_ALIAS_KEY = PRIORITIZATION_PREFIX + ".prioritizerAlias";
   public static final String MAX_COPY_PREFIX = PRIORITIZATION_PREFIX + ".maxCopy";
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42677dc8/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
index f2cb933..9c729e3 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
@@ -203,7 +203,7 @@ public class CopyableFile extends CopyEntity implements File {
             this.configuration.getTargetFs(), this.destination);
       }
       if (this.checksum == null) {
-        FileChecksum checksumTmp = this.originFs.getFileChecksum(this.origin.getPath());
+        FileChecksum checksumTmp = this.origin.isDirectory() ? null : this.originFs.getFileChecksum(this.origin.getPath());
         this.checksum = checksumTmp == null ? new byte[0] : checksumTmp.getBytes();
       }
       if (this.fileSet == null) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42677dc8/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
index e24752a..34428f1 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
@@ -65,7 +65,12 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
   private final CopyableFileFilter copyableFileFilter;
   private final boolean update;
   private final boolean delete;
+
+  // Include empty directories in the source for copy
+  private final boolean includeEmptyDirectories;
+  // Delete empty directories in the destination
   private final boolean deleteEmptyDirectories;
+
   private final Properties properties;
 
   public RecursiveCopyableDataset(final FileSystem fs, Path rootPath, Properties properties, Path glob) {
@@ -80,6 +85,8 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
     this.update = Boolean.parseBoolean(properties.getProperty(UPDATE_KEY));
     this.delete = Boolean.parseBoolean(properties.getProperty(DELETE_KEY));
     this.deleteEmptyDirectories = Boolean.parseBoolean(properties.getProperty(DELETE_EMPTY_DIRECTORIES_KEY));
+    this.includeEmptyDirectories =
+        Boolean.parseBoolean(properties.getProperty(CopyConfiguration.INCLUDE_EMPTY_DIRECTORIES));
     this.properties = properties;
   }
 
@@ -150,7 +157,7 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
   @VisibleForTesting
   protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, PathFilter fileFilter) throws IOException {
     try {
-      return FileListUtils.listFilesRecursively(fs, path, fileFilter);
+      return FileListUtils.listFilesToCopyAtPath(fs, path, fileFilter, includeEmptyDirectories);
     } catch (FileNotFoundException fnfe) {
       return Lists.newArrayList();
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42677dc8/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursivePathFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursivePathFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursivePathFinder.java
index ec608b2..b749996 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursivePathFinder.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursivePathFinder.java
@@ -46,22 +46,27 @@ public class RecursivePathFinder {
   private final Path rootPath;
   private final FileSystem fs;
   private final PathFilter pathFilter;
+  private final boolean includeEmptyDirectories;
 
   public RecursivePathFinder(final FileSystem fs, Path rootPath, Properties properties) {
     this.rootPath = PathUtils.getPathWithoutSchemeAndAuthority(rootPath);
     this.fs = fs;
 
     this.pathFilter = DatasetUtils.instantiatePathFilter(properties);
+    this.includeEmptyDirectories =
+        Boolean.parseBoolean(properties.getProperty(CopyConfiguration.INCLUDE_EMPTY_DIRECTORIES));
   }
 
-  public Set<FileStatus> getPaths(boolean skipHiddenPaths) throws IOException {
+  public Set<FileStatus> getPaths(boolean skipHiddenPaths)
+      throws IOException {
 
     if (!this.fs.exists(this.rootPath)) {
       return Sets.newHashSet();
     }
     PathFilter actualFilter =
         skipHiddenPaths ? new AndPathFilter(new HiddenFilter(), this.pathFilter) : this.pathFilter;
-    List<FileStatus> files = FileListUtils.listFilesRecursively(this.fs, this.rootPath, actualFilter);
+    List<FileStatus> files =
+        FileListUtils.listFilesToCopyAtPath(this.fs, this.rootPath, actualFilter, includeEmptyDirectories);
 
     return Sets.newHashSet(files);
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42677dc8/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java
index 5e53344..406c828 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java
@@ -23,6 +23,7 @@ import org.apache.gobblin.data.management.copy.FileAwareInputStream;
 import org.apache.gobblin.source.extractor.DataRecordException;
 import org.apache.gobblin.source.extractor.Extractor;
 import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.io.EmptyInputStream;
 import org.apache.gobblin.util.io.MeteredInputStream;
 
 import java.io.IOException;
@@ -80,6 +81,9 @@ public class FileAwareInputStreamExtractor implements Extractor<String, FileAwar
           this.state == null ? HadoopUtils.newConfiguration() : HadoopUtils.getConfFromState(this.state);
       FileSystem fsFromFile = this.file.getOrigin().getPath().getFileSystem(conf);
       this.recordRead = true;
+      if (this.file.getFileStatus().isDirectory()) {
+        return new FileAwareInputStream(this.file, EmptyInputStream.instance);
+      }
       return new FileAwareInputStream(this.file,
           MeteredInputStream.builder().in(fsFromFile.open(this.file.getFileStatus().getPath())).build());
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42677dc8/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
index 3ce204d..905ac02 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
@@ -206,6 +206,11 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA
       log.info(String.format("Recovering persisted file %s to %s.", persistedFile.get().getPath(), writeAt));
       this.fs.rename(persistedFile.get().getPath(), writeAt);
     } else {
+      // Copy empty directories
+      if (copyableFile.getFileStatus().isDirectory()) {
+        this.fs.mkdirs(writeAt);
+        return;
+      }
 
       OutputStream os =
           this.fs.create(writeAt, true, this.fs.getConf().getInt("io.file.buffer.size", 4096), replication, blockSize);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42677dc8/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java
index 9472db6..02920c2 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java
@@ -69,12 +69,35 @@ public class FileListUtils {
   }
 
   /**
+   * Given a path to copy, list all files rooted at the given path to copy
+   *
+   * @param fs the file system of the path
+   * @param path root path to copy
+   * @param fileFilter a filter only applied to root
+   * @param includeEmptyDirectories a control to include empty directories for copy
+   */
+  public static List<FileStatus> listFilesToCopyAtPath(FileSystem fs, Path path, PathFilter fileFilter,
+      boolean includeEmptyDirectories)
+      throws IOException {
+    List<FileStatus> files = Lists.newArrayList();
+    FileStatus rootFile = fs.getFileStatus(path);
+
+    listFilesRecursivelyHelper(fs, files, rootFile, fileFilter, false, includeEmptyDirectories);
+
+    // Copy the empty root directory
+    if (files.size() == 0 && rootFile.isDirectory() && includeEmptyDirectories) {
+      files.add(rootFile);
+    }
+    return files;
+  }
+
+  /**
    * Helper method to list out all files under a specified path. The specified {@link PathFilter} is treated as a file
    * filter, that is it is only applied to file {@link Path}s.
    */
   public static List<FileStatus> listFilesRecursively(FileSystem fs, Path path, PathFilter fileFilter)
       throws IOException {
-    return listFilesRecursivelyHelper(fs, Lists.<FileStatus> newArrayList(), fs.getFileStatus(path), fileFilter, false);
+    return listFilesRecursively(fs, path, fileFilter, false);
   }
 
   /**
@@ -83,20 +106,33 @@ public class FileListUtils {
    */
   public static List<FileStatus> listFilesRecursively(FileSystem fs, Path path, PathFilter fileFilter,
       boolean applyFilterToDirectories) throws IOException {
-    return listFilesRecursivelyHelper(fs, Lists.<FileStatus> newArrayList(), fs.getFileStatus(path), fileFilter,
-        applyFilterToDirectories);
+    return listFilesRecursivelyHelper(fs, Lists.newArrayList(), fs.getFileStatus(path), fileFilter,
+        applyFilterToDirectories, false);
   }
 
   private static List<FileStatus> listFilesRecursivelyHelper(FileSystem fs, List<FileStatus> files,
-      FileStatus fileStatus, PathFilter fileFilter, boolean applyFilterToDirectories)
+      FileStatus fileStatus, PathFilter fileFilter, boolean applyFilterToDirectories, boolean includeEmptyDirectories)
       throws FileNotFoundException, IOException {
     if (fileStatus.isDirectory()) {
       for (FileStatus status : fs.listStatus(fileStatus.getPath(),
           applyFilterToDirectories ? fileFilter : NO_OP_PATH_FILTER)) {
-        if (fileStatus.isDirectory()) {
-          listFilesRecursivelyHelper(fs, files, status, fileFilter, applyFilterToDirectories);
+        if (status.isDirectory()) {
+          // Number of files collected before diving into the directory
+          int numFilesBefore = files.size();
+
+          listFilesRecursivelyHelper(fs, files, status, fileFilter, applyFilterToDirectories, includeEmptyDirectories);
+
+          // Number of files collected after diving into the directory
+          int numFilesAfter = files.size();
+          if (numFilesAfter == numFilesBefore && includeEmptyDirectories) {
+            /*
+             * This is effectively an empty directory, which needs explicit copying. Has there any data file
+             * in the directory, the directory would be created as a side-effect of copying the data file
+             */
+            files.add(status);
+          }
         } else {
-          files.add(fileStatus);
+          files.add(status);
         }
       }
     } else if (fileFilter.accept(fileStatus.getPath())) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42677dc8/gobblin-utility/src/main/java/org/apache/gobblin/util/io/EmptyInputStream.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/io/EmptyInputStream.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/io/EmptyInputStream.java
new file mode 100644
index 0000000..3516866
--- /dev/null
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/io/EmptyInputStream.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+
+/**
+ * An {@link InputStream} with empty content
+ */
+public class EmptyInputStream extends InputStream {
+  public static final InputStream instance = new EmptyInputStream();
+
+  private EmptyInputStream() {}
+  @Override
+  public int read()
+      throws IOException {
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42677dc8/gobblin-utility/src/test/java/org/apache/gobblin/util/FileListUtilsTest.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/FileListUtilsTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/FileListUtilsTest.java
index 415ae54..388e311 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/FileListUtilsTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/FileListUtilsTest.java
@@ -146,4 +146,51 @@ public class FileListUtilsTest {
     }
   }
 
+  public void testListFilesToCopyAtPath() throws IOException {
+    FileSystem localFs = FileSystem.getLocal(new Configuration());
+    Path baseDir = new Path(FILE_UTILS_TEST_DIR, "fileListTestDir4");
+    try {
+      if (localFs.exists(baseDir)) {
+        localFs.delete(baseDir, true);
+      }
+      localFs.mkdirs(baseDir);
+
+      // Empty root directory
+      List<FileStatus> testFiles = FileListUtils.listFilesToCopyAtPath(localFs, baseDir, FileListUtils.NO_OP_PATH_FILTER, true);
+      Assert.assertEquals(testFiles.size(), 1);
+      Assert.assertEquals(testFiles.get(0).getPath().getName(), baseDir.getName());
+
+      // With an empty sub directory
+      Path subDir = new Path(baseDir, "subDir");
+      localFs.mkdirs(subDir);
+      testFiles = FileListUtils.listFilesToCopyAtPath(localFs, baseDir, FileListUtils.NO_OP_PATH_FILTER, true);
+      Assert.assertEquals(testFiles.size(), 1);
+      Assert.assertEquals(testFiles.get(0).getPath().getName(), subDir.getName());
+
+      // Disable include empty directories
+      testFiles = FileListUtils.listFilesToCopyAtPath(localFs, baseDir, FileListUtils.NO_OP_PATH_FILTER, false);
+      Assert.assertEquals(testFiles.size(), 0);
+
+      // With file subDir/tes1
+      Path test1Path = new Path(subDir, TEST_FILE_NAME1);
+      localFs.create(test1Path);
+      testFiles = FileListUtils.listFilesToCopyAtPath(localFs, baseDir, FileListUtils.NO_OP_PATH_FILTER, true);
+      Assert.assertEquals(testFiles.size(), 1);
+      Assert.assertEquals(testFiles.get(0).getPath().getName(), test1Path.getName());
+
+      // With file subDir/test2
+      Path test2Path = new Path(subDir, TEST_FILE_NAME2);
+      localFs.create(test2Path);
+      testFiles = FileListUtils.listFilesToCopyAtPath(localFs, baseDir, FileListUtils.NO_OP_PATH_FILTER, true);
+      Assert.assertEquals(testFiles.size(), 2);
+      Set<String> fileNames = Sets.newHashSet();
+      for (FileStatus testFileStatus : testFiles) {
+        fileNames.add(testFileStatus.getPath().getName());
+      }
+      Assert.assertTrue(fileNames.contains(TEST_FILE_NAME1) && fileNames.contains(TEST_FILE_NAME2));
+    } finally {
+      localFs.delete(baseDir, true);
+    }
+  }
+
 }