You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2017/01/03 21:39:51 UTC
[1/2] beam git commit: BEAM-1140 delay stream termination after final
mark to reduce test flakiness (stop gap measure).
Repository: beam
Updated Branches:
refs/heads/master 6c1e46976 -> 1b15d6de0
BEAM-1140 delay stream termination after final mark to reduce test flakiness (stop gap measure).
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/37a18f76
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/37a18f76
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/37a18f76
Branch: refs/heads/master
Commit: 37a18f761dc8e2c0251bd68499be2e7c1bd6e800
Parents: 70ff6bf
Author: Thomas Weise <th...@apache.org>
Authored: Mon Jan 2 21:58:57 2017 -0800
Committer: Thomas Weise <th...@apache.org>
Committed: Tue Jan 3 12:46:30 2017 -0800
----------------------------------------------------------------------
.../operators/ApexReadUnboundedInputOperator.java | 17 ++++++++++++++---
1 file changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/37a18f76/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java
index 6fc2f0c..ac28c2a 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java
@@ -35,6 +35,7 @@ import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple.DataTuple;
import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
import org.apache.beam.runners.apex.translation.utils.ValuesSource;
import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
@@ -86,9 +87,7 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT
public void beginWindow(long windowId) {
if (!available && (isBoundedSource || source instanceof ValuesSource)) {
// if it's a Create and the input was consumed, emit final watermark
- emitWatermarkIfNecessary(GlobalWindow.TIMESTAMP_MAX_VALUE.getMillis());
- // terminate the stream (allows tests to finish faster)
- BaseOperator.shutdown();
+ emitWatermarkIfNecessary(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
} else {
emitWatermarkIfNecessary(reader.getWatermark().getMillis());
}
@@ -106,6 +105,18 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT
@Override
public void endWindow() {
+ if (outputWatermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+ // terminate the stream
+ if (traceTuples) {
+ LOG.debug("terminating input after final watermark");
+ }
+ try {
+ // see BEAM-1140 for why the delay after mark was emitted
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ }
+ BaseOperator.shutdown();
+ }
}
@Override
[2/2] beam git commit: This closes #1727
Posted by th...@apache.org.
This closes #1727
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1b15d6de
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1b15d6de
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1b15d6de
Branch: refs/heads/master
Commit: 1b15d6de0c09dbffe462285a2368f066c6d4186d
Parents: 6c1e469 37a18f7
Author: Thomas Weise <th...@apache.org>
Authored: Tue Jan 3 13:39:26 2017 -0800
Committer: Thomas Weise <th...@apache.org>
Committed: Tue Jan 3 13:39:26 2017 -0800
----------------------------------------------------------------------
.../operators/ApexReadUnboundedInputOperator.java | 17 ++++++++++++++---
1 file changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------