You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/03/25 21:48:42 UTC

[1/3] incubator-beam git commit: Basic non-null checks

Repository: incubator-beam
Updated Branches:
  refs/heads/master 00f608f05 -> 49d82baf1


Basic non-null checks


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1c89a1b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1c89a1b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1c89a1b3

Branch: refs/heads/master
Commit: 1c89a1b3ac0ff296003ae443e6b4763f501b8ada
Parents: 00f608f
Author: Mark Shields <ma...@google.com>
Authored: Wed Mar 2 20:45:59 2016 -0800
Committer: Mark Shields <ma...@google.com>
Committed: Fri Mar 25 12:28:48 2016 -0700

----------------------------------------------------------------------
 .../cloud/dataflow/sdk/util/ReduceFnRunner.java | 51 +++++++++++++++++-
 .../cloud/dataflow/sdk/util/TriggerRunner.java  |  2 +
 .../cloud/dataflow/sdk/util/WatermarkHold.java  | 55 +++++++++++++++-----
 3 files changed, 92 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c89a1b3/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
index 2e2d1f6..f1d4582 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
@@ -499,6 +499,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
           directContext.timestamp(),
           directContext.timers(),
           directContext.state());
+
+      // At this point, if triggerRunner.shouldFire before the processValue then
+      // triggerRunner.shouldFire after the processValue. In other words adding values
+      // cannot take a trigger state from firing to non-firing.
+      // (We don't actually assert this since it is too slow.)
     }
 
     return windows;
@@ -568,6 +573,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
       boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
           && timer.getTimestamp().equals(window.maxTimestamp());
       if (isEndOfWindow) {
+        // If the window strategy trigger includes a watermark trigger then at this point
+        // there should be no data holds, either because we'd already cleared them on an
+        // earlier onTrigger, or because we just cleared them on the above emitIfAppropriate.
+        // We could assert this but it is very expensive.
+
         // Since we are processing an on-time firing we should schedule the garbage collection
         // timer. (If getAllowedLateness is zero then the timer event will be considered a
         // cleanup event and handled by the above).
@@ -715,8 +725,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
       ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
       boolean isFinished)
           throws Exception {
+    Instant inputWM = timerInternals.currentInputWatermarkTime();
+    Preconditions.checkNotNull(inputWM);
+
     // Prefetch necessary states
-    ReadableState<Instant> outputTimestampFuture =
+    ReadableState<WatermarkHold.OldAndNewHolds> outputTimestampFuture =
         watermarkHold.extractAndRelease(renamedContext, isFinished).readLater();
     ReadableState<PaneInfo> paneFuture =
         paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater();
@@ -729,7 +742,41 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
     // Calculate the pane info.
     final PaneInfo pane = paneFuture.read();
     // Extract the window hold, and as a side effect clear it.
-    final Instant outputTimestamp = outputTimestampFuture.read();
+
+    WatermarkHold.OldAndNewHolds pair = outputTimestampFuture.read();
+    final Instant outputTimestamp = pair.oldHold;
+    @Nullable Instant newHold = pair.newHold;
+
+    if (newHold != null && inputWM != null) {
+      // We can't be finished yet.
+      Preconditions.checkState(
+        !isFinished, "new hold at %s but finished %s", newHold, directContext.window());
+      // The hold cannot be behind the input watermark.
+      Preconditions.checkState(
+        !newHold.isBefore(inputWM), "new hold %s is before input watermark %s", newHold, inputWM);
+      if (newHold.isAfter(directContext.window().maxTimestamp())) {
+        // The hold must be for garbage collection, which can't have happened yet.
+        Preconditions.checkState(
+          newHold.isEqual(
+            directContext.window().maxTimestamp().plus(windowingStrategy.getAllowedLateness())),
+          "new hold %s should be at garbage collection for window %s plus %s",
+          newHold,
+          directContext.window(),
+          windowingStrategy.getAllowedLateness());
+      } else {
+        // The hold must be for the end-of-window, which can't have happened yet.
+        Preconditions.checkState(
+          newHold.isEqual(directContext.window().maxTimestamp()),
+          "new hold %s should be at end of window %s",
+          newHold,
+          directContext.window());
+        Preconditions.checkState(
+          !isEndOfWindow,
+          "new hold at %s for %s but this is the watermark trigger",
+          newHold,
+          directContext.window());
+      }
+    }
 
     // Only emit a pane if it has data or empty panes are observable.
     if (needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c89a1b3/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java
index dcfd035..8fc4981 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java
@@ -172,6 +172,8 @@ public class TriggerRunner<W extends BoundedWindow> {
   }
 
   public void onFire(W window, Timers timers, StateAccessor<?> state) throws Exception {
+    // shouldFire should be false.
+    // However it is too expensive to assert.
     FinishedTriggersBitSet finishedSet =
         readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
     Trigger<W>.TriggerContext context = contextFactory.base(window, timers,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c89a1b3/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java
index d537ddb..31e36c5 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java
@@ -228,6 +228,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
 
     Instant outputWM = timerInternals.currentOutputWatermarkTime();
     Instant inputWM = timerInternals.currentInputWatermarkTime();
+    Preconditions.checkNotNull(inputWM);
 
     // Only add the hold if we can be sure:
     // - the backend will be able to respect it
@@ -287,6 +288,8 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
     // by the end of window (ie the end of window is at or ahead of the input watermark).
     Instant outputWM = timerInternals.currentOutputWatermarkTime();
     Instant inputWM = timerInternals.currentInputWatermarkTime();
+    Preconditions.checkNotNull(inputWM);
+
     String which;
     boolean tooLate;
     Instant eowHold = context.window().maxTimestamp();
@@ -329,6 +332,8 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
       Instant gcHold = context.window().maxTimestamp().plus(windowingStrategy.getAllowedLateness());
       Instant outputWM = timerInternals.currentOutputWatermarkTime();
       Instant inputWM = timerInternals.currentInputWatermarkTime();
+      Preconditions.checkNotNull(inputWM);
+
       WindowTracing.trace(
           "WatermarkHold.addGarbageCollectionHold: garbage collection at {} hold for "
           + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
@@ -369,6 +374,19 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
   }
 
   /**
+   * Result of {@link #extractAndRelease}.
+   */
+  public static class OldAndNewHolds {
+    public final Instant oldHold;
+    @Nullable public final Instant newHold;
+
+    public OldAndNewHolds(Instant oldHold, @Nullable Instant newHold) {
+      this.oldHold = oldHold;
+      this.newHold = newHold;
+    }
+  }
+
+  /**
    * Return (a future for) the earliest hold for {@code context}. Clear all the holds after
    * reading, but add/restore an end-of-window or garbage collection hold if required.
    *
@@ -377,7 +395,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
    * elements in the current pane. If there is no such value the timestamp is the end
    * of the window.
    */
-  public ReadableState<Instant> extractAndRelease(
+  public ReadableState<OldAndNewHolds> extractAndRelease(
       final ReduceFn<?, ?, ?, W>.Context context, final boolean isFinished) {
     WindowTracing.debug(
         "extractAndRelease: for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
@@ -385,38 +403,38 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
         timerInternals.currentOutputWatermarkTime());
     final WatermarkHoldState<W> elementHoldState = context.state().access(elementHoldTag);
     final WatermarkHoldState<BoundedWindow> extraHoldState = context.state().access(EXTRA_HOLD_TAG);
-    return new ReadableState<Instant>() {
+    return new ReadableState<OldAndNewHolds>() {
       @Override
-      public ReadableState<Instant> readLater() {
+      public ReadableState<OldAndNewHolds> readLater() {
         elementHoldState.readLater();
         extraHoldState.readLater();
         return this;
       }
 
       @Override
-      public Instant read() {
+      public OldAndNewHolds read() {
         // Read both the element and extra holds.
         Instant elementHold = elementHoldState.read();
         Instant extraHold = extraHoldState.read();
-        Instant hold;
+        Instant oldHold;
         // Find the minimum, accounting for null.
         if (elementHold == null) {
-          hold = extraHold;
+          oldHold = extraHold;
         } else if (extraHold == null) {
-          hold = elementHold;
+          oldHold = elementHold;
         } else if (elementHold.isBefore(extraHold)) {
-          hold = elementHold;
+          oldHold = elementHold;
         } else {
-          hold = extraHold;
+          oldHold = extraHold;
         }
-        if (hold == null || hold.isAfter(context.window().maxTimestamp())) {
+        if (oldHold == null || oldHold.isAfter(context.window().maxTimestamp())) {
           // If no hold (eg because all elements came in behind the output watermark), or
           // the hold was for garbage collection, take the end of window as the result.
           WindowTracing.debug(
               "WatermarkHold.extractAndRelease.read: clipping from {} to end of window "
               + "for key:{}; window:{}",
-              hold, context.key(), context.window());
-          hold = context.window().maxTimestamp();
+              oldHold, context.key(), context.window());
+          oldHold = context.window().maxTimestamp();
         }
         WindowTracing.debug("WatermarkHold.extractAndRelease.read: clearing for key:{}; window:{}",
             context.key(), context.window());
@@ -425,13 +443,14 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
         elementHoldState.clear();
         extraHoldState.clear();
 
+        @Nullable Instant newHold = null;
         if (!isFinished) {
           // Only need to leave behind an end-of-window or garbage collection hold
           // if future elements will be processed.
-          addEndOfWindowOrGarbageCollectionHolds(context);
+          newHold = addEndOfWindowOrGarbageCollectionHolds(context);
         }
 
-        return hold;
+        return new OldAndNewHolds(oldHold, newHold);
       }
     };
   }
@@ -447,4 +466,12 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
     context.state().access(elementHoldTag).clear();
     context.state().access(EXTRA_HOLD_TAG).clear();
   }
+
+  /**
+   * Return the current data hold, or null if none. Does not clear. For debugging only.
+   */
+  @Nullable
+  public Instant getDataCurrent(ReduceFn<?, ?, ?, W>.Context context) {
+    return context.state().access(elementHoldTag).read();
+  }
 }


[3/3] incubator-beam git commit: This closes #30

Posted by ke...@apache.org.
This closes #30


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/49d82baf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/49d82baf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/49d82baf

Branch: refs/heads/master
Commit: 49d82baf1b2dcb81db086059d03de89336c3a65b
Parents: 00f608f 34b3301
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Mar 25 13:37:38 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Mar 25 13:37:38 2016 -0700

----------------------------------------------------------------------
 .../inprocess/InMemoryWatermarkManager.java     |  3 +-
 .../inprocess/InProcessTimerInternals.java      |  1 -
 .../sdk/util/LateDataDroppingDoFnRunner.java    |  3 +-
 .../dataflow/sdk/util/PaneInfoTracker.java      |  2 +-
 .../sdk/util/ReduceFnContextFactory.java        |  1 -
 .../cloud/dataflow/sdk/util/ReduceFnRunner.java | 65 +++++++++++++++++---
 .../cloud/dataflow/sdk/util/TimerInternals.java |  4 +-
 .../google/cloud/dataflow/sdk/util/Timers.java  |  3 +-
 .../sdk/util/TriggerContextFactory.java         |  1 -
 .../cloud/dataflow/sdk/util/TriggerRunner.java  |  2 +
 .../cloud/dataflow/sdk/util/WatermarkHold.java  | 58 ++++++++++++-----
 .../dataflow/sdk/io/CountingInputTest.java      |  1 +
 .../cloud/dataflow/sdk/util/ReduceFnTester.java |  8 +--
 .../cloud/dataflow/sdk/util/TriggerTester.java  | 10 ++-
 14 files changed, 114 insertions(+), 48 deletions(-)
----------------------------------------------------------------------



[2/3] incubator-beam git commit: Input watermarks can never be null.

Posted by ke...@apache.org.
Input watermarks can never be null.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/34b33014
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/34b33014
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/34b33014

Branch: refs/heads/master
Commit: 34b3301413a16be6697c5d20896f0f85ddc65cf1
Parents: 1c89a1b
Author: Mark Shields <ma...@google.com>
Authored: Fri Mar 4 15:16:33 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Mar 25 13:37:18 2016 -0700

----------------------------------------------------------------------
 .../inprocess/InMemoryWatermarkManager.java       |  3 ++-
 .../inprocess/InProcessTimerInternals.java        |  1 -
 .../sdk/util/LateDataDroppingDoFnRunner.java      |  3 +--
 .../cloud/dataflow/sdk/util/PaneInfoTracker.java  |  2 +-
 .../dataflow/sdk/util/ReduceFnContextFactory.java |  1 -
 .../cloud/dataflow/sdk/util/ReduceFnRunner.java   | 18 +++++++++---------
 .../cloud/dataflow/sdk/util/TimerInternals.java   |  4 ++--
 .../google/cloud/dataflow/sdk/util/Timers.java    |  3 +--
 .../dataflow/sdk/util/TriggerContextFactory.java  |  1 -
 .../cloud/dataflow/sdk/util/WatermarkHold.java    |  9 +++------
 .../cloud/dataflow/sdk/io/CountingInputTest.java  |  1 +
 .../cloud/dataflow/sdk/util/ReduceFnTester.java   |  8 +++-----
 .../cloud/dataflow/sdk/util/TriggerTester.java    | 10 ++++------
 13 files changed, 27 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
index a9a62a6..c4d67db 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
@@ -28,6 +28,7 @@ import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.cloud.dataflow.sdk.values.PValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
@@ -994,7 +995,7 @@ public class InMemoryWatermarkManager {
      * Returns the input watermark of the {@link AppliedPTransform}.
      */
     public Instant getInputWatermark() {
-      return inputWatermark.get();
+      return Preconditions.checkNotNull(inputWatermark.get());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java
index 06ba7b8..1d075c5 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java
@@ -70,7 +70,6 @@ public class InProcessTimerInternals implements TimerInternals {
   }
 
   @Override
-  @Nullable
   public Instant currentInputWatermarkTime() {
     return watermarks.getInputWatermark();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java
index 31927ab..3dfa064 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java
@@ -138,8 +138,7 @@ public class LateDataDroppingDoFnRunner<K, InputT, OutputT, W extends BoundedWin
     /** Is {@code window} expired w.r.t. the garbage collection watermark? */
     private boolean canDropDueToExpiredWindow(BoundedWindow window) {
       Instant inputWM = timerInternals.currentInputWatermarkTime();
-      return inputWM != null
-          && window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).isBefore(inputWM);
+      return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).isBefore(inputWM);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java
index a7818a3..9fa36b0 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java
@@ -103,7 +103,7 @@ public class PaneInfoTracker {
     boolean onlyEarlyPanesSoFar = previousTiming == null || previousTiming == Timing.EARLY;
 
     // True is the input watermark hasn't passed the window's max timestamp.
-    boolean isEarlyForInput = inputWM == null || !inputWM.isAfter(windowMaxTimestamp);
+    boolean isEarlyForInput = !inputWM.isAfter(windowMaxTimestamp);
 
     Timing timing;
     if (isLateForOutput || !onlyEarlyPanesSoFar) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java
index bdbaf10..7649d52 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java
@@ -146,7 +146,6 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
     }
 
     @Override
-    @Nullable
     public Instant currentEventTime() {
       return timerInternals.currentInputWatermarkTime();
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
index f1d4582..560d8ec 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
@@ -537,6 +537,10 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
           "ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window {}", timer, window);
     }
 
+    // If this is an end-of-window timer then, we need to set a GC timer
+    boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
+      && timer.getTimestamp().equals(window.maxTimestamp());
+
     // If this is a garbage collection timer then we should trigger and garbage collect the window.
     Instant cleanupTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
     boolean isGarbageCollection =
@@ -553,7 +557,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
         // We need to call onTrigger to emit the final pane if required.
         // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted,
         // and the watermark has passed the end of the window.
-        onTrigger(directContext, renamedContext, true/* isFinished */);
+        onTrigger(directContext, renamedContext, true/* isFinished */, isEndOfWindow);
       }
 
       // Cleanup flavor B: Clear all the remaining state for this window since we'll never
@@ -569,9 +573,6 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
         emitIfAppropriate(directContext, renamedContext);
       }
 
-      // If this is an end-of-window timer then, we need to set a GC timer
-      boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
-          && timer.getTimestamp().equals(window.maxTimestamp());
       if (isEndOfWindow) {
         // If the window strategy trigger includes a watermark trigger then at this point
         // there should be no data holds, either because we'd already cleared them on an
@@ -676,7 +677,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
     // Run onTrigger to produce the actual pane contents.
     // As a side effect it will clear all element holds, but not necessarily any
     // end-of-window or garbage collection holds.
-    onTrigger(directContext, renamedContext, isFinished);
+    onTrigger(directContext, renamedContext, isFinished, false /*isEndOfWindow*/);
 
     // Now that we've triggered, the pane is empty.
     nonEmptyPanes.clearPane(renamedContext.state());
@@ -723,10 +724,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
   private void onTrigger(
       final ReduceFn<K, InputT, OutputT, W>.Context directContext,
       ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
-      boolean isFinished)
+      boolean isFinished, boolean isEndOfWindow)
           throws Exception {
     Instant inputWM = timerInternals.currentInputWatermarkTime();
-    Preconditions.checkNotNull(inputWM);
 
     // Prefetch necessary states
     ReadableState<WatermarkHold.OldAndNewHolds> outputTimestampFuture =
@@ -747,7 +747,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
     final Instant outputTimestamp = pair.oldHold;
     @Nullable Instant newHold = pair.newHold;
 
-    if (newHold != null && inputWM != null) {
+    if (newHold != null) {
       // We can't be finished yet.
       Preconditions.checkState(
         !isFinished, "new hold at %s but finished %s", newHold, directContext.window());
@@ -825,7 +825,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
     Instant endOfWindow = directContext.window().maxTimestamp();
     Instant fireTime;
     String which;
-    if (inputWM != null && endOfWindow.isBefore(inputWM)) {
+    if (endOfWindow.isBefore(inputWM)) {
       fireTime = endOfWindow.plus(windowingStrategy.getAllowedLateness());
       which = "garbage collection";
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java
index c823ed3..b26e6e8 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java
@@ -79,10 +79,11 @@ public interface TimerInternals {
 
   /**
    * Return the current, local input watermark timestamp for this computation
-   * in the {@link TimeDomain#EVENT_TIME} time domain. Return {@code null} if unknown.
+   * in the {@link TimeDomain#EVENT_TIME} time domain.
    *
    * <p>This value:
    * <ol>
+   * <li>Is never {@literal null}, but may be {@link BoundedWindow#TIMESTAMP_MIN_VALUE}.
    * <li>Is monotonically increasing.
    * <li>May differ between workers due to network and other delays.
    * <li>Will never be ahead of the global input watermark for this computation. But it
@@ -95,7 +96,6 @@ public interface TimerInternals {
    * it is possible for an element to be considered locally on-time even though it is
    * globally late.
    */
-  @Nullable
   Instant currentInputWatermarkTime();
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java
index 7d4b4f2..2ddf524 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java
@@ -54,7 +54,6 @@ public interface Timers {
   @Nullable
   public abstract Instant currentSynchronizedProcessingTime();
 
-  /** Returns the current event time or {@code null} if unknown. */
-  @Nullable
+  /** Returns the current event time. */
   public abstract Instant currentEventTime();
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java
index 64ff402..50e8b32 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java
@@ -209,7 +209,6 @@ public class TriggerContextFactory<W extends BoundedWindow> {
     }
 
     @Override
-    @Nullable
     public Instant currentEventTime() {
       return timers.currentEventTime();
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java
index 31e36c5..7f814c4 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java
@@ -228,7 +228,6 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
 
     Instant outputWM = timerInternals.currentOutputWatermarkTime();
     Instant inputWM = timerInternals.currentInputWatermarkTime();
-    Preconditions.checkNotNull(inputWM);
 
     // Only add the hold if we can be sure:
     // - the backend will be able to respect it
@@ -242,7 +241,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
     if (outputWM != null && elementHold.isBefore(outputWM)) {
       which = "too late to effect output watermark";
       tooLate = true;
-    } else if (inputWM != null && context.window().maxTimestamp().isBefore(inputWM)) {
+    } else if (context.window().maxTimestamp().isBefore(inputWM)) {
       which = "too late for end-of-window timer";
       tooLate = true;
     } else {
@@ -288,12 +287,11 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
     // by the end of window (ie the end of window is at or ahead of the input watermark).
     Instant outputWM = timerInternals.currentOutputWatermarkTime();
     Instant inputWM = timerInternals.currentInputWatermarkTime();
-    Preconditions.checkNotNull(inputWM);
 
     String which;
     boolean tooLate;
     Instant eowHold = context.window().maxTimestamp();
-    if (inputWM != null && eowHold.isBefore(inputWM)) {
+    if (eowHold.isBefore(inputWM)) {
       which = "too late for end-of-window timer";
       tooLate = true;
     } else {
@@ -332,13 +330,12 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
       Instant gcHold = context.window().maxTimestamp().plus(windowingStrategy.getAllowedLateness());
       Instant outputWM = timerInternals.currentOutputWatermarkTime();
       Instant inputWM = timerInternals.currentInputWatermarkTime();
-      Preconditions.checkNotNull(inputWM);
 
       WindowTracing.trace(
           "WatermarkHold.addGarbageCollectionHold: garbage collection at {} hold for "
           + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
           gcHold, context.key(), context.window(), inputWM, outputWM);
-      Preconditions.checkState(inputWM == null || !gcHold.isBefore(inputWM),
+      Preconditions.checkState(!gcHold.isBefore(inputWM),
           "Garbage collection hold %s cannot be before input watermark %s", gcHold, inputWM);
       context.state().access(EXTRA_HOLD_TAG).add(gcHold);
       return gcHold;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java
index 1daadc7..cc60953 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java
@@ -1,3 +1,4 @@
+
 /*
  * Copyright (C) 2016 Google Inc.
  *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java
index d4620a7..4aeaa0c 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java
@@ -599,7 +599,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
 
     /** Current input watermark. */
     @Nullable
-    private Instant inputWatermarkTime = null;
+    private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
 
     /** Current output watermark. */
     @Nullable
@@ -666,9 +666,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
     }
 
     @Override
-    @Nullable
     public Instant currentInputWatermarkTime() {
-      return inputWatermarkTime;
+      return Preconditions.checkNotNull(inputWatermarkTime);
     }
 
     @Override
@@ -692,7 +691,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
         ReduceFnRunner<?, ?, ?, ?> runner, Instant newInputWatermark) throws Exception {
       Preconditions.checkNotNull(newInputWatermark);
       Preconditions.checkState(
-          inputWatermarkTime == null || !newInputWatermark.isBefore(inputWatermarkTime),
+          !newInputWatermark.isBefore(inputWatermarkTime),
           "Cannot move input watermark time backwards from %s to %s", inputWatermarkTime,
           newInputWatermark);
       WindowTracing.trace("TestTimerInternals.advanceInputWatermark: from {} to {}",
@@ -713,7 +712,6 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
 
     public void advanceOutputWatermark(Instant newOutputWatermark) {
       Preconditions.checkNotNull(newOutputWatermark);
-      Preconditions.checkNotNull(inputWatermarkTime);
       if (newOutputWatermark.isAfter(inputWatermarkTime)) {
         WindowTracing.trace(
             "TestTimerInternals.advanceOutputWatermark: clipping output watermark from {} to {}",

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34b33014/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java
index 0c71830..f291438 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java
@@ -41,6 +41,7 @@ import com.google.cloud.dataflow.sdk.util.state.StateTag;
 import com.google.cloud.dataflow.sdk.util.state.WatermarkHoldState;
 import com.google.cloud.dataflow.sdk.values.TimestampedValue;
 import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -428,7 +429,7 @@ public class TriggerTester<InputT, W extends BoundedWindow> {
 
     /** Current input watermark. */
     @Nullable
-    private Instant inputWatermarkTime = null;
+    private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
 
     /** Current output watermark. */
     @Nullable
@@ -471,9 +472,8 @@ public class TriggerTester<InputT, W extends BoundedWindow> {
     }
 
     @Override
-    @Nullable
     public Instant currentInputWatermarkTime() {
-      return inputWatermarkTime;
+      return Preconditions.checkNotNull(inputWatermarkTime);
     }
 
     @Override
@@ -495,7 +495,7 @@ public class TriggerTester<InputT, W extends BoundedWindow> {
 
     public void advanceInputWatermark(Instant newInputWatermark) throws Exception {
       checkNotNull(newInputWatermark);
-      checkState(inputWatermarkTime == null || !newInputWatermark.isBefore(inputWatermarkTime),
+      checkState(!newInputWatermark.isBefore(inputWatermarkTime),
           "Cannot move input watermark time backwards from %s to %s", inputWatermarkTime,
           newInputWatermark);
       WindowTracing.trace("TestTimerInternals.advanceInputWatermark: from {} to {}",
@@ -513,7 +513,6 @@ public class TriggerTester<InputT, W extends BoundedWindow> {
 
     private void advanceOutputWatermark(Instant newOutputWatermark) throws Exception {
       checkNotNull(newOutputWatermark);
-      checkNotNull(inputWatermarkTime);
       if (newOutputWatermark.isAfter(inputWatermarkTime)) {
         WindowTracing.trace(
             "TestTimerInternals.advanceOutputWatermark: clipping output watermark from {} to {}",
@@ -577,7 +576,6 @@ public class TriggerTester<InputT, W extends BoundedWindow> {
     }
 
     @Override
-    @Nullable
     public Instant currentEventTime() {
       return timerInternals.currentInputWatermarkTime();
     }