You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/11/13 23:32:47 UTC
[1/5] beam git commit: ReduceFnRunner: Do not manage EOW hold or
timer, set GC hold and timer always
Repository: beam
Updated Branches:
refs/heads/master 219b9caa1 -> a5254e730
ReduceFnRunner: Do not manage EOW hold or timer, set GC hold and timer always
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9c994ca4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9c994ca4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9c994ca4
Branch: refs/heads/master
Commit: 9c994ca44a161c13120bd44f1415ffec38c4b244
Parents: 296cba0
Author: Kenneth Knowles <ke...@apache.org>
Authored: Mon Oct 16 16:46:05 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Mon Nov 13 15:03:21 2017 -0800
----------------------------------------------------------------------
.../beam/runners/core/ReduceFnRunner.java | 72 ++----
.../apache/beam/runners/core/WatermarkHold.java | 221 +++----------------
.../beam/runners/core/ReduceFnRunnerTest.java | 191 ++++++++++++----
3 files changed, 197 insertions(+), 287 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/9c994ca4/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 634a2d1..bc1d0db 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -45,7 +45,6 @@ import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.TimeDomain;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
@@ -596,28 +595,12 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED);
nonEmptyPanes.recordContent(renamedContext.state());
-
- // Make sure we've scheduled the end-of-window or garbage collection timer for this window.
- Instant timer = scheduleEndOfWindowOrGarbageCollectionTimer(directContext);
+ scheduleGarbageCollectionTimer(directContext);
// Hold back progress of the output watermark until we have processed the pane this
- // element will be included within. If the element is too late for that, place a hold at
- // the end-of-window or garbage collection time to allow empty panes to contribute elements
- // which won't be dropped due to lateness by a following computation (assuming the following
- // computation uses the same allowed lateness value...)
- @Nullable Instant hold = watermarkHold.addHolds(renamedContext);
-
- if (hold != null) {
- // Assert that holds have a proximate timer.
- boolean holdInWindow = !hold.isAfter(window.maxTimestamp());
- boolean timerInWindow = !timer.isAfter(window.maxTimestamp());
- checkState(
- holdInWindow == timerInWindow,
- "set a hold at %s, a timer at %s, which disagree as to whether they are in window %s",
- hold,
- timer,
- directContext.window());
- }
+ // element will be included within. If the element is later than the output watermark, the
+ // hold will be at GC time.
+ watermarkHold.addHolds(renamedContext);
// Execute the reduceFn, which will buffer the value as appropriate
reduceFn.processValue(renamedContext);
@@ -1070,48 +1053,33 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
}
/**
- * Make sure we'll eventually have a timer fire which will tell us to garbage collect
- * the window state. For efficiency we may need to do this in two steps rather
- * than one. Return the time at which the timer will fire.
+ * Schedule a timer to garbage collect the window.
+ *
+ * <p>The timer:
*
* <ul>
- * <li>If allowedLateness is zero then we'll garbage collect at the end of the window.
- * For simplicity we'll set our own timer for this situation even though an
- * {@link AfterWatermark} trigger may have also set an end-of-window timer.
- * ({@code setTimer} is idempotent.)
- * <li>If allowedLateness is non-zero then we could just always set a timer for the garbage
- * collection time. However if the windows are large (eg hourly) and the allowedLateness is small
- * (eg seconds) then we'll end up with nearly twice the number of timers in-flight. So we
- * instead set an end-of-window timer and then roll that forward to a garbage collection timer
- * when it fires. We use the input watermark to distinguish those cases.
+ * <li>...must be fired strictly after the expiration of the window.
+ * <li>...should be as close to the expiration as possible, to have a timely output of
+ * remaining buffered data, and GC.
* </ul>
*/
- private Instant scheduleEndOfWindowOrGarbageCollectionTimer(
- ReduceFn<?, ?, ?, W>.Context directContext) {
+ private void scheduleGarbageCollectionTimer(ReduceFn<?, ?, ?, W>.Context directContext) {
Instant inputWM = timerInternals.currentInputWatermarkTime();
- Instant endOfWindow = directContext.window().maxTimestamp();
- String which;
- Instant timer;
- if (endOfWindow.isBefore(inputWM)) {
- timer = LateDataUtils.garbageCollectionTime(directContext.window(), windowingStrategy);
- which = "garbage collection";
- } else {
- timer = endOfWindow;
- which = "end-of-window";
- }
+ Instant gcTime =
+ LateDataUtils.garbageCollectionTime(directContext.window(), windowingStrategy);
WindowTracing.trace(
- "ReduceFnRunner.scheduleEndOfWindowOrGarbageCollectionTimer: Scheduling {} timer at {} for "
+ "ReduceFnRunner.scheduleGarbageCollectionTimer: Scheduling at {} for "
+ "key:{}; window:{} where inputWatermark:{}; outputWatermark:{}",
- which,
- timer,
+ gcTime,
key,
directContext.window(),
inputWM,
timerInternals.currentOutputWatermarkTime());
- checkState(!timer.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
- "Timer %s is beyond end-of-time", timer);
- directContext.timers().setTimer(timer, TimeDomain.EVENT_TIME);
- return timer;
+ checkState(
+ !gcTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
+ "Timer %s is beyond end-of-time",
+ gcTime);
+ directContext.timers().setTimer(gcTime, TimeDomain.EVENT_TIME);
}
private void cancelEndOfWindowAndGarbageCollectionTimers(
http://git-wip-us.apache.org/repos/asf/beam/blob/9c994ca4/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
index 8859bbb..9890826 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
@@ -25,9 +25,9 @@ import javax.annotation.Nullable;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Duration;
@@ -84,112 +84,22 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
/**
* Add a hold to prevent the output watermark progressing beyond the (possibly adjusted) timestamp
- * of the element in {@code context}. We allow the actual hold time to be shifted later by the
- * {@link TimestampCombiner}, but no further than the end of the window. The hold will
- * remain until cleared by {@link #extractAndRelease}. Return the timestamp at which the hold
- * was placed, or {@literal null} if no hold was placed.
+ * of the element in {@code context}.
*
- * <p>In the following we'll write {@code E} to represent an element's timestamp after passing
- * through the window strategy's output time function, {@code IWM} for the local input watermark,
- * {@code OWM} for the local output watermark, and {@code GCWM} for the garbage collection
- * watermark (which is at {@code IWM - getAllowedLateness}). Time progresses from left to right,
- * and we write {@code [ ... ]} to denote a bounded window with implied lower bound.
+ * <p>The target time for the aggregated output is shifted by the {@link WindowFn} and combined
+ * with a {@link TimestampCombiner} to determine where the output watermark is held.
*
- * <p>Note that the GCWM will be the same as the IWM if {@code getAllowedLateness}
- * is {@code ZERO}.
+ * <p>If the target time would be late, then we do not set this hold, but instead add the hold
+ * to allow a final output at GC time.
*
- * <p>Here are the cases we need to handle. They are conceptually considered in the
- * sequence written since if getAllowedLateness is ZERO the GCWM is the same as the IWM.
- * <ol>
- * <li>(Normal)
- * <pre>
- * |
- * [ | E ]
- * |
- * IWM
- * </pre>
- * This is, hopefully, the common and happy case. The element is locally on-time and can
- * definitely make it to an {@code ON_TIME} pane which we can still set an end-of-window timer
- * for. We place an element hold at E, which may contribute to the {@code ON_TIME} pane's
- * timestamp (depending on the output time function). Thus the OWM will not proceed past E
- * until the next pane fires.
- *
- * <li>(Discard - no target window)
- * <pre>
- * | |
- * [ E ] | |
- * | |
- * GCWM <-getAllowedLateness-> IWM
- * </pre>
- * The element is very locally late. The window has been garbage collected, thus there
- * is no target pane E could be assigned to. We discard E.
- *
- * <li>(Unobservably late)
- * <pre>
- * | |
- * [ | E | ]
- * | |
- * OWM IWM
- * </pre>
- * The element is locally late, however we can still treat this case as for 'Normal' above
- * since the IWM has not yet passed the end of the window and the element is ahead of the
- * OWM. In effect, we get to 'launder' the locally late element and consider it as locally
- * on-time because no downstream computation can observe the difference.
- *
- * <li>(Maybe late 1)
- * <pre>
- * | |
- * [ | E ] |
- * | |
- * OWM IWM
- * </pre>
- * The end-of-window timer may have already fired for this window, and thus an {@code ON_TIME}
- * pane may have already been emitted. However, if timer firings have been delayed then it
- * is possible the {@code ON_TIME} pane has not yet been emitted. We can't place an element
- * hold since we can't be sure if it will be cleared promptly. Thus this element *may* find
- * its way into an {@code ON_TIME} pane, but if so it will *not* contribute to that pane's
- * timestamp. We may however set a garbage collection hold if required.
- *
- * <li>(Maybe late 2)
- * <pre>
- * | |
- * [ E | | ]
- * | |
- * OWM IWM
- * </pre>
- * The end-of-window timer has not yet fired, so this element may still appear in an
- * {@code ON_TIME} pane. However the element is too late to contribute to the output
- * watermark hold, and thus won't contribute to the pane's timestamp. We can still place an
- * end-of-window hold.
- *
- * <li>(Maybe late 3)
- * <pre>
- * | |
- * [ E | ] |
- * | |
- * OWM IWM
- * </pre>
- * As for the (Maybe late 2) case, however we don't even know if the end-of-window timer
- * has already fired, or it is about to fire. We can place only the garbage collection hold,
- * if required.
- *
- * <li>(Definitely late)
- * <pre>
- * | |
- * [ E ] | |
- * | |
- * OWM IWM
- * </pre>
- * The element is definitely too late to make an {@code ON_TIME} pane. We are too late to
- * place an end-of-window hold. We can still place a garbage collection hold if required.
- *
- * </ol>
+ * <p>See https://s.apache.org/beam-lateness for the full design of how late data and watermarks
+ * interact.
*/
@Nullable
public Instant addHolds(ReduceFn<?, ?, ?, W>.ProcessValueContext context) {
Instant hold = addElementHold(context);
if (hold == null) {
- hold = addEndOfWindowOrGarbageCollectionHolds(context, false/*paneIsEmpty*/);
+ hold = addGarbageCollectionHold(context, false /*paneIsEmpty*/);
}
return hold;
}
@@ -268,94 +178,22 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
}
/**
- * Add an end-of-window hold or, if too late for that, a garbage collection hold (if required).
- * Return the {@link Instant} at which hold was added, or {@literal null} if no hold was added.
- */
- @Nullable
- private Instant addEndOfWindowOrGarbageCollectionHolds(
- ReduceFn<?, ?, ?, W>.Context context, boolean paneIsEmpty) {
- Instant hold = addEndOfWindowHold(context, paneIsEmpty);
- if (hold == null) {
- hold = addGarbageCollectionHold(context, paneIsEmpty);
- }
- return hold;
- }
-
- /**
- * Attempt to add an 'end-of-window hold'. Return the {@link Instant} at which the hold was added
- * (ie the end of window time), or {@literal null} if no end of window hold is possible and we
- * should fallback to a garbage collection hold.
- *
- * <p>We only add the hold if we can be sure a timer will be set (by {@link ReduceFnRunner})
- * to clear it. In other words, the input watermark cannot be ahead of the end of window time.
- *
- * <p>An end-of-window hold is added in two situations:
- * <ol>
- * <li>An incoming element came in behind the output watermark (so we are too late for placing
- * the usual element hold), but it may still be possible to include the element in an
- * {@link Timing#ON_TIME} pane. We place the end of window hold to ensure that pane will
- * not be considered late by any downstream computation.
- * <li>We guarantee an {@link Timing#ON_TIME} pane will be emitted for all windows which saw at
- * least one element, even if that {@link Timing#ON_TIME} pane is empty. Thus when elements in
- * a pane are processed due to a fired trigger we must set both an end of window timer and an end
- * of window hold. Again, the hold ensures the {@link Timing#ON_TIME} pane will not be considered
- * late by any downstream computation.
- * </ol>
- */
- @Nullable
- private Instant addEndOfWindowHold(ReduceFn<?, ?, ?, W>.Context context, boolean paneIsEmpty) {
- Instant outputWM = timerInternals.currentOutputWatermarkTime();
- Instant inputWM = timerInternals.currentInputWatermarkTime();
- Instant eowHold = context.window().maxTimestamp();
-
- if (eowHold.isBefore(inputWM)) {
- WindowTracing.trace(
- "WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is too late for "
- + "end-of-window timer for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
- eowHold, context.key(), context.window(), inputWM, outputWM);
- return null;
- }
-
- checkState(outputWM == null || !eowHold.isBefore(outputWM),
- "End-of-window hold %s cannot be before output watermark %s",
- eowHold, outputWM);
- checkState(!eowHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
- "End-of-window hold %s is beyond end-of-time", eowHold);
- // If paneIsEmpty then this hold is just for empty ON_TIME panes, so we want to keep
- // the hold away from the combining function in elementHoldTag.
- // However if !paneIsEmpty then it could make sense to use the elementHoldTag here.
- // Alas, onMerge is forced to add an end of window or garbage collection hold without
- // knowing whether an element hold is already in place (stopping to check is too expensive).
- // This it would end up adding an element hold at the end of the window which could
- // upset the elementHoldTag combining function.
- context.state().access(EXTRA_HOLD_TAG).add(eowHold);
- WindowTracing.trace(
- "WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is on time for "
- + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
- eowHold, context.key(), context.window(), inputWM, outputWM);
- return eowHold;
- }
-
- /**
* Attempt to add a 'garbage collection hold' if it is required. Return the {@link Instant} at
- * which the hold was added (ie the end of window time plus allowed lateness),
- * or {@literal null} if no hold was added.
- *
- * <p>We only add the hold if it is distinct from what would be added by
- * {@link #addEndOfWindowHold}. In other words, {@link WindowingStrategy#getAllowedLateness}
- * must be non-zero.
+ * which the hold was added (ie the end of window time plus allowed lateness), or {@literal null}
+ * if no hold was added.
*
* <p>A garbage collection hold is added in two situations:
+ *
* <ol>
- * <li>An incoming element came in behind the output watermark, and was too late for placing
- * the usual element hold or an end of window hold. Place the garbage collection hold so that
- * we can guarantee when the pane is finally triggered its output will not be dropped due to
- * excessive lateness by any downstream computation.
- * <li>The {@link WindowingStrategy#getClosingBehavior()} is
- * {@link ClosingBehavior#FIRE_ALWAYS}, and thus we guarantee a final pane will be emitted
- * for all windows which saw at least one element. Again, the garbage collection hold guarantees
- * that any empty final pane can be given a timestamp which will not be considered beyond
- * allowed lateness by any downstream computation.
+ * <li>An incoming element has a timestamp earlier than the output watermark, and was too late
+ * for placing the usual element hold or an end of window hold. Place the garbage collection
+ * hold so that we can guarantee when the pane is finally triggered its output will not be
+ * dropped due to excessive lateness by any downstream computation.
+ * <li>The {@link WindowingStrategy#getClosingBehavior()} is {@link
+ * ClosingBehavior#FIRE_ALWAYS}, and thus we guarantee a final pane will be emitted for all
+ * windows which saw at least one element. Again, the garbage collection hold guarantees
+ * that any empty final pane can be given a timestamp which will not be considered beyond
+ * allowed lateness by any downstream computation.
* </ol>
*
* <p>We use {@code paneIsEmpty} to distinguish cases 1 and 2.
@@ -367,12 +205,11 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
Instant inputWM = timerInternals.currentInputWatermarkTime();
Instant gcHold = LateDataUtils.garbageCollectionTime(context.window(), windowingStrategy);
- if (!windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO)) {
+ if (gcHold.isBefore(inputWM)) {
WindowTracing.trace(
- "WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is unnecessary "
- + "since no allowed lateness for key:{}; window:{}; inputWatermark:{}; "
- + "outputWatermark:{}",
- gcHold, context.key(), context.window(), inputWM, outputWM);
+ "{}.addGarbageCollectionHold: gc hold would be before the input watermark "
+ + "for key:{}; window: {}; inputWatermark: {}; outputWatermark: {}",
+ getClass().getSimpleName(), context.key(), context.window(), inputWM, outputWM);
return null;
}
@@ -432,7 +269,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
// the hold depends on the min of the element timestamps.
// At least one merged window must be non-empty for the merge to have been triggered.
StateMerging.clear(context.state(), EXTRA_HOLD_TAG);
- addEndOfWindowOrGarbageCollectionHolds(context, false /*paneIsEmpty*/);
+ addGarbageCollectionHold(context, false /*paneIsEmpty*/);
}
/**
@@ -497,7 +334,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
oldHold = extraHold;
}
if (oldHold == null || oldHold.isAfter(context.window().maxTimestamp())) {
- // If no hold (eg because all elements came in behind the output watermark), or
+ // If no hold (eg because all elements came in before the output watermark), or
// the hold was for garbage collection, take the end of window as the result.
WindowTracing.debug(
"WatermarkHold.extractAndRelease.read: clipping from {} to end of window "
@@ -514,9 +351,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
@Nullable Instant newHold = null;
if (!isFinished) {
- // Only need to leave behind an end-of-window or garbage collection hold
- // if future elements will be processed.
- newHold = addEndOfWindowOrGarbageCollectionHolds(context, true /*paneIsEmpty*/);
+ newHold = addGarbageCollectionHold(context, true /*paneIsEmpty*/);
}
return new OldAndNewHolds(oldHold, newHold);
http://git-wip-us.apache.org/repos/asf/beam/blob/9c994ca4/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 2341502..982e934 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -25,6 +25,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
@@ -84,7 +85,6 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
@@ -670,9 +670,9 @@ public class ReduceFnRunnerTest {
TestOptions options = PipelineOptionsFactory.as(TestOptions.class);
options.setValue(expectedValue);
- when(mockSideInputReader.contains(Matchers.<PCollectionView<Integer>>any())).thenReturn(true);
- when(mockSideInputReader.get(
- Matchers.<PCollectionView<Integer>>any(), any(BoundedWindow.class)))
+ when(mockSideInputReader.contains(org.mockito.Matchers.any(PCollectionView.class)))
+ .thenReturn(true);
+ when(mockSideInputReader.get(any(PCollectionView.class), any(BoundedWindow.class)))
.then(
new Answer<Integer>() {
@Override
@@ -721,9 +721,13 @@ public class ReduceFnRunnerTest {
MetricsContainerImpl container = new MetricsContainerImpl("any");
MetricsEnvironment.setCurrentContainer(container);
// Test handling of late data. Specifically, ensure the watermark hold is correct.
+ Duration allowedLateness = Duration.millis(10);
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine,
- AccumulationMode.ACCUMULATING_FIRED_PANES, Duration.millis(10),
+ ReduceFnTester.nonCombining(
+ FixedWindows.of(Duration.millis(10)),
+ mockTriggerStateMachine,
+ AccumulationMode.ACCUMULATING_FIRED_PANES,
+ allowedLateness,
ClosingBehavior.FIRE_IF_NON_EMPTY);
// Input watermark -> null
@@ -731,22 +735,27 @@ public class ReduceFnRunnerTest {
assertEquals(null, tester.getOutputWatermark());
// All on time data, verify watermark hold.
+ IntervalWindow expectedWindow = new IntervalWindow(new Instant(0), new Instant(10));
injectElement(tester, 1);
injectElement(tester, 3);
assertEquals(new Instant(1), tester.getWatermarkHold());
when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
injectElement(tester, 2);
List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
- assertThat(output, contains(
- isSingleWindowedValue(containsInAnyOrder(1, 2, 3),
- 1, // timestamp
- 0, // window start
- 10))); // window end
+ assertThat(
+ output,
+ contains(
+ isSingleWindowedValue(
+ containsInAnyOrder(1, 2, 3),
+ equalTo(new Instant(1)),
+ equalTo((BoundedWindow) expectedWindow))));
assertThat(output.get(0).getPane(),
equalTo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1)));
- // Holding for the end-of-window transition.
- assertEquals(new Instant(9), tester.getWatermarkHold());
+ // There is no end-of-window hold, but the timer set by the trigger holds the watermark
+ assertThat(
+ tester.getWatermarkHold(), nullValue());
+
// Nothing dropped.
long droppedElements = container.getCounter(
MetricName.named(ReduceFnRunner.class,
@@ -763,9 +772,16 @@ public class ReduceFnRunnerTest {
tester.advanceInputWatermark(new Instant(4));
injectElement(tester, 2);
injectElement(tester, 3);
- assertEquals(new Instant(9), tester.getWatermarkHold());
+
+ // Late data has arrived behind the _output_ watermark. The ReduceFnRunner sets a GC hold
+ // since this data is not permitted to hold up the output watermark.
+ assertThat(
+ tester.getWatermarkHold(), equalTo(expectedWindow.maxTimestamp().plus(allowedLateness)));
+
+ // Now data just ahead of the output watermark arrives and sets an earlier "element" hold
injectElement(tester, 5);
assertEquals(new Instant(5), tester.getWatermarkHold());
+
when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
injectElement(tester, 4);
output = tester.extractOutput();
@@ -780,17 +796,29 @@ public class ReduceFnRunnerTest {
assertThat(output.get(0).getPane(),
equalTo(PaneInfo.createPane(false, false, Timing.EARLY, 1, -1)));
- // All late -- output at end of window timestamp.
+ // Since the element hold is cleared, there is no hold remaining
+ assertThat(tester.getWatermarkHold(), nullValue());
+
+ // All behind the output watermark -- hold is at GC time (if we imagine the
+ // trigger sets a timer for ON_TIME firing, that is actually when they'll be emitted)
when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false);
tester.advanceInputWatermark(new Instant(8));
injectElement(tester, 6);
injectElement(tester, 5);
- assertEquals(new Instant(9), tester.getWatermarkHold());
+ assertThat(
+ tester.getWatermarkHold(), equalTo(expectedWindow.maxTimestamp().plus(allowedLateness)));
+
injectElement(tester, 4);
// Fire the ON_TIME pane
when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
- tester.advanceInputWatermark(new Instant(10));
+
+ // To get an ON_TIME pane, we need the output watermark to be held back a little; this would
+ // be done by way of the timers set by the trigger, which are mocked here
+ tester.setAutoAdvanceOutputWatermark(false);
+
+ tester.advanceInputWatermark(expectedWindow.maxTimestamp().plus(1));
+ tester.fireTimer(expectedWindow, expectedWindow.maxTimestamp(), TimeDomain.EVENT_TIME);
// Output time is end of the window, because all the new data was late, but the pane
// is the ON_TIME pane.
@@ -806,6 +834,8 @@ public class ReduceFnRunnerTest {
assertThat(output.get(0).getPane(),
equalTo(PaneInfo.createPane(false, false, Timing.ON_TIME, 2, 0)));
+ tester.setAutoAdvanceOutputWatermark(true);
+
// This is "pending" at the time the watermark makes it way-late.
// Because we're about to expire the window, we output it.
when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false);
@@ -845,26 +875,32 @@ public class ReduceFnRunnerTest {
tester.assertHasOnlyGlobalAndFinishedSetsFor();
}
+ /** Make sure that if data comes in too late to make it on time, the hold is the GC time. */
@Test
public void dontSetHoldIfTooLateForEndOfWindowTimer() throws Exception {
- // Make sure holds are only set if they are accompanied by an end-of-window timer.
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine,
- AccumulationMode.ACCUMULATING_FIRED_PANES, Duration.millis(10),
+ ReduceFnTester.nonCombining(
+ FixedWindows.of(Duration.millis(10)),
+ mockTriggerStateMachine,
+ AccumulationMode.ACCUMULATING_FIRED_PANES,
+ Duration.millis(10),
ClosingBehavior.FIRE_ALWAYS);
tester.setAutoAdvanceOutputWatermark(false);
- // Case: Unobservably late
+ // Case: Unobservably "late" relative to input watermark, but on time for output watermark
tester.advanceInputWatermark(new Instant(15));
tester.advanceOutputWatermark(new Instant(11));
+
+ IntervalWindow expectedWindow = new IntervalWindow(new Instant(10), new Instant(20));
injectElement(tester, 14);
// Hold was applied, waiting for end-of-window timer.
assertEquals(new Instant(14), tester.getWatermarkHold());
- assertEquals(new Instant(19), tester.getNextTimer(TimeDomain.EVENT_TIME));
- // Trigger the end-of-window timer.
+ // Trigger the end-of-window timer, fire a timer as though the mock trigger set it
when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
tester.advanceInputWatermark(new Instant(20));
+ tester.fireTimer(expectedWindow, expectedWindow.maxTimestamp(), TimeDomain.EVENT_TIME);
+
when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false);
// Hold has been replaced with garbage collection hold. Waiting for garbage collection.
assertEquals(new Instant(29), tester.getWatermarkHold());
@@ -887,10 +923,15 @@ public class ReduceFnRunnerTest {
@Test
public void testPaneInfoAllStates() throws Exception {
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine,
- AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100),
+ ReduceFnTester.nonCombining(
+ FixedWindows.of(Duration.millis(10)),
+ mockTriggerStateMachine,
+ AccumulationMode.DISCARDING_FIRED_PANES,
+ Duration.millis(100),
ClosingBehavior.FIRE_IF_NON_EMPTY);
+ IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
+
tester.advanceInputWatermark(new Instant(0));
when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
injectElement(tester, 1);
@@ -903,7 +944,9 @@ public class ReduceFnRunnerTest {
WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.EARLY, 1, -1))));
when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false);
+ tester.setAutoAdvanceOutputWatermark(false);
tester.advanceInputWatermark(new Instant(15));
+
when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
injectElement(tester, 3);
assertThat(tester.extractOutput(), contains(
@@ -911,6 +954,7 @@ public class ReduceFnRunnerTest {
PaneInfo.createPane(false, false, Timing.ON_TIME, 2, 0))));
when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
+ tester.setAutoAdvanceOutputWatermark(true);
injectElement(tester, 4);
assertThat(tester.extractOutput(), contains(
WindowMatchers.valueWithPaneInfo(
@@ -1032,6 +1076,54 @@ public class ReduceFnRunnerTest {
WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10)));
}
+ /**
+ * If the trigger does not care about the watermark, the ReduceFnRunner should still emit an
+ * element for the ON_TIME pane.
+ */
+ @Test
+ public void testNoWatermarkTriggerNoHold() throws Exception {
+ Duration allowedLateness = Duration.standardDays(1);
+ ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+ ReduceFnTester.nonCombining(
+ WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
+ .withTrigger(
+ Repeatedly.forever(
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(Duration.standardSeconds(5))))
+ .withAllowedLateness(allowedLateness));
+
+ // First, an element comes in on time in [0, 10) but ReduceFnRunner should
+ // not set a hold or timer for 9. That is the trigger's job.
+ IntervalWindow expectedWindow = new IntervalWindow(new Instant(0), new Instant(10));
+ tester.advanceInputWatermark(new Instant(0));
+ tester.advanceProcessingTime(new Instant(0));
+
+ tester.injectElements(TimestampedValue.of(1, new Instant(1)));
+
+ // Since some data arrived, the element hold will be the end of the window.
+ assertThat(tester.getWatermarkHold(), equalTo(expectedWindow.maxTimestamp()));
+
+ tester.advanceProcessingTime(new Instant(6000));
+
+ // Sanity check; we aren't trying to verify output in this test
+ assertThat(tester.getOutputSize(), equalTo(1));
+
+ // Since we did not request empty final panes, no hold
+ assertThat(tester.getWatermarkHold(), nullValue());
+
+ // So when the input watermark advanced, the output advances with it (automated by tester)
+ tester.advanceInputWatermark(
+ new Instant(expectedWindow.maxTimestamp().plus(Duration.standardHours(1))));
+
+ // Now late data arrives
+ tester.injectElements(TimestampedValue.of(3, new Instant(3)));
+
+ // The ReduceFnRunner should set a GC hold since the element was too late and its timestamp
+ // will be ignored for the purposes of the watermark hold
+ assertThat(
+ tester.getWatermarkHold(), equalTo(expectedWindow.maxTimestamp().plus(allowedLateness)));
+ }
+
@Test
public void testPaneInfoAllStatesAfterWatermarkAccumulating() throws Exception {
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = ReduceFnTester.nonCombining(
@@ -1216,32 +1308,39 @@ public class ReduceFnRunnerTest {
*/
@Test
public void testMergingWithCloseTrigger() throws Exception {
+ Duration allowedLateness = Duration.millis(50);
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)),
+ ReduceFnTester.nonCombining(
+ Sessions.withGapDuration(Duration.millis(10)),
mockTriggerStateMachine,
- AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
- ClosingBehavior.FIRE_IF_NON_EMPTY);
+ AccumulationMode.DISCARDING_FIRED_PANES,
+ allowedLateness,
+ ClosingBehavior.FIRE_IF_NON_EMPTY);
// Create a new merged session window.
- tester.injectElements(TimestampedValue.of(1, new Instant(1)),
- TimestampedValue.of(2, new Instant(2)));
+ IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(12));
+ tester.injectElements(
+ TimestampedValue.of(1, new Instant(1)), TimestampedValue.of(2, new Instant(2)));
// Force the trigger to be closed for the merged window.
when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
triggerShouldFinish(mockTriggerStateMachine);
+
+ // Fire and end-of-window timer as though the trigger set it
tester.advanceInputWatermark(new Instant(13));
+ tester.fireTimer(mergedWindow, mergedWindow.maxTimestamp(), TimeDomain.EVENT_TIME);
// Trigger is now closed.
- assertTrue(tester.isMarkedFinished(new IntervalWindow(new Instant(1), new Instant(12))));
+ assertTrue(tester.isMarkedFinished(mergedWindow));
when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false);
// Revisit the same session window.
- tester.injectElements(TimestampedValue.of(1, new Instant(1)),
- TimestampedValue.of(2, new Instant(2)));
+ tester.injectElements(
+ TimestampedValue.of(1, new Instant(1)), TimestampedValue.of(2, new Instant(2)));
// Trigger is still closed.
- assertTrue(tester.isMarkedFinished(new IntervalWindow(new Instant(1), new Instant(12))));
+ assertTrue(tester.isMarkedFinished(mergedWindow));
}
/**
@@ -1250,11 +1349,16 @@ public class ReduceFnRunnerTest {
*/
@Test
public void testMergingWithReusedWindow() throws Exception {
+ Duration allowedLateness = Duration.millis(50);
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)),
+ ReduceFnTester.nonCombining(
+ Sessions.withGapDuration(Duration.millis(10)),
mockTriggerStateMachine,
- AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
- ClosingBehavior.FIRE_IF_NON_EMPTY);
+ AccumulationMode.DISCARDING_FIRED_PANES,
+ allowedLateness,
+ ClosingBehavior.FIRE_IF_NON_EMPTY);
+
+ IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(11));
// One elements in one session window.
tester.injectElements(TimestampedValue.of(1, new Instant(1))); // in [1, 11), gc at 21.
@@ -1263,6 +1367,7 @@ public class ReduceFnRunnerTest {
when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
triggerShouldFinish(mockTriggerStateMachine);
tester.advanceInputWatermark(new Instant(15));
+ tester.fireTimer(mergedWindow, mergedWindow.maxTimestamp(), TimeDomain.EVENT_TIME);
// Another element in the same session window.
// Should be discarded with 'window closed'.
@@ -1276,11 +1381,13 @@ public class ReduceFnRunnerTest {
List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
assertThat(output.size(), equalTo(1));
- assertThat(output.get(0),
- isSingleWindowedValue(containsInAnyOrder(1),
- 1, // timestamp
- 1, // window start
- 11)); // window end
+ assertThat(
+ output.get(0),
+ isSingleWindowedValue(
+ containsInAnyOrder(1),
+ equalTo(new Instant(1)), // timestamp
+ equalTo((BoundedWindow) mergedWindow)));
+
assertThat(
output.get(0).getPane(),
equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
[4/5] beam git commit: Tidy a troublesome TestStreamTest
Posted by ke...@apache.org.
Tidy a troublesome TestStreamTest
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/555ba40d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/555ba40d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/555ba40d
Branch: refs/heads/master
Commit: 555ba40d5934694476b5337b88276625252d684e
Parents: a593e49
Author: Kenneth Knowles <ke...@apache.org>
Authored: Fri Oct 27 10:51:38 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Mon Nov 13 15:03:21 2017 -0800
----------------------------------------------------------------------
.../apache/beam/sdk/testing/TestStreamTest.java | 25 +++++++++++---------
1 file changed, 14 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/555ba40d/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
index bef6aa0..2f147dc 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
@@ -240,21 +240,24 @@ public class TestStreamTest implements Serializable {
@Category({NeedsRunner.class, UsesTestStream.class})
public void testElementsAtAlmostPositiveInfinity() {
Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp();
- TestStream<String> stream = TestStream.create(StringUtf8Coder.of())
- .addElements(TimestampedValue.of("foo", endOfGlobalWindow),
- TimestampedValue.of("bar", endOfGlobalWindow))
- .advanceWatermarkToInfinity();
+ TestStream<String> stream =
+ TestStream.create(StringUtf8Coder.of())
+ .addElements(
+ TimestampedValue.of("foo", endOfGlobalWindow),
+ TimestampedValue.of("bar", endOfGlobalWindow))
+ .advanceWatermarkToInfinity();
FixedWindows windows = FixedWindows.of(Duration.standardHours(6));
- PCollection<String> windowedValues = p.apply(stream)
- .apply(Window.<String>into(windows))
- .apply(WithKeys.<Integer, String>of(1))
- .apply(GroupByKey.<Integer, String>create())
- .apply(Values.<Iterable<String>>create())
- .apply(Flatten.<String>iterables());
+ PCollection<String> windowedValues =
+ p.apply(stream)
+ .apply(Window.<String>into(windows))
+ .apply(WithKeys.<Integer, String>of(1))
+ .apply(GroupByKey.<Integer, String>create())
+ .apply(Values.<Iterable<String>>create())
+ .apply(Flatten.<String>iterables());
PAssert.that(windowedValues)
- .inWindow(windows.assignWindow(GlobalWindow.INSTANCE.maxTimestamp()))
+ .inWindow(windows.assignWindow(endOfGlobalWindow))
.containsInAnyOrder("foo", "bar");
p.run();
}
[2/5] beam git commit: Truncate the very last fixed window if it goes
beyond representable time
Posted by ke...@apache.org.
Truncate the very last fixed window if it goes beyond representable time
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/296cba00
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/296cba00
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/296cba00
Branch: refs/heads/master
Commit: 296cba009a5c979223fb61bd411816169eaad515
Parents: 555ba40
Author: Kenneth Knowles <ke...@apache.org>
Authored: Fri Oct 27 10:51:45 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Mon Nov 13 15:03:21 2017 -0800
----------------------------------------------------------------------
.../beam/runners/core/LateDataUtilsTest.java | 2 +-
.../sdk/transforms/windowing/FixedWindows.java | 24 +++++++++++++++++---
.../transforms/windowing/FixedWindowsTest.java | 12 ++++++++++
3 files changed, 34 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/296cba00/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java
index f0f315d..cef865c 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java
@@ -64,7 +64,7 @@ public class LateDataUtilsTest {
IntervalWindow window = windowFn.assignWindow(new Instant(BoundedWindow.TIMESTAMP_MAX_VALUE));
assertThat(
window.maxTimestamp(),
- Matchers.<ReadableInstant>greaterThan(GlobalWindow.INSTANCE.maxTimestamp()));
+ equalTo(GlobalWindow.INSTANCE.maxTimestamp()));
assertThat(
LateDataUtils.garbageCollectionTime(window, strategy),
equalTo(GlobalWindow.INSTANCE.maxTimestamp()));
http://git-wip-us.apache.org/repos/asf/beam/blob/296cba00/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java
index 8b16916..6c9376c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java
@@ -76,9 +76,27 @@ public class FixedWindows extends PartitioningWindowFn<Object, IntervalWindow> {
@Override
public IntervalWindow assignWindow(Instant timestamp) {
- long start = timestamp.getMillis()
- - timestamp.plus(size).minus(offset).getMillis() % size.getMillis();
- return new IntervalWindow(new Instant(start), size);
+ Instant start =
+ new Instant(
+ timestamp.getMillis()
+ - timestamp.plus(size).minus(offset).getMillis() % size.getMillis());
+
+
+ // The global window is inclusive of max timestamp, while interval window excludes its
+ // upper bound
+ Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp().plus(1);
+
+ // The end of the window is either start + size if that is within the allowable range, otherwise
+ // the end of the global window. Truncating the window drives many other
+ // areas of this system in the appropriate way automatically.
+ //
+ // Though it is curious that the very last representable fixed window is shorter than the rest,
+ // when we are processing data in the year 294247, we'll probably have technology that can
+ // account for this.
+ Instant end =
+ start.isAfter(endOfGlobalWindow.minus(size)) ? endOfGlobalWindow : start.plus(size);
+
+ return new IntervalWindow(start, end);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/296cba00/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java
index 80a534c..8dc02f9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java
@@ -107,6 +107,18 @@ public class FixedWindowsTest {
assertThat(mapping.maximumLookback(), equalTo(Duration.ZERO));
}
+ /** Tests that the last hour of the universe in fact ends at the end of time. */
+ @Test
+ public void testEndOfTime() {
+ Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp();
+ FixedWindows windowFn = FixedWindows.of(Duration.standardHours(1));
+
+ IntervalWindow truncatedWindow =
+ windowFn.assignWindow(endOfGlobalWindow.minus(1));
+
+ assertThat(truncatedWindow.maxTimestamp(), equalTo(endOfGlobalWindow));
+ }
+
@Test
public void testDefaultWindowMappingFnGlobalWindow() {
PartitioningWindowFn<?, ?> windowFn = FixedWindows.of(Duration.standardMinutes(20L));
[5/5] beam git commit: This closes #3988: [BEAM-3052] ReduceFnRunner:
Do not manage EOW hold or timer, set GC hold and timer always
Posted by ke...@apache.org.
This closes #3988: [BEAM-3052] ReduceFnRunner: Do not manage EOW hold or timer, set GC hold and timer always
ReduceFnRunner: Do not manage EOW hold or timer, set GC hold and timer always
Truncate the very last fixed window if it goes beyond representable time
Tidy a troublesome TestStreamTest
Fix expectations in CombineTest
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a5254e73
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a5254e73
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a5254e73
Branch: refs/heads/master
Commit: a5254e730065d8b15899b094ee54011797fffaa1
Parents: 219b9ca 9c994ca
Author: Kenneth Knowles <ke...@apache.org>
Authored: Mon Nov 13 15:03:37 2017 -0800
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Mon Nov 13 15:03:37 2017 -0800
----------------------------------------------------------------------
.../beam/runners/core/ReduceFnRunner.java | 72 ++----
.../apache/beam/runners/core/WatermarkHold.java | 221 +++----------------
.../beam/runners/core/LateDataUtilsTest.java | 2 +-
.../beam/runners/core/ReduceFnRunnerTest.java | 191 ++++++++++++----
.../sdk/transforms/windowing/FixedWindows.java | 24 +-
.../apache/beam/sdk/testing/TestStreamTest.java | 25 ++-
.../apache/beam/sdk/transforms/CombineTest.java | 2 +-
.../transforms/windowing/FixedWindowsTest.java | 12 +
8 files changed, 246 insertions(+), 303 deletions(-)
----------------------------------------------------------------------
[3/5] beam git commit: Fix expectations in CombineTest
Posted by ke...@apache.org.
Fix expectations in CombineTest
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a593e490
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a593e490
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a593e490
Branch: refs/heads/master
Commit: a593e4908477e897f71a0847b6502fc5eef486ad
Parents: 219b9ca
Author: Kenneth Knowles <ke...@apache.org>
Authored: Mon Oct 16 16:45:46 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Mon Nov 13 15:03:21 2017 -0800
----------------------------------------------------------------------
.../src/test/java/org/apache/beam/sdk/transforms/CombineTest.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a593e490/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index 52fedc6..2b5ab5a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -451,7 +451,7 @@ public class CombineTest implements Serializable {
.apply(Window.<Integer>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.accumulatingFiredPanes()
- .withAllowedLateness(new Duration(0)))
+ .withAllowedLateness(new Duration(0), ClosingBehavior.FIRE_ALWAYS))
.apply(Sum.integersGlobally())
.apply(ParDo.of(new FormatPaneInfo()));