You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/11/11 23:30:09 UTC
[1/2] incubator-beam git commit: [BEAM-954] FileBasedSink: remove
unused code of TemporaryFileRetention.
Repository: incubator-beam
Updated Branches:
refs/heads/master 821923334 -> 6814a99c2
[BEAM-954] FileBasedSink: remove unused code of TemporaryFileRetention.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2a151127
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2a151127
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2a151127
Branch: refs/heads/master
Commit: 2a151127f04733e6a1f87914901ae6b88c329935
Parents: 8219233
Author: Pei He <pe...@google.com>
Authored: Wed Nov 9 20:12:24 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Nov 11 15:30:03 2016 -0800
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/FileBasedSink.java | 49 ++------------
.../apache/beam/sdk/io/FileBasedSinkTest.java | 67 ++++----------------
2 files changed, 21 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2a151127/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index e6c37de..2d058ae 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -279,12 +279,6 @@ public abstract class FileBasedSink<T> extends Sink<T> {
* <p>Subclass implementations can change the file naming template by supplying a value for
* {@link FileBasedSink#fileNamingTemplate}.
*
- * <h2>Temporary Bundle File Handling:</h2>
- *
- * <p>{@link FileBasedSink.FileBasedWriteOperation#temporaryFileRetention} controls the behavior
- * for managing temporary files. By default, temporary files will be removed. Subclasses can
- * provide a different value to the constructor.
- *
* <p>Note that in the case of permanent failure of a bundle's write, no clean up of temporary
* files will occur.
*
@@ -294,23 +288,10 @@ public abstract class FileBasedSink<T> extends Sink<T> {
*/
public abstract static class FileBasedWriteOperation<T> extends WriteOperation<T, FileResult> {
/**
- * Options for handling of temporary output files.
- */
- public enum TemporaryFileRetention {
- KEEP,
- REMOVE
- }
-
- /**
* The Sink that this WriteOperation will write to.
*/
protected final FileBasedSink<T> sink;
- /**
- * Option to keep or remove temporary output files.
- */
- protected final TemporaryFileRetention temporaryFileRetention;
-
/** Directory for temporary output files. */
protected final String tempDirectory;
@@ -350,27 +331,14 @@ public abstract class FileBasedSink<T> extends Sink<T> {
}
/**
- * Construct a FileBasedWriteOperation.
- *
- * @param sink the FileBasedSink that will be used to configure this write operation.
- * @param tempDirectory the base directory to be used for temporary output files.
- */
- public FileBasedWriteOperation(FileBasedSink<T> sink, String tempDirectory) {
- this(sink, tempDirectory, TemporaryFileRetention.REMOVE);
- }
-
- /**
* Create a new FileBasedWriteOperation.
*
* @param sink the FileBasedSink that will be used to configure this write operation.
* @param tempDirectory the base directory to be used for temporary output files.
- * @param temporaryFileRetention defines how temporary files are handled.
*/
- public FileBasedWriteOperation(FileBasedSink<T> sink, String tempDirectory,
- TemporaryFileRetention temporaryFileRetention) {
+ public FileBasedWriteOperation(FileBasedSink<T> sink, String tempDirectory) {
this.sink = sink;
this.tempDirectory = tempDirectory;
- this.temporaryFileRetention = temporaryFileRetention;
}
/**
@@ -415,15 +383,12 @@ public abstract class FileBasedSink<T> extends Sink<T> {
}
copyToOutputFiles(files, options);
- // Optionally remove temporary files.
- if (temporaryFileRetention == TemporaryFileRetention.REMOVE) {
- // We remove the entire temporary directory, rather than specifically removing the files
- // from writerResults, because writerResults includes only successfully completed bundles,
- // and we'd like to clean up the failed ones too.
- // Note that due to GCS eventual consistency, matching files in the temp directory is also
- // currently non-perfect and may fail to delete some files.
- removeTemporaryFiles(files, options);
- }
+ // We remove the entire temporary directory, rather than specifically removing the files
+ // from writerResults, because writerResults includes only successfully completed bundles,
+ // and we'd like to clean up the failed ones too.
+ // Note that due to GCS eventual consistency, matching files in the temp directory is also
+ // currently non-perfect and may fail to delete some files.
+ removeTemporaryFiles(files, options);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2a151127/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index c1400da..4ab3843 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -46,7 +46,6 @@ import java.util.zip.GZIPInputStream;
import org.apache.beam.sdk.io.FileBasedSink.CompressionType;
import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation;
-import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation.TemporaryFileRetention;
import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter;
import org.apache.beam.sdk.io.FileBasedSink.FileResult;
import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
@@ -157,20 +156,9 @@ public class FileBasedSinkTest {
* Finalize copies temporary files to output files and removes any temporary files.
*/
@Test
- public void testFinalizeWithNoRetention() throws Exception {
+ public void testFinalize() throws Exception {
List<File> files = generateTemporaryFilesForFinalize(3);
- boolean retainTemporaryFiles = false;
- runFinalize(buildWriteOperationForFinalize(retainTemporaryFiles), files, retainTemporaryFiles);
- }
-
- /**
- * Finalize retains temporary files when requested.
- */
- @Test
- public void testFinalizeWithRetention() throws Exception {
- List<File> files = generateTemporaryFilesForFinalize(3);
- boolean retainTemporaryFiles = true;
- runFinalize(buildWriteOperationForFinalize(retainTemporaryFiles), files, retainTemporaryFiles);
+ runFinalize(buildWriteOperation(), files);
}
/**
@@ -179,9 +167,9 @@ public class FileBasedSinkTest {
@Test
public void testFinalizeMultipleCalls() throws Exception {
List<File> files = generateTemporaryFilesForFinalize(3);
- SimpleSink.SimpleWriteOperation writeOp = buildWriteOperationForFinalize(false);
- runFinalize(writeOp, files, false);
- runFinalize(writeOp, files, false);
+ SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
+ runFinalize(writeOp, files);
+ runFinalize(writeOp, files);
}
/**
@@ -190,24 +178,14 @@ public class FileBasedSinkTest {
@Test
public void testFinalizeWithIntermediateState() throws Exception {
List<File> files = generateTemporaryFilesForFinalize(3);
- SimpleSink.SimpleWriteOperation writeOp = buildWriteOperationForFinalize(false);
- runFinalize(writeOp, files, false);
+ SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
+ runFinalize(writeOp, files);
// create a temporary file
tmpFolder.newFolder(tempDirectory);
tmpFolder.newFile(tempDirectory + "/1");
- runFinalize(writeOp, files, false);
- }
-
- /**
- * Build a SimpleWriteOperation with default values and the specified retention policy.
- */
- private SimpleSink.SimpleWriteOperation buildWriteOperationForFinalize(
- boolean retainTemporaryFiles) throws Exception {
- TemporaryFileRetention retentionPolicy =
- retainTemporaryFiles ? TemporaryFileRetention.KEEP : TemporaryFileRetention.REMOVE;
- return buildWriteOperation(retentionPolicy);
+ runFinalize(writeOp, files);
}
/**
@@ -230,8 +208,8 @@ public class FileBasedSinkTest {
/**
* Finalize and verify that files are copied and temporary files are optionally removed.
*/
- private void runFinalize(SimpleSink.SimpleWriteOperation writeOp, List<File> temporaryFiles,
- boolean retainTemporaryFiles) throws Exception {
+ private void runFinalize(SimpleSink.SimpleWriteOperation writeOp, List<File> temporaryFiles)
+ throws Exception {
PipelineOptions options = PipelineOptionsFactory.create();
int numFiles = temporaryFiles.size();
@@ -250,12 +228,10 @@ public class FileBasedSinkTest {
for (int i = 0; i < numFiles; i++) {
assertTrue(outputFiles.get(i).exists());
- assertEquals(retainTemporaryFiles, temporaryFiles.get(i).exists());
+ assertFalse(temporaryFiles.get(i).exists());
}
- if (!retainTemporaryFiles) {
- assertFalse(new File(writeOp.tempDirectory).exists());
- }
+ assertFalse(new File(writeOp.tempDirectory).exists());
}
/**
@@ -555,11 +531,6 @@ public class FileBasedSinkTest {
}
private static final class SimpleWriteOperation extends FileBasedWriteOperation<String> {
- public SimpleWriteOperation(
- SimpleSink sink, String tempOutputFilename, TemporaryFileRetention retentionPolicy) {
- super(sink, tempOutputFilename, retentionPolicy);
- }
-
public SimpleWriteOperation(SimpleSink sink, String tempOutputFilename) {
super(sink, tempOutputFilename);
}
@@ -618,15 +589,6 @@ public class FileBasedSinkTest {
}
/**
- * Build a SimpleWriteOperation with default options and the given file retention policy.
- */
- private SimpleSink.SimpleWriteOperation buildWriteOperation(
- TemporaryFileRetention fileRetention) {
- SimpleSink sink = buildSink();
- return new SimpleSink.SimpleWriteOperation(sink, getBaseTempDirectory(), fileRetention);
- }
-
- /**
* Build a SimpleWriteOperation with default options and the given base temporary filename.
*/
private SimpleSink.SimpleWriteOperation buildWriteOperation(String baseTemporaryFilename) {
@@ -639,15 +601,14 @@ public class FileBasedSinkTest {
*/
private SimpleSink.SimpleWriteOperation buildWriteOperation() {
SimpleSink sink = buildSink();
- return new SimpleSink.SimpleWriteOperation(
- sink, getBaseTempDirectory(), TemporaryFileRetention.REMOVE);
+ return new SimpleSink.SimpleWriteOperation(sink, getBaseTempDirectory());
}
/**
* Build a writer with the default options for its parent write operation and sink.
*/
private SimpleSink.SimpleWriter buildWriter() {
- SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation(TemporaryFileRetention.REMOVE);
+ SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
return new SimpleSink.SimpleWriter(writeOp);
}
}
[2/2] incubator-beam git commit: Closes #1331
Posted by dh...@apache.org.
Closes #1331
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6814a99c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6814a99c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6814a99c
Branch: refs/heads/master
Commit: 6814a99c22264e3c45864d7deb237108b1bd27d2
Parents: 8219233 2a15112
Author: Dan Halperin <dh...@google.com>
Authored: Fri Nov 11 15:30:04 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Nov 11 15:30:04 2016 -0800
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/FileBasedSink.java | 49 ++------------
.../apache/beam/sdk/io/FileBasedSinkTest.java | 67 ++++----------------
2 files changed, 21 insertions(+), 95 deletions(-)
----------------------------------------------------------------------