You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/03/31 18:20:44 UTC
[5/6] beam git commit: Replaces Window.Bound with simply Window
Replaces Window.Bound with simply Window
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6848950c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6848950c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6848950c
Branch: refs/heads/master
Commit: 6848950cca5bee2dddc18ddca229a5deb9e34754
Parents: 6b67e54
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Mar 29 13:09:49 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Mar 31 10:59:38 2017 -0700
----------------------------------------------------------------------
.../translation/WindowAssignTranslator.java | 2 +-
.../direct/WindowEvaluatorFactoryTest.java | 7 +-
.../beam/runners/dataflow/AssignWindows.java | 4 +-
.../dataflow/ReshuffleOverrideFactory.java | 2 +-
.../org/apache/beam/sdk/testing/PAssert.java | 4 +-
.../org/apache/beam/sdk/transforms/DoFn.java | 2 +-
.../apache/beam/sdk/transforms/GroupByKey.java | 6 +-
.../beam/sdk/transforms/WithTimestamps.java | 6 +-
.../beam/sdk/transforms/windowing/Never.java | 2 +-
.../beam/sdk/transforms/windowing/PaneInfo.java | 2 +-
.../beam/sdk/transforms/windowing/Window.java | 447 +++++++++----------
.../org/apache/beam/sdk/util/Reshuffle.java | 2 +-
.../java/org/apache/beam/sdk/io/WriteTest.java | 4 +-
.../org/apache/beam/sdk/transforms/TopTest.java | 3 +-
.../sdk/transforms/windowing/WindowTest.java | 17 +-
15 files changed, 249 insertions(+), 261 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/6848950c/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowAssignTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowAssignTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowAssignTranslator.java
index b3aef8d..6106f75 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowAssignTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowAssignTranslator.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
/**
- * {@link Window.Bound} is translated to {link ApexParDoOperator} that wraps an {@link
+ * {@link Window} is translated to {link ApexParDoOperator} that wraps an {@link
* AssignWindowsDoFn}.
*/
class WindowAssignTranslator<T> implements TransformTranslator<Window.Assign<T>> {
http://git-wip-us.apache.org/repos/asf/beam/blob/6848950c/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
index a71a75b..eb58629 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
@@ -43,7 +43,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.Window.Bound;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.util.WindowedValue;
@@ -113,7 +112,7 @@ public class WindowEvaluatorFactoryTest {
@Test
public void singleWindowFnSucceeds() throws Exception {
Duration windowDuration = Duration.standardDays(7);
- Bound<Long> transform = Window.<Long>into(FixedWindows.of(windowDuration));
+ Window<Long> transform = Window.<Long>into(FixedWindows.of(windowDuration));
PCollection<Long> windowed = input.apply(transform);
CommittedBundle<Long> inputBundle = createInputBundle();
@@ -152,7 +151,7 @@ public class WindowEvaluatorFactoryTest {
public void multipleWindowsWindowFnSucceeds() throws Exception {
Duration windowDuration = Duration.standardDays(6);
Duration slidingBy = Duration.standardDays(3);
- Bound<Long> transform = Window.into(SlidingWindows.of(windowDuration).every(slidingBy));
+ Window<Long> transform = Window.into(SlidingWindows.of(windowDuration).every(slidingBy));
PCollection<Long> windowed = input.apply(transform);
CommittedBundle<Long> inputBundle = createInputBundle();
@@ -209,7 +208,7 @@ public class WindowEvaluatorFactoryTest {
@Test
public void referencesEarlierWindowsSucceeds() throws Exception {
- Bound<Long> transform = Window.into(new EvaluatorTestWindowFn());
+ Window<Long> transform = Window.into(new EvaluatorTestWindowFn());
PCollection<Long> windowed = input.apply(transform);
CommittedBundle<Long> inputBundle = createInputBundle();
http://git-wip-us.apache.org/repos/asf/beam/blob/6848950c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
index 880cd26..3e36899 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
@@ -42,13 +42,13 @@ import org.apache.beam.sdk.values.PCollection;
* @param <T> the type of input element
*/
class AssignWindows<T> extends PTransform<PCollection<T>, PCollection<T>> {
- private final Window.Bound<T> transform;
+ private final Window<T> transform;
/**
* Builds an instance of this class from the overriden transform.
*/
@SuppressWarnings("unused") // Used via reflection
- public AssignWindows(Window.Bound<T> transform) {
+ public AssignWindows(Window<T> transform) {
this.transform = transform;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6848950c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java
index 230f5dc..2e6455d 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java
@@ -56,7 +56,7 @@ class ReshuffleOverrideFactory<K, V>
// If the input has already had its windows merged, then the GBK that performed the merge
// will have set originalStrategy.getWindowFn() to InvalidWindows, causing the GBK contained
// here to fail. Instead, we install a valid WindowFn that leaves all windows unchanged.
- Window.Bound<KV<K, V>> rewindow =
+ Window<KV<K, V>> rewindow =
Window.<KV<K, V>>into(
new IdentityWindowFn<>(
originalStrategy.getWindowFn().windowCoder()))
http://git-wip-us.apache.org/repos/asf/beam/blob/6848950c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index ab412c4..92dca53 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -145,7 +145,7 @@ public class PAssert {
* <p>If the input {@link WindowingStrategy} does not always produce final panes, the assertion
* may be executed over an empty input even if the trigger has fired previously. To ensure that
* a final pane is always produced, set the {@link ClosingBehavior} of the windowing strategy
- * (via {@link Window.Bound#withAllowedLateness(Duration, ClosingBehavior)} setting
+ * (via {@link Window#withAllowedLateness(Duration, ClosingBehavior)} setting
* {@link ClosingBehavior} to {@link ClosingBehavior#FIRE_ALWAYS}).
*
* @return a new {@link IterableAssert} like this one but with the assertion only applied to the
@@ -233,7 +233,7 @@ public class PAssert {
* <p>If the input {@link WindowingStrategy} does not always produce final panes, the assertion
* may be executed over an empty input even if the trigger has fired previously. To ensure that
* a final pane is always produced, set the {@link ClosingBehavior} of the windowing strategy
- * (via {@link Window.Bound#withAllowedLateness(Duration, ClosingBehavior)} setting
+ * (via {@link Window#withAllowedLateness(Duration, ClosingBehavior)} setting
* {@link ClosingBehavior} to {@link ClosingBehavior#FIRE_ALWAYS}).
*
* @return a new {@link SingletonAssert} like this one but with the assertion only applied to
http://git-wip-us.apache.org/repos/asf/beam/blob/6848950c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index e43527a..de33612 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -324,7 +324,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
*
* @deprecated This method permits a {@link DoFn} to emit elements behind the watermark. These
* elements are considered late, and if behind the
- * {@link Window.Bound#withAllowedLateness(Duration) allowed lateness} of a downstream
+ * {@link Window#withAllowedLateness(Duration) allowed lateness} of a downstream
* {@link PCollection} may be silently dropped. See
* https://issues.apache.org/jira/browse/BEAM-644 for details on a replacement.
*
http://git-wip-us.apache.org/repos/asf/beam/blob/6848950c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
index f68b1f3..d228dbb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
@@ -39,7 +39,7 @@ import org.apache.beam.sdk.values.PCollection.IsBounded;
* each distinct key and window of the input {@code PCollection} to an
* {@code Iterable} over all the values associated with that key in
* the input per window. Absent repeatedly-firing
- * {@link Window.Bound#triggering triggering}, each key in the output
+ * {@link Window#triggering triggering}, each key in the output
* {@code PCollection} is unique within each window.
*
* <p>{@code GroupByKey} is analogous to converting a multi-map into
@@ -97,14 +97,14 @@ import org.apache.beam.sdk.values.PCollection.IsBounded;
* for details on the estimation.
*
* <p>The timestamp for each emitted pane is determined by the
- * {@link Window.Bound#withOutputTimeFn windowing operation}.
+ * {@link Window#withOutputTimeFn windowing operation}.
* The output {@code PCollection} will have the same {@link WindowFn}
* as the input.
*
* <p>If the input {@code PCollection} contains late data (see
* {@link org.apache.beam.sdk.io.PubsubIO.Read#timestampLabel}
* for an example of how this can occur) or the
- * {@link Window.Bound#triggering requested TriggerFn} can fire before
+ * {@link Window#triggering requested TriggerFn} can fire before
* the watermark, then there may be multiple elements
* output by a {@code GroupByKey} that correspond to the same key and window.
*
http://git-wip-us.apache.org/repos/asf/beam/blob/6848950c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
index 39cf6c6..c4df2fb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
@@ -52,7 +52,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
*
* <p>CAUTION: Use of {@link #withAllowedTimestampSkew(Duration)} permits elements to be emitted
* behind the watermark. These elements are considered late, and if behind the {@link
- * Window.Bound#withAllowedLateness(Duration) allowed lateness} of a downstream
+ * Window#withAllowedLateness(Duration) allowed lateness} of a downstream
* {@link PCollection} may be silently dropped. See https://issues.apache.org/jira/browse/BEAM-644
* for details on a replacement.
*
@@ -90,7 +90,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
* future. For infinite skew, use {@code new Duration(Long.MAX_VALUE)}.
* @deprecated This method permits a to elements to be emitted behind the watermark. These
* elements are considered late, and if behind the
- * {@link Window.Bound#withAllowedLateness(Duration) allowed lateness} of a downstream
+ * {@link Window#withAllowedLateness(Duration) allowed lateness} of a downstream
* {@link PCollection} may be silently dropped. See
* https://issues.apache.org/jira/browse/BEAM-644 for details on a replacement.
*/
@@ -106,7 +106,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
* @see DoFn#getAllowedTimestampSkew()
* @deprecated This method permits a to elements to be emitted behind the watermark. These
* elements are considered late, and if behind the
- * {@link Window.Bound#withAllowedLateness(Duration) allowed lateness} of a downstream
+ * {@link Window#withAllowedLateness(Duration) allowed lateness} of a downstream
* {@link PCollection} may be silently dropped. See
* https://issues.apache.org/jira/browse/BEAM-644 for details on a replacement.
*/
http://git-wip-us.apache.org/repos/asf/beam/blob/6848950c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
index d8cb96d..664ae83 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
@@ -26,7 +26,7 @@ import org.joda.time.Instant;
* A trigger which never fires.
*
* <p>Using this trigger will only produce output when the watermark passes the end of the
- * {@link BoundedWindow window} plus the {@link Window.Bound#withAllowedLateness allowed
+ * {@link BoundedWindow window} plus the {@link Window#withAllowedLateness allowed
* lateness}.
*/
public final class Never {
http://git-wip-us.apache.org/repos/asf/beam/blob/6848950c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
index de5b1e1..1fa1f49 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
@@ -72,7 +72,7 @@ public final class PaneInfo {
* <li>We'll call a pipeline 'simple' if it does not use
* {@link DoFn.Context#outputWithTimestamp} in
* any {@link DoFn}, and it uses the same
- * {@link org.apache.beam.sdk.transforms.windowing.Window.Bound#withAllowedLateness}
+ * {@link org.apache.beam.sdk.transforms.windowing.Window#withAllowedLateness}
* argument value on all windows (or uses the default of {@link org.joda.time.Duration#ZERO}).
* <li>We'll call an element 'locally late', from the point of view of a computation on a
* worker, if the element's timestamp is before the input watermark for that computation
http://git-wip-us.apache.org/repos/asf/beam/blob/6848950c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index 188554a..58425e0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -89,7 +89,7 @@ import org.joda.time.Duration;
*
* <h2>Triggers</h2>
*
- * <p>{@link Window.Bound#triggering(Trigger)} allows specifying a trigger to control when
+ * <p>{@link Window#triggering(Trigger)} allows specifying a trigger to control when
* (in processing time) results for the given window can be produced. If unspecified, the default
* behavior is to trigger first when the watermark passes the end of the window, and then trigger
* again every time there is late arriving data.
@@ -139,8 +139,8 @@ import org.joda.time.Duration;
*
* <p>See {@link Trigger} for details on the available triggers.
*/
-public class Window {
-
+@AutoValue
+public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T>> {
/**
* Specifies the conditions under which a final pane will be created when a window is permanently
* closed.
@@ -169,7 +169,7 @@ public class Window {
* the argument {@code WindowFn}. It is ready to be applied, or further
* properties can be set on it first.
*/
- public static <T> Bound<T> into(WindowFn<? super T, ?> fn) {
+ public static <T> Window<T> into(WindowFn<? super T, ?> fn) {
try {
fn.windowCoder().verifyDeterministic();
} catch (NonDeterministicException e) {
@@ -182,265 +182,256 @@ public class Window {
* Returns a new builder for a {@link Window} transform for setting windowing parameters other
* than the windowing function.
*/
- public static <T> Bound<T> configure() {
- return new AutoValue_Window_Bound.Builder<T>().build();
+ public static <T> Window<T> configure() {
+ return new AutoValue_Window.Builder<T>().build();
+ }
+
+ @Nullable
+ public abstract WindowFn<? super T, ?> getWindowFn();
+
+ @Nullable abstract Trigger getTrigger();
+ @Nullable abstract AccumulationMode getAccumulationMode();
+ @Nullable abstract Duration getAllowedLateness();
+ @Nullable abstract ClosingBehavior getClosingBehavior();
+ @Nullable abstract OutputTimeFn<?> getOutputTimeFn();
+
+ abstract Builder<T> toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder<T> {
+ abstract Builder<T> setWindowFn(WindowFn<? super T, ?> windowFn);
+ abstract Builder<T> setTrigger(Trigger trigger);
+ abstract Builder<T> setAccumulationMode(AccumulationMode mode);
+ abstract Builder<T> setAllowedLateness(Duration allowedLateness);
+ abstract Builder<T> setClosingBehavior(ClosingBehavior closingBehavior);
+ abstract Builder<T> setOutputTimeFn(OutputTimeFn<?> outputTimeFn);
+
+ abstract Window<T> build();
+ }
+
+ private Window<T> withWindowFn(WindowFn<? super T, ?> windowFn) {
+ return toBuilder().setWindowFn(windowFn).build();
}
/**
- * A {@code PTransform} that windows the elements of a {@code PCollection<T>},
- * into finite windows according to a user-specified {@code WindowFn}.
+ * Sets a non-default trigger for this {@code Window} {@code PTransform}.
+ * Elements that are assigned to a specific window will be output when
+ * the trigger fires.
*
- * @param <T> The type of elements this {@code Window} is applied to
+ * <p>{@link org.apache.beam.sdk.transforms.windowing.Trigger}
+ * has more details on the available triggers.
+ *
+ * <p>Must also specify allowed lateness using {@link #withAllowedLateness} and accumulation
+ * mode using either {@link #discardingFiredPanes()} or {@link #accumulatingFiredPanes()}.
*/
- @AutoValue
- public abstract static class Bound<T> extends PTransform<PCollection<T>, PCollection<T>> {
- @Nullable
- public abstract WindowFn<? super T, ?> getWindowFn();
-
- @Nullable abstract Trigger getTrigger();
- @Nullable abstract AccumulationMode getAccumulationMode();
- @Nullable abstract Duration getAllowedLateness();
- @Nullable abstract ClosingBehavior getClosingBehavior();
- @Nullable abstract OutputTimeFn<?> getOutputTimeFn();
-
- abstract Builder<T> toBuilder();
-
- @AutoValue.Builder
- abstract static class Builder<T> {
- abstract Builder<T> setWindowFn(WindowFn<? super T, ?> windowFn);
- abstract Builder<T> setTrigger(Trigger trigger);
- abstract Builder<T> setAccumulationMode(AccumulationMode mode);
- abstract Builder<T> setAllowedLateness(Duration allowedLateness);
- abstract Builder<T> setClosingBehavior(ClosingBehavior closingBehavior);
- abstract Builder<T> setOutputTimeFn(OutputTimeFn<?> outputTimeFn);
-
- abstract Bound<T> build();
- }
+ @Experimental(Kind.TRIGGER)
+ public Window<T> triggering(Trigger trigger) {
+ return toBuilder().setTrigger(trigger).build();
+ }
- private Bound<T> withWindowFn(WindowFn<? super T, ?> windowFn) {
- return toBuilder().setWindowFn(windowFn).build();
- }
+ /**
+ * Returns a new {@code Window} {@code PTransform} that uses the registered WindowFn and
+ * Triggering behavior, and that discards elements in a pane after they are triggered.
+ *
+ * <p>Does not modify this transform. The resulting {@code PTransform} is sufficiently
+ * specified to be applied, but more properties can still be specified.
+ */
+ @Experimental(Kind.TRIGGER)
+ public Window<T> discardingFiredPanes() {
+ return toBuilder().setAccumulationMode(AccumulationMode.DISCARDING_FIRED_PANES).build();
+ }
+
+ /**
+ * Returns a new {@code Window} {@code PTransform} that uses the registered WindowFn and
+ * Triggering behavior, and that accumulates elements in a pane after they are triggered.
+ *
+ * <p>Does not modify this transform. The resulting {@code PTransform} is sufficiently
+ * specified to be applied, but more properties can still be specified.
+ */
+ @Experimental(Kind.TRIGGER)
+ public Window<T> accumulatingFiredPanes() {
+ return toBuilder().setAccumulationMode(AccumulationMode.ACCUMULATING_FIRED_PANES).build();
+ }
- /**
- * Sets a non-default trigger for this {@code Window} {@code PTransform}.
- * Elements that are assigned to a specific window will be output when
- * the trigger fires.
- *
- * <p>{@link org.apache.beam.sdk.transforms.windowing.Trigger}
- * has more details on the available triggers.
- *
- * <p>Must also specify allowed lateness using {@link #withAllowedLateness} and accumulation
- * mode using either {@link #discardingFiredPanes()} or {@link #accumulatingFiredPanes()}.
- */
- @Experimental(Kind.TRIGGER)
- public Bound<T> triggering(Trigger trigger) {
- return toBuilder().setTrigger(trigger).build();
- }
+ /**
+ * Override the amount of lateness allowed for data elements in the output {@link PCollection}
+ * and downstream {@link PCollection PCollections} until explicitly set again.
+ * Like the other properties on this {@link Window} operation, this will be applied at
+ * the next {@link GroupByKey}. Any elements that are later than this as decided by
+ * the system-maintained watermark will be dropped.
+ *
+ * <p>This value also determines how long state will be kept around for old windows.
+ * Once no elements will be added to a window (because this duration has passed) any state
+ * associated with the window will be cleaned up.
+ *
+ * <p>Depending on the trigger this may not produce a pane with {@link PaneInfo#isLast}. See
+ * {@link ClosingBehavior#FIRE_IF_NON_EMPTY} for more details.
+ */
+ @Experimental(Kind.TRIGGER)
+ public Window<T> withAllowedLateness(Duration allowedLateness) {
+ return toBuilder().setAllowedLateness(allowedLateness).build();
+ }
- /**
- * Returns a new {@code Window} {@code PTransform} that uses the registered WindowFn and
- * Triggering behavior, and that discards elements in a pane after they are triggered.
- *
- * <p>Does not modify this transform. The resulting {@code PTransform} is sufficiently
- * specified to be applied, but more properties can still be specified.
- */
- @Experimental(Kind.TRIGGER)
- public Bound<T> discardingFiredPanes() {
- return toBuilder().setAccumulationMode(AccumulationMode.DISCARDING_FIRED_PANES).build();
- }
-
- /**
- * Returns a new {@code Window} {@code PTransform} that uses the registered WindowFn and
- * Triggering behavior, and that accumulates elements in a pane after they are triggered.
- *
- * <p>Does not modify this transform. The resulting {@code PTransform} is sufficiently
- * specified to be applied, but more properties can still be specified.
- */
- @Experimental(Kind.TRIGGER)
- public Bound<T> accumulatingFiredPanes() {
- return toBuilder().setAccumulationMode(AccumulationMode.ACCUMULATING_FIRED_PANES).build();
- }
+ /**
+ * <b><i>(Experimental)</i></b> Override the default {@link OutputTimeFn}, to control
+ * the output timestamp of values output from a {@link GroupByKey} operation.
+ */
+ @Experimental(Kind.OUTPUT_TIME)
+ public Window<T> withOutputTimeFn(OutputTimeFn<?> outputTimeFn) {
+ return toBuilder().setOutputTimeFn(outputTimeFn).build();
+ }
- /**
- * Override the amount of lateness allowed for data elements in the output {@link PCollection}
- * and downstream {@link PCollection PCollections} until explicitly set again.
- * Like the other properties on this {@link Window} operation, this will be applied at
- * the next {@link GroupByKey}. Any elements that are later than this as decided by
- * the system-maintained watermark will be dropped.
- *
- * <p>This value also determines how long state will be kept around for old windows.
- * Once no elements will be added to a window (because this duration has passed) any state
- * associated with the window will be cleaned up.
- *
- * <p>Depending on the trigger this may not produce a pane with {@link PaneInfo#isLast}. See
- * {@link ClosingBehavior#FIRE_IF_NON_EMPTY} for more details.
- */
- @Experimental(Kind.TRIGGER)
- public Bound<T> withAllowedLateness(Duration allowedLateness) {
- return toBuilder().setAllowedLateness(allowedLateness).build();
- }
+ /**
+ * Override the amount of lateness allowed for data elements in the pipeline. Like
+ * the other properties on this {@link Window} operation, this will be applied at
+ * the next {@link GroupByKey}. Any elements that are later than this as decided by
+ * the system-maintained watermark will be dropped.
+ *
+ * <p>This value also determines how long state will be kept around for old windows.
+ * Once no elements will be added to a window (because this duration has passed) any state
+ * associated with the window will be cleaned up.
+ */
+ @Experimental(Kind.TRIGGER)
+ public Window<T> withAllowedLateness(Duration allowedLateness, ClosingBehavior behavior) {
+ return toBuilder().setAllowedLateness(allowedLateness).setClosingBehavior(behavior).build();
+ }
- /**
- * <b><i>(Experimental)</i></b> Override the default {@link OutputTimeFn}, to control
- * the output timestamp of values output from a {@link GroupByKey} operation.
- */
- @Experimental(Kind.OUTPUT_TIME)
- public Bound<T> withOutputTimeFn(OutputTimeFn<?> outputTimeFn) {
- return toBuilder().setOutputTimeFn(outputTimeFn).build();
+ /**
+ * Get the output strategy of this {@link Window Window PTransform}. For internal use
+ * only.
+ */
+ // Rawtype cast of OutputTimeFn cannot be eliminated with intermediate variable, as it is
+ // casting between wildcards
+ public WindowingStrategy<?, ?> getOutputStrategyInternal(
+ WindowingStrategy<?, ?> inputStrategy) {
+ WindowingStrategy<?, ?> result = inputStrategy;
+ if (getWindowFn() != null) {
+ result = result.withWindowFn(getWindowFn());
}
-
- /**
- * Override the amount of lateness allowed for data elements in the pipeline. Like
- * the other properties on this {@link Window} operation, this will be applied at
- * the next {@link GroupByKey}. Any elements that are later than this as decided by
- * the system-maintained watermark will be dropped.
- *
- * <p>This value also determines how long state will be kept around for old windows.
- * Once no elements will be added to a window (because this duration has passed) any state
- * associated with the window will be cleaned up.
- */
- @Experimental(Kind.TRIGGER)
- public Bound<T> withAllowedLateness(Duration allowedLateness, ClosingBehavior behavior) {
- return toBuilder().setAllowedLateness(allowedLateness).setClosingBehavior(behavior).build();
+ if (getTrigger() != null) {
+ result = result.withTrigger(getTrigger());
}
-
- /**
- * Get the output strategy of this {@link Window.Bound Window PTransform}. For internal use
- * only.
- */
- // Rawtype cast of OutputTimeFn cannot be eliminated with intermediate variable, as it is
- // casting between wildcards
- public WindowingStrategy<?, ?> getOutputStrategyInternal(
- WindowingStrategy<?, ?> inputStrategy) {
- WindowingStrategy<?, ?> result = inputStrategy;
- if (getWindowFn() != null) {
- result = result.withWindowFn(getWindowFn());
- }
- if (getTrigger() != null) {
- result = result.withTrigger(getTrigger());
- }
- if (getAccumulationMode() != null) {
- result = result.withMode(getAccumulationMode());
- }
- if (getAllowedLateness() != null) {
- result = result.withAllowedLateness(getAllowedLateness());
- }
- if (getClosingBehavior() != null) {
- result = result.withClosingBehavior(getClosingBehavior());
- }
- if (getOutputTimeFn() != null) {
- result = result.withOutputTimeFn(getOutputTimeFn());
- }
- return result;
+ if (getAccumulationMode() != null) {
+ result = result.withMode(getAccumulationMode());
}
-
- @Override
- public void validate(PCollection<T> input) {
- WindowingStrategy<?, ?> outputStrategy =
- getOutputStrategyInternal(input.getWindowingStrategy());
-
- // Make sure that the windowing strategy is complete & valid.
- if (outputStrategy.isTriggerSpecified()
- && !(outputStrategy.getTrigger() instanceof DefaultTrigger)
- && !(outputStrategy.getWindowFn() instanceof GlobalWindows)
- && !outputStrategy.isAllowedLatenessSpecified()) {
- throw new IllegalArgumentException(
- "Except when using GlobalWindows,"
- + " calling .triggering() to specify a trigger requires that the allowed lateness"
- + " be specified using .withAllowedLateness() to set the upper bound on how late"
- + " data can arrive before being dropped. See Javadoc for more details.");
- }
-
- if (!outputStrategy.isModeSpecified() && canProduceMultiplePanes(outputStrategy)) {
- throw new IllegalArgumentException(
- "Calling .triggering() to specify a trigger or calling .withAllowedLateness() to"
- + " specify an allowed lateness greater than zero requires that the accumulation"
- + " mode be specified using .discardingFiredPanes() or .accumulatingFiredPanes()."
- + " See Javadoc for more details.");
- }
+ if (getAllowedLateness() != null) {
+ result = result.withAllowedLateness(getAllowedLateness());
+ }
+ if (getClosingBehavior() != null) {
+ result = result.withClosingBehavior(getClosingBehavior());
+ }
+ if (getOutputTimeFn() != null) {
+ result = result.withOutputTimeFn(getOutputTimeFn());
}
+ return result;
+ }
- private boolean canProduceMultiplePanes(WindowingStrategy<?, ?> strategy) {
- // The default trigger is Repeatedly.forever(AfterWatermark.pastEndOfWindow()); This fires
- // for every late-arriving element if allowed lateness is nonzero, and thus we must have
- // an accumulating mode specified
- boolean dataCanArriveLate =
- !(strategy.getWindowFn() instanceof GlobalWindows)
- && strategy.getAllowedLateness().getMillis() > 0;
- boolean hasCustomTrigger = !(strategy.getTrigger() instanceof DefaultTrigger);
- return dataCanArriveLate || hasCustomTrigger;
+ @Override
+ public void validate(PCollection<T> input) {
+ WindowingStrategy<?, ?> outputStrategy =
+ getOutputStrategyInternal(input.getWindowingStrategy());
+
+ // Make sure that the windowing strategy is complete & valid.
+ if (outputStrategy.isTriggerSpecified()
+ && !(outputStrategy.getTrigger() instanceof DefaultTrigger)
+ && !(outputStrategy.getWindowFn() instanceof GlobalWindows)
+ && !outputStrategy.isAllowedLatenessSpecified()) {
+ throw new IllegalArgumentException(
+ "Except when using GlobalWindows,"
+ + " calling .triggering() to specify a trigger requires that the allowed lateness"
+ + " be specified using .withAllowedLateness() to set the upper bound on how late"
+ + " data can arrive before being dropped. See Javadoc for more details.");
}
- @Override
- public PCollection<T> expand(PCollection<T> input) {
- WindowingStrategy<?, ?> outputStrategy =
- getOutputStrategyInternal(input.getWindowingStrategy());
- if (getWindowFn() == null) {
- // A new PCollection must be created in case input is reused in a different location as the
- // two PCollections will, in general, have a different windowing strategy.
- return PCollectionList.of(input)
- .apply(Flatten.<T>pCollections())
- .setWindowingStrategyInternal(outputStrategy);
- } else {
- // This is the AssignWindows primitive
- return input.apply(new Assign<>(this, outputStrategy));
- }
+ if (!outputStrategy.isModeSpecified() && canProduceMultiplePanes(outputStrategy)) {
+ throw new IllegalArgumentException(
+ "Calling .triggering() to specify a trigger or calling .withAllowedLateness() to"
+ + " specify an allowed lateness greater than zero requires that the accumulation"
+ + " mode be specified using .discardingFiredPanes() or .accumulatingFiredPanes()."
+ + " See Javadoc for more details.");
}
+ }
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
+ private boolean canProduceMultiplePanes(WindowingStrategy<?, ?> strategy) {
+ // The default trigger is Repeatedly.forever(AfterWatermark.pastEndOfWindow()); This fires
+ // for every late-arriving element if allowed lateness is nonzero, and thus we must have
+ // an accumulating mode specified
+ boolean dataCanArriveLate =
+ !(strategy.getWindowFn() instanceof GlobalWindows)
+ && strategy.getAllowedLateness().getMillis() > 0;
+ boolean hasCustomTrigger = !(strategy.getTrigger() instanceof DefaultTrigger);
+ return dataCanArriveLate || hasCustomTrigger;
+ }
- if (getWindowFn() != null) {
- builder
- .add(DisplayData.item("windowFn", getWindowFn().getClass())
- .withLabel("Windowing Function"))
- .include("windowFn", getWindowFn());
- }
+ @Override
+ public PCollection<T> expand(PCollection<T> input) {
+ WindowingStrategy<?, ?> outputStrategy =
+ getOutputStrategyInternal(input.getWindowingStrategy());
+ if (getWindowFn() == null) {
+ // A new PCollection must be created in case input is reused in a different location as the
+ // two PCollections will, in general, have a different windowing strategy.
+ return PCollectionList.of(input)
+ .apply(Flatten.<T>pCollections())
+ .setWindowingStrategyInternal(outputStrategy);
+ } else {
+ // This is the AssignWindows primitive
+ return input.apply(new Assign<>(this, outputStrategy));
+ }
+ }
- if (getAllowedLateness() != null) {
- builder.addIfNotDefault(DisplayData.item("allowedLateness", getAllowedLateness())
- .withLabel("Allowed Lateness"),
- Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
- }
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
- if (getTrigger() != null && !(getTrigger() instanceof DefaultTrigger)) {
- builder.add(DisplayData.item("trigger", getTrigger().toString())
- .withLabel("Trigger"));
- }
+ if (getWindowFn() != null) {
+ builder
+ .add(DisplayData.item("windowFn", getWindowFn().getClass())
+ .withLabel("Windowing Function"))
+ .include("windowFn", getWindowFn());
+ }
- if (getAccumulationMode() != null) {
- builder.add(DisplayData.item("accumulationMode", getAccumulationMode().toString())
- .withLabel("Accumulation Mode"));
- }
+ if (getAllowedLateness() != null) {
+ builder.addIfNotDefault(DisplayData.item("allowedLateness", getAllowedLateness())
+ .withLabel("Allowed Lateness"),
+ Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
+ }
- if (getClosingBehavior() != null) {
- builder.add(DisplayData.item("closingBehavior", getClosingBehavior().toString())
- .withLabel("Window Closing Behavior"));
- }
+ if (getTrigger() != null && !(getTrigger() instanceof DefaultTrigger)) {
+ builder.add(DisplayData.item("trigger", getTrigger().toString())
+ .withLabel("Trigger"));
+ }
- if (getOutputTimeFn() != null) {
- builder.add(DisplayData.item("outputTimeFn", getOutputTimeFn().getClass())
- .withLabel("Output Time Function"));
- }
+ if (getAccumulationMode() != null) {
+ builder.add(DisplayData.item("accumulationMode", getAccumulationMode().toString())
+ .withLabel("Accumulation Mode"));
}
- @Override
- protected Coder<?> getDefaultOutputCoder(PCollection<T> input) {
- return input.getCoder();
+ if (getClosingBehavior() != null) {
+ builder.add(DisplayData.item("closingBehavior", getClosingBehavior().toString())
+ .withLabel("Window Closing Behavior"));
}
- @Override
- protected String getKindString() {
- return "Window.Into()";
+ if (getOutputTimeFn() != null) {
+ builder.add(DisplayData.item("outputTimeFn", getOutputTimeFn().getClass())
+ .withLabel("Output Time Function"));
}
}
+ @Override
+ protected Coder<?> getDefaultOutputCoder(PCollection<T> input) {
+ return input.getCoder();
+ }
+
+ @Override
+ protected String getKindString() {
+ return "Window.Into()";
+ }
+
/**
* A Primitive {@link PTransform} that assigns windows to elements based on a {@link WindowFn}.
*/
public static class Assign<T> extends PTransform<PCollection<T>, PCollection<T>> {
- private final Bound<T> original;
+ private final Window<T> original;
private final WindowingStrategy<T, ?> updatedStrategy;
/**
@@ -449,7 +440,7 @@ public class Window {
* {@link #getWindowFn()}.
*/
@VisibleForTesting
- Assign(Bound<T> original, WindowingStrategy updatedStrategy) {
+ Assign(Window<T> original, WindowingStrategy updatedStrategy) {
this.original = original;
this.updatedStrategy = updatedStrategy;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6848950c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
index e80bc17..0c27c4f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
@@ -59,7 +59,7 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollecti
// here to fail. Instead, we install a valid WindowFn that leaves all windows unchanged.
// The OutputTimeFn is set to ensure the GroupByKey does not shift elements forwards in time.
// Because this outputs as fast as possible, this should not hold the watermark.
- Window.Bound<KV<K, V>> rewindow =
+ Window<KV<K, V>> rewindow =
Window.<KV<K, V>>into(new IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
.triggering(new ReshuffleTrigger<>())
.discardingFiredPanes()
http://git-wip-us.apache.org/repos/asf/beam/blob/6848950c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
index 80f6f66..3ecbed4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
@@ -118,8 +118,8 @@ public class WriteTest {
};
private static class WindowAndReshuffle<T> extends PTransform<PCollection<T>, PCollection<T>> {
- private final Window.Bound<T> window;
- public WindowAndReshuffle(Window.Bound<T> window) {
+ private final Window<T> window;
+ public WindowAndReshuffle(Window<T> window) {
this.window = window;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6848950c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
index e1ac54f..9b0b27d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
@@ -35,7 +35,6 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.Window.Bound;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.Matchers;
@@ -153,7 +152,7 @@ public class TopTest {
public void testTopEmptyWithIncompatibleWindows() {
p.enableAbandonedNodeEnforcement(false);
- Bound<String> windowingFn = Window.<String>into(FixedWindows.of(Duration.standardDays(10L)));
+ Window<String> windowingFn = Window.<String>into(FixedWindows.of(Duration.standardDays(10L)));
PCollection<String> input = p.apply(Create.empty(StringUtf8Coder.of())).apply(windowingFn);
expectedEx.expect(IllegalStateException.class);
http://git-wip-us.apache.org/repos/asf/beam/blob/6848950c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index 979179d..8bf022b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -52,7 +52,6 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
-import org.apache.beam.sdk.transforms.windowing.Window.Bound;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
import org.apache.beam.sdk.values.KV;
@@ -168,7 +167,7 @@ public class WindowTest implements Serializable {
/**
* With {@link #testWindowIntoNullWindowFnNoAssign()}, demonstrates that the expansions of the
- * {@link Window.Bound} transform depends on if it actually assigns elements to windows.
+ * {@link Window} transform depends on if it actually assigns elements to windows.
*/
@Test
public void testWindowIntoWindowFnAssign() {
@@ -192,7 +191,7 @@ public class WindowTest implements Serializable {
/**
* With {@link #testWindowIntoWindowFnAssign()}, demonstrates that the expansions of the
- * {@link Window.Bound} transform depends on if it actually assigns elements to windows.
+ * {@link Window} transform depends on if it actually assigns elements to windows.
*/
@Test
public void testWindowIntoNullWindowFnNoAssign() {
@@ -429,7 +428,7 @@ public class WindowTest implements Serializable {
Window.ClosingBehavior closingBehavior = Window.ClosingBehavior.FIRE_IF_NON_EMPTY;
OutputTimeFn<BoundedWindow> outputTimeFn = OutputTimeFns.outputAtEndOfWindow();
- Window.Bound<?> window = Window
+ Window<?> window = Window
.into(windowFn)
.triggering(triggerBuilder)
.accumulatingFiredPanes()
@@ -459,7 +458,7 @@ public class WindowTest implements Serializable {
Window.ClosingBehavior closingBehavior = Window.ClosingBehavior.FIRE_IF_NON_EMPTY;
OutputTimeFn<BoundedWindow> outputTimeFn = OutputTimeFns.outputAtEndOfWindow();
- Window.Bound<?> window = Window
+ Window<?> window = Window
.into(windowFn)
.triggering(triggerBuilder)
.accumulatingFiredPanes()
@@ -486,7 +485,7 @@ public class WindowTest implements Serializable {
public void testAssignDisplayDataUnchanged() {
FixedWindows windowFn = FixedWindows.of(Duration.standardHours(5));
- Bound<Object> original = Window.into(windowFn);
+ Window<Object> original = Window.into(windowFn);
WindowingStrategy<?, ?> updated = WindowingStrategy.globalDefault().withWindowFn(windowFn);
DisplayData displayData = DisplayData.from(new Window.Assign<>(original, updated));
@@ -503,7 +502,7 @@ public class WindowTest implements Serializable {
@Test
public void testDisplayDataExcludesUnspecifiedProperties() {
- Window.Bound<?> onlyHasAccumulationMode = Window.<Object>configure().discardingFiredPanes();
+ Window<?> onlyHasAccumulationMode = Window.<Object>configure().discardingFiredPanes();
assertThat(DisplayData.from(onlyHasAccumulationMode), not(hasDisplayItem(hasKey(isOneOf(
"windowFn",
"trigger",
@@ -511,14 +510,14 @@ public class WindowTest implements Serializable {
"allowedLateness",
"closingBehavior")))));
- Window.Bound<?> noAccumulationMode = Window.into(new GlobalWindows());
+ Window<?> noAccumulationMode = Window.into(new GlobalWindows());
assertThat(DisplayData.from(noAccumulationMode),
not(hasDisplayItem(hasKey("accumulationMode"))));
}
@Test
public void testDisplayDataExcludesDefaults() {
- Window.Bound<?> window = Window.into(new GlobalWindows())
+ Window<?> window = Window.into(new GlobalWindows())
.triggering(DefaultTrigger.of())
.withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));