You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/07/13 03:06:41 UTC

[29/50] [abbrv] beam git commit: BEAM-2575 ApexRunner doesn't emit watermarks for additional outputs

BEAM-2575 ApexRunner doesn't emit watermarks for additional outputs


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e014db6b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e014db6b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e014db6b

Branch: refs/heads/DSL_SQL
Commit: e014db6b7af00b49467389854c63ef693819ec1f
Parents: eee0c9c
Author: Thomas Weise <th...@apache.org>
Authored: Sun Jul 9 11:57:43 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Wed Jul 12 20:01:01 2017 -0700

----------------------------------------------------------------------
 .../operators/ApexParDoOperator.java            | 21 +++++++++++++-------
 .../runners/apex/examples/WordCountTest.java    |  8 ++++++--
 2 files changed, 20 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e014db6b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 809ca2a..c3cbab2 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -359,10 +359,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
       }
     }
     if (sideInputs.isEmpty()) {
-      if (traceTuples) {
-        LOG.debug("\nemitting watermark {}\n", mark);
-      }
-      output.emit(mark);
+      outputWatermark(mark);
       return;
     }
 
@@ -370,10 +367,20 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
         Math.min(pushedBackWatermark.get(), currentInputWatermark);
     if (potentialOutputWatermark > currentOutputWatermark) {
       currentOutputWatermark = potentialOutputWatermark;
-      if (traceTuples) {
-        LOG.debug("\nemitting watermark {}\n", currentOutputWatermark);
+      outputWatermark(ApexStreamTuple.WatermarkTuple.of(currentOutputWatermark));
+    }
+  }
+
+  private void outputWatermark(ApexStreamTuple.WatermarkTuple<?> mark) {
+    if (traceTuples) {
+      LOG.debug("\nemitting {}\n", mark);
+    }
+    output.emit(mark);
+    if (!additionalOutputPortMapping.isEmpty()) {
+      for (DefaultOutputPort<ApexStreamTuple<?>> additionalOutput :
+          additionalOutputPortMapping.values()) {
+        additionalOutput.emit(mark);
       }
-      output.emit(ApexStreamTuple.WatermarkTuple.of(currentOutputWatermark));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e014db6b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
index e76096e..ba75746 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
@@ -123,11 +123,15 @@ public class WordCountTest {
     options.setInputFile(new File(inputFile).getAbsolutePath());
     String outputFilePrefix = "target/wordcountresult.txt";
     options.setOutput(outputFilePrefix);
-    WordCountTest.main(TestPipeline.convertToArgs(options));
 
     File outFile1 = new File(outputFilePrefix + "-00000-of-00002");
     File outFile2 = new File(outputFilePrefix + "-00001-of-00002");
-    Assert.assertTrue(outFile1.exists() && outFile2.exists());
+    Assert.assertTrue(!outFile1.exists() || outFile1.delete());
+    Assert.assertTrue(!outFile2.exists() || outFile2.delete());
+
+    WordCountTest.main(TestPipeline.convertToArgs(options));
+
+    Assert.assertTrue("result files exist", outFile1.exists() && outFile2.exists());
     HashSet<String> results = new HashSet<>();
     results.addAll(FileUtils.readLines(outFile1));
     results.addAll(FileUtils.readLines(outFile2));