You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/10 03:35:33 UTC
[1/2] beam git commit: BEAM-2575 ApexRunner doesn't emit watermarks
for additional outputs
Repository: beam
Updated Branches:
refs/heads/master f467d6bbb -> 9f904dc00
BEAM-2575 ApexRunner doesn't emit watermarks for additional outputs
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cb5061e7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cb5061e7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cb5061e7
Branch: refs/heads/master
Commit: cb5061e7149519cb18673f4c572757dce3cc7bd1
Parents: f467d6b
Author: Thomas Weise <th...@apache.org>
Authored: Sun Jul 9 11:57:43 2017 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Sun Jul 9 12:12:52 2017 -0700
----------------------------------------------------------------------
.../operators/ApexParDoOperator.java | 21 +++++++++++++-------
.../runners/apex/examples/WordCountTest.java | 8 ++++++--
2 files changed, 20 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/cb5061e7/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 809ca2a..c3cbab2 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -359,10 +359,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
}
}
if (sideInputs.isEmpty()) {
- if (traceTuples) {
- LOG.debug("\nemitting watermark {}\n", mark);
- }
- output.emit(mark);
+ outputWatermark(mark);
return;
}
@@ -370,10 +367,20 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
Math.min(pushedBackWatermark.get(), currentInputWatermark);
if (potentialOutputWatermark > currentOutputWatermark) {
currentOutputWatermark = potentialOutputWatermark;
- if (traceTuples) {
- LOG.debug("\nemitting watermark {}\n", currentOutputWatermark);
+ outputWatermark(ApexStreamTuple.WatermarkTuple.of(currentOutputWatermark));
+ }
+ }
+
+ private void outputWatermark(ApexStreamTuple.WatermarkTuple<?> mark) {
+ if (traceTuples) {
+ LOG.debug("\nemitting {}\n", mark);
+ }
+ output.emit(mark);
+ if (!additionalOutputPortMapping.isEmpty()) {
+ for (DefaultOutputPort<ApexStreamTuple<?>> additionalOutput :
+ additionalOutputPortMapping.values()) {
+ additionalOutput.emit(mark);
}
- output.emit(ApexStreamTuple.WatermarkTuple.of(currentOutputWatermark));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/cb5061e7/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
index e76096e..ba75746 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
@@ -123,11 +123,15 @@ public class WordCountTest {
options.setInputFile(new File(inputFile).getAbsolutePath());
String outputFilePrefix = "target/wordcountresult.txt";
options.setOutput(outputFilePrefix);
- WordCountTest.main(TestPipeline.convertToArgs(options));
File outFile1 = new File(outputFilePrefix + "-00000-of-00002");
File outFile2 = new File(outputFilePrefix + "-00001-of-00002");
- Assert.assertTrue(outFile1.exists() && outFile2.exists());
+ Assert.assertTrue(!outFile1.exists() || outFile1.delete());
+ Assert.assertTrue(!outFile2.exists() || outFile2.delete());
+
+ WordCountTest.main(TestPipeline.convertToArgs(options));
+
+ Assert.assertTrue("result files exist", outFile1.exists() && outFile2.exists());
HashSet<String> results = new HashSet<>();
results.addAll(FileUtils.readLines(outFile1));
results.addAll(FileUtils.readLines(outFile2));
[2/2] beam git commit: This closes #3529: BEAM-2575 ApexRunner
doesn't emit watermarks for additional outputs
Posted by ke...@apache.org.
This closes #3529: BEAM-2575 ApexRunner doesn't emit watermarks for additional outputs
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9f904dc0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9f904dc0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9f904dc0
Branch: refs/heads/master
Commit: 9f904dc00552349b535752e751ce82a7e484b471
Parents: f467d6b cb5061e
Author: Kenneth Knowles <kl...@google.com>
Authored: Sun Jul 9 20:18:55 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Jul 9 20:18:55 2017 -0700
----------------------------------------------------------------------
.../operators/ApexParDoOperator.java | 21 +++++++++++++-------
.../runners/apex/examples/WordCountTest.java | 8 ++++++--
2 files changed, 20 insertions(+), 9 deletions(-)
----------------------------------------------------------------------