You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2017/01/03 21:39:51 UTC

[1/2] beam git commit: BEAM-1140 delay stream termination after final mark to reduce test flakiness (stop gap measure).

Repository: beam
Updated Branches:
  refs/heads/master 6c1e46976 -> 1b15d6de0


BEAM-1140 delay stream termination after final mark to reduce test flakiness (stop gap measure).


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/37a18f76
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/37a18f76
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/37a18f76

Branch: refs/heads/master
Commit: 37a18f761dc8e2c0251bd68499be2e7c1bd6e800
Parents: 70ff6bf
Author: Thomas Weise <th...@apache.org>
Authored: Mon Jan 2 21:58:57 2017 -0800
Committer: Thomas Weise <th...@apache.org>
Committed: Tue Jan 3 12:46:30 2017 -0800

----------------------------------------------------------------------
 .../operators/ApexReadUnboundedInputOperator.java  | 17 ++++++++++++++---
 1 file changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/37a18f76/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java
index 6fc2f0c..ac28c2a 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java
@@ -35,6 +35,7 @@ import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple.DataTuple;
 import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
 import org.apache.beam.runners.apex.translation.utils.ValuesSource;
 import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -86,9 +87,7 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT
   public void beginWindow(long windowId) {
     if (!available && (isBoundedSource || source instanceof ValuesSource)) {
       // if it's a Create and the input was consumed, emit final watermark
-      emitWatermarkIfNecessary(GlobalWindow.TIMESTAMP_MAX_VALUE.getMillis());
-      // terminate the stream (allows tests to finish faster)
-      BaseOperator.shutdown();
+      emitWatermarkIfNecessary(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
     } else {
       emitWatermarkIfNecessary(reader.getWatermark().getMillis());
     }
@@ -106,6 +105,18 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT
 
   @Override
   public void endWindow() {
+    if (outputWatermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+      // terminate the stream
+      if (traceTuples) {
+        LOG.debug("terminating input after final watermark");
+      }
+      try {
+        // see BEAM-1140 for why the delay after mark was emitted
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+      }
+      BaseOperator.shutdown();
+    }
   }
 
   @Override


[2/2] beam git commit: This closes #1727

Posted by th...@apache.org.
This closes #1727


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1b15d6de
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1b15d6de
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1b15d6de

Branch: refs/heads/master
Commit: 1b15d6de0c09dbffe462285a2368f066c6d4186d
Parents: 6c1e469 37a18f7
Author: Thomas Weise <th...@apache.org>
Authored: Tue Jan 3 13:39:26 2017 -0800
Committer: Thomas Weise <th...@apache.org>
Committed: Tue Jan 3 13:39:26 2017 -0800

----------------------------------------------------------------------
 .../operators/ApexReadUnboundedInputOperator.java  | 17 ++++++++++++++---
 1 file changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------