You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/03/25 21:48:42 UTC
[1/3] incubator-beam git commit: Basic non-null checks
Repository: incubator-beam
Updated Branches:
refs/heads/master 00f608f05 -> 49d82baf1
Basic non-null checks
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1c89a1b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1c89a1b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1c89a1b3
Branch: refs/heads/master
Commit: 1c89a1b3ac0ff296003ae443e6b4763f501b8ada
Parents: 00f608f
Author: Mark Shields <ma...@google.com>
Authored: Wed Mar 2 20:45:59 2016 -0800
Committer: Mark Shields <ma...@google.com>
Committed: Fri Mar 25 12:28:48 2016 -0700
----------------------------------------------------------------------
.../cloud/dataflow/sdk/util/ReduceFnRunner.java | 51 +++++++++++++++++-
.../cloud/dataflow/sdk/util/TriggerRunner.java | 2 +
.../cloud/dataflow/sdk/util/WatermarkHold.java | 55 +++++++++++++++-----
3 files changed, 92 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c89a1b3/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
index 2e2d1f6..f1d4582 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
@@ -499,6 +499,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
directContext.timestamp(),
directContext.timers(),
directContext.state());
+
+ // At this point, if triggerRunner.shouldFire before the processValue then
+ // triggerRunner.shouldFire after the processValue. In other words adding values
+ // cannot take a trigger state from firing to non-firing.
+ // (We don't actually assert this since it is too slow.)
}
return windows;
@@ -568,6 +573,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
&& timer.getTimestamp().equals(window.maxTimestamp());
if (isEndOfWindow) {
+ // If the window strategy trigger includes a watermark trigger then at this point
+ // there should be no data holds, either because we'd already cleared them on an
+ // earlier onTrigger, or because we just cleared them on the above emitIfAppropriate.
+ // We could assert this but it is very expensive.
+
// Since we are processing an on-time firing we should schedule the garbage collection
// timer. (If getAllowedLateness is zero then the timer event will be considered a
// cleanup event and handled by the above).
@@ -715,8 +725,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
boolean isFinished)
throws Exception {
+ Instant inputWM = timerInternals.currentInputWatermarkTime();
+ Preconditions.checkNotNull(inputWM);
+
// Prefetch necessary states
- ReadableState<Instant> outputTimestampFuture =
+ ReadableState<WatermarkHold.OldAndNewHolds> outputTimestampFuture =
watermarkHold.extractAndRelease(renamedContext, isFinished).readLater();
ReadableState<PaneInfo> paneFuture =
paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater();
@@ -729,7 +742,41 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
// Calculate the pane info.
final PaneInfo pane = paneFuture.read();
// Extract the window hold, and as a side effect clear it.
- final Instant outputTimestamp = outputTimestampFuture.read();
+
+ WatermarkHold.OldAndNewHolds pair = outputTimestampFuture.read();
+ final Instant outputTimestamp = pair.oldHold;
+ @Nullable Instant newHold = pair.newHold;
+
+ if (newHold != null && inputWM != null) {
+ // We can't be finished yet.
+ Preconditions.checkState(
+ !isFinished, "new hold at %s but finished %s", newHold, directContext.window());
+ // The hold cannot be behind the input watermark.
+ Preconditions.checkState(
+ !newHold.isBefore(inputWM), "new hold %s is before input watermark %s", newHold, inputWM);
+ if (newHold.isAfter(directContext.window().maxTimestamp())) {
+ // The hold must be for garbage collection, which can't have happened yet.
+ Preconditions.checkState(
+ newHold.isEqual(
+ directContext.window().maxTimestamp().plus(windowingStrategy.getAllowedLateness())),
+ "new hold %s should be at garbage collection for window %s plus %s",
+ newHold,
+ directContext.window(),
+ windowingStrategy.getAllowedLateness());
+ } else {
+ // The hold must be for the end-of-window, which can't have happened yet.
+ Preconditions.checkState(
+ newHold.isEqual(directContext.window().maxTimestamp()),
+ "new hold %s should be at end of window %s",
+ newHold,
+ directContext.window());
+ Preconditions.checkState(
+ !isEndOfWindow,
+ "new hold at %s for %s but this is the watermark trigger",
+ newHold,
+ directContext.window());
+ }
+ }
// Only emit a pane if it has data or empty panes are observable.
if (needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c89a1b3/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java
index dcfd035..8fc4981 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java
@@ -172,6 +172,8 @@ public class TriggerRunner<W extends BoundedWindow> {
}
public void onFire(W window, Timers timers, StateAccessor<?> state) throws Exception {
+ // shouldFire should be false.
+ // However it is too expensive to assert.
FinishedTriggersBitSet finishedSet =
readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
Trigger<W>.TriggerContext context = contextFactory.base(window, timers,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c89a1b3/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java
index d537ddb..31e36c5 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java
@@ -228,6 +228,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
Instant outputWM = timerInternals.currentOutputWatermarkTime();
Instant inputWM = timerInternals.currentInputWatermarkTime();
+ Preconditions.checkNotNull(inputWM);
// Only add the hold if we can be sure:
// - the backend will be able to respect it
@@ -287,6 +288,8 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
// by the end of window (ie the end of window is at or ahead of the input watermark).
Instant outputWM = timerInternals.currentOutputWatermarkTime();
Instant inputWM = timerInternals.currentInputWatermarkTime();
+ Preconditions.checkNotNull(inputWM);
+
String which;
boolean tooLate;
Instant eowHold = context.window().maxTimestamp();
@@ -329,6 +332,8 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
Instant gcHold = context.window().maxTimestamp().plus(windowingStrategy.getAllowedLateness());
Instant outputWM = timerInternals.currentOutputWatermarkTime();
Instant inputWM = timerInternals.currentInputWatermarkTime();
+ Preconditions.checkNotNull(inputWM);
+
WindowTracing.trace(
"WatermarkHold.addGarbageCollectionHold: garbage collection at {} hold for "
+ "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
@@ -369,6 +374,19 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
}
/**
+ * Result of {@link #extractAndRelease}.
+ */
+ public static class OldAndNewHolds {
+ public final Instant oldHold;
+ @Nullable public final Instant newHold;
+
+ public OldAndNewHolds(Instant oldHold, @Nullable Instant newHold) {
+ this.oldHold = oldHold;
+ this.newHold = newHold;
+ }
+ }
+
+ /**
* Return (a future for) the earliest hold for {@code context}. Clear all the holds after
* reading, but add/restore an end-of-window or garbage collection hold if required.
*
@@ -377,7 +395,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
* elements in the current pane. If there is no such value the timestamp is the end
* of the window.
*/
- public ReadableState<Instant> extractAndRelease(
+ public ReadableState<OldAndNewHolds> extractAndRelease(
final ReduceFn<?, ?, ?, W>.Context context, final boolean isFinished) {
WindowTracing.debug(
"extractAndRelease: for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
@@ -385,38 +403,38 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
timerInternals.currentOutputWatermarkTime());
final WatermarkHoldState<W> elementHoldState = context.state().access(elementHoldTag);
final WatermarkHoldState<BoundedWindow> extraHoldState = context.state().access(EXTRA_HOLD_TAG);
- return new ReadableState<Instant>() {
+ return new ReadableState<OldAndNewHolds>() {
@Override
- public ReadableState<Instant> readLater() {
+ public ReadableState<OldAndNewHolds> readLater() {
elementHoldState.readLater();
extraHoldState.readLater();
return this;
}
@Override
- public Instant read() {
+ public OldAndNewHolds read() {
// Read both the element and extra holds.
Instant elementHold = elementHoldState.read();
Instant extraHold = extraHoldState.read();
- Instant hold;
+ Instant oldHold;
// Find the minimum, accounting for null.
if (elementHold == null) {
- hold = extraHold;
+ oldHold = extraHold;
} else if (extraHold == null) {
- hold = elementHold;
+ oldHold = elementHold;
} else if (elementHold.isBefore(extraHold)) {
- hold = elementHold;
+ oldHold = elementHold;
} else {
- hold = extraHold;
+ oldHold = extraHold;
}
- if (hold == null || hold.isAfter(context.window().maxTimestamp())) {
+ if (oldHold == null || oldHold.isAfter(context.window().maxTimestamp())) {
// If no hold (eg because all elements came in behind the output watermark), or
// the hold was for garbage collection, take the end of window as the result.
WindowTracing.debug(
"WatermarkHold.extractAndRelease.read: clipping from {} to end of window "
+ "for key:{}; window:{}",
- hold, context.key(), context.window());
- hold = context.window().maxTimestamp();
+ oldHold, context.key(), context.window());
+ oldHold = context.window().maxTimestamp();
}
WindowTracing.debug("WatermarkHold.extractAndRelease.read: clearing for key:{}; window:{}",
context.key(), context.window());
@@ -425,13 +443,14 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
elementHoldState.clear();
extraHoldState.clear();
+ @Nullable Instant newHold = null;
if (!isFinished) {
// Only need to leave behind an end-of-window or garbage collection hold
// if future elements will be processed.
- addEndOfWindowOrGarbageCollectionHolds(context);
+ newHold = addEndOfWindowOrGarbageCollectionHolds(context);
}
- return hold;
+ return new OldAndNewHolds(oldHold, newHold);
}
};
}
@@ -447,4 +466,12 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
context.state().access(elementHoldTag).clear();
context.state().access(EXTRA_HOLD_TAG).clear();
}
+
+ /**
+ * Return the current data hold, or null if none. Does not clear. For debugging only.
+ */
+ @Nullable
+ public Instant getDataCurrent(ReduceFn<?, ?, ?, W>.Context context) {
+ return context.state().access(elementHoldTag).read();
+ }
}
[3/3] incubator-beam git commit: This closes #30
Posted by ke...@apache.org.
This closes #30
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/49d82baf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/49d82baf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/49d82baf
Branch: refs/heads/master
Commit: 49d82baf1b2dcb81db086059d03de89336c3a65b
Parents: 00f608f 34b3301
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Mar 25 13:37:38 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Mar 25 13:37:38 2016 -0700
----------------------------------------------------------------------
.../inprocess/InMemoryWatermarkManager.java | 3 +-
.../inprocess/InProcessTimerInternals.java | 1 -
.../sdk/util/LateDataDroppingDoFnRunner.java | 3 +-
.../dataflow/sdk/util/PaneInfoTracker.java | 2 +-
.../sdk/util/ReduceFnContextFactory.java | 1 -
.../cloud/dataflow/sdk/util/ReduceFnRunner.java | 65 +++++++++++++++++---
.../cloud/dataflow/sdk/util/TimerInternals.java | 4 +-
.../google/cloud/dataflow/sdk/util/Timers.java | 3 +-
.../sdk/util/TriggerContextFactory.java | 1 -
.../cloud/dataflow/sdk/util/TriggerRunner.java | 2 +
.../cloud/dataflow/sdk/util/WatermarkHold.java | 58 ++++++++++++-----
.../dataflow/sdk/io/CountingInputTest.java | 1 +
.../cloud/dataflow/sdk/util/ReduceFnTester.java | 8 +--
.../cloud/dataflow/sdk/util/TriggerTester.java | 10 ++-
14 files changed, 114 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
[2/3] incubator-beam git commit: Input watermarks can never be null.
Posted by ke...@apache.org.
Input watermarks can never be null.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/34b33014
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/34b33014
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/34b33014
Branch: refs/heads/master
Commit: 34b3301413a16be6697c5d20896f0f85ddc65cf1
Parents: 1c89a1b
Author: Mark Shields <ma...@google.com>
Authored: Fri Mar 4 15:16:33 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Mar 25 13:37:18 2016 -0700
----------------------------------------------------------------------
.../inprocess/InMemoryWatermarkManager.java | 3 ++-
.../inprocess/InProcessTimerInternals.java | 1 -
.../sdk/util/LateDataDroppingDoFnRunner.java | 3 +--
.../cloud/dataflow/sdk/util/PaneInfoTracker.java | 2 +-
.../dataflow/sdk/util/ReduceFnContextFactory.java | 1 -
.../cloud/dataflow/sdk/util/ReduceFnRunner.java | 18 +++++++++---------
.../cloud/dataflow/sdk/util/TimerInternals.java | 4 ++--
.../google/cloud/dataflow/sdk/util/Timers.java | 3 +--
.../dataflow/sdk/util/TriggerContextFactory.java | 1 -
.../cloud/dataflow/sdk/util/WatermarkHold.java | 9 +++------
.../cloud/dataflow/sdk/io/CountingInputTest.java | 1 +
.../cloud/dataflow/sdk/util/ReduceFnTester.java | 8 +++-----
.../cloud/dataflow/sdk/util/TriggerTester.java | 10 ++++------
13 files changed, 27 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
index a9a62a6..c4d67db 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
@@ -28,6 +28,7 @@ import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
@@ -994,7 +995,7 @@ public class InMemoryWatermarkManager {
* Returns the input watermark of the {@link AppliedPTransform}.
*/
public Instant getInputWatermark() {
- return inputWatermark.get();
+ return Preconditions.checkNotNull(inputWatermark.get());
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java
index 06ba7b8..1d075c5 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java
@@ -70,7 +70,6 @@ public class InProcessTimerInternals implements TimerInternals {
}
@Override
- @Nullable
public Instant currentInputWatermarkTime() {
return watermarks.getInputWatermark();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java
index 31927ab..3dfa064 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java
@@ -138,8 +138,7 @@ public class LateDataDroppingDoFnRunner<K, InputT, OutputT, W extends BoundedWin
/** Is {@code window} expired w.r.t. the garbage collection watermark? */
private boolean canDropDueToExpiredWindow(BoundedWindow window) {
Instant inputWM = timerInternals.currentInputWatermarkTime();
- return inputWM != null
- && window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).isBefore(inputWM);
+ return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).isBefore(inputWM);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java
index a7818a3..9fa36b0 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java
@@ -103,7 +103,7 @@ public class PaneInfoTracker {
boolean onlyEarlyPanesSoFar = previousTiming == null || previousTiming == Timing.EARLY;
// True is the input watermark hasn't passed the window's max timestamp.
- boolean isEarlyForInput = inputWM == null || !inputWM.isAfter(windowMaxTimestamp);
+ boolean isEarlyForInput = !inputWM.isAfter(windowMaxTimestamp);
Timing timing;
if (isLateForOutput || !onlyEarlyPanesSoFar) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java
index bdbaf10..7649d52 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java
@@ -146,7 +146,6 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
}
@Override
- @Nullable
public Instant currentEventTime() {
return timerInternals.currentInputWatermarkTime();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
index f1d4582..560d8ec 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
@@ -537,6 +537,10 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
"ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window {}", timer, window);
}
+ // If this is an end-of-window timer then, we need to set a GC timer
+ boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
+ && timer.getTimestamp().equals(window.maxTimestamp());
+
// If this is a garbage collection timer then we should trigger and garbage collect the window.
Instant cleanupTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
boolean isGarbageCollection =
@@ -553,7 +557,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
// We need to call onTrigger to emit the final pane if required.
// The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted,
// and the watermark has passed the end of the window.
- onTrigger(directContext, renamedContext, true/* isFinished */);
+ onTrigger(directContext, renamedContext, true/* isFinished */, isEndOfWindow);
}
// Cleanup flavor B: Clear all the remaining state for this window since we'll never
@@ -569,9 +573,6 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
emitIfAppropriate(directContext, renamedContext);
}
- // If this is an end-of-window timer then, we need to set a GC timer
- boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
- && timer.getTimestamp().equals(window.maxTimestamp());
if (isEndOfWindow) {
// If the window strategy trigger includes a watermark trigger then at this point
// there should be no data holds, either because we'd already cleared them on an
@@ -676,7 +677,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
// Run onTrigger to produce the actual pane contents.
// As a side effect it will clear all element holds, but not necessarily any
// end-of-window or garbage collection holds.
- onTrigger(directContext, renamedContext, isFinished);
+ onTrigger(directContext, renamedContext, isFinished, false /*isEndOfWindow*/);
// Now that we've triggered, the pane is empty.
nonEmptyPanes.clearPane(renamedContext.state());
@@ -723,10 +724,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
private void onTrigger(
final ReduceFn<K, InputT, OutputT, W>.Context directContext,
ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
- boolean isFinished)
+ boolean isFinished, boolean isEndOfWindow)
throws Exception {
Instant inputWM = timerInternals.currentInputWatermarkTime();
- Preconditions.checkNotNull(inputWM);
// Prefetch necessary states
ReadableState<WatermarkHold.OldAndNewHolds> outputTimestampFuture =
@@ -747,7 +747,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
final Instant outputTimestamp = pair.oldHold;
@Nullable Instant newHold = pair.newHold;
- if (newHold != null && inputWM != null) {
+ if (newHold != null) {
// We can't be finished yet.
Preconditions.checkState(
!isFinished, "new hold at %s but finished %s", newHold, directContext.window());
@@ -825,7 +825,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
Instant endOfWindow = directContext.window().maxTimestamp();
Instant fireTime;
String which;
- if (inputWM != null && endOfWindow.isBefore(inputWM)) {
+ if (endOfWindow.isBefore(inputWM)) {
fireTime = endOfWindow.plus(windowingStrategy.getAllowedLateness());
which = "garbage collection";
} else {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java
index c823ed3..b26e6e8 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java
@@ -79,10 +79,11 @@ public interface TimerInternals {
/**
* Return the current, local input watermark timestamp for this computation
- * in the {@link TimeDomain#EVENT_TIME} time domain. Return {@code null} if unknown.
+ * in the {@link TimeDomain#EVENT_TIME} time domain.
*
* <p>This value:
* <ol>
+ * <li>Is never {@literal null}, but may be {@link BoundedWindow#TIMESTAMP_MIN_VALUE}.
* <li>Is monotonically increasing.
* <li>May differ between workers due to network and other delays.
* <li>Will never be ahead of the global input watermark for this computation. But it
@@ -95,7 +96,6 @@ public interface TimerInternals {
* it is possible for an element to be considered locally on-time even though it is
* globally late.
*/
- @Nullable
Instant currentInputWatermarkTime();
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java
index 7d4b4f2..2ddf524 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java
@@ -54,7 +54,6 @@ public interface Timers {
@Nullable
public abstract Instant currentSynchronizedProcessingTime();
- /** Returns the current event time or {@code null} if unknown. */
- @Nullable
+ /** Returns the current event time. */
public abstract Instant currentEventTime();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java
index 64ff402..50e8b32 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java
@@ -209,7 +209,6 @@ public class TriggerContextFactory<W extends BoundedWindow> {
}
@Override
- @Nullable
public Instant currentEventTime() {
return timers.currentEventTime();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java
index 31e36c5..7f814c4 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java
@@ -228,7 +228,6 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
Instant outputWM = timerInternals.currentOutputWatermarkTime();
Instant inputWM = timerInternals.currentInputWatermarkTime();
- Preconditions.checkNotNull(inputWM);
// Only add the hold if we can be sure:
// - the backend will be able to respect it
@@ -242,7 +241,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
if (outputWM != null && elementHold.isBefore(outputWM)) {
which = "too late to effect output watermark";
tooLate = true;
- } else if (inputWM != null && context.window().maxTimestamp().isBefore(inputWM)) {
+ } else if (context.window().maxTimestamp().isBefore(inputWM)) {
which = "too late for end-of-window timer";
tooLate = true;
} else {
@@ -288,12 +287,11 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
// by the end of window (ie the end of window is at or ahead of the input watermark).
Instant outputWM = timerInternals.currentOutputWatermarkTime();
Instant inputWM = timerInternals.currentInputWatermarkTime();
- Preconditions.checkNotNull(inputWM);
String which;
boolean tooLate;
Instant eowHold = context.window().maxTimestamp();
- if (inputWM != null && eowHold.isBefore(inputWM)) {
+ if (eowHold.isBefore(inputWM)) {
which = "too late for end-of-window timer";
tooLate = true;
} else {
@@ -332,13 +330,12 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
Instant gcHold = context.window().maxTimestamp().plus(windowingStrategy.getAllowedLateness());
Instant outputWM = timerInternals.currentOutputWatermarkTime();
Instant inputWM = timerInternals.currentInputWatermarkTime();
- Preconditions.checkNotNull(inputWM);
WindowTracing.trace(
"WatermarkHold.addGarbageCollectionHold: garbage collection at {} hold for "
+ "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
gcHold, context.key(), context.window(), inputWM, outputWM);
- Preconditions.checkState(inputWM == null || !gcHold.isBefore(inputWM),
+ Preconditions.checkState(!gcHold.isBefore(inputWM),
"Garbage collection hold %s cannot be before input watermark %s", gcHold, inputWM);
context.state().access(EXTRA_HOLD_TAG).add(gcHold);
return gcHold;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java
index 1daadc7..cc60953 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java
@@ -1,3 +1,4 @@
+
/*
* Copyright (C) 2016 Google Inc.
*
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java
index d4620a7..4aeaa0c 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java
@@ -599,7 +599,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
/** Current input watermark. */
@Nullable
- private Instant inputWatermarkTime = null;
+ private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
/** Current output watermark. */
@Nullable
@@ -666,9 +666,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
}
@Override
- @Nullable
public Instant currentInputWatermarkTime() {
- return inputWatermarkTime;
+ return Preconditions.checkNotNull(inputWatermarkTime);
}
@Override
@@ -692,7 +691,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
ReduceFnRunner<?, ?, ?, ?> runner, Instant newInputWatermark) throws Exception {
Preconditions.checkNotNull(newInputWatermark);
Preconditions.checkState(
- inputWatermarkTime == null || !newInputWatermark.isBefore(inputWatermarkTime),
+ !newInputWatermark.isBefore(inputWatermarkTime),
"Cannot move input watermark time backwards from %s to %s", inputWatermarkTime,
newInputWatermark);
WindowTracing.trace("TestTimerInternals.advanceInputWatermark: from {} to {}",
@@ -713,7 +712,6 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
public void advanceOutputWatermark(Instant newOutputWatermark) {
Preconditions.checkNotNull(newOutputWatermark);
- Preconditions.checkNotNull(inputWatermarkTime);
if (newOutputWatermark.isAfter(inputWatermarkTime)) {
WindowTracing.trace(
"TestTimerInternals.advanceOutputWatermark: clipping output watermark from {} to {}",
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java
index 0c71830..f291438 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java
@@ -41,6 +41,7 @@ import com.google.cloud.dataflow.sdk.util.state.StateTag;
import com.google.cloud.dataflow.sdk.util.state.WatermarkHoldState;
import com.google.cloud.dataflow.sdk.values.TimestampedValue;
import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -428,7 +429,7 @@ public class TriggerTester<InputT, W extends BoundedWindow> {
/** Current input watermark. */
@Nullable
- private Instant inputWatermarkTime = null;
+ private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
/** Current output watermark. */
@Nullable
@@ -471,9 +472,8 @@ public class TriggerTester<InputT, W extends BoundedWindow> {
}
@Override
- @Nullable
public Instant currentInputWatermarkTime() {
- return inputWatermarkTime;
+ return Preconditions.checkNotNull(inputWatermarkTime);
}
@Override
@@ -495,7 +495,7 @@ public class TriggerTester<InputT, W extends BoundedWindow> {
public void advanceInputWatermark(Instant newInputWatermark) throws Exception {
checkNotNull(newInputWatermark);
- checkState(inputWatermarkTime == null || !newInputWatermark.isBefore(inputWatermarkTime),
+ checkState(!newInputWatermark.isBefore(inputWatermarkTime),
"Cannot move input watermark time backwards from %s to %s", inputWatermarkTime,
newInputWatermark);
WindowTracing.trace("TestTimerInternals.advanceInputWatermark: from {} to {}",
@@ -513,7 +513,6 @@ public class TriggerTester<InputT, W extends BoundedWindow> {
private void advanceOutputWatermark(Instant newOutputWatermark) throws Exception {
checkNotNull(newOutputWatermark);
- checkNotNull(inputWatermarkTime);
if (newOutputWatermark.isAfter(inputWatermarkTime)) {
WindowTracing.trace(
"TestTimerInternals.advanceOutputWatermark: clipping output watermark from {} to {}",
@@ -577,7 +576,6 @@ public class TriggerTester<InputT, W extends BoundedWindow> {
}
@Override
- @Nullable
public Instant currentEventTime() {
return timerInternals.currentInputWatermarkTime();
}