You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/11/10 18:56:06 UTC
[2/2] incubator-beam git commit: [BEAM-896] FileBasedSink removes
temp directory
[BEAM-896] FileBasedSink removes temp directory
Also fixes RAT plugin in pom.xml to ignore .idea
(found while running integration tests locally).
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/07a3c2c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/07a3c2c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/07a3c2c0
Branch: refs/heads/master
Commit: 07a3c2c06c80598faa9d33507ad6fbaa8b70e8c2
Parents: 11eaed1
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Nov 3 15:29:49 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Nov 10 10:56:00 2016 -0800
----------------------------------------------------------------------
pom.xml | 1 +
.../org/apache/beam/sdk/io/FileBasedSink.java | 134 ++++++++++++-------
.../beam/sdk/util/FileIOChannelFactory.java | 3 +-
.../apache/beam/sdk/io/FileBasedSinkTest.java | 28 ++--
.../org/apache/beam/sdk/io/XmlSinkTest.java | 4 +-
5 files changed, 104 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/07a3c2c0/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 749ca9c..22897e3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -987,6 +987,7 @@
<exclude>**/hs_err_pid*.log</exclude>
<exclude>.github/**/*</exclude>
<exclude>**/*.iml</exclude>
+ <exclude>**/.idea/**/*</exclude>
<exclude>**/package-list</exclude>
<exclude>**/user.avsc</exclude>
<exclude>**/test/resources/**/*.txt</exclude>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/07a3c2c0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 101ff61..e6c37de 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -38,6 +38,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.zip.GZIPOutputStream;
import javax.annotation.Nullable;
@@ -79,6 +80,8 @@ import org.slf4j.LoggerFactory;
* @param <T> the type of values written to the sink.
*/
public abstract class FileBasedSink<T> extends Sink<T> {
+ private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class);
+
/**
* Directly supported file output compression types.
*/
@@ -262,11 +265,11 @@ public abstract class FileBasedSink<T> extends Sink<T> {
* FileBasedSinkWriter.
*
* <h2>Temporary and Output File Naming:</h2> During the write, bundles are written to temporary
- * files using the baseTemporaryFilename that can be provided via the constructor of
+ * files using the tempDirectory that can be provided via the constructor of
* FileBasedWriteOperation. These temporary files will be named
- * {@code {baseTemporaryFilename}-temp-{bundleId}}, where bundleId is the unique id of the bundle.
- * For example, if baseTemporaryFilename is "gs://my-bucket/my_temp_output", the output for a
- * bundle with bundle id 15723 will be "gs://my-bucket/my_temp_output-temp-15723".
+ * {@code {tempDirectory}/{bundleId}}, where bundleId is the unique id of the bundle.
+ * For example, if tempDirectory is "gs://my-bucket/my_temp_output", the output for a
+ * bundle with bundle id 15723 will be "gs://my-bucket/my_temp_output/15723".
*
* <p>Final output files are written to baseOutputFilename with the format
* {@code {baseOutputFilename}-0000i-of-0000n.{extension}} where n is the total number of bundles
@@ -290,8 +293,6 @@ public abstract class FileBasedSink<T> extends Sink<T> {
* @param <T> the type of values written to the sink.
*/
public abstract static class FileBasedWriteOperation<T> extends WriteOperation<T, FileResult> {
- private static final Logger LOG = LoggerFactory.getLogger(FileBasedWriteOperation.class);
-
/**
* Options for handling of temporary output files.
*/
@@ -310,27 +311,21 @@ public abstract class FileBasedSink<T> extends Sink<T> {
*/
protected final TemporaryFileRetention temporaryFileRetention;
- /**
- * Base filename used for temporary output files. Default is the baseOutputFilename.
- */
- protected final String baseTemporaryFilename;
+ /** Directory for temporary output files. */
+ protected final String tempDirectory;
- /**
- * Build a temporary filename using the temporary filename separator with the given prefix and
- * suffix.
- */
- protected static final String buildTemporaryFilename(String prefix, String suffix) {
- try {
- IOChannelFactory factory = IOChannelUtils.getFactory(prefix);
- return factory.resolve(prefix, suffix);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ /** Constructs a temporary file path given the temporary directory and a filename. */
+ protected static String buildTemporaryFilename(String tempDirectory, String filename)
+ throws IOException {
+ return IOChannelUtils.getFactory(tempDirectory).resolve(tempDirectory, filename);
}
/**
- * Construct a FileBasedWriteOperation using the same base filename for both temporary and
- * output files.
+ * Constructs a FileBasedWriteOperation using the default strategy for generating a temporary
+ * directory from the base output filename.
+ *
+ * <p>Default is a uniquely named sibling of baseOutputFilename, e.g. if baseOutputFilename is
+ * /path/to/foo, the temporary directory will be /path/to/temp-beam-foo-$date.
*
* @param sink the FileBasedSink that will be used to configure this write operation.
*/
@@ -358,23 +353,23 @@ public abstract class FileBasedSink<T> extends Sink<T> {
* Construct a FileBasedWriteOperation.
*
* @param sink the FileBasedSink that will be used to configure this write operation.
- * @param baseTemporaryFilename the base filename to be used for temporary output files.
+ * @param tempDirectory the base directory to be used for temporary output files.
*/
- public FileBasedWriteOperation(FileBasedSink<T> sink, String baseTemporaryFilename) {
- this(sink, baseTemporaryFilename, TemporaryFileRetention.REMOVE);
+ public FileBasedWriteOperation(FileBasedSink<T> sink, String tempDirectory) {
+ this(sink, tempDirectory, TemporaryFileRetention.REMOVE);
}
/**
* Create a new FileBasedWriteOperation.
*
* @param sink the FileBasedSink that will be used to configure this write operation.
- * @param baseTemporaryFilename the base filename to be used for temporary output files.
+ * @param tempDirectory the base directory to be used for temporary output files.
* @param temporaryFileRetention defines how temporary files are handled.
*/
- public FileBasedWriteOperation(FileBasedSink<T> sink, String baseTemporaryFilename,
+ public FileBasedWriteOperation(FileBasedSink<T> sink, String tempDirectory,
TemporaryFileRetention temporaryFileRetention) {
this.sink = sink;
- this.baseTemporaryFilename = baseTemporaryFilename;
+ this.tempDirectory = tempDirectory;
this.temporaryFileRetention = temporaryFileRetention;
}
@@ -422,7 +417,12 @@ public abstract class FileBasedSink<T> extends Sink<T> {
// Optionally remove temporary files.
if (temporaryFileRetention == TemporaryFileRetention.REMOVE) {
- removeTemporaryFiles(options);
+ // We remove the entire temporary directory, rather than specifically removing the files
+ // from writerResults, because writerResults includes only successfully completed bundles,
+ // and we'd like to clean up the failed ones too.
+ // Note that due to GCS eventual consistency, matching files in the temp directory is also
+ // currently non-perfect and may fail to delete some files.
+ removeTemporaryFiles(files, options);
}
}
@@ -483,21 +483,18 @@ public abstract class FileBasedSink<T> extends Sink<T> {
}
/**
- * Removes temporary output files. Uses the temporary filename to find files to remove.
+ * Removes temporary output files. Uses the temporary directory to find files to remove.
*
* <p>Can be called from subclasses that override {@link FileBasedWriteOperation#finalize}.
* <b>Note:</b>If finalize is overridden and does <b>not</b> rename or otherwise finalize
* temporary files, this method will remove them.
*/
- protected final void removeTemporaryFiles(PipelineOptions options) throws IOException {
- String pattern = buildTemporaryFilename(baseTemporaryFilename, "*");
- LOG.debug("Finding temporary bundle output files matching {}.", pattern);
- FileOperations fileOperations = FileOperationsFactory.getFileOperations(pattern, options);
- IOChannelFactory factory = IOChannelUtils.getFactory(pattern);
- Collection<String> matches = factory.match(pattern);
- LOG.debug("{} temporary files matched {}", matches.size(), pattern);
- LOG.debug("Removing {} files.", matches.size());
- fileOperations.remove(matches);
+ protected final void removeTemporaryFiles(List<String> knownFiles, PipelineOptions options)
+ throws IOException {
+ LOG.debug("Removing temporary bundle output files in {}.", tempDirectory);
+ FileOperations fileOperations =
+ FileOperationsFactory.getFileOperations(tempDirectory, options);
+ fileOperations.removeDirectoryAndFiles(tempDirectory, knownFiles);
}
/**
@@ -542,9 +539,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
private String id;
/**
- * The filename of the output bundle. Equal to the
- * {@link FileBasedSink.FileBasedWriteOperation#TEMPORARY_FILENAME_SEPARATOR} and id appended to
- * the baseName.
+ * The filename of the output bundle - $tempDirectory/$id.
*/
private String filename;
@@ -597,7 +592,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
public final void open(String uId) throws Exception {
this.id = uId;
filename = FileBasedWriteOperation.buildTemporaryFilename(
- getWriteOperation().baseTemporaryFilename, uId);
+ getWriteOperation().tempDirectory, uId);
LOG.debug("Opening {}.", filename);
final WritableByteChannelFactory factory =
getWriteOperation().getSink().writableByteChannelFactory;
@@ -682,7 +677,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
if (factory instanceof GcsIOChannelFactory) {
return new GcsOperations(options);
} else if (factory instanceof FileIOChannelFactory) {
- return new LocalFileOperations();
+ return new LocalFileOperations(factory);
} else {
throw new IOException("Unrecognized file system.");
}
@@ -706,9 +701,16 @@ public abstract class FileBasedSink<T> extends Sink<T> {
void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException;
/**
- * Remove a collection of files.
+ * Removes a directory and the files in it (but not subdirectories).
+ *
+ * <p>Additionally, to partially mitigate the effects of filesystems with eventually-consistent
+ * directory matching APIs, takes a list of files that are known to exist - i.e. removes the
+ * union of the known files and files that the filesystem says exist in the directory.
+ *
+ * <p>Assumes that, if directory listing had been strongly consistent, it would have matched
+ * all of knownFiles - i.e. on a strongly consistent filesystem, knownFiles can be ignored.
*/
- void remove(Collection<String> filenames) throws IOException;
+ void removeDirectoryAndFiles(String directory, List<String> knownFiles) throws IOException;
}
/**
@@ -717,7 +719,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
private static class GcsOperations implements FileOperations {
private final GcsUtil gcsUtil;
- public GcsOperations(PipelineOptions options) {
+ GcsOperations(PipelineOptions options) {
gcsUtil = new GcsUtilFactory().create(options);
}
@@ -727,8 +729,21 @@ public abstract class FileBasedSink<T> extends Sink<T> {
}
@Override
- public void remove(Collection<String> filenames) throws IOException {
- gcsUtil.remove(filenames);
+ public void removeDirectoryAndFiles(String directory, List<String> knownFiles)
+ throws IOException {
+ IOChannelFactory factory = IOChannelUtils.getFactory(directory);
+ Collection<String> matches = factory.match(directory + "/*");
+ Set<String> allMatches = new HashSet<>(matches);
+ allMatches.addAll(knownFiles);
+ LOG.debug(
+ "Removing {} temporary files found under {} ({} matched glob, {} additional known files)",
+ allMatches.size(),
+ directory,
+ matches.size(),
+ allMatches.size() - matches.size());
+ gcsUtil.remove(allMatches);
+ // No need to remove the directory itself: GCS doesn't have directories, so if the directory
+ // is empty, then it already doesn't exist.
}
}
@@ -738,6 +753,12 @@ public abstract class FileBasedSink<T> extends Sink<T> {
private static class LocalFileOperations implements FileOperations {
private static final Logger LOG = LoggerFactory.getLogger(LocalFileOperations.class);
+ private final IOChannelFactory factory;
+
+ LocalFileOperations(IOChannelFactory factory) {
+ this.factory = factory;
+ }
+
@Override
public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException {
checkArgument(
@@ -769,11 +790,20 @@ public abstract class FileBasedSink<T> extends Sink<T> {
}
@Override
- public void remove(Collection<String> filenames) throws IOException {
- for (String filename : filenames) {
+ public void removeDirectoryAndFiles(String directory, List<String> knownFiles)
+ throws IOException {
+ if (!new File(directory).exists()) {
+ LOG.debug("Directory {} already doesn't exist", directory);
+ return;
+ }
+ Collection<String> matches = factory.match(new File(directory, "*").getAbsolutePath());
+ LOG.debug("Removing {} temporary files found under {}", matches.size(), directory);
+ for (String filename : matches) {
LOG.debug("Removing file {}", filename);
removeOne(filename);
}
+ LOG.debug("Removing directory {}", directory);
+ removeOne(directory);
}
private void removeOne(String filename) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/07a3c2c0/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
index 2d2c0c6..b920efb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
@@ -73,7 +73,8 @@ public class FileIOChannelFactory implements IOChannelFactory {
File parent = file.getAbsoluteFile().getParentFile();
if (!parent.exists()) {
- throw new IOException("Unable to find parent directory of " + spec);
+ throw new FileNotFoundException(
+ "Parent directory " + parent + " of " + spec + " does not exist");
}
// Method getAbsolutePath() on Windows platform may return something like
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/07a3c2c0/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index 8301afc..c1400da 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -40,6 +40,7 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.zip.GZIPInputStream;
@@ -67,7 +68,7 @@ public class FileBasedSinkTest {
public TemporaryFolder tmpFolder = new TemporaryFolder();
private String baseOutputFilename = "output";
- private String baseTemporaryFilename = "temp";
+ private String tempDirectory = "temp";
private String appendToTempFolder(String filename) {
return Paths.get(tmpFolder.getRoot().getPath(), filename).toString();
@@ -77,8 +78,8 @@ public class FileBasedSinkTest {
return appendToTempFolder(baseOutputFilename);
}
- private String getBaseTempFilename() {
- return appendToTempFolder(baseTemporaryFilename);
+ private String getBaseTempDirectory() {
+ return appendToTempFolder(tempDirectory);
}
/**
@@ -89,7 +90,7 @@ public class FileBasedSinkTest {
public void testWriter() throws Exception {
String testUid = "testId";
String expectedFilename =
- getBaseTempFilename() + "/" + testUid;
+ getBaseTempDirectory() + "/" + testUid;
SimpleSink.SimpleWriter writer = buildWriter();
List<String> values = Arrays.asList("sympathetic vulture", "boresome hummingbird");
@@ -141,7 +142,7 @@ public class FileBasedSinkTest {
*/
@Test
public void testRemoveWithTempFilename() throws Exception {
- testRemoveTemporaryFiles(3, baseTemporaryFilename);
+ testRemoveTemporaryFiles(3, tempDirectory);
}
/**
@@ -193,7 +194,8 @@ public class FileBasedSinkTest {
runFinalize(writeOp, files, false);
// create a temporary file
- tmpFolder.newFile(baseTemporaryFilename + "/1");
+ tmpFolder.newFolder(tempDirectory);
+ tmpFolder.newFile(tempDirectory + "/1");
runFinalize(writeOp, files, false);
}
@@ -215,7 +217,7 @@ public class FileBasedSinkTest {
List<File> temporaryFiles = new ArrayList<>();
for (int i = 0; i < numFiles; i++) {
String temporaryFilename =
- FileBasedWriteOperation.buildTemporaryFilename(baseTemporaryFilename, "" + i);
+ FileBasedWriteOperation.buildTemporaryFilename(tempDirectory, "" + i);
File tmpFile = new File(tmpFolder.getRoot(), temporaryFilename);
tmpFile.getParentFile().mkdirs();
assertTrue(tmpFile.createNewFile());
@@ -250,6 +252,10 @@ public class FileBasedSinkTest {
assertTrue(outputFiles.get(i).exists());
assertEquals(retainTemporaryFiles, temporaryFiles.get(i).exists());
}
+
+ if (!retainTemporaryFiles) {
+ assertFalse(new File(writeOp.tempDirectory).exists());
+ }
}
/**
@@ -273,7 +279,7 @@ public class FileBasedSinkTest {
outputFiles.add(outputFile);
}
- writeOp.removeTemporaryFiles(options);
+ writeOp.removeTemporaryFiles(Collections.<String>emptyList(), options);
for (int i = 0; i < numFiles; i++) {
assertFalse(temporaryFiles.get(i).exists());
@@ -505,7 +511,7 @@ public class FileBasedSinkTest {
final FileBasedWriter<String> writer =
writeOp.createWriter(null);
final String expectedFilename =
- writeOp.baseTemporaryFilename + "/" + testUid;
+ writeOp.tempDirectory + "/" + testUid;
final List<String> expected = new ArrayList<>();
expected.add("header");
@@ -617,7 +623,7 @@ public class FileBasedSinkTest {
private SimpleSink.SimpleWriteOperation buildWriteOperation(
TemporaryFileRetention fileRetention) {
SimpleSink sink = buildSink();
- return new SimpleSink.SimpleWriteOperation(sink, getBaseTempFilename(), fileRetention);
+ return new SimpleSink.SimpleWriteOperation(sink, getBaseTempDirectory(), fileRetention);
}
/**
@@ -634,7 +640,7 @@ public class FileBasedSinkTest {
private SimpleSink.SimpleWriteOperation buildWriteOperation() {
SimpleSink sink = buildSink();
return new SimpleSink.SimpleWriteOperation(
- sink, getBaseTempFilename(), TemporaryFileRetention.REMOVE);
+ sink, getBaseTempDirectory(), TemporaryFileRetention.REMOVE);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/07a3c2c0/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
index 653a9d0..400b04a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
@@ -146,7 +146,7 @@ public class XmlSinkTest {
assertEquals(testRootElement, writeOp.getSink().rootElementName);
assertEquals(XmlSink.XML_EXTENSION, writeOp.getSink().extension);
Path outputPath = new File(testFilePrefix).toPath();
- Path tempPath = new File(writeOp.baseTemporaryFilename).toPath();
+ Path tempPath = new File(writeOp.tempDirectory).toPath();
assertEquals(outputPath.getParent(), tempPath.getParent());
assertThat(
tempPath.getFileName().toString(), containsString("temp-beam-" + outputPath.getFileName()));
@@ -163,7 +163,7 @@ public class XmlSinkTest {
.createWriteOperation(options);
XmlWriter<Bird> writer = writeOp.createWriter(options);
Path outputPath = new File(testFilePrefix).toPath();
- Path tempPath = new File(writer.getWriteOperation().baseTemporaryFilename).toPath();
+ Path tempPath = new File(writer.getWriteOperation().tempDirectory).toPath();
assertEquals(outputPath.getParent(), tempPath.getParent());
assertThat(
tempPath.getFileName().toString(), containsString("temp-beam-" + outputPath.getFileName()));