You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/03/31 18:20:43 UTC

[4/6] beam git commit: Replaced static Window.blah() methods with Window.configure().blah() except Window.into()

Replaced static Window.blah() methods with Window.configure().blah() except Window.into()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6b67e547
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6b67e547
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6b67e547

Branch: refs/heads/master
Commit: 6b67e547aab7658bcb6dfdf6eb5bf7e220ef7558
Parents: 876d13d
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Mar 29 12:58:20 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Mar 31 10:59:38 2017 -0700

----------------------------------------------------------------------
 .../direct/ParDoMultiOverrideFactory.java       |  4 +-
 .../translation/streaming/CreateStreamTest.java |  4 +-
 .../org/apache/beam/sdk/testing/PAssert.java    |  6 +-
 .../org/apache/beam/sdk/transforms/DoFn.java    |  8 +-
 .../apache/beam/sdk/transforms/GroupByKey.java  |  4 +-
 .../beam/sdk/transforms/WithTimestamps.java     | 10 +--
 .../beam/sdk/transforms/windowing/Never.java    |  2 +-
 .../beam/sdk/transforms/windowing/Window.java   | 80 +++-----------------
 .../apache/beam/sdk/testing/TestStreamTest.java |  6 +-
 .../sdk/transforms/windowing/WindowTest.java    | 24 +++---
 10 files changed, 47 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6b67e547/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 4604fcc..056a0c3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -135,8 +135,8 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
               //  - ensure this GBK holds to the minimum of those timestamps (via OutputTimeFn)
               //  - discard past panes as it is "just a stream" of elements
               .apply(
-                  Window.<KV<K, WindowedValue<KV<K, InputT>>>>triggering(
-                          Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
+                  Window.<KV<K, WindowedValue<KV<K, InputT>>>>configure()
+                      .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
                       .discardingFiredPanes()
                       .withAllowedLateness(inputWindowingStrategy.getAllowedLateness())
                       .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()))

http://git-wip-us.apache.org/repos/asf/beam/blob/6b67e547/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
index 75abc8b..78b8039 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
@@ -275,7 +275,7 @@ public class CreateStreamTest implements Serializable {
     PCollection<String> createStrings =
         p.apply("CreateStrings", source)
             .apply("WindowStrings",
-                Window.<String>triggering(AfterPane.elementCountAtLeast(2))
+                Window.<String>configure().triggering(AfterPane.elementCountAtLeast(2))
                     .withAllowedLateness(Duration.ZERO)
                     .accumulatingFiredPanes());
     PAssert.that(createStrings).containsInAnyOrder("foo", "bar");
@@ -283,7 +283,7 @@ public class CreateStreamTest implements Serializable {
     PCollection<Integer> createInts =
         p.apply("CreateInts", other)
             .apply("WindowInts",
-                Window.<Integer>triggering(AfterPane.elementCountAtLeast(4))
+                Window.<Integer>configure().triggering(AfterPane.elementCountAtLeast(4))
                     .withAllowedLateness(Duration.ZERO)
                     .accumulatingFiredPanes());
     PAssert.that(createInts).containsInAnyOrder(1, 2, 3, 4);

http://git-wip-us.apache.org/repos/asf/beam/blob/6b67e547/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index 56df449..ab412c4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -939,7 +939,8 @@ public class PAssert {
               PCollection<KV<Integer, Iterable<ValueInSingleWindow<T>>>>,
               PCollection<KV<Integer, Iterable<ValueInSingleWindow<T>>>>>
           removeTriggering =
-              Window.<KV<Integer, Iterable<ValueInSingleWindow<T>>>>triggering(Never.ever())
+              Window.<KV<Integer, Iterable<ValueInSingleWindow<T>>>>configure()
+                  .triggering(Never.ever())
                   .discardingFiredPanes()
                   .withAllowedLateness(input.getWindowingStrategy().getAllowedLateness());
       // Group the contents by key. If it is empty, this PCollection will be empty, too.
@@ -983,7 +984,8 @@ public class PAssert {
                   Flatten.<KV<Integer, Iterable<ValueInSingleWindow<T>>>>pCollections())
               .apply(
                   "NeverTrigger",
-                  Window.<KV<Integer, Iterable<ValueInSingleWindow<T>>>>triggering(Never.ever())
+                  Window.<KV<Integer, Iterable<ValueInSingleWindow<T>>>>configure()
+                      .triggering(Never.ever())
                       .withAllowedLateness(input.getWindowingStrategy().getAllowedLateness())
                       .discardingFiredPanes())
               .apply(

http://git-wip-us.apache.org/repos/asf/beam/blob/6b67e547/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index a7730f0..e43527a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -279,8 +279,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
     /**
      * Returns the timestamp of the input element.
      *
-     * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window}
-     * for more information.
+     * <p>See {@link Window} for more information.
      */
     public abstract Instant timestamp();
 
@@ -290,8 +289,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
      *
      * <p>Generally all data is in a single, uninteresting pane unless custom
      * triggering and/or late data has been explicitly requested.
-     * See {@link org.apache.beam.sdk.transforms.windowing.Window}
-     * for more information.
+     * See {@link Window} for more information.
      */
     public abstract PaneInfo pane();
   }
@@ -326,7 +324,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
    *
    * @deprecated This method permits a {@link DoFn} to emit elements behind the watermark. These
    *     elements are considered late, and if behind the
-   *     {@link Window#withAllowedLateness(Duration) allowed lateness} of a downstream
+   *     {@link Window.Bound#withAllowedLateness(Duration) allowed lateness} of a downstream
    *     {@link PCollection} may be silently dropped. See
    *     https://issues.apache.org/jira/browse/BEAM-644 for details on a replacement.
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/6b67e547/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
index adf189b..f68b1f3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
@@ -39,7 +39,7 @@ import org.apache.beam.sdk.values.PCollection.IsBounded;
  * each distinct key and window of the input {@code PCollection} to an
  * {@code Iterable} over all the values associated with that key in
  * the input per window.  Absent repeatedly-firing
- * {@link Window#triggering triggering}, each key in the output
+ * {@link Window.Bound#triggering triggering}, each key in the output
  * {@code PCollection} is unique within each window.
  *
  * <p>{@code GroupByKey} is analogous to converting a multi-map into
@@ -104,7 +104,7 @@ import org.apache.beam.sdk.values.PCollection.IsBounded;
  * <p>If the input {@code PCollection} contains late data (see
  * {@link org.apache.beam.sdk.io.PubsubIO.Read#timestampLabel}
  * for an example of how this can occur) or the
- * {@link Window#triggering requested TriggerFn} can fire before
+ * {@link Window.Bound#triggering requested TriggerFn} can fire before
  * the watermark, then there may be multiple elements
  * output by a {@code GroupByKey} that correspond to the same key and window.
  *

http://git-wip-us.apache.org/repos/asf/beam/blob/6b67e547/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
index 6f20226..39cf6c6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
@@ -52,9 +52,9 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
    *
    * <p>CAUTION: Use of {@link #withAllowedTimestampSkew(Duration)} permits elements to be emitted
    * behind the watermark. These elements are considered late, and if behind the {@link
-   * Window#withAllowedLateness(Duration) allowed lateness} of a downstream {@link PCollection} may
-   * be silently dropped. See https://issues.apache.org/jira/browse/BEAM-644 for details on a
-   * replacement.
+   * Window.Bound#withAllowedLateness(Duration) allowed lateness} of a downstream
+   * {@link PCollection} may be silently dropped. See https://issues.apache.org/jira/browse/BEAM-644
+   * for details on a replacement.
    *
    * <p>Each output element will be in the same windows as the input element. If a new window based
    * on the new output timestamp is desired, apply a new instance of {@link Window#into(WindowFn)}.
@@ -90,7 +90,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
    * future. For infinite skew, use {@code new Duration(Long.MAX_VALUE)}.
    * @deprecated This method permits a to elements to be emitted behind the watermark. These
    *     elements are considered late, and if behind the
-   *     {@link Window#withAllowedLateness(Duration) allowed lateness} of a downstream
+   *     {@link Window.Bound#withAllowedLateness(Duration) allowed lateness} of a downstream
    *     {@link PCollection} may be silently dropped. See
    *     https://issues.apache.org/jira/browse/BEAM-644 for details on a replacement.
    */
@@ -106,7 +106,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
    * @see DoFn#getAllowedTimestampSkew()
    * @deprecated This method permits a to elements to be emitted behind the watermark. These
    *     elements are considered late, and if behind the
-   *     {@link Window#withAllowedLateness(Duration) allowed lateness} of a downstream
+   *     {@link Window.Bound#withAllowedLateness(Duration) allowed lateness} of a downstream
    *     {@link PCollection} may be silently dropped. See
    *     https://issues.apache.org/jira/browse/BEAM-644 for details on a replacement.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/6b67e547/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
index 664ae83..d8cb96d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
@@ -26,7 +26,7 @@ import org.joda.time.Instant;
  * A trigger which never fires.
  *
  * <p>Using this trigger will only produce output when the watermark passes the end of the
- * {@link BoundedWindow window} plus the {@link Window#withAllowedLateness allowed
+ * {@link BoundedWindow window} plus the {@link Window.Bound#withAllowedLateness allowed
  * lateness}.
  */
 public final class Never {

http://git-wip-us.apache.org/repos/asf/beam/blob/6b67e547/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index a6c7adf..188554a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -170,62 +170,18 @@ public class Window {
    * properties can be set on it first.
    */
   public static <T> Bound<T> into(WindowFn<? super T, ?> fn) {
-    return Window.<T>configure().into(fn);
-  }
-
-  /**
-   * Sets a non-default trigger for this {@code Window} {@code PTransform}.
-   * Elements that are assigned to a specific window will be output when
-   * the trigger fires.
-   *
-   * <p>Must also specify allowed lateness using {@link #withAllowedLateness} and accumulation
-   * mode using either {@link #discardingFiredPanes()} or {@link #accumulatingFiredPanes()}.
-   */
-  @Experimental(Kind.TRIGGER)
-  public static <T> Bound<T> triggering(Trigger trigger) {
-    return Window.<T>configure().triggering(trigger);
-  }
-
-  /**
-   * Returns a new {@code Window} {@code PTransform} that uses the registered WindowFn and
-   * Triggering behavior, and that discards elements in a pane after they are triggered.
-   *
-   * <p>Does not modify this transform.  The resulting {@code PTransform} is sufficiently
-   * specified to be applied, but more properties can still be specified.
-   */
-  @Experimental(Kind.TRIGGER)
-  public static <T> Bound<T> discardingFiredPanes() {
-    return Window.<T>configure().discardingFiredPanes();
-  }
-
-  /**
-   * Returns a new {@code Window} {@code PTransform} that uses the registered WindowFn and
-   * Triggering behavior, and that accumulates elements in a pane after they are triggered.
-   *
-   * <p>Does not modify this transform.  The resulting {@code PTransform} is sufficiently
-   * specified to be applied, but more properties can still be specified.
-   */
-  @Experimental(Kind.TRIGGER)
-  public static <T> Bound<T> accumulatingFiredPanes() {
-    return Window.<T>configure().accumulatingFiredPanes();
+    try {
+      fn.windowCoder().verifyDeterministic();
+    } catch (NonDeterministicException e) {
+      throw new IllegalArgumentException("Window coders must be deterministic.", e);
+    }
+    return Window.<T>configure().withWindowFn(fn);
   }
 
   /**
-   * Override the amount of lateness allowed for data elements in the output {@link PCollection},
-   * and downstream {@link PCollection PCollections} until explicitly set again. Like
-   * the other properties on this {@link Window} operation, this will be applied at
-   * the next {@link GroupByKey}. Any elements that are later than this as decided by
-   * the system-maintained watermark will be dropped.
-   *
-   * <p>This value also determines how long state will be kept around for old windows.
-   * Once no elements will be added to a window (because this duration has passed) any state
-   * associated with the window will be cleaned up.
+   * Returns a new builder for a {@link Window} transform for setting windowing parameters other
+   * than the windowing function.
    */
-  @Experimental(Kind.TRIGGER)
-  public static <T> Bound<T> withAllowedLateness(Duration allowedLateness) {
-    return Window.<T>configure().withAllowedLateness(allowedLateness);
-  }
-
   public static <T> Bound<T> configure() {
     return new AutoValue_Window_Bound.Builder<T>().build();
   }
@@ -261,20 +217,7 @@ public class Window {
       abstract Bound<T> build();
     }
 
-    /**
-     * Returns a new {@code Window} {@code PTransform} that's like this
-     * transform but that will use the given {@link WindowFn}, and that has
-     * its input and output types bound.  Does not modify this transform.  The
-     * resulting {@code PTransform} is sufficiently specified to be applied,
-     * but more properties can still be specified.
-     */
-    private Bound<T> into(WindowFn<? super T, ?> windowFn) {
-      try {
-        windowFn.windowCoder().verifyDeterministic();
-      } catch (NonDeterministicException e) {
-        throw new IllegalArgumentException("Window coders must be deterministic.", e);
-      }
-
+    private Bound<T> withWindowFn(WindowFn<? super T, ?> windowFn) {
       return toBuilder().setWindowFn(windowFn).build();
     }
 
@@ -319,8 +262,9 @@ public class Window {
    }
 
     /**
-     * Override the amount of lateness allowed for data elements in the pipeline. Like
-     * the other properties on this {@link Window} operation, this will be applied at
+     * Override the amount of lateness allowed for data elements in the output {@link PCollection}
+     * and downstream {@link PCollection PCollections} until explicitly set again.
+     * Like the other properties on this {@link Window} operation, this will be applied at
      * the next {@link GroupByKey}. Any elements that are later than this as decided by
      * the system-maintained watermark will be dropped.
      *

http://git-wip-us.apache.org/repos/asf/beam/blob/6b67e547/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
index 614831d..5cb7634 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
@@ -146,7 +146,7 @@ public class TestStreamTest implements Serializable {
         .advanceWatermarkToInfinity();
 
     PCollection<Long> sum = p.apply(source)
-        .apply(Window.<Long>triggering(AfterWatermark.pastEndOfWindow()
+        .apply(Window.<Long>configure().triggering(AfterWatermark.pastEndOfWindow()
             .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
                 .plusDelayOf(Duration.standardMinutes(5)))).accumulatingFiredPanes()
             .withAllowedLateness(Duration.ZERO))
@@ -272,14 +272,14 @@ public class TestStreamTest implements Serializable {
     PCollection<String> createStrings =
         p.apply("CreateStrings", stream)
             .apply("WindowStrings",
-                Window.<String>triggering(AfterPane.elementCountAtLeast(2))
+                Window.<String>configure().triggering(AfterPane.elementCountAtLeast(2))
                     .withAllowedLateness(Duration.ZERO)
                     .accumulatingFiredPanes());
     PAssert.that(createStrings).containsInAnyOrder("foo", "bar");
     PCollection<Integer> createInts =
         p.apply("CreateInts", other)
             .apply("WindowInts",
-                Window.<Integer>triggering(AfterPane.elementCountAtLeast(4))
+                Window.<Integer>configure().triggering(AfterPane.elementCountAtLeast(4))
                     .withAllowedLateness(Duration.ZERO)
                     .accumulatingFiredPanes());
     PAssert.that(createInts).containsInAnyOrder(1, 2, 3, 4);

http://git-wip-us.apache.org/repos/asf/beam/blob/6b67e547/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index 2bc8d86..979179d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -136,9 +136,9 @@ public class WindowTest implements Serializable {
     Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
     WindowingStrategy<?, ?> strategy = pipeline
       .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
-      .apply("Mode", Window.<String>accumulatingFiredPanes())
-      .apply("Lateness", Window.<String>withAllowedLateness(Duration.standardDays(1)))
-      .apply("Trigger", Window.<String>triggering(trigger))
+      .apply("Mode", Window.<String>configure().accumulatingFiredPanes())
+      .apply("Lateness", Window.<String>configure().withAllowedLateness(Duration.standardDays(1)))
+      .apply("Trigger", Window.<String>configure().triggering(trigger))
       .apply("Window", Window.<String>into(fixed10))
       .getWindowingStrategy();
 
@@ -199,7 +199,7 @@ public class WindowTest implements Serializable {
     pipeline
         .apply(Create.of(1, 2, 3))
         .apply(
-            Window.<Integer>triggering(AfterWatermark.pastEndOfWindow())
+            Window.<Integer>configure().triggering(AfterWatermark.pastEndOfWindow())
                 .withAllowedLateness(Duration.ZERO)
                 .accumulatingFiredPanes());
 
@@ -247,7 +247,9 @@ public class WindowTest implements Serializable {
     thrown.expectMessage("requires that the accumulation mode");
     input.apply(
         "Triggering",
-        Window.<String>withAllowedLateness(Duration.standardDays(1)).triggering(trigger));
+        Window.<String>configure()
+            .withAllowedLateness(Duration.standardDays(1))
+            .triggering(trigger));
   }
 
   @Test
@@ -260,8 +262,8 @@ public class WindowTest implements Serializable {
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("allowed lateness");
     thrown.expectMessage("accumulation mode be specified");
-    input
-        .apply("Lateness", Window.<String>withAllowedLateness(Duration.standardDays(1)));
+    input.apply(
+        "Lateness", Window.<String>configure().withAllowedLateness(Duration.standardDays(1)));
   }
 
   @Test
@@ -273,9 +275,9 @@ public class WindowTest implements Serializable {
     thrown.expectMessage("requires that the allowed lateness");
     pipeline
       .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
-      .apply("Mode", Window.<String>accumulatingFiredPanes())
+      .apply("Mode", Window.<String>configure().accumulatingFiredPanes())
       .apply("Window", Window.<String>into(fixed10))
-      .apply("Trigger", Window.<String>triggering(trigger));
+      .apply("Trigger", Window.<String>configure().triggering(trigger));
   }
 
   private static class WindowOddEvenBuckets extends NonMergingWindowFn<Long, IntervalWindow> {
@@ -353,7 +355,7 @@ public class WindowTest implements Serializable {
     PCollection<Boolean> updatedTrigger =
         upOne.apply(
             "UpdateWindowingStrategy",
-            Window.<Boolean>triggering(Never.ever())
+            Window.<Boolean>configure().triggering(Never.ever())
                 .withAllowedLateness(Duration.ZERO)
                 .accumulatingFiredPanes());
     pipeline.run();
@@ -501,7 +503,7 @@ public class WindowTest implements Serializable {
 
   @Test
   public void testDisplayDataExcludesUnspecifiedProperties() {
-    Window.Bound<?> onlyHasAccumulationMode = Window.discardingFiredPanes();
+    Window.Bound<?> onlyHasAccumulationMode = Window.<Object>configure().discardingFiredPanes();
     assertThat(DisplayData.from(onlyHasAccumulationMode), not(hasDisplayItem(hasKey(isOneOf(
         "windowFn",
         "trigger",