You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/30 21:31:31 UTC
[29/50] beam git commit: WindowingStrategy: add OnTimeBehavior to
control whether to emit empty ON_TIME pane.
WindowingStrategy: add OnTimeBehavior to control whether to emit empty ON_TIME pane.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/38dd12df
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/38dd12df
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/38dd12df
Branch: refs/heads/gearpump-runner
Commit: 38dd12df6dee2ada31ad9c52f8d9dc99225f1bc2
Parents: b1ece01
Author: Pei He <pe...@apache.org>
Authored: Tue Jun 20 16:09:26 2017 -0700
Committer: Pei He <pe...@apache.org>
Committed: Thu Jun 29 14:01:54 2017 +0800
----------------------------------------------------------------------
.../WindowingStrategyTranslation.java | 26 ++-
.../beam/runners/core/ReduceFnRunner.java | 6 +-
.../beam/runners/core/ReduceFnRunnerTest.java | 161 +++++++++++++++++++
.../src/main/proto/beam_runner_api.proto | 14 ++
.../beam/sdk/transforms/windowing/Window.java | 32 ++++
.../beam/sdk/values/WindowingStrategy.java | 46 ++++--
6 files changed, 273 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/38dd12df/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
index 718efe7..88ebc01 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
@@ -38,6 +38,7 @@ import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.transforms.windowing.Window.OnTimeBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.WindowingStrategy;
@@ -119,6 +120,27 @@ public class WindowingStrategyTranslation implements Serializable {
}
}
+
+ public static OnTimeBehavior fromProto(RunnerApi.OnTimeBehavior proto) {
+ switch (proto) {
+ case FIRE_ALWAYS:
+ return OnTimeBehavior.FIRE_ALWAYS;
+ case FIRE_IF_NONEMPTY:
+ return OnTimeBehavior.FIRE_IF_NON_EMPTY;
+ case UNRECOGNIZED:
+ default:
+ // Whether or not it is proto that cannot recognize it (due to the version of the
+ // generated code we link to) or the switch hasn't been updated to handle it,
+ // the situation is the same: we don't know what this OutputTime means
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot convert unknown %s to %s: %s",
+ RunnerApi.OnTimeBehavior.class.getCanonicalName(),
+ OnTimeBehavior.class.getCanonicalName(),
+ proto));
+ }
+ }
+
public static RunnerApi.OutputTime toProto(TimestampCombiner timestampCombiner) {
switch(timestampCombiner) {
case EARLIEST:
@@ -323,13 +345,15 @@ public class WindowingStrategyTranslation implements Serializable {
Trigger trigger = TriggerTranslation.fromProto(proto.getTrigger());
ClosingBehavior closingBehavior = fromProto(proto.getClosingBehavior());
Duration allowedLateness = Duration.millis(proto.getAllowedLateness());
+ OnTimeBehavior onTimeBehavior = fromProto(proto.getOnTimeBehavior());
return WindowingStrategy.of(windowFn)
.withAllowedLateness(allowedLateness)
.withMode(accumulationMode)
.withTrigger(trigger)
.withTimestampCombiner(timestampCombiner)
- .withClosingBehavior(closingBehavior);
+ .withClosingBehavior(closingBehavior)
+ .withOnTimeBehavior(onTimeBehavior);
}
public static WindowFn<?, ?> windowFnFromProto(SdkFunctionSpec windowFnSpec)
http://git-wip-us.apache.org/repos/asf/beam/blob/38dd12df/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 75b6acd..a33bac1 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -51,6 +51,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowTracing;
@@ -920,8 +921,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
// The pane has elements.
return true;
}
- if (timing == Timing.ON_TIME) {
- // This is the unique ON_TIME pane.
+ if (timing == Timing.ON_TIME
+ && windowingStrategy.getOnTimeBehavior() == Window.OnTimeBehavior.FIRE_ALWAYS) {
+ // This is an empty ON_TIME pane.
return true;
}
if (isFinished && windowingStrategy.getClosingBehavior() == ClosingBehavior.FIRE_ALWAYS) {
http://git-wip-us.apache.org/repos/asf/beam/blob/38dd12df/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 4f68038..3a2c220 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -67,6 +67,7 @@ import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
@@ -1423,6 +1424,166 @@ public class ReduceFnRunnerTest {
}
/**
+ * Test that it won't fire an empty on-time pane when OnTimeBehavior is FIRE_IF_NON_EMPTY.
+ */
+ @Test
+ public void testEmptyOnTimeWithOnTimeBehaviorFireIfNonEmpty() throws Exception {
+
+ WindowingStrategy<?, IntervalWindow> strategy =
+ WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
+ .withTimestampCombiner(TimestampCombiner.EARLIEST)
+ .withTrigger(
+ AfterEach.<IntervalWindow>inOrder(
+ Repeatedly.forever(
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(new Duration(5)))
+ .orFinally(AfterWatermark.pastEndOfWindow()),
+ Repeatedly.forever(
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(new Duration(25)))))
+ .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+ .withAllowedLateness(Duration.millis(100))
+ .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)
+ .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY);
+
+ ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+ ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
+
+ tester.advanceInputWatermark(new Instant(0));
+ tester.advanceProcessingTime(new Instant(0));
+
+ // Processing time timer for 5
+ tester.injectElements(
+ TimestampedValue.of(1, new Instant(1)),
+ TimestampedValue.of(1, new Instant(3)),
+ TimestampedValue.of(1, new Instant(7)),
+ TimestampedValue.of(1, new Instant(5)));
+
+ // Should fire early pane
+ tester.advanceProcessingTime(new Instant(6));
+
+ // Should not fire empty on time pane
+ tester.advanceInputWatermark(new Instant(11));
+
+ // Should fire final GC pane
+ tester.advanceInputWatermark(new Instant(10 + 100));
+ List<WindowedValue<Integer>> output = tester.extractOutput();
+ assertEquals(2, output.size());
+
+ assertThat(output.get(0), WindowMatchers.isSingleWindowedValue(4, 1, 0, 10));
+ assertThat(output.get(1), WindowMatchers.isSingleWindowedValue(4, 9, 0, 10));
+
+ assertThat(
+ output.get(0),
+ WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1)));
+ assertThat(
+ output.get(1),
+ WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.LATE, 1, 0)));
+ }
+
+ /**
+ * Test that it fires an empty on-time isFinished pane when OnTimeBehavior is FIRE_ALWAYS
+ * and ClosingBehavior is FIRE_IF_NON_EMPTY.
+ *
+ * <p>This is a test just for backward compatibility.
+ */
+ @Test
+ public void testEmptyOnTimeWithOnTimeBehaviorBackwardCompatibility() throws Exception {
+
+ WindowingStrategy<?, IntervalWindow> strategy =
+ WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
+ .withTimestampCombiner(TimestampCombiner.EARLIEST)
+ .withTrigger(AfterWatermark.pastEndOfWindow()
+ .withEarlyFirings(AfterPane.elementCountAtLeast(1)))
+ .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+ .withAllowedLateness(Duration.millis(0))
+ .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY);
+
+ ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+ ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
+
+ tester.advanceInputWatermark(new Instant(0));
+ tester.advanceProcessingTime(new Instant(0));
+
+ tester.injectElements(
+ TimestampedValue.of(1, new Instant(1)));
+
+ // Should fire empty on time isFinished pane
+ tester.advanceInputWatermark(new Instant(11));
+
+ List<WindowedValue<Integer>> output = tester.extractOutput();
+ assertEquals(2, output.size());
+
+ assertThat(
+ output.get(0),
+ WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1)));
+ assertThat(
+ output.get(1),
+ WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.ON_TIME, 1, 0)));
+ }
+
+ /**
+ * Test that it won't fire an empty on-time pane when OnTimeBehavior is FIRE_IF_NON_EMPTY
+ * and when receiving late data.
+ */
+ @Test
+ public void testEmptyOnTimeWithOnTimeBehaviorFireIfNonEmptyAndLateData() throws Exception {
+
+ WindowingStrategy<?, IntervalWindow> strategy =
+ WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
+ .withTimestampCombiner(TimestampCombiner.EARLIEST)
+ .withTrigger(
+ AfterEach.<IntervalWindow>inOrder(
+ Repeatedly.forever(
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(new Duration(5)))
+ .orFinally(AfterWatermark.pastEndOfWindow()),
+ Repeatedly.forever(
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(new Duration(25)))))
+ .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+ .withAllowedLateness(Duration.millis(100))
+ .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY);
+
+ ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+ ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
+
+ tester.advanceInputWatermark(new Instant(0));
+ tester.advanceProcessingTime(new Instant(0));
+
+ // Processing time timer for 5
+ tester.injectElements(
+ TimestampedValue.of(1, new Instant(1)),
+ TimestampedValue.of(1, new Instant(3)),
+ TimestampedValue.of(1, new Instant(7)),
+ TimestampedValue.of(1, new Instant(5)));
+
+ // Should fire early pane
+ tester.advanceProcessingTime(new Instant(6));
+
+ // Should not fire empty on time pane
+ tester.advanceInputWatermark(new Instant(11));
+
+ // Processing late data, and should fire late pane
+ tester.injectElements(
+ TimestampedValue.of(1, new Instant(9)));
+ tester.advanceProcessingTime(new Instant(6 + 25 + 1));
+
+ List<WindowedValue<Integer>> output = tester.extractOutput();
+ assertEquals(2, output.size());
+
+ assertThat(output.get(0), WindowMatchers.isSingleWindowedValue(4, 1, 0, 10));
+ assertThat(output.get(1), WindowMatchers.isSingleWindowedValue(5, 9, 0, 10));
+
+ assertThat(
+ output.get(0),
+ WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1)));
+ assertThat(
+ output.get(1),
+ WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.LATE, 1, 0)));
+ }
+
+ /**
* Tests for processing time firings after the watermark passes the end of the window.
* Specifically, verify the proper triggerings and pane-info of a typical speculative/on-time/late
* when the on-time pane is non-empty.
http://git-wip-us.apache.org/repos/asf/beam/blob/38dd12df/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index 039ecb0..24e907a 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -433,6 +433,9 @@ message WindowingStrategy {
// (Required) The duration, in milliseconds, beyond the end of a window at
// which the window becomes droppable.
int64 allowed_lateness = 8;
+
+ // (Required) Indicate whether empty on-time panes should be omitted.
+ OnTimeBehavior OnTimeBehavior = 9;
}
// Whether or not a PCollection's WindowFn is non-merging, merging, or
@@ -478,6 +481,17 @@ enum ClosingBehavior {
EMIT_IF_NONEMPTY = 1;
}
+// Controls whether or not an aggregating transform should output data
+// when an on-time pane is empty.
+enum OnTimeBehavior {
+ // Always fire the on-time pane. Even if there is no new data since
+ // the previous firing, an element will be produced.
+ FIRE_ALWAYS = 0;
+
+ // Only fire the on-time pane if there is new data since the previous firing.
+ FIRE_IF_NONEMPTY = 1;
+}
+
// When a number of windowed, timestamped inputs are aggregated, the timestamp
// for the resulting output.
enum OutputTime {
http://git-wip-us.apache.org/repos/asf/beam/blob/38dd12df/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index 105ebfb..a12be6d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -163,6 +163,24 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
}
/**
+ * Specifies the conditions under which an on-time pane will be created when a window is closed.
+ */
+ public enum OnTimeBehavior {
+ /**
+ * Always fire the on-time pane. Even if there is no new data since the previous firing,
+ * an element will be produced.
+ *
+ * <p>This is the default behavior.
+ */
+ FIRE_ALWAYS,
+ /**
+ * Only fire the on-time pane if there is new data since the previous firing.
+ */
+ FIRE_IF_NON_EMPTY
+
+ }
+
+ /**
* Creates a {@code Window} {@code PTransform} that uses the given
* {@link WindowFn} to window the data.
*
@@ -195,6 +213,7 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
@Nullable abstract AccumulationMode getAccumulationMode();
@Nullable abstract Duration getAllowedLateness();
@Nullable abstract ClosingBehavior getClosingBehavior();
+ @Nullable abstract OnTimeBehavior getOnTimeBehavior();
@Nullable abstract TimestampCombiner getTimestampCombiner();
abstract Builder<T> toBuilder();
@@ -206,6 +225,7 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
abstract Builder<T> setAccumulationMode(AccumulationMode mode);
abstract Builder<T> setAllowedLateness(Duration allowedLateness);
abstract Builder<T> setClosingBehavior(ClosingBehavior closingBehavior);
+ abstract Builder<T> setOnTimeBehavior(OnTimeBehavior onTimeBehavior);
abstract Builder<T> setTimestampCombiner(TimestampCombiner timestampCombiner);
abstract Window<T> build();
@@ -299,6 +319,15 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
}
/**
+ * <b><i>(Experimental)</i></b> Override the default {@link OnTimeBehavior}, to control
+ * whether to output an empty on-time pane.
+ */
+ @Experimental(Kind.TRIGGER)
+ public Window<T> withOnTimeBehavior(OnTimeBehavior behavior) {
+ return toBuilder().setOnTimeBehavior(behavior).build();
+ }
+
+ /**
* Get the output strategy of this {@link Window Window PTransform}. For internal use
* only.
*/
@@ -321,6 +350,9 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
if (getClosingBehavior() != null) {
result = result.withClosingBehavior(getClosingBehavior());
}
+ if (getOnTimeBehavior() != null) {
+ result = result.withOnTimeBehavior(getOnTimeBehavior());
+ }
if (getTimestampCombiner() != null) {
result = result.withTimestampCombiner(getTimestampCombiner());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/38dd12df/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java
index 8a773e2..3b74e69 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.transforms.windowing.Window.OnTimeBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.joda.time.Duration;
@@ -59,6 +60,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
private final AccumulationMode mode;
private final Duration allowedLateness;
private final ClosingBehavior closingBehavior;
+ private final OnTimeBehavior onTimeBehavior;
private final TimestampCombiner timestampCombiner;
private final boolean triggerSpecified;
private final boolean modeSpecified;
@@ -71,7 +73,8 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
AccumulationMode mode, boolean modeSpecified,
Duration allowedLateness, boolean allowedLatenessSpecified,
TimestampCombiner timestampCombiner, boolean timestampCombinerSpecified,
- ClosingBehavior closingBehavior) {
+ ClosingBehavior closingBehavior,
+ OnTimeBehavior onTimeBehavior) {
this.windowFn = windowFn;
this.trigger = trigger;
this.triggerSpecified = triggerSpecified;
@@ -80,6 +83,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
this.allowedLateness = allowedLateness;
this.allowedLatenessSpecified = allowedLatenessSpecified;
this.closingBehavior = closingBehavior;
+ this.onTimeBehavior = onTimeBehavior;
this.timestampCombiner = timestampCombiner;
this.timestampCombinerSpecified = timestampCombinerSpecified;
}
@@ -98,7 +102,8 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
AccumulationMode.DISCARDING_FIRED_PANES, false,
DEFAULT_ALLOWED_LATENESS, false,
TimestampCombiner.END_OF_WINDOW, false,
- ClosingBehavior.FIRE_IF_NON_EMPTY);
+ ClosingBehavior.FIRE_IF_NON_EMPTY,
+ OnTimeBehavior.FIRE_ALWAYS);
}
public WindowFn<T, W> getWindowFn() {
@@ -133,6 +138,10 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
return closingBehavior;
}
+ public OnTimeBehavior getOnTimeBehavior() {
+ return onTimeBehavior;
+ }
+
public TimestampCombiner getTimestampCombiner() {
return timestampCombiner;
}
@@ -152,7 +161,8 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
mode, modeSpecified,
allowedLateness, allowedLatenessSpecified,
timestampCombiner, timestampCombinerSpecified,
- closingBehavior);
+ closingBehavior,
+ onTimeBehavior);
}
/**
@@ -166,7 +176,8 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
mode, true,
allowedLateness, allowedLatenessSpecified,
timestampCombiner, timestampCombinerSpecified,
- closingBehavior);
+ closingBehavior,
+ onTimeBehavior);
}
/**
@@ -183,7 +194,8 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
mode, modeSpecified,
allowedLateness, allowedLatenessSpecified,
timestampCombiner, timestampCombinerSpecified,
- closingBehavior);
+ closingBehavior,
+ onTimeBehavior);
}
/**
@@ -197,7 +209,8 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
mode, modeSpecified,
allowedLateness, true,
timestampCombiner, timestampCombinerSpecified,
- closingBehavior);
+ closingBehavior,
+ onTimeBehavior);
}
public WindowingStrategy<T, W> withClosingBehavior(ClosingBehavior closingBehavior) {
@@ -207,7 +220,19 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
mode, modeSpecified,
allowedLateness, allowedLatenessSpecified,
timestampCombiner, timestampCombinerSpecified,
- closingBehavior);
+ closingBehavior,
+ onTimeBehavior);
+ }
+
+ public WindowingStrategy<T, W> withOnTimeBehavior(OnTimeBehavior onTimeBehavior) {
+ return new WindowingStrategy<T, W>(
+ windowFn,
+ trigger, triggerSpecified,
+ mode, modeSpecified,
+ allowedLateness, allowedLatenessSpecified,
+ timestampCombiner, timestampCombinerSpecified,
+ closingBehavior,
+ onTimeBehavior);
}
@Experimental(Experimental.Kind.OUTPUT_TIME)
@@ -219,7 +244,8 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
mode, modeSpecified,
allowedLateness, allowedLatenessSpecified,
timestampCombiner, true,
- closingBehavior);
+ closingBehavior,
+ onTimeBehavior);
}
@Override
@@ -246,6 +272,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
&& getMode().equals(other.getMode())
&& getAllowedLateness().equals(other.getAllowedLateness())
&& getClosingBehavior().equals(other.getClosingBehavior())
+ && getOnTimeBehavior().equals(other.getOnTimeBehavior())
&& getTrigger().equals(other.getTrigger())
&& getTimestampCombiner().equals(other.getTimestampCombiner())
&& getWindowFn().equals(other.getWindowFn());
@@ -278,6 +305,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
mode, true,
allowedLateness, true,
timestampCombiner, true,
- closingBehavior);
+ closingBehavior,
+ onTimeBehavior);
}
}