You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:40 UTC

[16/67] [partial] incubator-beam git commit: Directory reorganization

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/PaneInfo.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/PaneInfo.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/PaneInfo.java
deleted file mode 100644
index 18f7a97..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/PaneInfo.java
+++ /dev/null
@@ -1,384 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.util.VarInt;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Objects;
-
-/**
- * Provides information about the pane an element belongs to. Every pane is implicitly associated
- * with a window. Panes are observable only via the
- * {@link com.google.cloud.dataflow.sdk.transforms.DoFn.ProcessContext#pane} method of the context
- * passed to a {@link DoFn#processElement} overridden method.
- *
- * <p>Note: This does not uniquely identify a pane, and should not be used for comparisons.
- */
-public final class PaneInfo {
-  /**
-   * Enumerates the possibilities for the timing of this pane firing related to the
-   * input and output watermarks for its computation.
-   *
-   * <p>A window may fire multiple panes, and the timing of those panes generally follows the
-   * regular expression {@code EARLY* ON_TIME? LATE*}. Generally a pane is considered:
-   * <ol>
-   * <li>{@code EARLY} if the system cannot be sure it has seen all data which may contribute
-   * to the pane's window.
-   * <li>{@code ON_TIME} if the system predicts it has seen all the data which may contribute
-   * to the pane's window.
-   * <li>{@code LATE} if the system has encountered new data after predicting no more could arrive.
-   * It is possible an {@code ON_TIME} pane has already been emitted, in which case any
-   * following panes are considered {@code LATE}.
-   * </ol>
-   *
-   * <p>Only an
-   * {@link AfterWatermark#pastEndOfWindow} trigger may produce an {@code ON_TIME} pane.
-   * With merging {@link WindowFn}'s, windows may be merged to produce new windows that satisfy
-   * their own instance of the above regular expression. The only guarantee is that once a window
-   * produces a final pane, it will not be merged into any new windows.
-   *
-   * <p>The predictions above are made using the mechanism of watermarks.
-   * See {@link com.google.cloud.dataflow.sdk.util.TimerInternals} for more information
-   * about watermarks.
-   *
-   * <p>We can state some properties of {@code LATE} and {@code ON_TIME} panes, but first need some
-   * definitions:
-   * <ol>
-   * <li>We'll call a pipeline 'simple' if it does not use
-   * {@link com.google.cloud.dataflow.sdk.transforms.DoFn.Context#outputWithTimestamp} in
-   * any {@code DoFn}, and it uses the same
-   * {@link com.google.cloud.dataflow.sdk.transforms.windowing.Window.Bound#withAllowedLateness}
-   * argument value on all windows (or uses the default of {@link org.joda.time.Duration#ZERO}).
-   * <li>We'll call an element 'locally late', from the point of view of a computation on a
-   * worker, if the element's timestamp is before the input watermark for that computation
-   * on that worker. The element is otherwise 'locally on-time'.
-   * <li>We'll say 'the pane's timestamp' to mean the timestamp of the element produced to
-   * represent the pane's contents.
-   * </ol>
-   *
-   * <p>Then in simple pipelines:
-   * <ol>
-   * <li> (Soundness) An {@code ON_TIME} pane can never cause a later computation to generate a
-   * {@code LATE} pane. (If it did, it would imply a later computation's input watermark progressed
-   * ahead of an earlier stage's output watermark, which by design is not possible.)
-   * <li> (Liveness) An {@code ON_TIME} pane is emitted as soon as possible after the input
-   * watermark passes the end of the pane's window.
-   * <li> (Consistency) A pane with only locally on-time elements will always be {@code ON_TIME}.
-   * And a {@code LATE} pane cannot contain locally on-time elements.
-   * </ol>
-   *
-   * However, note that:
-   * <ol>
-   * <li> An {@code ON_TIME} pane may contain locally late elements. It may even contain only
-   * locally late elements. Provided a locally late element finds its way into an {@code ON_TIME}
-   * pane its lateness becomes unobservable.
-   * <li> A {@code LATE} pane does not necessarily cause any following computation panes to be
-   * marked as {@code LATE}.
-   * </ol>
-   */
-  public enum Timing {
-    /**
-     * Pane was fired before the input watermark had progressed after the end of the window.
-     */
-    EARLY,
-    /**
-     * Pane was fired by a {@link AfterWatermark#pastEndOfWindow} trigger because the input
-     * watermark progressed after the end of the window. However the output watermark has not
-     * yet progressed after the end of the window. Thus it is still possible to assign a timestamp
-     * to the element representing this pane which cannot be considered locally late by any
-     * following computation.
-     */
-    ON_TIME,
-    /**
-     * Pane was fired after the output watermark had progressed past the end of the window.
-     */
-    LATE,
-    /**
-     * This element was not produced in a triggered pane and its relation to input and
-     * output watermarks is unknown.
-     */
-    UNKNOWN;
-
-    // NOTE: Do not add fields or re-order them. The ordinal is used as part of
-    // the encoding.
-  }
-
-  private static byte encodedByte(boolean isFirst, boolean isLast, Timing timing) {
-    byte result = 0x0;
-    if (isFirst) {
-      result |= 1;
-    }
-    if (isLast) {
-      result |= 2;
-    }
-    result |= timing.ordinal() << 2;
-    return result;
-  }
-
-  private static final ImmutableMap<Byte, PaneInfo> BYTE_TO_PANE_INFO;
-  static {
-    ImmutableMap.Builder<Byte, PaneInfo> decodingBuilder = ImmutableMap.builder();
-    for (Timing timing : Timing.values()) {
-      long onTimeIndex = timing == Timing.EARLY ? -1 : 0;
-      register(decodingBuilder, new PaneInfo(true, true, timing, 0, onTimeIndex));
-      register(decodingBuilder, new PaneInfo(true, false, timing, 0, onTimeIndex));
-      register(decodingBuilder, new PaneInfo(false, true, timing, -1, onTimeIndex));
-      register(decodingBuilder, new PaneInfo(false, false, timing, -1, onTimeIndex));
-    }
-    BYTE_TO_PANE_INFO = decodingBuilder.build();
-  }
-
-  private static void register(ImmutableMap.Builder<Byte, PaneInfo> builder, PaneInfo info) {
-    builder.put(info.encodedByte, info);
-  }
-
-  private final byte encodedByte;
-
-  private final boolean isFirst;
-  private final boolean isLast;
-  private final Timing timing;
-  private final long index;
-  private final long nonSpeculativeIndex;
-
-  /**
-   * {@code PaneInfo} to use for elements on (and before) initial window assignemnt (including
-   * elements read from sources) before they have passed through a {@link GroupByKey} and are
-   * associated with a particular trigger firing.
-   */
-  public static final PaneInfo NO_FIRING =
-      PaneInfo.createPane(true, true, Timing.UNKNOWN, 0, 0);
-
-  /**
-   * {@code PaneInfo} to use when there will be exactly one firing and it is on time.
-   */
-  public static final PaneInfo ON_TIME_AND_ONLY_FIRING =
-      PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0);
-
-  private PaneInfo(boolean isFirst, boolean isLast, Timing timing, long index, long onTimeIndex) {
-    this.encodedByte = encodedByte(isFirst, isLast, timing);
-    this.isFirst = isFirst;
-    this.isLast = isLast;
-    this.timing = timing;
-    this.index = index;
-    this.nonSpeculativeIndex = onTimeIndex;
-  }
-
-  public static PaneInfo createPane(boolean isFirst, boolean isLast, Timing timing) {
-    Preconditions.checkArgument(isFirst, "Indices must be provided for non-first pane info.");
-    return createPane(isFirst, isLast, timing, 0, timing == Timing.EARLY ? -1 : 0);
-  }
-
-  /**
-   * Factory method to create a {@link PaneInfo} with the specified parameters.
-   */
-  public static PaneInfo createPane(
-      boolean isFirst, boolean isLast, Timing timing, long index, long onTimeIndex) {
-    if (isFirst || timing == Timing.UNKNOWN) {
-      return Preconditions.checkNotNull(
-          BYTE_TO_PANE_INFO.get(encodedByte(isFirst, isLast, timing)));
-    } else {
-      return new PaneInfo(isFirst, isLast, timing, index, onTimeIndex);
-    }
-  }
-
-  public static PaneInfo decodePane(byte encodedPane) {
-    return Preconditions.checkNotNull(BYTE_TO_PANE_INFO.get(encodedPane));
-  }
-
-  /**
-   * Return true if there is no timing information for the current {@link PaneInfo}.
-   * This typically indicates that the current element has not been assigned to
-   * windows or passed through an operation that executes triggers yet.
-   */
-  public boolean isUnknown() {
-    return Timing.UNKNOWN.equals(timing);
-  }
-
-  /**
-   * Return true if this is the first pane produced for the associated window.
-   */
-  public boolean isFirst() {
-    return isFirst;
-  }
-
-  /**
-   * Return true if this is the last pane that will be produced in the associated window.
-   */
-  public boolean isLast() {
-    return isLast;
-  }
-
-  /**
-   * Return true if this is the last pane that will be produced in the associated window.
-   */
-  public Timing getTiming() {
-    return timing;
-  }
-
-  /**
-   * The zero-based index of this trigger firing that produced this pane.
-   *
-   * <p>This will return 0 for the first time the timer fires, 1 for the next time, etc.
-   *
-   * <p>A given (key, window, pane-index) is guaranteed to be unique in the
-   * output of a group-by-key operation.
-   */
-  public long getIndex() {
-    return index;
-  }
-
-  /**
-   * The zero-based index of this trigger firing among non-speculative panes.
-   *
-   * <p> This will return 0 for the first non-{@link Timing#EARLY} timer firing, 1 for the next one,
-   * etc.
-   *
-   * <p>Always -1 for speculative data.
-   */
-  public long getNonSpeculativeIndex() {
-    return nonSpeculativeIndex;
-  }
-
-  int getEncodedByte() {
-    return encodedByte;
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(encodedByte, index, nonSpeculativeIndex);
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj) {
-      // Simple PaneInfos are interned.
-      return true;
-    } else if (obj instanceof PaneInfo) {
-      PaneInfo that = (PaneInfo) obj;
-      return this.encodedByte == that.encodedByte
-          && this.index == that.index
-          && this.nonSpeculativeIndex == that.nonSpeculativeIndex;
-    } else {
-      return false;
-    }
-  }
-
-  @Override
-  public String toString() {
-    return MoreObjects.toStringHelper(getClass())
-        .omitNullValues()
-        .add("isFirst", isFirst ? true : null)
-        .add("isLast", isLast ? true : null)
-        .add("timing", timing)
-        .add("index", index)
-        .add("onTimeIndex", nonSpeculativeIndex != -1 ? nonSpeculativeIndex : null)
-        .toString();
-  }
-
-  /**
-   * A Coder for encoding PaneInfo instances.
-   */
-  public static class PaneInfoCoder extends AtomicCoder<PaneInfo> {
-    private static enum Encoding {
-      FIRST,
-      ONE_INDEX,
-      TWO_INDICES;
-
-      // NOTE: Do not reorder fields. The ordinal is used as part of
-      // the encoding.
-
-      public final byte tag;
-
-      private Encoding() {
-        assert ordinal() < 16;
-        tag = (byte) (ordinal() << 4);
-      }
-
-      public static Encoding fromTag(byte b) {
-        return Encoding.values()[b >> 4];
-      }
-    }
-
-    private Encoding chooseEncoding(PaneInfo value) {
-      if (value.index == 0 && value.nonSpeculativeIndex == 0 || value.timing == Timing.UNKNOWN) {
-        return Encoding.FIRST;
-      } else if (value.index == value.nonSpeculativeIndex || value.timing == Timing.EARLY) {
-        return Encoding.ONE_INDEX;
-      } else {
-        return Encoding.TWO_INDICES;
-      }
-    }
-
-    public static final PaneInfoCoder INSTANCE = new PaneInfoCoder();
-
-    @Override
-    public void encode(PaneInfo value, final OutputStream outStream, Coder.Context context)
-        throws CoderException, IOException {
-      Encoding encoding = chooseEncoding(value);
-      switch (chooseEncoding(value)) {
-        case FIRST:
-          outStream.write(value.encodedByte);
-          break;
-        case ONE_INDEX:
-          outStream.write(value.encodedByte | encoding.tag);
-          VarInt.encode(value.index, outStream);
-          break;
-        case TWO_INDICES:
-          outStream.write(value.encodedByte | encoding.tag);
-          VarInt.encode(value.index, outStream);
-          VarInt.encode(value.nonSpeculativeIndex, outStream);
-          break;
-        default:
-          throw new CoderException("Unknown encoding " + encoding);
-      }
-    }
-
-    @Override
-    public PaneInfo decode(final InputStream inStream, Coder.Context context)
-        throws CoderException, IOException {
-      byte keyAndTag = (byte) inStream.read();
-      PaneInfo base = BYTE_TO_PANE_INFO.get((byte) (keyAndTag & 0x0F));
-      long index, onTimeIndex;
-      switch (Encoding.fromTag(keyAndTag)) {
-        case FIRST:
-          return base;
-        case ONE_INDEX:
-          index = VarInt.decodeLong(inStream);
-          onTimeIndex = base.timing == Timing.EARLY ? -1 : index;
-          break;
-        case TWO_INDICES:
-          index = VarInt.decodeLong(inStream);
-          onTimeIndex = VarInt.decodeLong(inStream);
-          break;
-        default:
-          throw new CoderException("Unknown encoding " + (keyAndTag & 0xF0));
-      }
-      return new PaneInfo(base.isFirst, base.isLast, base.timing, index, onTimeIndex);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/PartitioningWindowFn.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/PartitioningWindowFn.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/PartitioningWindowFn.java
deleted file mode 100644
index bea0285..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/PartitioningWindowFn.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import org.joda.time.Instant;
-
-import java.util.Arrays;
-import java.util.Collection;
-
-/**
- * A {@link WindowFn} that places each value into exactly one window based on its timestamp and
- * never merges windows.
- *
- * @param <T> type of elements being windowed
- * @param <W> window type
- */
-public abstract class PartitioningWindowFn<T, W extends BoundedWindow>
-    extends NonMergingWindowFn<T, W> {
-  /**
-   * Returns the single window to which elements with this timestamp belong.
-   */
-  public abstract W assignWindow(Instant timestamp);
-
-  @Override
-  public final Collection<W> assignWindows(AssignContext c) {
-    return Arrays.asList(assignWindow(c.timestamp()));
-  }
-
-  @Override
-  public W getSideInputWindow(final BoundedWindow window) {
-    if (window instanceof GlobalWindow) {
-      throw new IllegalArgumentException(
-          "Attempted to get side input window for GlobalWindow from non-global WindowFn");
-    }
-    return assignWindow(window.maxTimestamp());
-  }
-
-  @Override
-  public boolean assignsToSingleWindow() {
-    return true;
-  }
-
-  @Override
-  public Instant getOutputTime(Instant inputTimestamp, W window) {
-    return inputTimestamp;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Repeatedly.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Repeatedly.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Repeatedly.java
deleted file mode 100644
index e77e2a1..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Repeatedly.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import com.google.cloud.dataflow.sdk.util.ExecutableTrigger;
-
-import org.joda.time.Instant;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Repeat a trigger, either until some condition is met or forever.
- *
- * <p>For example, to fire after the end of the window, and every time late data arrives:
- * <pre> {@code
- *     Repeatedly.forever(AfterWatermark.isPastEndOfWindow());
- * } </pre>
- *
- * <p>{@code Repeatedly.forever(someTrigger)} behaves like an infinite
- * {@code AfterEach.inOrder(someTrigger, someTrigger, someTrigger, ...)}.
- *
- * @param <W> {@link BoundedWindow} subclass used to represent the windows used by this
- *            {@code Trigger}
- */
-public class Repeatedly<W extends BoundedWindow> extends Trigger<W> {
-
-  private static final int REPEATED = 0;
-
-  /**
-   * Create a composite trigger that repeatedly executes the trigger {@code toRepeat}, firing each
-   * time it fires and ignoring any indications to finish.
-   *
-   * <p>Unless used with {@link Trigger#orFinally} the composite trigger will never finish.
-   *
-   * @param repeated the trigger to execute repeatedly.
-   */
-  public static <W extends BoundedWindow> Repeatedly<W> forever(Trigger<W> repeated) {
-    return new Repeatedly<W>(repeated);
-  }
-
-  private Repeatedly(Trigger<W> repeated) {
-    super(Arrays.asList(repeated));
-  }
-
-
-  @Override
-  public void onElement(OnElementContext c) throws Exception {
-    getRepeated(c).invokeOnElement(c);
-  }
-
-  @Override
-  public void onMerge(OnMergeContext c) throws Exception {
-    getRepeated(c).invokeOnMerge(c);
-  }
-
-  @Override
-  public Instant getWatermarkThatGuaranteesFiring(W window) {
-    // This trigger fires once the repeated trigger fires.
-    return subTriggers.get(REPEATED).getWatermarkThatGuaranteesFiring(window);
-  }
-
-  @Override
-  public Trigger<W> getContinuationTrigger(List<Trigger<W>> continuationTriggers) {
-    return new Repeatedly<W>(continuationTriggers.get(REPEATED));
-  }
-
-  @Override
-  public boolean shouldFire(Trigger<W>.TriggerContext context) throws Exception {
-    return getRepeated(context).invokeShouldFire(context);
-  }
-
-  @Override
-  public void onFire(TriggerContext context) throws Exception {
-    getRepeated(context).invokeOnFire(context);
-
-    if (context.trigger().isFinished(REPEATED)) {
-      context.trigger().setFinished(false, REPEATED);
-      getRepeated(context).invokeClear(context);
-    }
-  }
-
-  private ExecutableTrigger<W> getRepeated(TriggerContext context) {
-    return context.trigger().subTrigger(REPEATED);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Sessions.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Sessions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Sessions.java
deleted file mode 100644
index da137c1..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Sessions.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-
-import org.joda.time.Duration;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Objects;
-
-/**
- * A {@link WindowFn} windowing values into sessions separated by {@link #gapDuration}-long
- * periods with no elements.
- *
- * <p>For example, in order to window data into session with at least 10 minute
- * gaps in between them:
- * <pre> {@code
- * PCollection<Integer> pc = ...;
- * PCollection<Integer> windowed_pc = pc.apply(
- *   Window.<Integer>into(Sessions.withGapDuration(Duration.standardMinutes(10))));
- * } </pre>
- */
-public class Sessions extends WindowFn<Object, IntervalWindow> {
-  /**
-   * Duration of the gaps between sessions.
-   */
-  private final Duration gapDuration;
-
-  /**
-   * Creates a {@code Sessions} {@link WindowFn} with the specified gap duration.
-   */
-  public static Sessions withGapDuration(Duration gapDuration) {
-    return new Sessions(gapDuration);
-  }
-
-  /**
-   * Creates a {@code Sessions} {@link WindowFn} with the specified gap duration.
-   */
-  private Sessions(Duration gapDuration) {
-    this.gapDuration = gapDuration;
-  }
-
-  @Override
-  public Collection<IntervalWindow> assignWindows(AssignContext c) {
-    // Assign each element into a window from its timestamp until gapDuration in the
-    // future.  Overlapping windows (representing elements within gapDuration of
-    // each other) will be merged.
-    return Arrays.asList(new IntervalWindow(c.timestamp(), gapDuration));
-  }
-
-  @Override
-  public void mergeWindows(MergeContext c) throws Exception {
-    MergeOverlappingIntervalWindows.mergeWindows(c);
-  }
-
-  @Override
-  public Coder<IntervalWindow> windowCoder() {
-    return IntervalWindow.getCoder();
-  }
-
-  @Override
-  public boolean isCompatible(WindowFn<?, ?> other) {
-    return other instanceof Sessions;
-  }
-
-  @Override
-  public IntervalWindow getSideInputWindow(BoundedWindow window) {
-    throw new UnsupportedOperationException("Sessions is not allowed in side inputs");
-  }
-
-  @Experimental(Kind.OUTPUT_TIME)
-  @Override
-  public OutputTimeFn<? super IntervalWindow> getOutputTimeFn() {
-    return OutputTimeFns.outputAtEarliestInputTimestamp();
-  }
-
-  public Duration getGapDuration() {
-    return gapDuration;
-  }
-
-  @Override
-  public boolean equals(Object object) {
-    if (!(object instanceof Sessions)) {
-      return false;
-    }
-    Sessions other = (Sessions) object;
-    return getGapDuration().equals(other.getGapDuration());
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(gapDuration);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/SlidingWindows.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/SlidingWindows.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/SlidingWindows.java
deleted file mode 100644
index b0066d6..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/SlidingWindows.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Objects;
-
-/**
- * A {@link WindowFn} that windows values into possibly overlapping fixed-size
- * timestamp-based windows.
- *
- * <p>For example, in order to window data into 10 minute windows that
- * update every minute:
- * <pre> {@code
- * PCollection<Integer> items = ...;
- * PCollection<Integer> windowedItems = items.apply(
- *   Window.<Integer>into(SlidingWindows.of(Duration.standardMinutes(10))));
- * } </pre>
- */
-public class SlidingWindows extends NonMergingWindowFn<Object, IntervalWindow> {
-
-  /**
-   * Amount of time between generated windows.
-   */
-  private final Duration period;
-
-  /**
-   * Size of the generated windows.
-   */
-  private final Duration size;
-
-  /**
-   * Offset of the generated windows.
-   * Windows start at time N * start + offset, where 0 is the epoch.
-   */
-  private final Duration offset;
-
-  /**
-   * Assigns timestamps into half-open intervals of the form
-   * [N * period, N * period + size), where 0 is the epoch.
-   *
-   * <p>If {@link SlidingWindows#every} is not called, the period defaults
-   * to the largest time unit smaller than the given duration.  For example,
-   * specifying a size of 5 seconds will result in a default period of 1 second.
-   */
-  public static SlidingWindows of(Duration size) {
-    return new SlidingWindows(getDefaultPeriod(size), size, Duration.ZERO);
-  }
-
-  /**
-   * Returns a new {@code SlidingWindows} with the original size, that assigns
-   * timestamps into half-open intervals of the form
-   * [N * period, N * period + size), where 0 is the epoch.
-   */
-  public SlidingWindows every(Duration period) {
-    return new SlidingWindows(period, size, offset);
-  }
-
-  /**
-   * Assigns timestamps into half-open intervals of the form
-   * [N * period + offset, N * period + offset + size).
-   *
-   * @throws IllegalArgumentException if offset is not in [0, period)
-   */
-  public SlidingWindows withOffset(Duration offset) {
-    return new SlidingWindows(period, size, offset);
-  }
-
-  private SlidingWindows(Duration period, Duration size, Duration offset) {
-    if (offset.isShorterThan(Duration.ZERO)
-        || !offset.isShorterThan(period)
-        || !size.isLongerThan(Duration.ZERO)) {
-      throw new IllegalArgumentException(
-          "SlidingWindows WindowingStrategies must have 0 <= offset < period and 0 < size");
-    }
-    this.period = period;
-    this.size = size;
-    this.offset = offset;
-  }
-
-  @Override
-  public Coder<IntervalWindow> windowCoder() {
-    return IntervalWindow.getCoder();
-  }
-
-  @Override
-  public Collection<IntervalWindow> assignWindows(AssignContext c) {
-    List<IntervalWindow> windows =
-        new ArrayList<>((int) (size.getMillis() / period.getMillis()));
-    Instant timestamp = c.timestamp();
-    long lastStart = lastStartFor(timestamp);
-    for (long start = lastStart;
-         start > timestamp.minus(size).getMillis();
-         start -= period.getMillis()) {
-      windows.add(new IntervalWindow(new Instant(start), size));
-    }
-    return windows;
-  }
-
-  /**
-   * Return the earliest window that contains the end of the main-input window.
-   */
-  @Override
-  public IntervalWindow getSideInputWindow(final BoundedWindow window) {
-    if (window instanceof GlobalWindow) {
-      throw new IllegalArgumentException(
-          "Attempted to get side input window for GlobalWindow from non-global WindowFn");
-    }
-    long lastStart = lastStartFor(window.maxTimestamp().minus(size));
-    return new IntervalWindow(new Instant(lastStart + period.getMillis()), size);
-  }
-
-  @Override
-  public boolean isCompatible(WindowFn<?, ?> other) {
-    return equals(other);
-  }
-
-  /**
-   * Return the last start of a sliding window that contains the timestamp.
-   */
-  private long lastStartFor(Instant timestamp) {
-    return timestamp.getMillis()
-        - timestamp.plus(period).minus(offset).getMillis() % period.getMillis();
-  }
-
-  static Duration getDefaultPeriod(Duration size) {
-    if (size.isLongerThan(Duration.standardHours(1))) {
-      return Duration.standardHours(1);
-    }
-    if (size.isLongerThan(Duration.standardMinutes(1))) {
-      return Duration.standardMinutes(1);
-    }
-    if (size.isLongerThan(Duration.standardSeconds(1))) {
-      return Duration.standardSeconds(1);
-    }
-    return Duration.millis(1);
-  }
-
-  public Duration getPeriod() {
-    return period;
-  }
-
-  public Duration getSize() {
-    return size;
-  }
-
-  public Duration getOffset() {
-    return offset;
-  }
-
-  /**
-   * Ensures that later sliding windows have an output time that is past the end of earlier windows.
-   *
-   * <p>If this is the earliest sliding window containing {@code inputTimestamp}, that's fine.
-   * Otherwise, we pick the earliest time that doesn't overlap with earlier windows.
-   */
-  @Experimental(Kind.OUTPUT_TIME)
-  @Override
-  public OutputTimeFn<? super IntervalWindow> getOutputTimeFn() {
-    return new OutputTimeFn.Defaults<BoundedWindow>() {
-      @Override
-      public Instant assignOutputTime(Instant inputTimestamp, BoundedWindow window) {
-        Instant startOfLastSegment = window.maxTimestamp().minus(period);
-        return startOfLastSegment.isBefore(inputTimestamp)
-            ? inputTimestamp
-                : startOfLastSegment.plus(1);
-      }
-
-      @Override
-      public boolean dependsOnlyOnEarliestInputTimestamp() {
-        return true;
-      }
-    };
-  }
-
-  @Override
-  public boolean equals(Object object) {
-    if (!(object instanceof SlidingWindows)) {
-      return false;
-    }
-    SlidingWindows other = (SlidingWindows) object;
-    return getOffset().equals(other.getOffset())
-        && getSize().equals(other.getSize())
-        && getPeriod().equals(other.getPeriod());
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(size, offset, period);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Trigger.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Trigger.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Trigger.java
deleted file mode 100644
index 4471563..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Trigger.java
+++ /dev/null
@@ -1,544 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.util.ExecutableTrigger;
-import com.google.cloud.dataflow.sdk.util.TimeDomain;
-import com.google.cloud.dataflow.sdk.util.state.MergingStateAccessor;
-import com.google.cloud.dataflow.sdk.util.state.StateAccessor;
-import com.google.common.base.Joiner;
-
-import org.joda.time.Instant;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-
-import javax.annotation.Nullable;
-
-/**
- * {@code Trigger}s control when the elements for a specific key and window are output. As elements
- * arrive, they are put into one or more windows by a {@link Window} transform and its associated
- * {@link WindowFn}, and then passed to the associated {@code Trigger} to determine if the
- * {@code Window}s contents should be output.
- *
- * <p>See {@link com.google.cloud.dataflow.sdk.transforms.GroupByKey} and {@link Window}
- * for more information about how grouping with windows works.
- *
- * <p>The elements that are assigned to a window since the last time it was fired (or since the
- * window was created) are placed into the current window pane. Triggers are evaluated against the
- * elements as they are added. When the root trigger fires, the elements in the current pane will be
- * output. When the root trigger finishes (indicating it will never fire again), the window is
- * closed and any new elements assigned to that window are discarded.
- *
- * <p>Several predefined {@code Trigger}s are provided:
- * <ul>
- *   <li> {@link AfterWatermark} for firing when the watermark passes a timestamp determined from
- *   either the end of the window or the arrival of the first element in a pane.
- *   <li> {@link AfterProcessingTime} for firing after some amount of processing time has elapsed
- *   (typically since the first element in a pane).
- *   <li> {@link AfterPane} for firing off a property of the elements in the current pane, such as
- *   the number of elements that have been assigned to the current pane.
- * </ul>
- *
- * <p>In addition, {@code Trigger}s can be combined in a variety of ways:
- * <ul>
- *   <li> {@link Repeatedly#forever} to create a trigger that executes forever. Any time its
- *   argument finishes it gets reset and starts over. Can be combined with
- *   {@link Trigger#orFinally} to specify a condition that causes the repetition to stop.
- *   <li> {@link AfterEach#inOrder} to execute each trigger in sequence, firing each (and every)
- *   time that a trigger fires, and advancing to the next trigger in the sequence when it finishes.
- *   <li> {@link AfterFirst#of} to create a trigger that fires after at least one of its arguments
- *   fires. An {@link AfterFirst} trigger finishes after it fires once.
- *   <li> {@link AfterAll#of} to create a trigger that fires after all least one of its arguments
- *   have fired at least once. An {@link AfterAll} trigger finishes after it fires once.
- * </ul>
- *
- * <p>Each trigger tree is instantiated per-key and per-window. Every trigger in the tree is in one
- * of the following states:
- * <ul>
- *   <li> Never Existed - before the trigger has started executing, there is no state associated
- *   with it anywhere in the system. A trigger moves to the executing state as soon as it
- *   processes in the current pane.
- *   <li> Executing - while the trigger is receiving items and may fire. While it is in this state,
- *   it may persist book-keeping information to persisted state, set timers, etc.
- *   <li> Finished - after a trigger finishes, all of its book-keeping data is cleaned up, and the
- *   system remembers only that it is finished. Entering this state causes us to discard any
- *   elements in the buffer for that window, as well.
- * </ul>
- *
- * <p>Once finished, a trigger cannot return itself back to an earlier state, however a composite
- * trigger could reset its sub-triggers.
- *
- * <p>Triggers should not build up any state internally since they may be recreated
- * between invocations of the callbacks. All important values should be persisted using
- * state before the callback returns.
- *
- * @param <W> {@link BoundedWindow} subclass used to represent the windows used by this
- *            {@code Trigger}
- */
-@Experimental(Experimental.Kind.TRIGGER)
-public abstract class Trigger<W extends BoundedWindow> implements Serializable, TriggerBuilder<W> {
-
-  /**
-   * Interface for accessing information about the trigger being executed and other triggers in the
-   * same tree.
-   */
-  public interface TriggerInfo<W extends BoundedWindow> {
-
-    /**
-     * Returns true if the windowing strategy of the current {@code PCollection} is a merging
-     * WindowFn. If true, the trigger execution needs to keep enough information to support the
-     * possibility of {@link Trigger#onMerge} being called. If false, {@link Trigger#onMerge} will
-     * never be called.
-     */
-    boolean isMerging();
-
-    /**
-     * Access the executable versions of the sub-triggers of the current trigger.
-     */
-    Iterable<ExecutableTrigger<W>> subTriggers();
-
-    /**
-     * Access the executable version of the specified sub-trigger.
-     */
-    ExecutableTrigger<W> subTrigger(int subtriggerIndex);
-
-    /**
-     * Returns true if the current trigger is marked finished.
-     */
-    boolean isFinished();
-
-    /**
-     * Return true if the given subtrigger is marked finished.
-     */
-    boolean isFinished(int subtriggerIndex);
-
-    /**
-     * Returns true if all the sub-triggers of the current trigger are marked finished.
-     */
-    boolean areAllSubtriggersFinished();
-
-    /**
-     * Returns an iterable over the unfinished sub-triggers of the current trigger.
-     */
-    Iterable<ExecutableTrigger<W>> unfinishedSubTriggers();
-
-    /**
-     * Returns the first unfinished sub-trigger.
-     */
-    ExecutableTrigger<W> firstUnfinishedSubTrigger();
-
-    /**
-     * Clears all keyed state for triggers in the current sub-tree and unsets all the associated
-     * finished bits.
-     */
-    void resetTree() throws Exception;
-
-    /**
-     * Sets the finished bit for the current trigger.
-     */
-    void setFinished(boolean finished);
-
-    /**
-     * Sets the finished bit for the given sub-trigger.
-     */
-    void setFinished(boolean finished, int subTriggerIndex);
-  }
-
-  /**
-   * Interact with properties of the trigger being executed, with extensions to deal with the
-   * merging windows.
-   */
-  public interface MergingTriggerInfo<W extends BoundedWindow> extends TriggerInfo<W> {
-
-    /** Return true if the trigger is finished in any window being merged. */
-    public abstract boolean finishedInAnyMergingWindow();
-
-    /** Return true if the trigger is finished in all windows being merged. */
-    public abstract boolean finishedInAllMergingWindows();
-
-    /** Return the merging windows in which the trigger is finished. */
-    public abstract Iterable<W> getFinishedMergingWindows();
-  }
-
-  /**
-   * Information accessible to all operational hooks in this {@code Trigger}.
-   *
-   * <p>Used directly in {@link Trigger#shouldFire} and {@link Trigger#clear}, and
-   * extended with additional information in other methods.
-   */
-  public abstract class TriggerContext {
-
-    /** Returns the interface for accessing trigger info. */
-    public abstract TriggerInfo<W> trigger();
-
-    /** Returns the interface for accessing persistent state. */
-    public abstract StateAccessor<?> state();
-
-    /** The window that the current context is executing in. */
-    public abstract W window();
-
-    /** Create a sub-context for the given sub-trigger. */
-    public abstract TriggerContext forTrigger(ExecutableTrigger<W> trigger);
-
-    /**
-     * Removes the timer set in this trigger context for the given {@link Instant}
-     * and {@link TimeDomain}.
-     */
-    public abstract void deleteTimer(Instant timestamp, TimeDomain domain);
-
-    /** The current processing time. */
-    public abstract Instant currentProcessingTime();
-
-    /** The current synchronized upstream processing time or {@code null} if unknown. */
-    @Nullable
-    public abstract Instant currentSynchronizedProcessingTime();
-
-    /** The current event time for the input or {@code null} if unknown. */
-    @Nullable
-    public abstract Instant currentEventTime();
-  }
-
-  /**
-   * Extended {@link TriggerContext} containing information accessible to the {@link #onElement}
-   * operational hook.
-   */
-  public abstract class OnElementContext extends TriggerContext {
-    /** The event timestamp of the element currently being processed. */
-    public abstract Instant eventTimestamp();
-
-    /**
-     * Sets a timer to fire when the watermark or processing time is beyond the given timestamp.
-     * Timers are not guaranteed to fire immediately, but will be delivered at some time afterwards.
-     *
-     * <p>As with {@link #state}, timers are implicitly scoped to the current window. All
-     * timer firings for a window will be received, but the implementation should choose to ignore
-     * those that are not applicable.
-     *
-     * @param timestamp the time at which the trigger should be re-evaluated
-     * @param domain the domain that the {@code timestamp} applies to
-     */
-    public abstract void setTimer(Instant timestamp, TimeDomain domain);
-
-    /** Create an {@code OnElementContext} for executing the given trigger. */
-    @Override
-    public abstract OnElementContext forTrigger(ExecutableTrigger<W> trigger);
-  }
-
-  /**
-   * Extended {@link TriggerContext} containing information accessible to the {@link #onMerge}
-   * operational hook.
-   */
-  public abstract class OnMergeContext extends TriggerContext {
-    /**
-     * Sets a timer to fire when the watermark or processing time is beyond the given timestamp.
-     * Timers are not guaranteed to fire immediately, but will be delivered at some time afterwards.
-     *
-     * <p>As with {@link #state}, timers are implicitly scoped to the current window. All
-     * timer firings for a window will be received, but the implementation should choose to ignore
-     * those that are not applicable.
-     *
-     * @param timestamp the time at which the trigger should be re-evaluated
-     * @param domain the domain that the {@code timestamp} applies to
-     */
-    public abstract void setTimer(Instant timestamp, TimeDomain domain);
-
-    /** Create an {@code OnMergeContext} for executing the given trigger. */
-    @Override
-    public abstract OnMergeContext forTrigger(ExecutableTrigger<W> trigger);
-
-    @Override
-    public abstract MergingStateAccessor<?, W> state();
-
-    @Override
-    public abstract MergingTriggerInfo<W> trigger();
-  }
-
-  @Nullable
-  protected final List<Trigger<W>> subTriggers;
-
-  protected Trigger(@Nullable List<Trigger<W>> subTriggers) {
-    this.subTriggers = subTriggers;
-  }
-
-
-  /**
-   * Called immediately after an element is first incorporated into a window.
-   */
-  public abstract void onElement(OnElementContext c) throws Exception;
-
-  /**
-   * Called immediately after windows have been merged.
-   *
-   * <p>Leaf triggers should update their state by inspecting their status and any state
-   * in the merging windows. Composite triggers should update their state by calling
-   * {@link ExecutableTrigger#invokeOnMerge} on their sub-triggers, and applying appropriate logic.
-   *
-   * <p>A trigger such as {@link AfterWatermark#pastEndOfWindow} may no longer be finished;
-   * it is the responsibility of the trigger itself to record this fact. It is forbidden for
-   * a trigger to become finished due to {@link #onMerge}, as it has not yet fired the pending
-   * elements that led to it being ready to fire.
-   *
-   * <p>The implementation does not need to clear out any state associated with the old windows.
-   */
-  public abstract void onMerge(OnMergeContext c) throws Exception;
-
-  /**
-   * Returns {@code true} if the current state of the trigger indicates that its condition
-   * is satisfied and it is ready to fire.
-   */
-  public abstract boolean shouldFire(TriggerContext context) throws Exception;
-
-  /**
-   * Adjusts the state of the trigger to be ready for the next pane. For example, a
-   * {@link Repeatedly} trigger will reset its inner trigger, since it has fired.
-   *
-   * <p>If the trigger is finished, it is the responsibility of the trigger itself to
-   * record that fact via the {@code context}.
-   */
-  public abstract void onFire(TriggerContext context) throws Exception;
-
-  /**
-   * Called to allow the trigger to prefetch any state it will likely need to read from during
-   * an {@link #onElement} call.
-   */
-  public void prefetchOnElement(StateAccessor<?> state) {
-    if (subTriggers != null) {
-      for (Trigger<W> trigger : subTriggers) {
-        trigger.prefetchOnElement(state);
-      }
-    }
-  }
-
-  /**
-   * Called to allow the trigger to prefetch any state it will likely need to read from during
-   * an {@link #onMerge} call.
-   */
-  public void prefetchOnMerge(MergingStateAccessor<?, W> state) {
-    if (subTriggers != null) {
-      for (Trigger<W> trigger : subTriggers) {
-        trigger.prefetchOnMerge(state);
-      }
-    }
-  }
-
-  /**
-   * Called to allow the trigger to prefetch any state it will likely need to read from during
-   * an {@link #shouldFire} call.
-   */
-  public void prefetchShouldFire(StateAccessor<?> state) {
-    if (subTriggers != null) {
-      for (Trigger<W> trigger : subTriggers) {
-        trigger.prefetchShouldFire(state);
-      }
-    }
-  }
-
-  /**
-   * Called to allow the trigger to prefetch any state it will likely need to read from during
-   * an {@link #onFire} call.
-   */
-  public void prefetchOnFire(StateAccessor<?> state) {
-    if (subTriggers != null) {
-      for (Trigger<W> trigger : subTriggers) {
-        trigger.prefetchOnFire(state);
-      }
-    }
-  }
-
-  /**
-   * Clear any state associated with this trigger in the given window.
-   *
-   * <p>This is called after a trigger has indicated it will never fire again. The trigger system
-   * keeps enough information to know that the trigger is finished, so this trigger should clear all
-   * of its state.
-   */
-  public void clear(TriggerContext c) throws Exception {
-    if (subTriggers != null) {
-      for (ExecutableTrigger<W> trigger : c.trigger().subTriggers()) {
-        trigger.invokeClear(c);
-      }
-    }
-  }
-
-  public Iterable<Trigger<W>> subTriggers() {
-    return subTriggers;
-  }
-
-  /**
-   * Return a trigger to use after a {@code GroupByKey} to preserve the
-   * intention of this trigger. Specifically, triggers that are time based
-   * and intended to provide speculative results should continue providing
-   * speculative results. Triggers that fire once (or multiple times) should
-   * continue firing once (or multiple times).
-   */
-  public Trigger<W> getContinuationTrigger() {
-    if (subTriggers == null) {
-      return getContinuationTrigger(null);
-    }
-
-    List<Trigger<W>> subTriggerContinuations = new ArrayList<>();
-    for (Trigger<W> subTrigger : subTriggers) {
-      subTriggerContinuations.add(subTrigger.getContinuationTrigger());
-    }
-    return getContinuationTrigger(subTriggerContinuations);
-  }
-
-  /**
-   * Return the {@link #getContinuationTrigger} of this {@code Trigger}. For convenience, this
-   * is provided the continuation trigger of each of the sub-triggers.
-   */
-  protected abstract Trigger<W> getContinuationTrigger(List<Trigger<W>> continuationTriggers);
-
-  /**
-   * Returns a bound in watermark time by which this trigger would have fired at least once
-   * for a given window had there been input data.  This is a static property of a trigger
-   * that does not depend on its state.
-   *
-   * <p>For triggers that do not fire based on the watermark advancing, returns
-   * {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
-   *
-   * <p>This estimate is used to determine that there are no elements in a side-input window, which
-   * causes the default value to be used instead.
-   */
-  public abstract Instant getWatermarkThatGuaranteesFiring(W window);
-
-  /**
-   * Returns whether this performs the same triggering as the given {@code Trigger}.
-   */
-  public boolean isCompatible(Trigger<?> other) {
-    if (!getClass().equals(other.getClass())) {
-      return false;
-    }
-
-    if (subTriggers == null) {
-      return other.subTriggers == null;
-    } else if (other.subTriggers == null) {
-      return false;
-    } else if (subTriggers.size() != other.subTriggers.size()) {
-      return false;
-    }
-
-    for (int i = 0; i < subTriggers.size(); i++) {
-      if (!subTriggers.get(i).isCompatible(other.subTriggers.get(i))) {
-        return false;
-      }
-    }
-
-    return true;
-  }
-
-  @Override
-  public String toString() {
-    String simpleName = getClass().getSimpleName();
-    if (getClass().getEnclosingClass() != null) {
-      simpleName = getClass().getEnclosingClass().getSimpleName() + "." + simpleName;
-    }
-    if (subTriggers == null || subTriggers.size() == 0) {
-      return simpleName;
-    } else {
-      return simpleName + "(" + Joiner.on(", ").join(subTriggers) + ")";
-    }
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if (!(obj instanceof Trigger)) {
-      return false;
-    }
-    @SuppressWarnings("unchecked")
-    Trigger<W> that = (Trigger<W>) obj;
-    return Objects.equals(getClass(), that.getClass())
-        && Objects.equals(subTriggers, that.subTriggers);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(getClass(), subTriggers);
-  }
-
-  /**
-   * Specify an ending condition for this trigger. If the {@code until} fires then the combination
-   * fires.
-   *
-   * <p>The expression {@code t1.orFinally(t2)} fires every time {@code t1} fires, and finishes
-   * as soon as either {@code t1} finishes or {@code t2} fires, in which case it fires one last time
-   * for {@code t2}. Both {@code t1} and {@code t2} are executed in parallel. This means that
-   * {@code t1} may have fired since {@code t2} started, so not all of the elements that {@code t2}
-   * has seen are necessarily in the current pane.
-   *
-   * <p>For example the final firing of the following trigger may only have 1 element:
-   * <pre> {@code
-   * Repeatedly.forever(AfterPane.elementCountAtLeast(2))
-   *     .orFinally(AfterPane.elementCountAtLeast(5))
-   * } </pre>
-   *
-   * <p>Note that if {@code t1} is {@link OnceTrigger}, then {@code t1.orFinally(t2)} is the same
-   * as {@code AfterFirst.of(t1, t2)}.
-   */
-  public Trigger<W> orFinally(OnceTrigger<W> until) {
-    return new OrFinallyTrigger<W>(this, until);
-  }
-
-  @Override
-  public Trigger<W> buildTrigger() {
-    return this;
-  }
-
-  /**
-   * {@link Trigger}s that are guaranteed to fire at most once should extend from this, rather
-   * than the general {@link Trigger} class to indicate that behavior.
-   *
-   * @param <W> {@link BoundedWindow} subclass used to represent the windows used by this
-   *            {@code AtMostOnceTrigger}
-   */
-  public abstract static class OnceTrigger<W extends BoundedWindow> extends Trigger<W> {
-    protected OnceTrigger(List<Trigger<W>> subTriggers) {
-      super(subTriggers);
-    }
-
-    @Override
-    public final OnceTrigger<W> getContinuationTrigger() {
-      Trigger<W> continuation = super.getContinuationTrigger();
-      if (!(continuation instanceof OnceTrigger)) {
-        throw new IllegalStateException("Continuation of a OnceTrigger must be a OnceTrigger");
-      }
-      return (OnceTrigger<W>) continuation;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public final void onFire(TriggerContext context) throws Exception {
-      onOnlyFiring(context);
-      context.trigger().setFinished(true);
-    }
-
-    /**
-     * Called exactly once by {@link #onFire} when the trigger is fired. By default,
-     * invokes {@link #onFire} on all subtriggers for which {@link #shouldFire} is {@code true}.
-     */
-    protected abstract void onOnlyFiring(TriggerContext context) throws Exception;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/TriggerBuilder.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/TriggerBuilder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/TriggerBuilder.java
deleted file mode 100644
index cc817ba..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/TriggerBuilder.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-/**
- * Anything that can be used to create an instance of a {@code Trigger} implements this interface.
- *
- * <p>This includes {@code Trigger}s (which can return themselves) and any "enhanced" syntax for
- * constructing a trigger.
- *
- * @param <W> The type of windows the built trigger will operate on.
- */
-public interface TriggerBuilder<W extends BoundedWindow> {
-  /** Return the {@code Trigger} built by this builder. */
-  Trigger<W> buildTrigger();
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Window.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Window.java
deleted file mode 100644
index 6793e76..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Window.java
+++ /dev/null
@@ -1,662 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.Coder.NonDeterministicException;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.util.AssignWindowsDoFn;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import org.joda.time.Duration;
-
-import javax.annotation.Nullable;
-
-/**
- * {@code Window} logically divides up or groups the elements of a
- * {@link PCollection} into finite windows according to a {@link WindowFn}.
- * The output of {@code Window} contains the same elements as input, but they
- * have been logically assigned to windows. The next
- * {@link com.google.cloud.dataflow.sdk.transforms.GroupByKey GroupByKeys},
- * including one within composite transforms, will group by the combination of
- * keys and windows.
-
- * <p>See {@link com.google.cloud.dataflow.sdk.transforms.GroupByKey}
- * for more information about how grouping with windows works.
- *
- * <h2> Windowing </h2>
- *
- * <p>Windowing a {@code PCollection} divides the elements into windows based
- * on the associated event time for each element. This is especially useful
- * for {@code PCollection}s with unbounded size, since it allows operating on
- * a sub-group of the elements placed into a related window. For {@code PCollection}s
- * with a bounded size (aka. conventional batch mode), by default, all data is
- * implicitly in a single window, unless {@code Window} is applied.
- *
- * <p>For example, a simple form of windowing divides up the data into
- * fixed-width time intervals, using {@link FixedWindows}.
- * The following example demonstrates how to use {@code Window} in a pipeline
- * that counts the number of occurrences of strings each minute:
- *
- * <pre> {@code
- * PCollection<String> items = ...;
- * PCollection<String> windowed_items = items.apply(
- *   Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));
- * PCollection<KV<String, Long>> windowed_counts = windowed_items.apply(
- *   Count.<String>perElement());
- * } </pre>
- *
- * <p>Let (data, timestamp) denote a data element along with its timestamp.
- * Then, if the input to this pipeline consists of
- * {("foo", 15s), ("bar", 30s), ("foo", 45s), ("foo", 1m30s)},
- * the output will be
- * {(KV("foo", 2), 1m), (KV("bar", 1), 1m), (KV("foo", 1), 2m)}
- *
- * <p>Several predefined {@link WindowFn}s are provided:
- * <ul>
- *  <li> {@link FixedWindows} partitions the timestamps into fixed-width intervals.
- *  <li> {@link SlidingWindows} places data into overlapping fixed-width intervals.
- *  <li> {@link Sessions} groups data into sessions where each item in a window
- *       is separated from the next by no more than a specified gap.
- * </ul>
- *
- * <p>Additionally, custom {@link WindowFn}s can be created, by creating new
- * subclasses of {@link WindowFn}.
- *
- * <h2> Triggers </h2>
- *
- * <p>{@link Window.Bound#triggering(TriggerBuilder)} allows specifying a trigger to control when
- * (in processing time) results for the given window can be produced. If unspecified, the default
- * behavior is to trigger first when the watermark passes the end of the window, and then trigger
- * again every time there is late arriving data.
- *
- * <p>Elements are added to the current window pane as they arrive. When the root trigger fires,
- * output is produced based on the elements in the current pane.
- *
- * <p>Depending on the trigger, this can be used both to output partial results
- * early during the processing of the whole window, and to deal with late
- * arriving in batches.
- *
- * <p>Continuing the earlier example, if we wanted to emit the values that were available
- * when the watermark passed the end of the window, and then output any late arriving
- * elements once-per (actual hour) hour until we have finished processing the next 24-hours of data.
- * (The use of watermark time to stop processing tends to be more robust if the data source is slow
- * for a few days, etc.)
- *
- * <pre> {@code
- * PCollection<String> items = ...;
- * PCollection<String> windowed_items = items.apply(
- *   Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
- *      .triggering(
- *          AfterWatermark.pastEndOfWindow()
- *              .withLateFirings(AfterProcessingTime
- *                  .pastFirstElementInPane().plusDelayOf(Duration.standardHours(1))))
- *      .withAllowedLateness(Duration.standardDays(1)));
- * PCollection<KV<String, Long>> windowed_counts = windowed_items.apply(
- *   Count.<String>perElement());
- * } </pre>
- *
- * <p>On the other hand, if we wanted to get early results every minute of processing
- * time (for which there were new elements in the given window) we could do the following:
- *
- * <pre> {@code
- * PCollection<String> windowed_items = items.apply(
- *   Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))
- *      .triggering(
- *      .triggering(
- *          AfterWatermark.pastEndOfWindow()
- *              .withEarlyFirings(AfterProcessingTime
- *                  .pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1))))
- *      .withAllowedLateness(Duration.ZERO));
- * } </pre>
- *
- * <p>After a {@link com.google.cloud.dataflow.sdk.transforms.GroupByKey} the trigger is set to
- * a trigger that will preserve the intent of the upstream trigger.  See
- * {@link Trigger#getContinuationTrigger} for more information.
- *
- * <p>See {@link Trigger} for details on the available triggers.
- */
-public class Window {
-
-  /**
-   * Specifies the conditions under which a final pane will be created when a window is permanently
-   * closed.
-   */
-  public enum ClosingBehavior {
-    /**
-     * Always fire the last pane. Even if there is no new data since the previous firing, an element
-     * with {@link PaneInfo#isLast()} {@code true} will be produced.
-     */
-    FIRE_ALWAYS,
-    /**
-     * Only fire the last pane if there is new data since the previous firing.
-     *
-     * <p>This is the default behavior.
-     */
-    FIRE_IF_NON_EMPTY;
-  }
-
-  /**
-   * Creates a {@code Window} {@code PTransform} with the given name.
-   *
-   * <p>See the discussion of Naming in
-   * {@link com.google.cloud.dataflow.sdk.transforms.ParDo} for more explanation.
-   *
-   * <p>The resulting {@code PTransform} is incomplete, and its input/output
-   * type is not yet bound.  Use {@link Window.Unbound#into} to specify the
-   * {@link WindowFn} to use, which will also bind the input/output type of this
-   * {@code PTransform}.
-   */
-  public static Unbound named(String name) {
-    return new Unbound().named(name);
-  }
-
-  /**
-   * Creates a {@code Window} {@code PTransform} that uses the given
-   * {@link WindowFn} to window the data.
-   *
-   * <p>The resulting {@code PTransform}'s types have been bound, with both the
-   * input and output being a {@code PCollection<T>}, inferred from the types of
-   * the argument {@code WindowFn}.  It is ready to be applied, or further
-   * properties can be set on it first.
-   */
-  public static <T> Bound<T> into(WindowFn<? super T, ?> fn) {
-    return new Unbound().into(fn);
-  }
-
-  /**
-   * Sets a non-default trigger for this {@code Window} {@code PTransform}.
-   * Elements that are assigned to a specific window will be output when
-   * the trigger fires.
-   *
-   * <p>Must also specify allowed lateness using {@link #withAllowedLateness} and accumulation
-   * mode using either {@link #discardingFiredPanes()} or {@link #accumulatingFiredPanes()}.
-   */
-  @Experimental(Kind.TRIGGER)
-  public static <T> Bound<T> triggering(TriggerBuilder<?> trigger) {
-    return new Unbound().triggering(trigger);
-  }
-
-  /**
-   * Returns a new {@code Window} {@code PTransform} that uses the registered WindowFn and
-   * Triggering behavior, and that discards elements in a pane after they are triggered.
-   *
-   * <p>Does not modify this transform.  The resulting {@code PTransform} is sufficiently
-   * specified to be applied, but more properties can still be specified.
-   */
-  @Experimental(Kind.TRIGGER)
-  public static <T> Bound<T> discardingFiredPanes() {
-    return new Unbound().discardingFiredPanes();
-  }
-
-  /**
-   * Returns a new {@code Window} {@code PTransform} that uses the registered WindowFn and
-   * Triggering behavior, and that accumulates elements in a pane after they are triggered.
-   *
-   * <p>Does not modify this transform.  The resulting {@code PTransform} is sufficiently
-   * specified to be applied, but more properties can still be specified.
-   */
-  @Experimental(Kind.TRIGGER)
-  public static <T> Bound<T> accumulatingFiredPanes() {
-    return new Unbound().accumulatingFiredPanes();
-  }
-
-  /**
-   * Override the amount of lateness allowed for data elements in the pipeline. Like
-   * the other properties on this {@link Window} operation, this will be applied at
-   * the next {@link GroupByKey}. Any elements that are later than this as decided by
-   * the system-maintained watermark will be dropped.
-   *
-   * <p>This value also determines how long state will be kept around for old windows.
-   * Once no elements will be added to a window (because this duration has passed) any state
-   * associated with the window will be cleaned up.
-   */
-  @Experimental(Kind.TRIGGER)
-  public static <T> Bound<T> withAllowedLateness(Duration allowedLateness) {
-    return new Unbound().withAllowedLateness(allowedLateness);
-  }
-
-  /**
-   * An incomplete {@code Window} transform, with unbound input/output type.
-   *
-   * <p>Before being applied, {@link Window.Unbound#into} must be
-   * invoked to specify the {@link WindowFn} to invoke, which will also
-   * bind the input/output type of this {@code PTransform}.
-   */
-  public static class Unbound {
-    String name;
-
-    Unbound() {}
-
-    Unbound(String name) {
-      this.name = name;
-    }
-
-    /**
-     * Returns a new {@code Window} transform that's like this
-     * transform but with the specified name.  Does not modify this
-     * transform.  The resulting transform is still incomplete.
-     *
-     * <p>See the discussion of Naming in
-     * {@link com.google.cloud.dataflow.sdk.transforms.ParDo} for more
-     * explanation.
-     */
-    public Unbound named(String name) {
-      return new Unbound(name);
-    }
-
-    /**
-     * Returns a new {@code Window} {@code PTransform} that's like this
-     * transform but that will use the given {@link WindowFn}, and that has
-     * its input and output types bound.  Does not modify this transform.  The
-     * resulting {@code PTransform} is sufficiently specified to be applied,
-     * but more properties can still be specified.
-     */
-    public <T> Bound<T> into(WindowFn<? super T, ?> fn) {
-      return new Bound<T>(name).into(fn);
-    }
-
-    /**
-     * Sets a non-default trigger for this {@code Window} {@code PTransform}.
-     * Elements that are assigned to a specific window will be output when
-     * the trigger fires.
-     *
-     * <p>{@link com.google.cloud.dataflow.sdk.transforms.windowing.Trigger}
-     * has more details on the available triggers.
-     *
-     * <p>Must also specify allowed lateness using {@link #withAllowedLateness} and accumulation
-     * mode using either {@link #discardingFiredPanes()} or {@link #accumulatingFiredPanes()}.
-     */
-    @Experimental(Kind.TRIGGER)
-    public <T> Bound<T> triggering(TriggerBuilder<?> trigger) {
-      return new Bound<T>(name).triggering(trigger);
-    }
-
-    /**
-     * Returns a new {@code Window} {@code PTransform} that uses the registered WindowFn and
-     * Triggering behavior, and that discards elements in a pane after they are triggered.
-     *
-     * <p>Does not modify this transform.  The resulting {@code PTransform} is sufficiently
-     * specified to be applied, but more properties can still be specified.
-     */
-    @Experimental(Kind.TRIGGER)
-    public <T> Bound<T> discardingFiredPanes() {
-      return new Bound<T>(name).discardingFiredPanes();
-    }
-
-    /**
-     * Returns a new {@code Window} {@code PTransform} that uses the registered WindowFn and
-     * Triggering behavior, and that accumulates elements in a pane after they are triggered.
-     *
-     * <p>Does not modify this transform.  The resulting {@code PTransform} is sufficiently
-     * specified to be applied, but more properties can still be specified.
-     */
-    @Experimental(Kind.TRIGGER)
-    public <T> Bound<T> accumulatingFiredPanes() {
-      return new Bound<T>(name).accumulatingFiredPanes();
-    }
-
-    /**
-     * Override the amount of lateness allowed for data elements in the pipeline. Like
-     * the other properties on this {@link Window} operation, this will be applied at
-     * the next {@link GroupByKey}. Any elements that are later than this as decided by
-     * the system-maintained watermark will be dropped.
-     *
-     * <p>This value also determines how long state will be kept around for old windows.
-     * Once no elements will be added to a window (because this duration has passed) any state
-     * associated with the window will be cleaned up.
-     *
-     * <p>Depending on the trigger this may not produce a pane with {@link PaneInfo#isLast}. See
-     * {@link ClosingBehavior#FIRE_IF_NON_EMPTY} for more details.
-     */
-    @Experimental(Kind.TRIGGER)
-    public <T> Bound<T> withAllowedLateness(Duration allowedLateness) {
-      return new Bound<T>(name).withAllowedLateness(allowedLateness);
-    }
-
-    /**
-     * Override the amount of lateness allowed for data elements in the pipeline. Like
-     * the other properties on this {@link Window} operation, this will be applied at
-     * the next {@link GroupByKey}. Any elements that are later than this as decided by
-     * the system-maintained watermark will be dropped.
-     *
-     * <p>This value also determines how long state will be kept around for old windows.
-     * Once no elements will be added to a window (because this duration has passed) any state
-     * associated with the window will be cleaned up.
-     */
-    @Experimental(Kind.TRIGGER)
-    public <T> Bound<T> withAllowedLateness(Duration allowedLateness, ClosingBehavior behavior) {
-      return new Bound<T>(name).withAllowedLateness(allowedLateness, behavior);
-    }
-  }
-
-  /**
-   * A {@code PTransform} that windows the elements of a {@code PCollection<T>},
-   * into finite windows according to a user-specified {@code WindowFn}.
-   *
-   * @param <T> The type of elements this {@code Window} is applied to
-   */
-  public static class Bound<T> extends PTransform<PCollection<T>, PCollection<T>> {
-
-    @Nullable private final WindowFn<? super T, ?> windowFn;
-    @Nullable private final Trigger<?> trigger;
-    @Nullable private final AccumulationMode mode;
-    @Nullable private final Duration allowedLateness;
-    @Nullable private final ClosingBehavior closingBehavior;
-    @Nullable private final OutputTimeFn<?> outputTimeFn;
-
-    private Bound(String name,
-        @Nullable WindowFn<? super T, ?> windowFn, @Nullable Trigger<?> trigger,
-        @Nullable AccumulationMode mode, @Nullable Duration allowedLateness,
-        ClosingBehavior behavior, @Nullable OutputTimeFn<?> outputTimeFn) {
-      super(name);
-      this.windowFn = windowFn;
-      this.trigger = trigger;
-      this.mode = mode;
-      this.allowedLateness = allowedLateness;
-      this.closingBehavior = behavior;
-      this.outputTimeFn = outputTimeFn;
-    }
-
-    private Bound(String name) {
-      this(name, null, null, null, null, null, null);
-    }
-
-    /**
-     * Returns a new {@code Window} {@code PTransform} that's like this
-     * transform but that will use the given {@link WindowFn}, and that has
-     * its input and output types bound.  Does not modify this transform.  The
-     * resulting {@code PTransform} is sufficiently specified to be applied,
-     * but more properties can still be specified.
-     */
-    private Bound<T> into(WindowFn<? super T, ?> windowFn) {
-      try {
-        windowFn.windowCoder().verifyDeterministic();
-      } catch (NonDeterministicException e) {
-        throw new IllegalArgumentException("Window coders must be deterministic.", e);
-      }
-
-      return new Bound<>(
-          name, windowFn, trigger, mode, allowedLateness, closingBehavior, outputTimeFn);
-    }
-
-    /**
-     * Returns a new {@code Window} {@code PTransform} that's like this
-     * {@code PTransform} but with the specified name.  Does not
-     * modify this {@code PTransform}.
-     *
-     * <p>See the discussion of Naming in
-     * {@link com.google.cloud.dataflow.sdk.transforms.ParDo} for more
-     * explanation.
-     */
-    public Bound<T> named(String name) {
-      return new Bound<>(
-          name, windowFn, trigger, mode, allowedLateness, closingBehavior, outputTimeFn);
-    }
-
-    /**
-     * Sets a non-default trigger for this {@code Window} {@code PTransform}.
-     * Elements that are assigned to a specific window will be output when
-     * the trigger fires.
-     *
-     * <p>{@link com.google.cloud.dataflow.sdk.transforms.windowing.Trigger}
-     * has more details on the available triggers.
-     *
-     * <p>Must also specify allowed lateness using {@link #withAllowedLateness} and accumulation
-     * mode using either {@link #discardingFiredPanes()} or {@link #accumulatingFiredPanes()}.
-     */
-    @Experimental(Kind.TRIGGER)
-    public Bound<T> triggering(TriggerBuilder<?> trigger) {
-      return new Bound<T>(
-          name,
-          windowFn,
-          trigger.buildTrigger(),
-          mode,
-          allowedLateness,
-          closingBehavior,
-          outputTimeFn);
-    }
-
-   /**
-    * Returns a new {@code Window} {@code PTransform} that uses the registered WindowFn and
-    * Triggering behavior, and that discards elements in a pane after they are triggered.
-    *
-    * <p>Does not modify this transform.  The resulting {@code PTransform} is sufficiently
-    * specified to be applied, but more properties can still be specified.
-    */
-    @Experimental(Kind.TRIGGER)
-   public Bound<T> discardingFiredPanes() {
-     return new Bound<T>(
-         name,
-         windowFn,
-         trigger,
-         AccumulationMode.DISCARDING_FIRED_PANES,
-         allowedLateness,
-         closingBehavior,
-         outputTimeFn);
-   }
-
-   /**
-    * Returns a new {@code Window} {@code PTransform} that uses the registered WindowFn and
-    * Triggering behavior, and that accumulates elements in a pane after they are triggered.
-    *
-    * <p>Does not modify this transform.  The resulting {@code PTransform} is sufficiently
-    * specified to be applied, but more properties can still be specified.
-    */
-   @Experimental(Kind.TRIGGER)
-   public Bound<T> accumulatingFiredPanes() {
-     return new Bound<T>(
-         name,
-         windowFn,
-         trigger,
-         AccumulationMode.ACCUMULATING_FIRED_PANES,
-         allowedLateness,
-         closingBehavior,
-         outputTimeFn);
-   }
-
-    /**
-     * Override the amount of lateness allowed for data elements in the pipeline. Like
-     * the other properties on this {@link Window} operation, this will be applied at
-     * the next {@link GroupByKey}. Any elements that are later than this as decided by
-     * the system-maintained watermark will be dropped.
-     *
-     * <p>This value also determines how long state will be kept around for old windows.
-     * Once no elements will be added to a window (because this duration has passed) any state
-     * associated with the window will be cleaned up.
-     *
-     * <p>Depending on the trigger this may not produce a pane with {@link PaneInfo#isLast}. See
-     * {@link ClosingBehavior#FIRE_IF_NON_EMPTY} for more details.
-     */
-    @Experimental(Kind.TRIGGER)
-    public Bound<T> withAllowedLateness(Duration allowedLateness) {
-      return new Bound<T>(
-          name, windowFn, trigger, mode, allowedLateness, closingBehavior, outputTimeFn);
-    }
-
-    /**
-     * <b><i>(Experimental)</i></b> Override the default {@link OutputTimeFn}, to control
-     * the output timestamp of values output from a {@link GroupByKey} operation.
-     */
-    @Experimental(Kind.OUTPUT_TIME)
-    public Bound<T> withOutputTimeFn(OutputTimeFn<?> outputTimeFn) {
-      return new Bound<T>(
-          name, windowFn, trigger, mode, allowedLateness, closingBehavior, outputTimeFn);
-    }
-
-    /**
-     * Override the amount of lateness allowed for data elements in the pipeline. Like
-     * the other properties on this {@link Window} operation, this will be applied at
-     * the next {@link GroupByKey}. Any elements that are later than this as decided by
-     * the system-maintained watermark will be dropped.
-     *
-     * <p>This value also determines how long state will be kept around for old windows.
-     * Once no elements will be added to a window (because this duration has passed) any state
-     * associated with the window will be cleaned up.
-     */
-    @Experimental(Kind.TRIGGER)
-    public Bound<T> withAllowedLateness(Duration allowedLateness, ClosingBehavior behavior) {
-      return new Bound<T>(name, windowFn, trigger, mode, allowedLateness, behavior, outputTimeFn);
-    }
-
-    /**
-     * Get the output strategy of this {@link Window.Bound Window PTransform}. For internal use
-     * only.
-     */
-    // Rawtype cast of OutputTimeFn cannot be eliminated with intermediate variable, as it is
-    // casting between wildcards
-    public WindowingStrategy<?, ?> getOutputStrategyInternal(
-        WindowingStrategy<?, ?> inputStrategy) {
-      WindowingStrategy<?, ?> result = inputStrategy;
-      if (windowFn != null) {
-        result = result.withWindowFn(windowFn);
-      }
-      if (trigger != null) {
-        result = result.withTrigger(trigger);
-      }
-      if (mode != null) {
-        result = result.withMode(mode);
-      }
-      if (allowedLateness != null) {
-        result = result.withAllowedLateness(allowedLateness);
-      }
-      if (closingBehavior != null) {
-        result = result.withClosingBehavior(closingBehavior);
-      }
-      if (outputTimeFn != null) {
-        result = result.withOutputTimeFn(outputTimeFn);
-      }
-      return result;
-    }
-
-    /**
-     * Get the {@link WindowFn} of this {@link Window.Bound Window PTransform}.
-     */
-    public WindowFn<? super T, ?> getWindowFn() {
-      return windowFn;
-    }
-
-    @Override
-    public void validate(PCollection<T> input) {
-      WindowingStrategy<?, ?> outputStrategy =
-          getOutputStrategyInternal(input.getWindowingStrategy());
-
-      // Make sure that the windowing strategy is complete & valid.
-      if (outputStrategy.isTriggerSpecified()
-          && !(outputStrategy.getTrigger().getSpec() instanceof DefaultTrigger)) {
-        if (!(outputStrategy.getWindowFn() instanceof GlobalWindows)
-            && !outputStrategy.isAllowedLatenessSpecified()) {
-          throw new IllegalArgumentException("Except when using GlobalWindows,"
-              + " calling .triggering() to specify a trigger requires that the allowed lateness be"
-              + " specified using .withAllowedLateness() to set the upper bound on how late data"
-              + " can arrive before being dropped. See Javadoc for more details.");
-        }
-
-        if (!outputStrategy.isModeSpecified()) {
-          throw new IllegalArgumentException(
-              "Calling .triggering() to specify a trigger requires that the accumulation mode be"
-              + " specified using .discardingFiredPanes() or .accumulatingFiredPanes()."
-              + " See Javadoc for more details.");
-        }
-      }
-    }
-
-    @Override
-    public PCollection<T> apply(PCollection<T> input) {
-      WindowingStrategy<?, ?> outputStrategy =
-          getOutputStrategyInternal(input.getWindowingStrategy());
-      PCollection<T> output;
-      if (windowFn != null) {
-        // If the windowFn changed, we create a primitive, and run the AssignWindows operation here.
-        output = assignWindows(input, windowFn);
-      } else {
-        // If the windowFn didn't change, we just run a pass-through transform and then set the
-        // new windowing strategy.
-        output = input.apply(Window.<T>identity());
-      }
-      return output.setWindowingStrategyInternal(outputStrategy);
-    }
-
-    private <T, W extends BoundedWindow> PCollection<T> assignWindows(
-        PCollection<T> input, WindowFn<? super T, W> windowFn) {
-      return input.apply("AssignWindows", ParDo.of(new AssignWindowsDoFn<T, W>(windowFn)));
-    }
-
-    @Override
-    protected Coder<?> getDefaultOutputCoder(PCollection<T> input) {
-      return input.getCoder();
-    }
-
-    @Override
-    protected String getKindString() {
-      return "Window.Into()";
-    }
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  private static <T> PTransform<PCollection<? extends T>, PCollection<T>> identity() {
-    return ParDo.named("Identity").of(new DoFn<T, T>() {
-      @Override public void processElement(ProcessContext c) {
-        c.output(c.element());
-      }
-    });
-  }
-
-  /**
-   * Creates a {@code Window} {@code PTransform} that does not change assigned
-   * windows, but will cause windows to be merged again as part of the next
-   * {@link com.google.cloud.dataflow.sdk.transforms.GroupByKey}.
-   */
-  public static <T> Remerge<T> remerge() {
-    return new Remerge<T>();
-  }
-
-  /**
-   * {@code PTransform} that does not change assigned windows, but will cause
-   *  windows to be merged again as part of the next
-   * {@link com.google.cloud.dataflow.sdk.transforms.GroupByKey}.
-   */
-  public static class Remerge<T> extends PTransform<PCollection<T>, PCollection<T>> {
-    @Override
-    public PCollection<T> apply(PCollection<T> input) {
-      WindowingStrategy<?, ?> outputWindowingStrategy = getOutputWindowing(
-          input.getWindowingStrategy());
-
-      return input.apply(Window.<T>identity())
-          .setWindowingStrategyInternal(outputWindowingStrategy);
-    }
-
-    private <W extends BoundedWindow> WindowingStrategy<?, W> getOutputWindowing(
-        WindowingStrategy<?, W> inputStrategy) {
-      if (inputStrategy.getWindowFn() instanceof InvalidWindows) {
-        @SuppressWarnings("unchecked")
-        InvalidWindows<W> invalidWindows = (InvalidWindows<W>) inputStrategy.getWindowFn();
-        return inputStrategy.withWindowFn(invalidWindows.getOriginalWindowFn());
-      } else {
-        return inputStrategy;
-      }
-    }
-  }
-}