You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/07/13 03:06:41 UTC
[29/50] [abbrv] beam git commit: BEAM-2575 ApexRunner doesn't emit
watermarks for additional outputs
BEAM-2575 ApexRunner doesn't emit watermarks for additional outputs
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e014db6b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e014db6b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e014db6b
Branch: refs/heads/DSL_SQL
Commit: e014db6b7af00b49467389854c63ef693819ec1f
Parents: eee0c9c
Author: Thomas Weise <th...@apache.org>
Authored: Sun Jul 9 11:57:43 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Wed Jul 12 20:01:01 2017 -0700
----------------------------------------------------------------------
.../operators/ApexParDoOperator.java | 21 +++++++++++++-------
.../runners/apex/examples/WordCountTest.java | 8 ++++++--
2 files changed, 20 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e014db6b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 809ca2a..c3cbab2 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -359,10 +359,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
}
}
if (sideInputs.isEmpty()) {
- if (traceTuples) {
- LOG.debug("\nemitting watermark {}\n", mark);
- }
- output.emit(mark);
+ outputWatermark(mark);
return;
}
@@ -370,10 +367,20 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
Math.min(pushedBackWatermark.get(), currentInputWatermark);
if (potentialOutputWatermark > currentOutputWatermark) {
currentOutputWatermark = potentialOutputWatermark;
- if (traceTuples) {
- LOG.debug("\nemitting watermark {}\n", currentOutputWatermark);
+ outputWatermark(ApexStreamTuple.WatermarkTuple.of(currentOutputWatermark));
+ }
+ }
+
+ private void outputWatermark(ApexStreamTuple.WatermarkTuple<?> mark) {
+ if (traceTuples) {
+ LOG.debug("\nemitting {}\n", mark);
+ }
+ output.emit(mark);
+ if (!additionalOutputPortMapping.isEmpty()) {
+ for (DefaultOutputPort<ApexStreamTuple<?>> additionalOutput :
+ additionalOutputPortMapping.values()) {
+ additionalOutput.emit(mark);
}
- output.emit(ApexStreamTuple.WatermarkTuple.of(currentOutputWatermark));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e014db6b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
index e76096e..ba75746 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
@@ -123,11 +123,15 @@ public class WordCountTest {
options.setInputFile(new File(inputFile).getAbsolutePath());
String outputFilePrefix = "target/wordcountresult.txt";
options.setOutput(outputFilePrefix);
- WordCountTest.main(TestPipeline.convertToArgs(options));
File outFile1 = new File(outputFilePrefix + "-00000-of-00002");
File outFile2 = new File(outputFilePrefix + "-00001-of-00002");
- Assert.assertTrue(outFile1.exists() && outFile2.exists());
+ Assert.assertTrue(!outFile1.exists() || outFile1.delete());
+ Assert.assertTrue(!outFile2.exists() || outFile2.delete());
+
+ WordCountTest.main(TestPipeline.convertToArgs(options));
+
+ Assert.assertTrue("result files exist", outFile1.exists() && outFile2.exists());
HashSet<String> results = new HashSet<>();
results.addAll(FileUtils.readLines(outFile1));
results.addAll(FileUtils.readLines(outFile2));