You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/11/11 23:30:09 UTC

[1/2] incubator-beam git commit: [BEAM-954] FileBasedSink: remove unused code of TemporaryFileRetention.

Repository: incubator-beam
Updated Branches:
  refs/heads/master 821923334 -> 6814a99c2


[BEAM-954] FileBasedSink: remove unused code of TemporaryFileRetention.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2a151127
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2a151127
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2a151127

Branch: refs/heads/master
Commit: 2a151127f04733e6a1f87914901ae6b88c329935
Parents: 8219233
Author: Pei He <pe...@google.com>
Authored: Wed Nov 9 20:12:24 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Nov 11 15:30:03 2016 -0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 49 ++------------
 .../apache/beam/sdk/io/FileBasedSinkTest.java   | 67 ++++----------------
 2 files changed, 21 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2a151127/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index e6c37de..2d058ae 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -279,12 +279,6 @@ public abstract class FileBasedSink<T> extends Sink<T> {
    * <p>Subclass implementations can change the file naming template by supplying a value for
    * {@link FileBasedSink#fileNamingTemplate}.
    *
-   * <h2>Temporary Bundle File Handling:</h2>
-   *
-   * <p>{@link FileBasedSink.FileBasedWriteOperation#temporaryFileRetention} controls the behavior
-   * for managing temporary files. By default, temporary files will be removed. Subclasses can
-   * provide a different value to the constructor.
-   *
    * <p>Note that in the case of permanent failure of a bundle's write, no clean up of temporary
    * files will occur.
    *
@@ -294,23 +288,10 @@ public abstract class FileBasedSink<T> extends Sink<T> {
    */
   public abstract static class FileBasedWriteOperation<T> extends WriteOperation<T, FileResult> {
     /**
-     * Options for handling of temporary output files.
-     */
-    public enum TemporaryFileRetention {
-      KEEP,
-      REMOVE
-    }
-
-    /**
      * The Sink that this WriteOperation will write to.
      */
     protected final FileBasedSink<T> sink;
 
-    /**
-     * Option to keep or remove temporary output files.
-     */
-    protected final TemporaryFileRetention temporaryFileRetention;
-
     /** Directory for temporary output files. */
     protected final String tempDirectory;
 
@@ -350,27 +331,14 @@ public abstract class FileBasedSink<T> extends Sink<T> {
     }
 
     /**
-     * Construct a FileBasedWriteOperation.
-     *
-     * @param sink the FileBasedSink that will be used to configure this write operation.
-     * @param tempDirectory the base directory to be used for temporary output files.
-     */
-    public FileBasedWriteOperation(FileBasedSink<T> sink, String tempDirectory) {
-      this(sink, tempDirectory, TemporaryFileRetention.REMOVE);
-    }
-
-    /**
      * Create a new FileBasedWriteOperation.
      *
      * @param sink the FileBasedSink that will be used to configure this write operation.
      * @param tempDirectory the base directory to be used for temporary output files.
-     * @param temporaryFileRetention defines how temporary files are handled.
      */
-    public FileBasedWriteOperation(FileBasedSink<T> sink, String tempDirectory,
-        TemporaryFileRetention temporaryFileRetention) {
+    public FileBasedWriteOperation(FileBasedSink<T> sink, String tempDirectory) {
       this.sink = sink;
       this.tempDirectory = tempDirectory;
-      this.temporaryFileRetention = temporaryFileRetention;
     }
 
     /**
@@ -415,15 +383,12 @@ public abstract class FileBasedSink<T> extends Sink<T> {
       }
       copyToOutputFiles(files, options);
 
-      // Optionally remove temporary files.
-      if (temporaryFileRetention == TemporaryFileRetention.REMOVE) {
-        // We remove the entire temporary directory, rather than specifically removing the files
-        // from writerResults, because writerResults includes only successfully completed bundles,
-        // and we'd like to clean up the failed ones too.
-        // Note that due to GCS eventual consistency, matching files in the temp directory is also
-        // currently non-perfect and may fail to delete some files.
-        removeTemporaryFiles(files, options);
-      }
+      // We remove the entire temporary directory, rather than specifically removing the files
+      // from writerResults, because writerResults includes only successfully completed bundles,
+      // and we'd like to clean up the failed ones too.
+      // Note that due to GCS eventual consistency, matching files in the temp directory is also
+      // currently non-perfect and may fail to delete some files.
+      removeTemporaryFiles(files, options);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2a151127/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index c1400da..4ab3843 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -46,7 +46,6 @@ import java.util.zip.GZIPInputStream;
 
 import org.apache.beam.sdk.io.FileBasedSink.CompressionType;
 import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation;
-import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation.TemporaryFileRetention;
 import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter;
 import org.apache.beam.sdk.io.FileBasedSink.FileResult;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
@@ -157,20 +156,9 @@ public class FileBasedSinkTest {
    * Finalize copies temporary files to output files and removes any temporary files.
    */
   @Test
-  public void testFinalizeWithNoRetention() throws Exception {
+  public void testFinalize() throws Exception {
     List<File> files = generateTemporaryFilesForFinalize(3);
-    boolean retainTemporaryFiles = false;
-    runFinalize(buildWriteOperationForFinalize(retainTemporaryFiles), files, retainTemporaryFiles);
-  }
-
-  /**
-   * Finalize retains temporary files when requested.
-   */
-  @Test
-  public void testFinalizeWithRetention() throws Exception {
-    List<File> files = generateTemporaryFilesForFinalize(3);
-    boolean retainTemporaryFiles = true;
-    runFinalize(buildWriteOperationForFinalize(retainTemporaryFiles), files, retainTemporaryFiles);
+    runFinalize(buildWriteOperation(), files);
   }
 
   /**
@@ -179,9 +167,9 @@ public class FileBasedSinkTest {
   @Test
   public void testFinalizeMultipleCalls() throws Exception {
     List<File> files = generateTemporaryFilesForFinalize(3);
-    SimpleSink.SimpleWriteOperation writeOp = buildWriteOperationForFinalize(false);
-    runFinalize(writeOp, files, false);
-    runFinalize(writeOp, files, false);
+    SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
+    runFinalize(writeOp, files);
+    runFinalize(writeOp, files);
   }
 
   /**
@@ -190,24 +178,14 @@ public class FileBasedSinkTest {
   @Test
   public void testFinalizeWithIntermediateState() throws Exception {
     List<File> files = generateTemporaryFilesForFinalize(3);
-    SimpleSink.SimpleWriteOperation writeOp = buildWriteOperationForFinalize(false);
-    runFinalize(writeOp, files, false);
+    SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
+    runFinalize(writeOp, files);
 
     // create a temporary file
     tmpFolder.newFolder(tempDirectory);
     tmpFolder.newFile(tempDirectory + "/1");
 
-    runFinalize(writeOp, files, false);
-  }
-
-  /**
-   * Build a SimpleWriteOperation with default values and the specified retention policy.
-   */
-  private SimpleSink.SimpleWriteOperation buildWriteOperationForFinalize(
-      boolean retainTemporaryFiles) throws Exception {
-    TemporaryFileRetention retentionPolicy =
-        retainTemporaryFiles ? TemporaryFileRetention.KEEP : TemporaryFileRetention.REMOVE;
-    return buildWriteOperation(retentionPolicy);
+    runFinalize(writeOp, files);
   }
 
   /**
@@ -230,8 +208,8 @@ public class FileBasedSinkTest {
   /**
    * Finalize and verify that files are copied and temporary files are optionally removed.
    */
-  private void runFinalize(SimpleSink.SimpleWriteOperation writeOp, List<File> temporaryFiles,
-      boolean retainTemporaryFiles) throws Exception {
+  private void runFinalize(SimpleSink.SimpleWriteOperation writeOp, List<File> temporaryFiles)
+      throws Exception {
     PipelineOptions options = PipelineOptionsFactory.create();
 
     int numFiles = temporaryFiles.size();
@@ -250,12 +228,10 @@ public class FileBasedSinkTest {
 
     for (int i = 0; i < numFiles; i++) {
       assertTrue(outputFiles.get(i).exists());
-      assertEquals(retainTemporaryFiles, temporaryFiles.get(i).exists());
+      assertFalse(temporaryFiles.get(i).exists());
     }
 
-    if (!retainTemporaryFiles) {
-      assertFalse(new File(writeOp.tempDirectory).exists());
-    }
+    assertFalse(new File(writeOp.tempDirectory).exists());
   }
 
   /**
@@ -555,11 +531,6 @@ public class FileBasedSinkTest {
     }
 
     private static final class SimpleWriteOperation extends FileBasedWriteOperation<String> {
-      public SimpleWriteOperation(
-          SimpleSink sink, String tempOutputFilename, TemporaryFileRetention retentionPolicy) {
-        super(sink, tempOutputFilename, retentionPolicy);
-      }
-
       public SimpleWriteOperation(SimpleSink sink, String tempOutputFilename) {
         super(sink, tempOutputFilename);
       }
@@ -618,15 +589,6 @@ public class FileBasedSinkTest {
   }
 
   /**
-   * Build a SimpleWriteOperation with default options and the given file retention policy.
-   */
-  private SimpleSink.SimpleWriteOperation buildWriteOperation(
-      TemporaryFileRetention fileRetention) {
-    SimpleSink sink = buildSink();
-    return new SimpleSink.SimpleWriteOperation(sink, getBaseTempDirectory(), fileRetention);
-  }
-
-  /**
    * Build a SimpleWriteOperation with default options and the given base temporary filename.
    */
   private SimpleSink.SimpleWriteOperation buildWriteOperation(String baseTemporaryFilename) {
@@ -639,15 +601,14 @@ public class FileBasedSinkTest {
    */
   private SimpleSink.SimpleWriteOperation buildWriteOperation() {
     SimpleSink sink = buildSink();
-    return new SimpleSink.SimpleWriteOperation(
-        sink, getBaseTempDirectory(), TemporaryFileRetention.REMOVE);
+    return new SimpleSink.SimpleWriteOperation(sink, getBaseTempDirectory());
   }
 
   /**
    * Build a writer with the default options for its parent write operation and sink.
    */
   private SimpleSink.SimpleWriter buildWriter() {
-    SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation(TemporaryFileRetention.REMOVE);
+    SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
     return new SimpleSink.SimpleWriter(writeOp);
   }
 }


[2/2] incubator-beam git commit: Closes #1331

Posted by dh...@apache.org.
Closes #1331


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6814a99c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6814a99c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6814a99c

Branch: refs/heads/master
Commit: 6814a99c22264e3c45864d7deb237108b1bd27d2
Parents: 8219233 2a15112
Author: Dan Halperin <dh...@google.com>
Authored: Fri Nov 11 15:30:04 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Nov 11 15:30:04 2016 -0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 49 ++------------
 .../apache/beam/sdk/io/FileBasedSinkTest.java   | 67 ++++----------------
 2 files changed, 21 insertions(+), 95 deletions(-)
----------------------------------------------------------------------