You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/06/09 16:47:35 UTC
[1/2] incubator-beam git commit: FileBasedSink: Detect bad shard name
templates
Repository: incubator-beam
Updated Branches:
refs/heads/master 61d8cf2c4 -> 39da22c76
FileBasedSink: Detect bad shard name templates
This is particularly relevant when TextIO.Write.withoutSharding() is
used: [BEAM-159].
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4d036bc8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4d036bc8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4d036bc8
Branch: refs/heads/master
Commit: 4d036bc8a4b8c255d09467d09fe4ed7eae9dd035
Parents: 61d8cf2
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed Jun 8 14:59:56 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Jun 9 09:47:24 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/FileBasedSink.java | 16 +++++++++-----
.../apache/beam/sdk/io/FileBasedSinkTest.java | 23 ++++++++++++++++++++
2 files changed, 33 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d036bc8/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 9048380..521f54b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.MimeTypes;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Ordering;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +44,7 @@ import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
/**
@@ -333,13 +334,10 @@ public abstract class FileBasedSink<T> extends Sink<T> {
protected final List<String> copyToOutputFiles(List<String> filenames, PipelineOptions options)
throws IOException {
int numFiles = filenames.size();
- List<String> srcFilenames = new ArrayList<>();
+ // Sort files for idempotence.
+ List<String> srcFilenames = Ordering.natural().sortedCopy(filenames);
List<String> destFilenames = generateDestinationFilenames(numFiles);
- // Sort files for copying.
- srcFilenames.addAll(filenames);
- Collections.sort(srcFilenames);
-
if (numFiles > 0) {
LOG.debug("Copying {} files.", numFiles);
FileOperations fileOperations =
@@ -366,6 +364,12 @@ public abstract class FileBasedSink<T> extends Sink<T> {
destFilenames.add(IOChannelUtils.constructName(
baseOutputFilename, fileNamingTemplate, suffix, i, numFiles));
}
+
+ int numDistinctShards = new HashSet<String>(destFilenames).size();
+ Preconditions.checkState(numDistinctShards == numFiles,
+ "Shard name template '%s' only generated %s distinct file names for %s files.",
+ fileNamingTemplate, numDistinctShards, numFiles);
+
return destFilenames;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d036bc8/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index 0e434fc..d3454da 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation;
import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation.TemporaryFileRetention;
@@ -374,6 +375,28 @@ public class FileBasedSinkTest {
}
/**
+ * Reject non-distinct output filenames.
+ */
+ @Test
+ public void testCollidingOutputFilenames() {
+ SimpleSink sink = new SimpleSink("output", "test", "-NN");
+ SimpleSink.SimpleWriteOperation writeOp = new SimpleSink.SimpleWriteOperation(sink);
+
+ // A single shard doesn't need to include the shard number.
+ assertEquals(Arrays.asList("output-01.test"),
+ writeOp.generateDestinationFilenames(1));
+
+ // More than one shard does.
+ try {
+ writeOp.generateDestinationFilenames(3);
+ fail("Should have failed.");
+ } catch (IllegalStateException exn) {
+ assertEquals("Shard name template '-NN' only generated 1 distinct file names for 3 files.",
+ exn.getMessage());
+ }
+ }
+
+ /**
* Output filenames are generated correctly when an extension is not supplied.
*/
@Test
[2/2] incubator-beam git commit: Closes #436
Posted by dh...@apache.org.
Closes #436
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/39da22c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/39da22c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/39da22c7
Branch: refs/heads/master
Commit: 39da22c7636bd93c6f97ccaa5cd918c6de5dcdae
Parents: 61d8cf2 4d036bc
Author: Dan Halperin <dh...@google.com>
Authored: Thu Jun 9 09:47:25 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Jun 9 09:47:25 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/FileBasedSink.java | 16 +++++++++-----
.../apache/beam/sdk/io/FileBasedSinkTest.java | 23 ++++++++++++++++++++
2 files changed, 33 insertions(+), 6 deletions(-)
----------------------------------------------------------------------