You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/11/10 18:56:06 UTC

[2/2] incubator-beam git commit: [BEAM-896] FileBasedSink removes temp directory

[BEAM-896] FileBasedSink removes temp directory

Also fixes RAT plugin in pom.xml to ignore .idea
(found while running integration tests locally).


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/07a3c2c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/07a3c2c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/07a3c2c0

Branch: refs/heads/master
Commit: 07a3c2c06c80598faa9d33507ad6fbaa8b70e8c2
Parents: 11eaed1
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Nov 3 15:29:49 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Nov 10 10:56:00 2016 -0800

----------------------------------------------------------------------
 pom.xml                                         |   1 +
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 134 ++++++++++++-------
 .../beam/sdk/util/FileIOChannelFactory.java     |   3 +-
 .../apache/beam/sdk/io/FileBasedSinkTest.java   |  28 ++--
 .../org/apache/beam/sdk/io/XmlSinkTest.java     |   4 +-
 5 files changed, 104 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/07a3c2c0/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 749ca9c..22897e3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -987,6 +987,7 @@
               <exclude>**/hs_err_pid*.log</exclude>
               <exclude>.github/**/*</exclude>
               <exclude>**/*.iml</exclude>
+              <exclude>**/.idea/**/*</exclude>
               <exclude>**/package-list</exclude>
               <exclude>**/user.avsc</exclude>
               <exclude>**/test/resources/**/*.txt</exclude>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/07a3c2c0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 101ff61..e6c37de 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -38,6 +38,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.zip.GZIPOutputStream;
 
 import javax.annotation.Nullable;
@@ -79,6 +80,8 @@ import org.slf4j.LoggerFactory;
  * @param <T> the type of values written to the sink.
  */
 public abstract class FileBasedSink<T> extends Sink<T> {
+  private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class);
+
   /**
    * Directly supported file output compression types.
    */
@@ -262,11 +265,11 @@ public abstract class FileBasedSink<T> extends Sink<T> {
    * FileBasedSinkWriter.
    *
    * <h2>Temporary and Output File Naming:</h2> During the write, bundles are written to temporary
-   * files using the baseTemporaryFilename that can be provided via the constructor of
+   * files using the tempDirectory that can be provided via the constructor of
    * FileBasedWriteOperation. These temporary files will be named
-   * {@code {baseTemporaryFilename}-temp-{bundleId}}, where bundleId is the unique id of the bundle.
-   * For example, if baseTemporaryFilename is "gs://my-bucket/my_temp_output", the output for a
-   * bundle with bundle id 15723 will be "gs://my-bucket/my_temp_output-temp-15723".
+   * {@code {tempDirectory}/{bundleId}}, where bundleId is the unique id of the bundle.
+   * For example, if tempDirectory is "gs://my-bucket/my_temp_output", the output for a
+   * bundle with bundle id 15723 will be "gs://my-bucket/my_temp_output/15723".
    *
    * <p>Final output files are written to baseOutputFilename with the format
    * {@code {baseOutputFilename}-0000i-of-0000n.{extension}} where n is the total number of bundles
@@ -290,8 +293,6 @@ public abstract class FileBasedSink<T> extends Sink<T> {
    * @param <T> the type of values written to the sink.
    */
   public abstract static class FileBasedWriteOperation<T> extends WriteOperation<T, FileResult> {
-    private static final Logger LOG = LoggerFactory.getLogger(FileBasedWriteOperation.class);
-
     /**
      * Options for handling of temporary output files.
      */
@@ -310,27 +311,21 @@ public abstract class FileBasedSink<T> extends Sink<T> {
      */
     protected final TemporaryFileRetention temporaryFileRetention;
 
-    /**
-     * Base filename used for temporary output files. Default is the baseOutputFilename.
-     */
-    protected final String baseTemporaryFilename;
+    /** Directory for temporary output files. */
+    protected final String tempDirectory;
 
-    /**
-     * Build a temporary filename using the temporary filename separator with the given prefix and
-     * suffix.
-     */
-    protected static final String buildTemporaryFilename(String prefix, String suffix) {
-      try {
-        IOChannelFactory factory = IOChannelUtils.getFactory(prefix);
-        return factory.resolve(prefix, suffix);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
+    /** Constructs a temporary file path given the temporary directory and a filename. */
+    protected static String buildTemporaryFilename(String tempDirectory, String filename)
+        throws IOException {
+      return IOChannelUtils.getFactory(tempDirectory).resolve(tempDirectory, filename);
     }
 
     /**
-     * Construct a FileBasedWriteOperation using the same base filename for both temporary and
-     * output files.
+     * Constructs a FileBasedWriteOperation using the default strategy for generating a temporary
+     * directory from the base output filename.
+     *
+     * <p>Default is a uniquely named sibling of baseOutputFilename, e.g. if baseOutputFilename is
+     * /path/to/foo, the temporary directory will be /path/to/temp-beam-foo-$date.
      *
      * @param sink the FileBasedSink that will be used to configure this write operation.
      */
@@ -358,23 +353,23 @@ public abstract class FileBasedSink<T> extends Sink<T> {
      * Construct a FileBasedWriteOperation.
      *
      * @param sink the FileBasedSink that will be used to configure this write operation.
-     * @param baseTemporaryFilename the base filename to be used for temporary output files.
+     * @param tempDirectory the base directory to be used for temporary output files.
      */
-    public FileBasedWriteOperation(FileBasedSink<T> sink, String baseTemporaryFilename) {
-      this(sink, baseTemporaryFilename, TemporaryFileRetention.REMOVE);
+    public FileBasedWriteOperation(FileBasedSink<T> sink, String tempDirectory) {
+      this(sink, tempDirectory, TemporaryFileRetention.REMOVE);
     }
 
     /**
      * Create a new FileBasedWriteOperation.
      *
      * @param sink the FileBasedSink that will be used to configure this write operation.
-     * @param baseTemporaryFilename the base filename to be used for temporary output files.
+     * @param tempDirectory the base directory to be used for temporary output files.
      * @param temporaryFileRetention defines how temporary files are handled.
      */
-    public FileBasedWriteOperation(FileBasedSink<T> sink, String baseTemporaryFilename,
+    public FileBasedWriteOperation(FileBasedSink<T> sink, String tempDirectory,
         TemporaryFileRetention temporaryFileRetention) {
       this.sink = sink;
-      this.baseTemporaryFilename = baseTemporaryFilename;
+      this.tempDirectory = tempDirectory;
       this.temporaryFileRetention = temporaryFileRetention;
     }
 
@@ -422,7 +417,12 @@ public abstract class FileBasedSink<T> extends Sink<T> {
 
       // Optionally remove temporary files.
       if (temporaryFileRetention == TemporaryFileRetention.REMOVE) {
-        removeTemporaryFiles(options);
+        // We remove the entire temporary directory, rather than specifically removing the files
+        // from writerResults, because writerResults includes only successfully completed bundles,
+        // and we'd like to clean up the failed ones too.
+        // Note that due to GCS eventual consistency, matching files in the temp directory is also
+        // currently non-perfect and may fail to delete some files.
+        removeTemporaryFiles(files, options);
       }
     }
 
@@ -483,21 +483,18 @@ public abstract class FileBasedSink<T> extends Sink<T> {
     }
 
     /**
-     * Removes temporary output files. Uses the temporary filename to find files to remove.
+     * Removes temporary output files. Uses the temporary directory to find files to remove.
      *
      * <p>Can be called from subclasses that override {@link FileBasedWriteOperation#finalize}.
      * <b>Note:</b>If finalize is overridden and does <b>not</b> rename or otherwise finalize
      * temporary files, this method will remove them.
      */
-    protected final void removeTemporaryFiles(PipelineOptions options) throws IOException {
-      String pattern = buildTemporaryFilename(baseTemporaryFilename, "*");
-      LOG.debug("Finding temporary bundle output files matching {}.", pattern);
-      FileOperations fileOperations = FileOperationsFactory.getFileOperations(pattern, options);
-      IOChannelFactory factory = IOChannelUtils.getFactory(pattern);
-      Collection<String> matches = factory.match(pattern);
-      LOG.debug("{} temporary files matched {}", matches.size(), pattern);
-      LOG.debug("Removing {} files.", matches.size());
-      fileOperations.remove(matches);
+    protected final void removeTemporaryFiles(List<String> knownFiles, PipelineOptions options)
+        throws IOException {
+      LOG.debug("Removing temporary bundle output files in {}.", tempDirectory);
+      FileOperations fileOperations =
+          FileOperationsFactory.getFileOperations(tempDirectory, options);
+      fileOperations.removeDirectoryAndFiles(tempDirectory, knownFiles);
     }
 
     /**
@@ -542,9 +539,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
     private String id;
 
     /**
-     * The filename of the output bundle. Equal to the
-     * {@link FileBasedSink.FileBasedWriteOperation#TEMPORARY_FILENAME_SEPARATOR} and id appended to
-     * the baseName.
+     * The filename of the output bundle - $tempDirectory/$id.
      */
     private String filename;
 
@@ -597,7 +592,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
     public final void open(String uId) throws Exception {
       this.id = uId;
       filename = FileBasedWriteOperation.buildTemporaryFilename(
-          getWriteOperation().baseTemporaryFilename, uId);
+          getWriteOperation().tempDirectory, uId);
       LOG.debug("Opening {}.", filename);
       final WritableByteChannelFactory factory =
           getWriteOperation().getSink().writableByteChannelFactory;
@@ -682,7 +677,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
       if (factory instanceof GcsIOChannelFactory) {
         return new GcsOperations(options);
       } else if (factory instanceof FileIOChannelFactory) {
-        return new LocalFileOperations();
+        return new LocalFileOperations(factory);
       } else {
         throw new IOException("Unrecognized file system.");
       }
@@ -706,9 +701,16 @@ public abstract class FileBasedSink<T> extends Sink<T> {
      void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException;
 
     /**
-     * Remove a collection of files.
+     * Removes a directory and the files in it (but not subdirectories).
+     *
+     * <p>Additionally, to partially mitigate the effects of filesystems with eventually-consistent
+     * directory matching APIs, takes a list of files that are known to exist - i.e. removes the
+     * union of the known files and files that the filesystem says exist in the directory.
+     *
+     * <p>Assumes that, if directory listing had been strongly consistent, it would have matched
+     * all of knownFiles - i.e. on a strongly consistent filesystem, knownFiles can be ignored.
      */
-    void remove(Collection<String> filenames) throws IOException;
+    void removeDirectoryAndFiles(String directory, List<String> knownFiles) throws IOException;
   }
 
   /**
@@ -717,7 +719,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
   private static class GcsOperations implements FileOperations {
     private final GcsUtil gcsUtil;
 
-    public GcsOperations(PipelineOptions options) {
+    GcsOperations(PipelineOptions options) {
       gcsUtil = new GcsUtilFactory().create(options);
     }
 
@@ -727,8 +729,21 @@ public abstract class FileBasedSink<T> extends Sink<T> {
     }
 
     @Override
-    public void remove(Collection<String> filenames) throws IOException {
-      gcsUtil.remove(filenames);
+    public void removeDirectoryAndFiles(String directory, List<String> knownFiles)
+        throws IOException {
+      IOChannelFactory factory = IOChannelUtils.getFactory(directory);
+      Collection<String> matches = factory.match(directory + "/*");
+      Set<String> allMatches = new HashSet<>(matches);
+      allMatches.addAll(knownFiles);
+      LOG.debug(
+          "Removing {} temporary files found under {} ({} matched glob, {} additional known files)",
+          allMatches.size(),
+          directory,
+          matches.size(),
+          allMatches.size() - matches.size());
+      gcsUtil.remove(allMatches);
+      // No need to remove the directory itself: GCS doesn't have directories, so if the directory
+      // is empty, then it already doesn't exist.
     }
   }
 
@@ -738,6 +753,12 @@ public abstract class FileBasedSink<T> extends Sink<T> {
   private static class LocalFileOperations implements FileOperations {
     private static final Logger LOG = LoggerFactory.getLogger(LocalFileOperations.class);
 
+    private final IOChannelFactory factory;
+
+    LocalFileOperations(IOChannelFactory factory) {
+      this.factory = factory;
+    }
+
     @Override
     public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException {
       checkArgument(
@@ -769,11 +790,20 @@ public abstract class FileBasedSink<T> extends Sink<T> {
     }
 
     @Override
-    public void remove(Collection<String> filenames) throws IOException {
-      for (String filename : filenames) {
+    public void removeDirectoryAndFiles(String directory, List<String> knownFiles)
+        throws IOException {
+      if (!new File(directory).exists()) {
+        LOG.debug("Directory {} already doesn't exist", directory);
+        return;
+      }
+      Collection<String> matches = factory.match(new File(directory, "*").getAbsolutePath());
+      LOG.debug("Removing {} temporary files found under {}", matches.size(), directory);
+      for (String filename : matches) {
         LOG.debug("Removing file {}", filename);
         removeOne(filename);
       }
+      LOG.debug("Removing directory {}", directory);
+      removeOne(directory);
     }
 
     private void removeOne(String filename) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/07a3c2c0/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
index 2d2c0c6..b920efb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
@@ -73,7 +73,8 @@ public class FileIOChannelFactory implements IOChannelFactory {
 
     File parent = file.getAbsoluteFile().getParentFile();
     if (!parent.exists()) {
-      throw new IOException("Unable to find parent directory of " + spec);
+      throw new FileNotFoundException(
+          "Parent directory " + parent + " of " + spec + " does not exist");
     }
 
     // Method getAbsolutePath() on Windows platform may return something like

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/07a3c2c0/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index 8301afc..c1400da 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -40,6 +40,7 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.zip.GZIPInputStream;
 
@@ -67,7 +68,7 @@ public class FileBasedSinkTest {
   public TemporaryFolder tmpFolder = new TemporaryFolder();
 
   private String baseOutputFilename = "output";
-  private String baseTemporaryFilename = "temp";
+  private String tempDirectory = "temp";
 
   private String appendToTempFolder(String filename) {
     return Paths.get(tmpFolder.getRoot().getPath(), filename).toString();
@@ -77,8 +78,8 @@ public class FileBasedSinkTest {
     return appendToTempFolder(baseOutputFilename);
   }
 
-  private String getBaseTempFilename() {
-    return appendToTempFolder(baseTemporaryFilename);
+  private String getBaseTempDirectory() {
+    return appendToTempFolder(tempDirectory);
   }
 
   /**
@@ -89,7 +90,7 @@ public class FileBasedSinkTest {
   public void testWriter() throws Exception {
     String testUid = "testId";
     String expectedFilename =
-        getBaseTempFilename() + "/" + testUid;
+        getBaseTempDirectory() + "/" + testUid;
     SimpleSink.SimpleWriter writer = buildWriter();
 
     List<String> values = Arrays.asList("sympathetic vulture", "boresome hummingbird");
@@ -141,7 +142,7 @@ public class FileBasedSinkTest {
    */
   @Test
   public void testRemoveWithTempFilename() throws Exception {
-    testRemoveTemporaryFiles(3, baseTemporaryFilename);
+    testRemoveTemporaryFiles(3, tempDirectory);
   }
 
   /**
@@ -193,7 +194,8 @@ public class FileBasedSinkTest {
     runFinalize(writeOp, files, false);
 
     // create a temporary file
-    tmpFolder.newFile(baseTemporaryFilename + "/1");
+    tmpFolder.newFolder(tempDirectory);
+    tmpFolder.newFile(tempDirectory + "/1");
 
     runFinalize(writeOp, files, false);
   }
@@ -215,7 +217,7 @@ public class FileBasedSinkTest {
     List<File> temporaryFiles = new ArrayList<>();
     for (int i = 0; i < numFiles; i++) {
       String temporaryFilename =
-          FileBasedWriteOperation.buildTemporaryFilename(baseTemporaryFilename, "" + i);
+          FileBasedWriteOperation.buildTemporaryFilename(tempDirectory, "" + i);
       File tmpFile = new File(tmpFolder.getRoot(), temporaryFilename);
       tmpFile.getParentFile().mkdirs();
       assertTrue(tmpFile.createNewFile());
@@ -250,6 +252,10 @@ public class FileBasedSinkTest {
       assertTrue(outputFiles.get(i).exists());
       assertEquals(retainTemporaryFiles, temporaryFiles.get(i).exists());
     }
+
+    if (!retainTemporaryFiles) {
+      assertFalse(new File(writeOp.tempDirectory).exists());
+    }
   }
 
   /**
@@ -273,7 +279,7 @@ public class FileBasedSinkTest {
       outputFiles.add(outputFile);
     }
 
-    writeOp.removeTemporaryFiles(options);
+    writeOp.removeTemporaryFiles(Collections.<String>emptyList(), options);
 
     for (int i = 0; i < numFiles; i++) {
       assertFalse(temporaryFiles.get(i).exists());
@@ -505,7 +511,7 @@ public class FileBasedSinkTest {
     final FileBasedWriter<String> writer =
         writeOp.createWriter(null);
     final String expectedFilename =
-        writeOp.baseTemporaryFilename + "/" + testUid;
+        writeOp.tempDirectory + "/" + testUid;
 
     final List<String> expected = new ArrayList<>();
     expected.add("header");
@@ -617,7 +623,7 @@ public class FileBasedSinkTest {
   private SimpleSink.SimpleWriteOperation buildWriteOperation(
       TemporaryFileRetention fileRetention) {
     SimpleSink sink = buildSink();
-    return new SimpleSink.SimpleWriteOperation(sink, getBaseTempFilename(), fileRetention);
+    return new SimpleSink.SimpleWriteOperation(sink, getBaseTempDirectory(), fileRetention);
   }
 
   /**
@@ -634,7 +640,7 @@ public class FileBasedSinkTest {
   private SimpleSink.SimpleWriteOperation buildWriteOperation() {
     SimpleSink sink = buildSink();
     return new SimpleSink.SimpleWriteOperation(
-        sink, getBaseTempFilename(), TemporaryFileRetention.REMOVE);
+        sink, getBaseTempDirectory(), TemporaryFileRetention.REMOVE);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/07a3c2c0/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
index 653a9d0..400b04a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
@@ -146,7 +146,7 @@ public class XmlSinkTest {
     assertEquals(testRootElement, writeOp.getSink().rootElementName);
     assertEquals(XmlSink.XML_EXTENSION, writeOp.getSink().extension);
     Path outputPath = new File(testFilePrefix).toPath();
-    Path tempPath = new File(writeOp.baseTemporaryFilename).toPath();
+    Path tempPath = new File(writeOp.tempDirectory).toPath();
     assertEquals(outputPath.getParent(), tempPath.getParent());
     assertThat(
         tempPath.getFileName().toString(), containsString("temp-beam-" + outputPath.getFileName()));
@@ -163,7 +163,7 @@ public class XmlSinkTest {
             .createWriteOperation(options);
     XmlWriter<Bird> writer = writeOp.createWriter(options);
     Path outputPath = new File(testFilePrefix).toPath();
-    Path tempPath = new File(writer.getWriteOperation().baseTemporaryFilename).toPath();
+    Path tempPath = new File(writer.getWriteOperation().tempDirectory).toPath();
     assertEquals(outputPath.getParent(), tempPath.getParent());
     assertThat(
         tempPath.getFileName().toString(), containsString("temp-beam-" + outputPath.getFileName()));