You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/11/01 18:16:21 UTC

[2/2] incubator-beam git commit: Remove Window.Unbound

Remove Window.Unbound

Window PTransforms always have a bound output type; equal to that of the
input PCollection type.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d8bfd00a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d8bfd00a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d8bfd00a

Branch: refs/heads/master
Commit: d8bfd00a1861a3083a815033998bd35d440cd035
Parents: 68623e9
Author: Thomas Groh <tg...@google.com>
Authored: Wed Jul 13 09:59:35 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Nov 1 11:14:04 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/transforms/windowing/Window.java   | 128 +------------------
 1 file changed, 5 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d8bfd00a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index 90e6a3a..5607762 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -166,7 +166,7 @@ public class Window {
    * properties can be set on it first.
    */
   public static <T> Bound<T> into(WindowFn<? super T, ?> fn) {
-    return new Unbound().into(fn);
+    return new Bound(null).into(fn);
   }
 
   /**
@@ -179,7 +179,7 @@ public class Window {
    */
   @Experimental(Kind.TRIGGER)
   public static <T> Bound<T> triggering(Trigger trigger) {
-    return new Unbound().triggering(trigger);
+    return new Bound(null).triggering(trigger);
   }
 
   /**
@@ -191,7 +191,7 @@ public class Window {
    */
   @Experimental(Kind.TRIGGER)
   public static <T> Bound<T> discardingFiredPanes() {
-    return new Unbound().discardingFiredPanes();
+    return new Bound(null).discardingFiredPanes();
   }
 
   /**
@@ -203,7 +203,7 @@ public class Window {
    */
   @Experimental(Kind.TRIGGER)
   public static <T> Bound<T> accumulatingFiredPanes() {
-    return new Unbound().accumulatingFiredPanes();
+    return new Bound(null).accumulatingFiredPanes();
   }
 
   /**
@@ -219,125 +219,7 @@ public class Window {
    */
   @Experimental(Kind.TRIGGER)
   public static <T> Bound<T> withAllowedLateness(Duration allowedLateness) {
-    return new Unbound().withAllowedLateness(allowedLateness);
-  }
-
-  /**
-   * Override the amount of lateness allowed for data elements in the output {@link PCollection},
-   * and downstream {@link PCollection PCollections} until explicitly set again. Like
-   * the other properties on this {@link Window} operation, this will be applied at
-   * the next {@link GroupByKey}. Any elements that are later than this as decided by
-   * the system-maintained watermark will be dropped.
-   *
-   * <p>This value also determines how long state will be kept around for old windows.
-   * Once no elements will be added to a window (because this duration has passed) any state
-   * associated with the window will be cleaned up.
-   */
-  @Experimental(Kind.TRIGGER)
-  public static <T> Bound<T> withAllowedLateness(
-      Duration allowedLateness, ClosingBehavior closingBehavior) {
-    return new Unbound().withAllowedLateness(allowedLateness, closingBehavior);
-  }
-
-  /**
-   * An incomplete {@code Window} transform, with unbound input/output type.
-   *
-   * <p>Before being applied, {@link Window.Unbound#into} must be
-   * invoked to specify the {@link WindowFn} to invoke, which will also
-   * bind the input/output type of this {@code PTransform}.
-   */
-  public static class Unbound {
-    String name;
-
-    Unbound() {}
-
-    Unbound(String name) {
-      this.name = name;
-    }
-
-    /**
-     * Returns a new {@code Window} {@code PTransform} that's like this
-     * transform but that will use the given {@link WindowFn}, and that has
-     * its input and output types bound.  Does not modify this transform.  The
-     * resulting {@code PTransform} is sufficiently specified to be applied,
-     * but more properties can still be specified.
-     */
-    public <T> Bound<T> into(WindowFn<? super T, ?> fn) {
-      return new Bound<T>(name).into(fn);
-    }
-
-    /**
-     * Sets a non-default trigger for this {@code Window} {@code PTransform}.
-     * Elements that are assigned to a specific window will be output when
-     * the trigger fires.
-     *
-     * <p>{@link org.apache.beam.sdk.transforms.windowing.Trigger}
-     * has more details on the available triggers.
-     *
-     * <p>Must also specify allowed lateness using {@link #withAllowedLateness} and accumulation
-     * mode using either {@link #discardingFiredPanes()} or {@link #accumulatingFiredPanes()}.
-     */
-    @Experimental(Kind.TRIGGER)
-    public <T> Bound<T> triggering(Trigger trigger) {
-      return new Bound<T>(name).triggering(trigger);
-    }
-
-    /**
-     * Returns a new {@code Window} {@code PTransform} that uses the registered WindowFn and
-     * Triggering behavior, and that discards elements in a pane after they are triggered.
-     *
-     * <p>Does not modify this transform.  The resulting {@code PTransform} is sufficiently
-     * specified to be applied, but more properties can still be specified.
-     */
-    @Experimental(Kind.TRIGGER)
-    public <T> Bound<T> discardingFiredPanes() {
-      return new Bound<T>(name).discardingFiredPanes();
-    }
-
-    /**
-     * Returns a new {@code Window} {@code PTransform} that uses the registered WindowFn and
-     * Triggering behavior, and that accumulates elements in a pane after they are triggered.
-     *
-     * <p>Does not modify this transform.  The resulting {@code PTransform} is sufficiently
-     * specified to be applied, but more properties can still be specified.
-     */
-    @Experimental(Kind.TRIGGER)
-    public <T> Bound<T> accumulatingFiredPanes() {
-      return new Bound<T>(name).accumulatingFiredPanes();
-    }
-
-    /**
-     * Override the amount of lateness allowed for data elements in the pipeline. Like
-     * the other properties on this {@link Window} operation, this will be applied at
-     * the next {@link GroupByKey}. Any elements that are later than this as decided by
-     * the system-maintained watermark will be dropped.
-     *
-     * <p>This value also determines how long state will be kept around for old windows.
-     * Once no elements will be added to a window (because this duration has passed) any state
-     * associated with the window will be cleaned up.
-     *
-     * <p>Depending on the trigger this may not produce a pane with {@link PaneInfo#isLast}. See
-     * {@link ClosingBehavior#FIRE_IF_NON_EMPTY} for more details.
-     */
-    @Experimental(Kind.TRIGGER)
-    public <T> Bound<T> withAllowedLateness(Duration allowedLateness) {
-      return new Bound<T>(name).withAllowedLateness(allowedLateness);
-    }
-
-    /**
-     * Override the amount of lateness allowed for data elements in the pipeline. Like
-     * the other properties on this {@link Window} operation, this will be applied at
-     * the next {@link GroupByKey}. Any elements that are later than this as decided by
-     * the system-maintained watermark will be dropped.
-     *
-     * <p>This value also determines how long state will be kept around for old windows.
-     * Once no elements will be added to a window (because this duration has passed) any state
-     * associated with the window will be cleaned up.
-     */
-    @Experimental(Kind.TRIGGER)
-    public <T> Bound<T> withAllowedLateness(Duration allowedLateness, ClosingBehavior behavior) {
-      return new Bound<T>(name).withAllowedLateness(allowedLateness, behavior);
-    }
+    return new Bound(null).withAllowedLateness(allowedLateness);
   }
 
   /**