You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/06/07 18:05:07 UTC
[1/2] incubator-beam git commit: [BEAM-292] Write: always produce at
least 1 WriteT
Repository: incubator-beam
Updated Branches:
refs/heads/master aaab15766 -> b9845a700
[BEAM-292] Write: always produce at least 1 WriteT
Write has a degenerate case wherein, if no elements were in the written PCollection,
the finalize step will get produced with nothing to finalize. This often prevents
correct operation, for example when a FileBasedSink produces no files instead of one
empty file.
Catch and handle this case in Write by opening and closing an empty
Writer to produce a single WriteT.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f97c38d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f97c38d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f97c38d1
Branch: refs/heads/master
Commit: f97c38d1c5d85e8ad0d94229c47e9e619be93632
Parents: aaab157
Author: Dan Halperin <dh...@google.com>
Authored: Mon Jun 6 21:46:37 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jun 7 11:04:39 2016 -0700
----------------------------------------------------------------------
.../src/main/java/org/apache/beam/sdk/io/Write.java | 9 +++++++++
.../test/java/org/apache/beam/sdk/io/TextIOTest.java | 12 ++++++++++--
2 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f97c38d1/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
index 66fef84..c7b3f5c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
@@ -39,6 +39,7 @@ import org.apache.beam.sdk.values.PDone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
import java.util.UUID;
/**
@@ -230,6 +231,14 @@ public class Write {
LOG.info("Finalizing write operation {}", writeOperation);
Iterable<WriteT> results = c.sideInput(resultsView);
LOG.debug("Side input initialized to finalize write operation {}", writeOperation);
+ if (!results.iterator().hasNext()) {
+ LOG.info("No write results, creating a single empty output.");
+ Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
+ writer.open(UUID.randomUUID().toString());
+ WriteT emptyWrite = writer.close();
+ results = Collections.singleton(emptyWrite);
+ LOG.debug("Done creating a single empty output.");
+ }
writeOperation.finalize(results, c.getPipelineOptions());
LOG.debug("Done finalizing write operation {}", writeOperation);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f97c38d1/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 73aeda9..724a113 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -202,15 +202,17 @@ public class TextIOTest {
}
if (numShards == 1) {
write = write.withoutSharding();
- } else {
+ } else if (numShards > 0) {
write = write.withNumShards(numShards).withShardNameTemplate(ShardNameTemplate.INDEX_OF_MAX);
}
+ int numOutputShards = (numShards > 0) ? numShards : 1;
input.apply(write);
p.run();
- assertOutputFiles(elems, coder, numShards, tmpFolder, outputName, write.getShardNameTemplate());
+ assertOutputFiles(elems, coder, numOutputShards, tmpFolder, outputName,
+ write.getShardNameTemplate());
}
public static <T> void assertOutputFiles(
@@ -261,6 +263,12 @@ public class TextIOTest {
@Test
@Category(NeedsRunner.class)
+ public void testWriteEmptyStringsNoSharding() throws Exception {
+ runTestWrite(NO_LINES_ARRAY, StringUtf8Coder.of(), 0);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
public void testWriteEmptyStrings() throws Exception {
runTestWrite(NO_LINES_ARRAY, StringUtf8Coder.of());
}
[2/2] incubator-beam git commit: Closes #424
Posted by dh...@apache.org.
Closes #424
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b9845a70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b9845a70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b9845a70
Branch: refs/heads/master
Commit: b9845a700a262b954a1dbb67bd979e9927141a90
Parents: aaab157 f97c38d
Author: Dan Halperin <dh...@google.com>
Authored: Tue Jun 7 11:04:40 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jun 7 11:04:40 2016 -0700
----------------------------------------------------------------------
.../src/main/java/org/apache/beam/sdk/io/Write.java | 9 +++++++++
.../test/java/org/apache/beam/sdk/io/TextIOTest.java | 12 ++++++++++--
2 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------