You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/06/29 06:04:18 UTC

[2/3] beam git commit: WindowingStrategy: add OnTimeBehavior to control whether to emit empty ON_TIME pane.

WindowingStrategy: add OnTimeBehavior to control whether to emit empty ON_TIME pane.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/38dd12df
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/38dd12df
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/38dd12df

Branch: refs/heads/master
Commit: 38dd12df6dee2ada31ad9c52f8d9dc99225f1bc2
Parents: b1ece01
Author: Pei He <pe...@apache.org>
Authored: Tue Jun 20 16:09:26 2017 -0700
Committer: Pei He <pe...@apache.org>
Committed: Thu Jun 29 14:01:54 2017 +0800

----------------------------------------------------------------------
 .../WindowingStrategyTranslation.java           |  26 ++-
 .../beam/runners/core/ReduceFnRunner.java       |   6 +-
 .../beam/runners/core/ReduceFnRunnerTest.java   | 161 +++++++++++++++++++
 .../src/main/proto/beam_runner_api.proto        |  14 ++
 .../beam/sdk/transforms/windowing/Window.java   |  32 ++++
 .../beam/sdk/values/WindowingStrategy.java      |  46 ++++--
 6 files changed, 273 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/38dd12df/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
index 718efe7..88ebc01 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
@@ -38,6 +38,7 @@ import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.transforms.windowing.Window.OnTimeBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.WindowingStrategy;
@@ -119,6 +120,27 @@ public class WindowingStrategyTranslation implements Serializable {
     }
   }
 
+
+  public static OnTimeBehavior fromProto(RunnerApi.OnTimeBehavior proto) {
+    switch (proto) {
+      case FIRE_ALWAYS:
+        return OnTimeBehavior.FIRE_ALWAYS;
+      case FIRE_IF_NONEMPTY:
+        return OnTimeBehavior.FIRE_IF_NON_EMPTY;
+      case UNRECOGNIZED:
+      default:
+        // Whether or not it is proto that cannot recognize it (due to the version of the
+        // generated code we link to) or the switch hasn't been updated to handle it,
+        // the situation is the same: we don't know what this OutputTime means
+        throw new IllegalArgumentException(
+            String.format(
+                "Cannot convert unknown %s to %s: %s",
+                RunnerApi.OnTimeBehavior.class.getCanonicalName(),
+                OnTimeBehavior.class.getCanonicalName(),
+                proto));
+    }
+  }
+
   public static RunnerApi.OutputTime toProto(TimestampCombiner timestampCombiner) {
     switch(timestampCombiner) {
       case EARLIEST:
@@ -323,13 +345,15 @@ public class WindowingStrategyTranslation implements Serializable {
     Trigger trigger = TriggerTranslation.fromProto(proto.getTrigger());
     ClosingBehavior closingBehavior = fromProto(proto.getClosingBehavior());
     Duration allowedLateness = Duration.millis(proto.getAllowedLateness());
+    OnTimeBehavior onTimeBehavior = fromProto(proto.getOnTimeBehavior());
 
     return WindowingStrategy.of(windowFn)
         .withAllowedLateness(allowedLateness)
         .withMode(accumulationMode)
         .withTrigger(trigger)
         .withTimestampCombiner(timestampCombiner)
-        .withClosingBehavior(closingBehavior);
+        .withClosingBehavior(closingBehavior)
+        .withOnTimeBehavior(onTimeBehavior);
   }
 
   public static WindowFn<?, ?> windowFnFromProto(SdkFunctionSpec windowFnSpec)

http://git-wip-us.apache.org/repos/asf/beam/blob/38dd12df/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 75b6acd..a33bac1 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -51,6 +51,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowTracing;
@@ -920,8 +921,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
       // The pane has elements.
       return true;
     }
-    if (timing == Timing.ON_TIME) {
-      // This is the unique ON_TIME pane.
+    if (timing == Timing.ON_TIME
+        && windowingStrategy.getOnTimeBehavior() == Window.OnTimeBehavior.FIRE_ALWAYS) {
+      // This is an empty ON_TIME pane.
       return true;
     }
     if (isFinished && windowingStrategy.getClosingBehavior() == ClosingBehavior.FIRE_ALWAYS) {

http://git-wip-us.apache.org/repos/asf/beam/blob/38dd12df/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 4f68038..3a2c220 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -67,6 +67,7 @@ import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
@@ -1423,6 +1424,166 @@ public class ReduceFnRunnerTest {
   }
 
   /**
+   * Test that it won't fire an empty on-time pane when OnTimeBehavior is FIRE_IF_NON_EMPTY.
+   */
+  @Test
+  public void testEmptyOnTimeWithOnTimeBehaviorFireIfNonEmpty() throws Exception {
+
+    WindowingStrategy<?, IntervalWindow> strategy =
+        WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
+            .withTimestampCombiner(TimestampCombiner.EARLIEST)
+            .withTrigger(
+                AfterEach.<IntervalWindow>inOrder(
+                    Repeatedly.forever(
+                        AfterProcessingTime.pastFirstElementInPane()
+                            .plusDelayOf(new Duration(5)))
+                        .orFinally(AfterWatermark.pastEndOfWindow()),
+                    Repeatedly.forever(
+                        AfterProcessingTime.pastFirstElementInPane()
+                            .plusDelayOf(new Duration(25)))))
+            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+            .withAllowedLateness(Duration.millis(100))
+            .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)
+            .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY);
+
+    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+        ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
+
+    tester.advanceInputWatermark(new Instant(0));
+    tester.advanceProcessingTime(new Instant(0));
+
+    // Processing time timer for 5
+    tester.injectElements(
+        TimestampedValue.of(1, new Instant(1)),
+        TimestampedValue.of(1, new Instant(3)),
+        TimestampedValue.of(1, new Instant(7)),
+        TimestampedValue.of(1, new Instant(5)));
+
+    // Should fire early pane
+    tester.advanceProcessingTime(new Instant(6));
+
+    // Should not fire empty on time pane
+    tester.advanceInputWatermark(new Instant(11));
+
+    // Should fire final GC pane
+    tester.advanceInputWatermark(new Instant(10 + 100));
+    List<WindowedValue<Integer>> output = tester.extractOutput();
+    assertEquals(2, output.size());
+
+    assertThat(output.get(0), WindowMatchers.isSingleWindowedValue(4, 1, 0, 10));
+    assertThat(output.get(1), WindowMatchers.isSingleWindowedValue(4, 9, 0, 10));
+
+    assertThat(
+        output.get(0),
+        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1)));
+    assertThat(
+        output.get(1),
+        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.LATE, 1, 0)));
+  }
+
+  /**
+   * Test that it fires an empty on-time isFinished pane when OnTimeBehavior is FIRE_ALWAYS
+   * and ClosingBehavior is FIRE_IF_NON_EMPTY.
+   *
+   * <p>This is a test just for backward compatibility.
+   */
+  @Test
+  public void testEmptyOnTimeWithOnTimeBehaviorBackwardCompatibility() throws Exception {
+
+    WindowingStrategy<?, IntervalWindow> strategy =
+        WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
+            .withTimestampCombiner(TimestampCombiner.EARLIEST)
+            .withTrigger(AfterWatermark.pastEndOfWindow()
+                .withEarlyFirings(AfterPane.elementCountAtLeast(1)))
+            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+            .withAllowedLateness(Duration.millis(0))
+            .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY);
+
+    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+        ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
+
+    tester.advanceInputWatermark(new Instant(0));
+    tester.advanceProcessingTime(new Instant(0));
+
+    tester.injectElements(
+        TimestampedValue.of(1, new Instant(1)));
+
+    // Should fire empty on time isFinished pane
+    tester.advanceInputWatermark(new Instant(11));
+
+    List<WindowedValue<Integer>> output = tester.extractOutput();
+    assertEquals(2, output.size());
+
+    assertThat(
+        output.get(0),
+        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1)));
+    assertThat(
+        output.get(1),
+        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.ON_TIME, 1, 0)));
+  }
+
+  /**
+   * Test that it won't fire an empty on-time pane when OnTimeBehavior is FIRE_IF_NON_EMPTY
+   * and when receiving late data.
+   */
+  @Test
+  public void testEmptyOnTimeWithOnTimeBehaviorFireIfNonEmptyAndLateData() throws Exception {
+
+    WindowingStrategy<?, IntervalWindow> strategy =
+        WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
+            .withTimestampCombiner(TimestampCombiner.EARLIEST)
+            .withTrigger(
+                AfterEach.<IntervalWindow>inOrder(
+                    Repeatedly.forever(
+                        AfterProcessingTime.pastFirstElementInPane()
+                            .plusDelayOf(new Duration(5)))
+                        .orFinally(AfterWatermark.pastEndOfWindow()),
+                    Repeatedly.forever(
+                        AfterProcessingTime.pastFirstElementInPane()
+                            .plusDelayOf(new Duration(25)))))
+            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+            .withAllowedLateness(Duration.millis(100))
+            .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY);
+
+    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+        ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
+
+    tester.advanceInputWatermark(new Instant(0));
+    tester.advanceProcessingTime(new Instant(0));
+
+    // Processing time timer for 5
+    tester.injectElements(
+        TimestampedValue.of(1, new Instant(1)),
+        TimestampedValue.of(1, new Instant(3)),
+        TimestampedValue.of(1, new Instant(7)),
+        TimestampedValue.of(1, new Instant(5)));
+
+    // Should fire early pane
+    tester.advanceProcessingTime(new Instant(6));
+
+    // Should not fire empty on time pane
+    tester.advanceInputWatermark(new Instant(11));
+
+    // Processing late data, and should fire late pane
+    tester.injectElements(
+        TimestampedValue.of(1, new Instant(9)));
+    tester.advanceProcessingTime(new Instant(6 + 25 + 1));
+
+    List<WindowedValue<Integer>> output = tester.extractOutput();
+    assertEquals(2, output.size());
+
+    assertThat(output.get(0), WindowMatchers.isSingleWindowedValue(4, 1, 0, 10));
+    assertThat(output.get(1), WindowMatchers.isSingleWindowedValue(5, 9, 0, 10));
+
+    assertThat(
+        output.get(0),
+        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1)));
+    assertThat(
+        output.get(1),
+        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.LATE, 1, 0)));
+  }
+
+  /**
    * Tests for processing time firings after the watermark passes the end of the window.
    * Specifically, verify the proper triggerings and pane-info of a typical speculative/on-time/late
    * when the on-time pane is non-empty.

http://git-wip-us.apache.org/repos/asf/beam/blob/38dd12df/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index 039ecb0..24e907a 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -433,6 +433,9 @@ message WindowingStrategy {
   // (Required) The duration, in milliseconds, beyond the end of a window at
   // which the window becomes droppable.
   int64 allowed_lateness = 8;
+
+  // (Required) Indicate whether empty on-time panes should be omitted.
+  OnTimeBehavior OnTimeBehavior = 9;
 }
 
 // Whether or not a PCollection's WindowFn is non-merging, merging, or
@@ -478,6 +481,17 @@ enum ClosingBehavior {
   EMIT_IF_NONEMPTY = 1;
 }
 
+// Controls whether or not an aggregating transform should output data
+// when an on-time pane is empty.
+enum OnTimeBehavior {
+  // Always fire the on-time pane. Even if there is no new data since
+  // the previous firing, an element will be produced.
+  FIRE_ALWAYS = 0;
+
+  // Only fire the on-time pane if there is new data since the previous firing.
+  FIRE_IF_NONEMPTY = 1;
+}
+
 // When a number of windowed, timestamped inputs are aggregated, the timestamp
 // for the resulting output.
 enum OutputTime {

http://git-wip-us.apache.org/repos/asf/beam/blob/38dd12df/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index 105ebfb..a12be6d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -163,6 +163,24 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
   }
 
   /**
+   * Specifies the conditions under which an on-time pane will be created when a window is closed.
+   */
+  public enum OnTimeBehavior {
+    /**
+     * Always fire the on-time pane. Even if there is no new data since the previous firing,
+     * an element will be produced.
+     *
+     * <p>This is the default behavior.
+     */
+    FIRE_ALWAYS,
+    /**
+     * Only fire the on-time pane if there is new data since the previous firing.
+     */
+    FIRE_IF_NON_EMPTY
+
+  }
+
+  /**
    * Creates a {@code Window} {@code PTransform} that uses the given
    * {@link WindowFn} to window the data.
    *
@@ -195,6 +213,7 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
   @Nullable abstract AccumulationMode getAccumulationMode();
   @Nullable abstract Duration getAllowedLateness();
   @Nullable abstract ClosingBehavior getClosingBehavior();
+  @Nullable abstract OnTimeBehavior getOnTimeBehavior();
   @Nullable abstract TimestampCombiner getTimestampCombiner();
 
   abstract Builder<T> toBuilder();
@@ -206,6 +225,7 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
     abstract Builder<T> setAccumulationMode(AccumulationMode mode);
     abstract Builder<T> setAllowedLateness(Duration allowedLateness);
     abstract Builder<T> setClosingBehavior(ClosingBehavior closingBehavior);
+    abstract Builder<T> setOnTimeBehavior(OnTimeBehavior onTimeBehavior);
     abstract Builder<T> setTimestampCombiner(TimestampCombiner timestampCombiner);
 
     abstract Window<T> build();
@@ -299,6 +319,15 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
   }
 
   /**
+   * <b><i>(Experimental)</i></b> Override the default {@link OnTimeBehavior}, to control
+   * whether to output an empty on-time pane.
+   */
+  @Experimental(Kind.TRIGGER)
+  public Window<T> withOnTimeBehavior(OnTimeBehavior behavior) {
+    return toBuilder().setOnTimeBehavior(behavior).build();
+  }
+
+  /**
    * Get the output strategy of this {@link Window Window PTransform}. For internal use
    * only.
    */
@@ -321,6 +350,9 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
     if (getClosingBehavior() != null) {
       result = result.withClosingBehavior(getClosingBehavior());
     }
+    if (getOnTimeBehavior() != null) {
+      result = result.withOnTimeBehavior(getOnTimeBehavior());
+    }
     if (getTimestampCombiner() != null) {
       result = result.withTimestampCombiner(getTimestampCombiner());
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/38dd12df/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java
index 8a773e2..3b74e69 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.transforms.windowing.Window.OnTimeBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.joda.time.Duration;
 
@@ -59,6 +60,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
   private final AccumulationMode mode;
   private final Duration allowedLateness;
   private final ClosingBehavior closingBehavior;
+  private final OnTimeBehavior onTimeBehavior;
   private final TimestampCombiner timestampCombiner;
   private final boolean triggerSpecified;
   private final boolean modeSpecified;
@@ -71,7 +73,8 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
       AccumulationMode mode, boolean modeSpecified,
       Duration allowedLateness, boolean allowedLatenessSpecified,
       TimestampCombiner timestampCombiner, boolean timestampCombinerSpecified,
-      ClosingBehavior closingBehavior) {
+      ClosingBehavior closingBehavior,
+      OnTimeBehavior onTimeBehavior) {
     this.windowFn = windowFn;
     this.trigger = trigger;
     this.triggerSpecified = triggerSpecified;
@@ -80,6 +83,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
     this.allowedLateness = allowedLateness;
     this.allowedLatenessSpecified = allowedLatenessSpecified;
     this.closingBehavior = closingBehavior;
+    this.onTimeBehavior = onTimeBehavior;
     this.timestampCombiner = timestampCombiner;
     this.timestampCombinerSpecified = timestampCombinerSpecified;
   }
@@ -98,7 +102,8 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
         AccumulationMode.DISCARDING_FIRED_PANES, false,
         DEFAULT_ALLOWED_LATENESS, false,
         TimestampCombiner.END_OF_WINDOW, false,
-        ClosingBehavior.FIRE_IF_NON_EMPTY);
+        ClosingBehavior.FIRE_IF_NON_EMPTY,
+        OnTimeBehavior.FIRE_ALWAYS);
   }
 
   public WindowFn<T, W> getWindowFn() {
@@ -133,6 +138,10 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
     return closingBehavior;
   }
 
+  public OnTimeBehavior getOnTimeBehavior() {
+    return onTimeBehavior;
+  }
+
   public TimestampCombiner getTimestampCombiner() {
     return timestampCombiner;
   }
@@ -152,7 +161,8 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
         mode, modeSpecified,
         allowedLateness, allowedLatenessSpecified,
         timestampCombiner, timestampCombinerSpecified,
-        closingBehavior);
+        closingBehavior,
+        onTimeBehavior);
   }
 
   /**
@@ -166,7 +176,8 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
         mode, true,
         allowedLateness, allowedLatenessSpecified,
         timestampCombiner, timestampCombinerSpecified,
-        closingBehavior);
+        closingBehavior,
+        onTimeBehavior);
   }
 
   /**
@@ -183,7 +194,8 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
         mode, modeSpecified,
         allowedLateness, allowedLatenessSpecified,
         timestampCombiner, timestampCombinerSpecified,
-        closingBehavior);
+        closingBehavior,
+        onTimeBehavior);
   }
 
   /**
@@ -197,7 +209,8 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
         mode, modeSpecified,
         allowedLateness, true,
         timestampCombiner, timestampCombinerSpecified,
-        closingBehavior);
+        closingBehavior,
+        onTimeBehavior);
   }
 
   public WindowingStrategy<T, W> withClosingBehavior(ClosingBehavior closingBehavior) {
@@ -207,7 +220,19 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
         mode, modeSpecified,
         allowedLateness, allowedLatenessSpecified,
         timestampCombiner, timestampCombinerSpecified,
-        closingBehavior);
+        closingBehavior,
+        onTimeBehavior);
+  }
+
+  public WindowingStrategy<T, W> withOnTimeBehavior(OnTimeBehavior onTimeBehavior) {
+    return new WindowingStrategy<T, W>(
+        windowFn,
+        trigger, triggerSpecified,
+        mode, modeSpecified,
+        allowedLateness, allowedLatenessSpecified,
+        timestampCombiner, timestampCombinerSpecified,
+        closingBehavior,
+        onTimeBehavior);
   }
 
   @Experimental(Experimental.Kind.OUTPUT_TIME)
@@ -219,7 +244,8 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
         mode, modeSpecified,
         allowedLateness, allowedLatenessSpecified,
         timestampCombiner, true,
-        closingBehavior);
+        closingBehavior,
+        onTimeBehavior);
   }
 
   @Override
@@ -246,6 +272,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
         && getMode().equals(other.getMode())
         && getAllowedLateness().equals(other.getAllowedLateness())
         && getClosingBehavior().equals(other.getClosingBehavior())
+        && getOnTimeBehavior().equals(other.getOnTimeBehavior())
         && getTrigger().equals(other.getTrigger())
         && getTimestampCombiner().equals(other.getTimestampCombiner())
         && getWindowFn().equals(other.getWindowFn());
@@ -278,6 +305,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
         mode, true,
         allowedLateness, true,
         timestampCombiner, true,
-        closingBehavior);
+        closingBehavior,
+        onTimeBehavior);
   }
 }