You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/03/31 18:20:43 UTC
[4/6] beam git commit: Replaced static Window.blah() methods with
Window.configure().blah() except Window.into()
Replaced static Window.blah() methods with Window.configure().blah() except Window.into()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6b67e547
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6b67e547
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6b67e547
Branch: refs/heads/master
Commit: 6b67e547aab7658bcb6dfdf6eb5bf7e220ef7558
Parents: 876d13d
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Mar 29 12:58:20 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Mar 31 10:59:38 2017 -0700
----------------------------------------------------------------------
.../direct/ParDoMultiOverrideFactory.java | 4 +-
.../translation/streaming/CreateStreamTest.java | 4 +-
.../org/apache/beam/sdk/testing/PAssert.java | 6 +-
.../org/apache/beam/sdk/transforms/DoFn.java | 8 +-
.../apache/beam/sdk/transforms/GroupByKey.java | 4 +-
.../beam/sdk/transforms/WithTimestamps.java | 10 +--
.../beam/sdk/transforms/windowing/Never.java | 2 +-
.../beam/sdk/transforms/windowing/Window.java | 80 +++-----------------
.../apache/beam/sdk/testing/TestStreamTest.java | 6 +-
.../sdk/transforms/windowing/WindowTest.java | 24 +++---
10 files changed, 47 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/6b67e547/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 4604fcc..056a0c3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -135,8 +135,8 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
// - ensure this GBK holds to the minimum of those timestamps (via OutputTimeFn)
// - discard past panes as it is "just a stream" of elements
.apply(
- Window.<KV<K, WindowedValue<KV<K, InputT>>>>triggering(
- Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
+ Window.<KV<K, WindowedValue<KV<K, InputT>>>>configure()
+ .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.discardingFiredPanes()
.withAllowedLateness(inputWindowingStrategy.getAllowedLateness())
.withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()))
http://git-wip-us.apache.org/repos/asf/beam/blob/6b67e547/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
index 75abc8b..78b8039 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
@@ -275,7 +275,7 @@ public class CreateStreamTest implements Serializable {
PCollection<String> createStrings =
p.apply("CreateStrings", source)
.apply("WindowStrings",
- Window.<String>triggering(AfterPane.elementCountAtLeast(2))
+ Window.<String>configure().triggering(AfterPane.elementCountAtLeast(2))
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes());
PAssert.that(createStrings).containsInAnyOrder("foo", "bar");
@@ -283,7 +283,7 @@ public class CreateStreamTest implements Serializable {
PCollection<Integer> createInts =
p.apply("CreateInts", other)
.apply("WindowInts",
- Window.<Integer>triggering(AfterPane.elementCountAtLeast(4))
+ Window.<Integer>configure().triggering(AfterPane.elementCountAtLeast(4))
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes());
PAssert.that(createInts).containsInAnyOrder(1, 2, 3, 4);
http://git-wip-us.apache.org/repos/asf/beam/blob/6b67e547/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index 56df449..ab412c4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -939,7 +939,8 @@ public class PAssert {
PCollection<KV<Integer, Iterable<ValueInSingleWindow<T>>>>,
PCollection<KV<Integer, Iterable<ValueInSingleWindow<T>>>>>
removeTriggering =
- Window.<KV<Integer, Iterable<ValueInSingleWindow<T>>>>triggering(Never.ever())
+ Window.<KV<Integer, Iterable<ValueInSingleWindow<T>>>>configure()
+ .triggering(Never.ever())
.discardingFiredPanes()
.withAllowedLateness(input.getWindowingStrategy().getAllowedLateness());
// Group the contents by key. If it is empty, this PCollection will be empty, too.
@@ -983,7 +984,8 @@ public class PAssert {
Flatten.<KV<Integer, Iterable<ValueInSingleWindow<T>>>>pCollections())
.apply(
"NeverTrigger",
- Window.<KV<Integer, Iterable<ValueInSingleWindow<T>>>>triggering(Never.ever())
+ Window.<KV<Integer, Iterable<ValueInSingleWindow<T>>>>configure()
+ .triggering(Never.ever())
.withAllowedLateness(input.getWindowingStrategy().getAllowedLateness())
.discardingFiredPanes())
.apply(
http://git-wip-us.apache.org/repos/asf/beam/blob/6b67e547/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index a7730f0..e43527a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -279,8 +279,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
/**
* Returns the timestamp of the input element.
*
- * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window}
- * for more information.
+ * <p>See {@link Window} for more information.
*/
public abstract Instant timestamp();
@@ -290,8 +289,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
*
* <p>Generally all data is in a single, uninteresting pane unless custom
* triggering and/or late data has been explicitly requested.
- * See {@link org.apache.beam.sdk.transforms.windowing.Window}
- * for more information.
+ * See {@link Window} for more information.
*/
public abstract PaneInfo pane();
}
@@ -326,7 +324,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
*
* @deprecated This method permits a {@link DoFn} to emit elements behind the watermark. These
* elements are considered late, and if behind the
- * {@link Window#withAllowedLateness(Duration) allowed lateness} of a downstream
+ * {@link Window.Bound#withAllowedLateness(Duration) allowed lateness} of a downstream
* {@link PCollection} may be silently dropped. See
* https://issues.apache.org/jira/browse/BEAM-644 for details on a replacement.
*
http://git-wip-us.apache.org/repos/asf/beam/blob/6b67e547/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
index adf189b..f68b1f3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
@@ -39,7 +39,7 @@ import org.apache.beam.sdk.values.PCollection.IsBounded;
* each distinct key and window of the input {@code PCollection} to an
* {@code Iterable} over all the values associated with that key in
* the input per window. Absent repeatedly-firing
- * {@link Window#triggering triggering}, each key in the output
+ * {@link Window.Bound#triggering triggering}, each key in the output
* {@code PCollection} is unique within each window.
*
* <p>{@code GroupByKey} is analogous to converting a multi-map into
@@ -104,7 +104,7 @@ import org.apache.beam.sdk.values.PCollection.IsBounded;
* <p>If the input {@code PCollection} contains late data (see
* {@link org.apache.beam.sdk.io.PubsubIO.Read#timestampLabel}
* for an example of how this can occur) or the
- * {@link Window#triggering requested TriggerFn} can fire before
+ * {@link Window.Bound#triggering requested TriggerFn} can fire before
* the watermark, then there may be multiple elements
* output by a {@code GroupByKey} that correspond to the same key and window.
*
http://git-wip-us.apache.org/repos/asf/beam/blob/6b67e547/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
index 6f20226..39cf6c6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
@@ -52,9 +52,9 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
*
* <p>CAUTION: Use of {@link #withAllowedTimestampSkew(Duration)} permits elements to be emitted
* behind the watermark. These elements are considered late, and if behind the {@link
- * Window#withAllowedLateness(Duration) allowed lateness} of a downstream {@link PCollection} may
- * be silently dropped. See https://issues.apache.org/jira/browse/BEAM-644 for details on a
- * replacement.
+ * Window.Bound#withAllowedLateness(Duration) allowed lateness} of a downstream
+ * {@link PCollection} may be silently dropped. See https://issues.apache.org/jira/browse/BEAM-644
+ * for details on a replacement.
*
* <p>Each output element will be in the same windows as the input element. If a new window based
* on the new output timestamp is desired, apply a new instance of {@link Window#into(WindowFn)}.
@@ -90,7 +90,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
* future. For infinite skew, use {@code new Duration(Long.MAX_VALUE)}.
* @deprecated This method permits a to elements to be emitted behind the watermark. These
* elements are considered late, and if behind the
- * {@link Window#withAllowedLateness(Duration) allowed lateness} of a downstream
+ * {@link Window.Bound#withAllowedLateness(Duration) allowed lateness} of a downstream
* {@link PCollection} may be silently dropped. See
* https://issues.apache.org/jira/browse/BEAM-644 for details on a replacement.
*/
@@ -106,7 +106,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
* @see DoFn#getAllowedTimestampSkew()
* @deprecated This method permits a to elements to be emitted behind the watermark. These
* elements are considered late, and if behind the
- * {@link Window#withAllowedLateness(Duration) allowed lateness} of a downstream
+ * {@link Window.Bound#withAllowedLateness(Duration) allowed lateness} of a downstream
* {@link PCollection} may be silently dropped. See
* https://issues.apache.org/jira/browse/BEAM-644 for details on a replacement.
*/
http://git-wip-us.apache.org/repos/asf/beam/blob/6b67e547/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
index 664ae83..d8cb96d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
@@ -26,7 +26,7 @@ import org.joda.time.Instant;
* A trigger which never fires.
*
* <p>Using this trigger will only produce output when the watermark passes the end of the
- * {@link BoundedWindow window} plus the {@link Window#withAllowedLateness allowed
+ * {@link BoundedWindow window} plus the {@link Window.Bound#withAllowedLateness allowed
* lateness}.
*/
public final class Never {
http://git-wip-us.apache.org/repos/asf/beam/blob/6b67e547/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index a6c7adf..188554a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -170,62 +170,18 @@ public class Window {
* properties can be set on it first.
*/
public static <T> Bound<T> into(WindowFn<? super T, ?> fn) {
- return Window.<T>configure().into(fn);
- }
-
- /**
- * Sets a non-default trigger for this {@code Window} {@code PTransform}.
- * Elements that are assigned to a specific window will be output when
- * the trigger fires.
- *
- * <p>Must also specify allowed lateness using {@link #withAllowedLateness} and accumulation
- * mode using either {@link #discardingFiredPanes()} or {@link #accumulatingFiredPanes()}.
- */
- @Experimental(Kind.TRIGGER)
- public static <T> Bound<T> triggering(Trigger trigger) {
- return Window.<T>configure().triggering(trigger);
- }
-
- /**
- * Returns a new {@code Window} {@code PTransform} that uses the registered WindowFn and
- * Triggering behavior, and that discards elements in a pane after they are triggered.
- *
- * <p>Does not modify this transform. The resulting {@code PTransform} is sufficiently
- * specified to be applied, but more properties can still be specified.
- */
- @Experimental(Kind.TRIGGER)
- public static <T> Bound<T> discardingFiredPanes() {
- return Window.<T>configure().discardingFiredPanes();
- }
-
- /**
- * Returns a new {@code Window} {@code PTransform} that uses the registered WindowFn and
- * Triggering behavior, and that accumulates elements in a pane after they are triggered.
- *
- * <p>Does not modify this transform. The resulting {@code PTransform} is sufficiently
- * specified to be applied, but more properties can still be specified.
- */
- @Experimental(Kind.TRIGGER)
- public static <T> Bound<T> accumulatingFiredPanes() {
- return Window.<T>configure().accumulatingFiredPanes();
+ try {
+ fn.windowCoder().verifyDeterministic();
+ } catch (NonDeterministicException e) {
+ throw new IllegalArgumentException("Window coders must be deterministic.", e);
+ }
+ return Window.<T>configure().withWindowFn(fn);
}
/**
- * Override the amount of lateness allowed for data elements in the output {@link PCollection},
- * and downstream {@link PCollection PCollections} until explicitly set again. Like
- * the other properties on this {@link Window} operation, this will be applied at
- * the next {@link GroupByKey}. Any elements that are later than this as decided by
- * the system-maintained watermark will be dropped.
- *
- * <p>This value also determines how long state will be kept around for old windows.
- * Once no elements will be added to a window (because this duration has passed) any state
- * associated with the window will be cleaned up.
+ * Returns a new builder for a {@link Window} transform for setting windowing parameters other
+ * than the windowing function.
*/
- @Experimental(Kind.TRIGGER)
- public static <T> Bound<T> withAllowedLateness(Duration allowedLateness) {
- return Window.<T>configure().withAllowedLateness(allowedLateness);
- }
-
public static <T> Bound<T> configure() {
return new AutoValue_Window_Bound.Builder<T>().build();
}
@@ -261,20 +217,7 @@ public class Window {
abstract Bound<T> build();
}
- /**
- * Returns a new {@code Window} {@code PTransform} that's like this
- * transform but that will use the given {@link WindowFn}, and that has
- * its input and output types bound. Does not modify this transform. The
- * resulting {@code PTransform} is sufficiently specified to be applied,
- * but more properties can still be specified.
- */
- private Bound<T> into(WindowFn<? super T, ?> windowFn) {
- try {
- windowFn.windowCoder().verifyDeterministic();
- } catch (NonDeterministicException e) {
- throw new IllegalArgumentException("Window coders must be deterministic.", e);
- }
-
+ private Bound<T> withWindowFn(WindowFn<? super T, ?> windowFn) {
return toBuilder().setWindowFn(windowFn).build();
}
@@ -319,8 +262,9 @@ public class Window {
}
/**
- * Override the amount of lateness allowed for data elements in the pipeline. Like
- * the other properties on this {@link Window} operation, this will be applied at
+ * Override the amount of lateness allowed for data elements in the output {@link PCollection}
+ * and downstream {@link PCollection PCollections} until explicitly set again.
+ * Like the other properties on this {@link Window} operation, this will be applied at
* the next {@link GroupByKey}. Any elements that are later than this as decided by
* the system-maintained watermark will be dropped.
*
http://git-wip-us.apache.org/repos/asf/beam/blob/6b67e547/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
index 614831d..5cb7634 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
@@ -146,7 +146,7 @@ public class TestStreamTest implements Serializable {
.advanceWatermarkToInfinity();
PCollection<Long> sum = p.apply(source)
- .apply(Window.<Long>triggering(AfterWatermark.pastEndOfWindow()
+ .apply(Window.<Long>configure().triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(5)))).accumulatingFiredPanes()
.withAllowedLateness(Duration.ZERO))
@@ -272,14 +272,14 @@ public class TestStreamTest implements Serializable {
PCollection<String> createStrings =
p.apply("CreateStrings", stream)
.apply("WindowStrings",
- Window.<String>triggering(AfterPane.elementCountAtLeast(2))
+ Window.<String>configure().triggering(AfterPane.elementCountAtLeast(2))
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes());
PAssert.that(createStrings).containsInAnyOrder("foo", "bar");
PCollection<Integer> createInts =
p.apply("CreateInts", other)
.apply("WindowInts",
- Window.<Integer>triggering(AfterPane.elementCountAtLeast(4))
+ Window.<Integer>configure().triggering(AfterPane.elementCountAtLeast(4))
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes());
PAssert.that(createInts).containsInAnyOrder(1, 2, 3, 4);
http://git-wip-us.apache.org/repos/asf/beam/blob/6b67e547/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index 2bc8d86..979179d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -136,9 +136,9 @@ public class WindowTest implements Serializable {
Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
WindowingStrategy<?, ?> strategy = pipeline
.apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
- .apply("Mode", Window.<String>accumulatingFiredPanes())
- .apply("Lateness", Window.<String>withAllowedLateness(Duration.standardDays(1)))
- .apply("Trigger", Window.<String>triggering(trigger))
+ .apply("Mode", Window.<String>configure().accumulatingFiredPanes())
+ .apply("Lateness", Window.<String>configure().withAllowedLateness(Duration.standardDays(1)))
+ .apply("Trigger", Window.<String>configure().triggering(trigger))
.apply("Window", Window.<String>into(fixed10))
.getWindowingStrategy();
@@ -199,7 +199,7 @@ public class WindowTest implements Serializable {
pipeline
.apply(Create.of(1, 2, 3))
.apply(
- Window.<Integer>triggering(AfterWatermark.pastEndOfWindow())
+ Window.<Integer>configure().triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes());
@@ -247,7 +247,9 @@ public class WindowTest implements Serializable {
thrown.expectMessage("requires that the accumulation mode");
input.apply(
"Triggering",
- Window.<String>withAllowedLateness(Duration.standardDays(1)).triggering(trigger));
+ Window.<String>configure()
+ .withAllowedLateness(Duration.standardDays(1))
+ .triggering(trigger));
}
@Test
@@ -260,8 +262,8 @@ public class WindowTest implements Serializable {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("allowed lateness");
thrown.expectMessage("accumulation mode be specified");
- input
- .apply("Lateness", Window.<String>withAllowedLateness(Duration.standardDays(1)));
+ input.apply(
+ "Lateness", Window.<String>configure().withAllowedLateness(Duration.standardDays(1)));
}
@Test
@@ -273,9 +275,9 @@ public class WindowTest implements Serializable {
thrown.expectMessage("requires that the allowed lateness");
pipeline
.apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
- .apply("Mode", Window.<String>accumulatingFiredPanes())
+ .apply("Mode", Window.<String>configure().accumulatingFiredPanes())
.apply("Window", Window.<String>into(fixed10))
- .apply("Trigger", Window.<String>triggering(trigger));
+ .apply("Trigger", Window.<String>configure().triggering(trigger));
}
private static class WindowOddEvenBuckets extends NonMergingWindowFn<Long, IntervalWindow> {
@@ -353,7 +355,7 @@ public class WindowTest implements Serializable {
PCollection<Boolean> updatedTrigger =
upOne.apply(
"UpdateWindowingStrategy",
- Window.<Boolean>triggering(Never.ever())
+ Window.<Boolean>configure().triggering(Never.ever())
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes());
pipeline.run();
@@ -501,7 +503,7 @@ public class WindowTest implements Serializable {
@Test
public void testDisplayDataExcludesUnspecifiedProperties() {
- Window.Bound<?> onlyHasAccumulationMode = Window.discardingFiredPanes();
+ Window.Bound<?> onlyHasAccumulationMode = Window.<Object>configure().discardingFiredPanes();
assertThat(DisplayData.from(onlyHasAccumulationMode), not(hasDisplayItem(hasKey(isOneOf(
"windowFn",
"trigger",