You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/05/09 17:12:04 UTC

[1/2] incubator-beam git commit: Make WindowingStrategy combine WindowFn with OutputTimeFn

Repository: incubator-beam
Updated Branches:
  refs/heads/master 199ec2e10 -> 7725a47a7


Make WindowingStrategy combine WindowFn with OutputTimeFn

Previously:

 - Any user-specified OutputTimeFn overrode the WindowFn#getOutputTime
 - WindowFn#getOutputTimeFn provided a default OutputTimeFn
 - The default varied from "earliest" to "end of window"

Now:

 - The user-specified OutputTimeFn is used to combine the WindowFn's
   assigned output timestamps.
 - The WindowFn does not provide the default.
 - The default is always to output at end of window.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3755c557
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3755c557
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3755c557

Branch: refs/heads/master
Commit: 3755c5579de3c46202a32b3ac2774ab440cf42f3
Parents: e63311f
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu May 5 19:33:16 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon May 9 10:10:57 2016 -0700

----------------------------------------------------------------------
 .../FlinkGroupAlsoByWindowWrapper.java          |  2 +-
 .../flink/streaming/GroupAlsoByWindowTest.java  |  7 +-
 .../beam/sdk/testing/WindowFnTestUtils.java     |  4 +-
 .../sdk/transforms/windowing/OutputTimeFn.java  |  1 -
 .../beam/sdk/transforms/windowing/Sessions.java |  8 --
 .../transforms/windowing/SlidingWindows.java    | 23 ++----
 .../beam/sdk/transforms/windowing/WindowFn.java | 82 ++++----------------
 .../apache/beam/sdk/util/WindowingStrategy.java | 55 ++++++++++++-
 .../sdk/transforms/join/CoGroupByKeyTest.java   |  7 +-
 .../sdk/transforms/windowing/WindowTest.java    | 31 +++++---
 .../sdk/transforms/windowing/WindowingTest.java |  9 ++-
 .../sdk/util/GroupAlsoByWindowsProperties.java  | 65 +++-------------
 ...oupAlsoByWindowsViaOutputBufferDoFnTest.java |  8 +-
 .../beam/sdk/util/ReduceFnRunnerTest.java       |  6 ++
 .../apache/beam/sdk/util/ReduceFnTester.java    |  9 ++-
 .../CopyOnAccessInMemoryStateInternalsTest.java |  6 +-
 16 files changed, 136 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
index 0306aa1..9d2cad8 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
@@ -412,7 +412,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
     FlinkStateInternals<K> stateInternals = perKeyStateInternals.get(key);
     if (stateInternals == null) {
       Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder();
-      OutputTimeFn<? super BoundedWindow> outputTimeFn = this.windowingStrategy.getWindowFn().getOutputTimeFn();
+      OutputTimeFn<? super BoundedWindow> outputTimeFn = this.windowingStrategy.getOutputTimeFn();
       stateInternals = new FlinkStateInternals<>(key, inputKvCoder.getKeyCoder(), windowCoder, outputTimeFn);
       perKeyStateInternals.put(key, stateInternals);
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
index f3ceba7..c76af65 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Repeatedly;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
@@ -58,16 +59,19 @@ public class GroupAlsoByWindowTest {
 
   private final WindowingStrategy slidingWindowWithAfterWatermarkTriggerStrategy =
       WindowingStrategy.of(SlidingWindows.of(Duration.standardSeconds(10)).every(Duration.standardSeconds(5)))
+          .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
           .withTrigger(AfterWatermark.pastEndOfWindow()).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES);
 
   private final WindowingStrategy sessionWindowingStrategy =
       WindowingStrategy.of(Sessions.withGapDuration(Duration.standardSeconds(2)))
           .withTrigger(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
           .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES)
+          .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
           .withAllowedLateness(Duration.standardSeconds(100));
 
   private final WindowingStrategy fixedWindowingStrategy =
-      WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(10)));
+      WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(10)))
+          .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp());
 
   private final WindowingStrategy fixedWindowWithCountTriggerStrategy =
       fixedWindowingStrategy.withTrigger(AfterPane.elementCountAtLeast(5));
@@ -94,6 +98,7 @@ public class GroupAlsoByWindowTest {
   public void testWithLateness() throws Exception {
     WindowingStrategy strategy = WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(2)))
         .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES)
+        .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
         .withAllowedLateness(Duration.millis(1000));
     long initialTime = 0L;
     Pipeline pipeline = FlinkTestPipeline.createForStreaming();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
index 2566a12..a4130df 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
@@ -201,7 +201,7 @@ public class WindowFnTestUtils {
 
     Instant instant = new Instant(timestamp);
     for (W window : windows) {
-      Instant outputTimestamp = windowFn.getOutputTimeFn().assignOutputTime(instant, window);
+      Instant outputTimestamp = windowFn.getOutputTime(instant, window);
       assertFalse("getOutputTime must be greater than or equal to input timestamp",
           outputTimestamp.isBefore(instant));
       assertFalse("getOutputTime must be less than or equal to the max timestamp",
@@ -232,7 +232,7 @@ public class WindowFnTestUtils {
     Instant instant = new Instant(timestamp);
     Instant endOfPrevious = null;
     for (W window : sortedWindows) {
-      Instant outputTimestamp = windowFn.getOutputTimeFn().assignOutputTime(instant, window);
+      Instant outputTimestamp = windowFn.getOutputTime(instant, window);
       if (endOfPrevious == null) {
         // If this is the first window, the output timestamp can be anything, as long as it is in
         // the valid range.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java
index 3deea56..7cf870a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java
@@ -61,7 +61,6 @@ public abstract class OutputTimeFn<W extends BoundedWindow> implements Serializa
    * Returns the output timestamp to use for data depending on the given
    * {@code inputTimestamp} in the specified {@code window}.
    *
-   *
    * <p>The result of this method must be between {@code inputTimestamp} and
    * {@code window.maxTimestamp()} (inclusive on both sides).
    *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java
index 8e8a005..788566e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.sdk.transforms.windowing;
 
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 
@@ -88,12 +86,6 @@ public class Sessions extends WindowFn<Object, IntervalWindow> {
     throw new UnsupportedOperationException("Sessions is not allowed in side inputs");
   }
 
-  @Experimental(Kind.OUTPUT_TIME)
-  @Override
-  public OutputTimeFn<? super IntervalWindow> getOutputTimeFn() {
-    return OutputTimeFns.outputAtEarliestInputTimestamp();
-  }
-
   public Duration getGapDuration() {
     return gapDuration;
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
index 4153e21..62c2738 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
@@ -185,26 +185,17 @@ public class SlidingWindows extends NonMergingWindowFn<Object, IntervalWindow> {
   /**
    * Ensures that later sliding windows have an output time that is past the end of earlier windows.
    *
-   * <p>If this is the earliest sliding window containing {@code inputTimestamp}, that's fine.
+   * <p>
+   * If this is the earliest sliding window containing {@code inputTimestamp}, that's fine.
    * Otherwise, we pick the earliest time that doesn't overlap with earlier windows.
    */
   @Experimental(Kind.OUTPUT_TIME)
   @Override
-  public OutputTimeFn<? super IntervalWindow> getOutputTimeFn() {
-    return new OutputTimeFn.Defaults<BoundedWindow>() {
-      @Override
-      public Instant assignOutputTime(Instant inputTimestamp, BoundedWindow window) {
-        Instant startOfLastSegment = window.maxTimestamp().minus(period);
-        return startOfLastSegment.isBefore(inputTimestamp)
-            ? inputTimestamp
-                : startOfLastSegment.plus(1);
-      }
-
-      @Override
-      public boolean dependsOnlyOnEarliestInputTimestamp() {
-        return true;
-      }
-    };
+  public Instant getOutputTime(Instant inputTimestamp, IntervalWindow window) {
+    Instant startOfLastSegment = window.maxTimestamp().minus(period);
+    return startOfLastSegment.isBefore(inputTimestamp)
+        ? inputTimestamp
+        : startOfLastSegment.plus(1);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
index e291bee..41833f8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
@@ -22,9 +22,6 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
-import org.apache.beam.sdk.util.WindowingStrategy;
-
-import com.google.common.collect.Ordering;
 
 import org.joda.time.Instant;
 
@@ -136,34 +133,24 @@ public abstract class WindowFn<T, W extends BoundedWindow>
   public abstract W getSideInputWindow(final BoundedWindow window);
 
   /**
-   * @deprecated Implement {@link #getOutputTimeFn} to return one of the appropriate
-   * {@link OutputTimeFns}, or a custom {@link OutputTimeFn} extending
-   * {@link OutputTimeFn.Defaults}.
-   */
-  @Deprecated
-  @Experimental(Kind.OUTPUT_TIME)
-  public Instant getOutputTime(Instant inputTimestamp, W window) {
-    return getOutputTimeFn().assignOutputTime(inputTimestamp, window);
-  }
-
-  /**
-   * Provides a default implementation for {@link WindowingStrategy#getOutputTimeFn()}.
-   * See the full specification there.
+   * Returns the output timestamp to use for data depending on the given
+   * {@code inputTimestamp} in the specified {@code window}.
    *
-   * <p>If this {@link WindowFn} doesn't produce overlapping windows, this need not (and probably
-   * should not) override any of the default implementations in {@link OutputTimeFn.Defaults}.
+   * <p>The result of this method must be between {@code inputTimestamp} and
+   * {@code window.maxTimestamp()} (inclusive on both sides).
    *
-   * <p>If this {@link WindowFn} does produce overlapping windows that can be predicted here, it is
-   * suggested that the result in later overlapping windows is past the end of earlier windows so
-   * that the later windows don't prevent the watermark from progressing past the end of the earlier
-   * window.
+   * <p>This function must be monotonic across input timestamps. Specifically, if {@code A < B},
+   * then {@code getOutputTime(A, window) <= getOutputTime(B, window)}.
    *
-   * <p>For example, a timestamp in a sliding window should be moved past the beginning of the next
-   * sliding window. See {@link SlidingWindows#getOutputTimeFn}.
+   * <p>For a {@link WindowFn} that doesn't produce overlapping windows, this can (and typically
+   * should) just return {@code inputTimestamp}. In the presence of overlapping windows, it is
+   * suggested that the result in later overlapping windows is past the end of earlier windows
+   * so that the later windows don't prevent the watermark from
+   * progressing past the end of the earlier window.
    */
   @Experimental(Kind.OUTPUT_TIME)
-  public OutputTimeFn<? super W> getOutputTimeFn() {
-    return new OutputAtEarliestAssignedTimestamp<>(this);
+  public Instant getOutputTime(Instant inputTimestamp, W window) {
+    return inputTimestamp;
   }
 
   /**
@@ -189,47 +176,4 @@ public abstract class WindowFn<T, W extends BoundedWindow>
   @Override
   public void populateDisplayData(DisplayData.Builder builder) {
   }
-
-  /**
-   * A compatibility adapter that will return the assigned timestamps according to the
-   * {@link WindowFn}, which was the prior policy. Specifying the assigned output timestamps
-   * on the {@link WindowFn} is now deprecated.
-   */
-  private static class OutputAtEarliestAssignedTimestamp<W extends BoundedWindow>
-      extends OutputTimeFn.Defaults<W> {
-
-    private final WindowFn<?, W> windowFn;
-
-    public OutputAtEarliestAssignedTimestamp(WindowFn<?, W> windowFn) {
-      this.windowFn = windowFn;
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @return the result of {@link WindowFn#getOutputTime windowFn.getOutputTime()}.
-     */
-    @Override
-    @SuppressWarnings("deprecation") // this is an adapter for the deprecated behavior
-    public Instant assignOutputTime(Instant timestamp, W window) {
-      return windowFn.getOutputTime(timestamp, window);
-    }
-
-    @Override
-    public Instant combine(Instant outputTime, Instant otherOutputTime) {
-      return Ordering.natural().min(outputTime, otherOutputTime);
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @return {@code true}. When the {@link OutputTimeFn} is not overridden by {@link WindowFn}
-     *         or {@link WindowingStrategy}, the minimum output timestamp is taken, which depends
-     *         only on the minimum input timestamp by monotonicity of {@link #assignOutputTime}.
-     */
-    @Override
-    public boolean dependsOnlyOnEarliestInputTimestamp() {
-      return true;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
index a82f2b3..d98793f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
@@ -23,6 +23,7 @@ import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -30,8 +31,10 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import com.google.common.base.MoreObjects;
 
 import org.joda.time.Duration;
+import org.joda.time.Instant;
 
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.Objects;
 
 /**
@@ -99,7 +102,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
         ExecutableTrigger.create(DefaultTrigger.<W>of()), false,
         AccumulationMode.DISCARDING_FIRED_PANES, false,
         DEFAULT_ALLOWED_LATENESS, false,
-        windowFn.getOutputTimeFn(), false,
+        OutputTimeFns.outputAtEndOfWindow(), false,
         ClosingBehavior.FIRE_IF_NON_EMPTY);
   }
 
@@ -182,7 +185,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
     // The onus of type correctness falls on the callee.
     @SuppressWarnings("unchecked")
     OutputTimeFn<? super W> newOutputTimeFn = (OutputTimeFn<? super W>)
-        (outputTimeFnSpecified ? outputTimeFn : typedWindowFn.getOutputTimeFn());
+        new CombineWindowFnOutputTimes<W>(outputTimeFn, typedWindowFn);
 
     return new WindowingStrategy<T, W>(
         typedWindowFn,
@@ -223,12 +226,15 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
     @SuppressWarnings("unchecked")
     OutputTimeFn<? super W> typedOutputTimeFn = (OutputTimeFn<? super W>) outputTimeFn;
 
+    OutputTimeFn<? super W> newOutputTimeFn =
+        new CombineWindowFnOutputTimes<W>(typedOutputTimeFn, windowFn);
+
     return new WindowingStrategy<T, W>(
         windowFn,
         trigger, triggerSpecified,
         mode, modeSpecified,
         allowedLateness, allowedLatenessSpecified,
-        typedOutputTimeFn, true,
+        newOutputTimeFn, true,
         closingBehavior);
   }
 
@@ -265,4 +271,47 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
     return Objects.hash(triggerSpecified, allowedLatenessSpecified, modeSpecified,
         windowFn, trigger, mode, allowedLateness, closingBehavior);
   }
+
+  /**
+   * An {@link OutputTimeFn} that uses {@link WindowFn#getOutputTime} to assign initial timestamps
+   * but then combines and merges according to a given {@link OutputTimeFn}.
+   *
+   * <ul>
+   *   <li>The {@link WindowFn#getOutputTime} allows adjustments such as that whereby
+   *       {@link SlidingWindows#getOutputTime} moves elements later in time to avoid holding up
+   *       progress downstream.</li>
+   *   <li>Then, when multiple elements are buffered for output, the output timestamp of the
+   *       result is calculated using {@link OutputTimeFn#combine}.</li>
+   *   <li>In the case of a merging {@link WindowFn}, the output timestamp when windows merge
+   *       is calculated using {@link OutputTimeFn#merge}.</li>
+   * </ul>
+   */
+  private static class CombineWindowFnOutputTimes<W extends BoundedWindow>
+      extends OutputTimeFn.Defaults<W> {
+
+    private final OutputTimeFn<? super W> outputTimeFn;
+    private final WindowFn<?, W> windowFn;
+
+    public CombineWindowFnOutputTimes(
+        OutputTimeFn<? super W> outputTimeFn, WindowFn<?, W> windowFn) {
+      this.outputTimeFn = outputTimeFn;
+      this.windowFn = windowFn;
+    }
+
+    @Override
+    public Instant assignOutputTime(Instant inputTimestamp, W window) {
+      return outputTimeFn.merge(
+          window, Collections.singleton(windowFn.getOutputTime(inputTimestamp, window)));
+    }
+
+    @Override
+    public Instant combine(Instant timestamp, Instant otherTimestamp) {
+      return outputTimeFn.combine(timestamp, otherTimestamp);
+    }
+
+    @Override
+    public Instant merge(W newWindow, Iterable<? extends Instant> timestamps) {
+      return outputTimeFn.merge(newWindow, timestamps);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
index 609f454..f4ce2ca 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -239,7 +240,8 @@ public class CoGroupByKeyTest implements Serializable {
             idToClick,
             Arrays.asList(0L, 2L, 4L, 6L, 8L))
         .apply("WindowClicks", Window.<KV<Integer, String>>into(
-            FixedWindows.of(new Duration(4))));
+            FixedWindows.of(new Duration(4)))
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()));
 
     PCollection<KV<Integer, String>> purchasesTable =
         createInput("CreatePurchases",
@@ -247,7 +249,8 @@ public class CoGroupByKeyTest implements Serializable {
             idToPurchases,
             Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L))
         .apply("WindowPurchases", Window.<KV<Integer, String>>into(
-            FixedWindows.of(new Duration(4))));
+            FixedWindows.of(new Duration(4)))
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()));
 
     PCollection<KV<Integer, CoGbkResult>> coGbkResults =
         KeyedPCollectionTuple.of(clicksTag, clicksTable)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index 91bd846..8ad590d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -183,25 +183,34 @@ public class WindowTest implements Serializable {
 
   /**
    * Tests that when two elements are combined via a GroupByKey their output timestamp agrees
-   * with the windowing function default, the earlier of the two values.
+   * with the windowing function default, the end of the window.
    */
   @Test
   @Category(RunnableOnService.class)
   public void testOutputTimeFnDefault() {
     Pipeline pipeline = TestPipeline.create();
 
-    pipeline.apply(
-        Create.timestamped(
-            TimestampedValue.of(KV.of(0, "hello"), new Instant(0)),
-            TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10))))
+    pipeline
+        .apply(
+            Create.timestamped(
+                TimestampedValue.of(KV.of(0, "hello"), new Instant(0)),
+                TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10))))
         .apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(10))))
         .apply(GroupByKey.<Integer, String>create())
-        .apply(ParDo.of(new DoFn<KV<Integer, Iterable<String>>, Void>() {
-          @Override
-          public void processElement(ProcessContext c) throws Exception {
-            assertThat(c.timestamp(), equalTo(new Instant(0)));
-          }
-        }));
+        .apply(
+            ParDo.of(
+                new DoFn<KV<Integer, Iterable<String>>, Void>() {
+                  @Override
+                  public void processElement(ProcessContext c) throws Exception {
+                    assertThat(
+                        c.timestamp(),
+                        equalTo(
+                            new IntervalWindow(
+                                    new Instant(0),
+                                    new Instant(0).plus(Duration.standardMinutes(10)))
+                                .maxTimestamp()));
+                  }
+                }));
 
     pipeline.run();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
index 5cbf044..65adac1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
@@ -72,11 +72,12 @@ public class WindowingTest implements Serializable {
     }
     @Override
     public PCollection<String> apply(PCollection<String> in) {
-      return in
-          .apply(Window.named("Window").<String>into(windowFn))
+      return in.apply(
+              Window.named("Window")
+                  .<String>into(windowFn)
+                  .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()))
           .apply(Count.<String>perElement())
-          .apply(ParDo
-              .named("FormatCounts").of(new FormatCountsDoFn()))
+          .apply(ParDo.named("FormatCounts").of(new FormatCountsDoFn()))
           .setCoder(StringUtf8Coder.of());
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
index d5aa0da..4518f9f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
@@ -29,7 +29,6 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
@@ -123,12 +122,12 @@ public class GroupAlsoByWindowsProperties {
 
     WindowedValue<KV<String, Iterable<String>>> item0 = result.get(0);
     assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
-    assertThat(item0.getTimestamp(), equalTo(new Instant(1)));
+    assertThat(item0.getTimestamp(), equalTo(window(0, 10).maxTimestamp()));
     assertThat(item0.getWindows(), contains(window(0, 10)));
 
     WindowedValue<KV<String, Iterable<String>>> item1 = result.get(1);
     assertThat(item1.getValue().getValue(), contains("v3"));
-    assertThat(item1.getTimestamp(), equalTo(new Instant(13)));
+    assertThat(item1.getTimestamp(), equalTo(window(10, 20).maxTimestamp()));
     assertThat(item1.getWindows(),
         contains(window(10, 20)));
   }
@@ -139,12 +138,13 @@ public class GroupAlsoByWindowsProperties {
    *
    * <p>In the input here, each element occurs in multiple windows.
    */
-  public static void groupsElementsIntoSlidingWindows(
+  public static void groupsElementsIntoSlidingWindowsWithMinTimestamp(
       GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
           throws Exception {
 
     WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(
-        SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10)));
+        SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10)))
+        .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp());
 
     List<WindowedValue<KV<String, Iterable<String>>>> result =
         runGABW(gabwFactory, windowingStrategy, "key",
@@ -271,13 +271,13 @@ public class GroupAlsoByWindowsProperties {
 
     WindowedValue<KV<String, Iterable<String>>> item0 = result.get(0);
     assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v3"));
-    assertThat(item0.getTimestamp(), equalTo(new Instant(1)));
+    assertThat(item0.getTimestamp(), equalTo(window(1, 5).maxTimestamp()));
     assertThat(item0.getWindows(),
         contains(window(0, 5)));
 
     WindowedValue<KV<String, Iterable<String>>> item1 = result.get(1);
     assertThat(item1.getValue().getValue(), contains("v2"));
-    assertThat(item1.getTimestamp(), equalTo(new Instant(4)));
+    assertThat(item1.getTimestamp(), equalTo(window(0, 5).maxTimestamp()));
     assertThat(item1.getWindows(),
         contains(window(1, 5)));
   }
@@ -314,13 +314,13 @@ public class GroupAlsoByWindowsProperties {
 
     WindowedValue<KV<String, Iterable<String>>> item0 = result.get(0);
     assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
-    assertThat(item0.getTimestamp(), equalTo(new Instant(0)));
+    assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp()));
     assertThat(item0.getWindows(),
         contains(window(0, 15)));
 
     WindowedValue<KV<String, Iterable<String>>> item1 = result.get(1);
     assertThat(item1.getValue().getValue(), contains("v3"));
-    assertThat(item1.getTimestamp(), equalTo(new Instant(15)));
+    assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp()));
     assertThat(item1.getWindows(),
         contains(window(15, 25)));
   }
@@ -421,53 +421,6 @@ public class GroupAlsoByWindowsProperties {
   /**
    * Tests that for a simple sequence of elements on the same key, the given GABW implementation
    * correctly groups them according to fixed windows and also sets the output timestamp
-   * according to a custom {@link OutputTimeFn}.
-   */
-  public static void groupsElementsIntoFixedWindowsWithCustomTimestamp(
-      GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-      throws Exception {
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
-            .withOutputTimeFn(new OutputTimeFn.Defaults<IntervalWindow>() {
-              @Override
-              public Instant assignOutputTime(Instant inputTimestamp, IntervalWindow window) {
-                return inputTimestamp.isBefore(window.maxTimestamp())
-                    ? inputTimestamp.plus(1) : window.maxTimestamp();
-              }
-
-              @Override
-              public Instant combine(Instant outputTime, Instant otherOutputTime) {
-                return outputTime.isBefore(otherOutputTime) ? outputTime : otherOutputTime;
-              }
-
-              @Override
-              public boolean dependsOnlyOnEarliestInputTimestamp() {
-                return true;
-              }
-            });
-
-    List<WindowedValue<KV<String, Iterable<String>>>> result = runGABW(gabwFactory,
-        windowingStrategy, "key",
-        WindowedValue.of("v1", new Instant(1), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
-        WindowedValue.of("v2", new Instant(2), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
-        WindowedValue.of("v3", new Instant(13), Arrays.asList(window(10, 20)), PaneInfo.NO_FIRING));
-
-    assertThat(result.size(), equalTo(2));
-
-    WindowedValue<KV<String, Iterable<String>>> item0 = result.get(0);
-    assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
-    assertThat(item0.getWindows(), contains(window(0, 10)));
-    assertThat(item0.getTimestamp(), equalTo(new Instant(2)));
-
-    WindowedValue<KV<String, Iterable<String>>> item1 = result.get(1);
-    assertThat(item1.getValue().getValue(), contains("v3"));
-    assertThat(item1.getWindows(), contains(window(10, 20)));
-    assertThat(item1.getTimestamp(), equalTo(new Instant(14)));
-  }
-
-  /**
-   * Tests that for a simple sequence of elements on the same key, the given GABW implementation
-   * correctly groups them according to fixed windows and also sets the output timestamp
    * according to the policy {@link OutputTimeFns#outputAtLatestInputTimestamp()}.
    */
   public static void groupsElementsIntoFixedWindowsWithLatestTimestamp(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java
index d9c786d..4ac6164 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java
@@ -64,7 +64,7 @@ public class GroupAlsoByWindowsViaOutputBufferDoFnTest {
 
   @Test
   public void testGroupsElementsIntoSlidingWindows() throws Exception {
-    GroupAlsoByWindowsProperties.groupsElementsIntoSlidingWindows(
+    GroupAlsoByWindowsProperties.groupsElementsIntoSlidingWindowsWithMinTimestamp(
         new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
   }
 
@@ -93,12 +93,6 @@ public class GroupAlsoByWindowsViaOutputBufferDoFnTest {
   }
 
   @Test
-  public void testGroupsElementsIntoFixedWindowsWithCustomTimestamp() throws Exception {
-    GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindowsWithCustomTimestamp(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
-  }
-
-  @Test
   public void testGroupsElementsIntoSessionsWithEndOfWindowTimestamp() throws Exception {
     GroupAlsoByWindowsProperties.groupsElementsInMergedSessionsWithEndOfWindowTimestamp(
         new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
index f2036eb..41c1710 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
@@ -49,6 +49,7 @@ import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
 import org.apache.beam.sdk.transforms.windowing.Repeatedly;
@@ -258,6 +259,7 @@ public class ReduceFnRunnerTest {
         .of(FixedWindows.of(Duration.millis(10)))
         .withTrigger(mockTrigger)
         .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
+        .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
         .withAllowedLateness(Duration.millis(100));
 
     TestOptions options = PipelineOptionsFactory.as(TestOptions.class);
@@ -504,6 +506,7 @@ public class ReduceFnRunnerTest {
                 AfterWatermark.pastEndOfWindow())))
             .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
             .withAllowedLateness(Duration.millis(100))
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
             .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
 
     tester.advanceInputWatermark(new Instant(0));
@@ -556,6 +559,7 @@ public class ReduceFnRunnerTest {
                 AfterWatermark.pastEndOfWindow())))
             .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
             .withAllowedLateness(Duration.millis(100))
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
             .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY));
 
     tester.advanceInputWatermark(new Instant(0));
@@ -582,6 +586,7 @@ public class ReduceFnRunnerTest {
                 AfterWatermark.pastEndOfWindow())))
             .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
             .withAllowedLateness(Duration.millis(100))
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
             .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
 
     tester.advanceInputWatermark(new Instant(0));
@@ -610,6 +615,7 @@ public class ReduceFnRunnerTest {
                 AfterWatermark.pastEndOfWindow())))
             .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
             .withAllowedLateness(Duration.millis(100))
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
             .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
 
     tester.advanceInputWatermark(new Instant(0));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
index f296d65..9916c5c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.TriggerBuilder;
@@ -134,6 +135,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
           Duration allowedDataLateness, ClosingBehavior closingBehavior) throws Exception {
     WindowingStrategy<?, W> strategy =
         WindowingStrategy.of(windowFn)
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
             .withTrigger(trigger.buildTrigger())
             .withMode(mode)
             .withAllowedLateness(allowedDataLateness)
@@ -185,8 +187,11 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
           Duration allowedDataLateness) throws Exception {
 
     WindowingStrategy<?, W> strategy =
-        WindowingStrategy.of(windowFn).withTrigger(trigger).withMode(mode).withAllowedLateness(
-            allowedDataLateness);
+        WindowingStrategy.of(windowFn)
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+            .withTrigger(trigger)
+            .withMode(mode)
+            .withAllowedLateness(allowedDataLateness);
 
     return combining(strategy, combineFn, outputCoder);
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternalsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternalsTest.java
index f17f64c..b7388ee 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternalsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternalsTest.java
@@ -35,7 +35,6 @@ import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
@@ -220,9 +219,8 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     CopyOnAccessInMemoryStateInternals<String> underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
 
-    @SuppressWarnings("unchecked")
-    OutputTimeFn<BoundedWindow> outputTimeFn = (OutputTimeFn<BoundedWindow>)
-        TestPipeline.create().apply(Create.of("foo")).getWindowingStrategy().getOutputTimeFn();
+    OutputTimeFn<BoundedWindow> outputTimeFn =
+        OutputTimeFns.outputAtEarliestInputTimestamp();
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
     StateTag<Object, WatermarkHoldState<BoundedWindow>> stateTag =



[2/2] incubator-beam git commit: This closes #296

Posted by ke...@apache.org.
This closes #296


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7725a47a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7725a47a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7725a47a

Branch: refs/heads/master
Commit: 7725a47a7918018b834a535baf2c5a5049bb9c6e
Parents: 199ec2e 3755c55
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon May 9 10:11:34 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon May 9 10:11:34 2016 -0700

----------------------------------------------------------------------
 .../FlinkGroupAlsoByWindowWrapper.java          |  2 +-
 .../flink/streaming/GroupAlsoByWindowTest.java  |  7 +-
 .../beam/sdk/testing/WindowFnTestUtils.java     |  4 +-
 .../sdk/transforms/windowing/OutputTimeFn.java  |  1 -
 .../beam/sdk/transforms/windowing/Sessions.java |  8 --
 .../transforms/windowing/SlidingWindows.java    | 23 ++----
 .../beam/sdk/transforms/windowing/WindowFn.java | 82 ++++----------------
 .../apache/beam/sdk/util/WindowingStrategy.java | 55 ++++++++++++-
 .../sdk/transforms/join/CoGroupByKeyTest.java   |  7 +-
 .../sdk/transforms/windowing/WindowTest.java    | 31 +++++---
 .../sdk/transforms/windowing/WindowingTest.java |  9 ++-
 .../sdk/util/GroupAlsoByWindowsProperties.java  | 65 +++-------------
 ...oupAlsoByWindowsViaOutputBufferDoFnTest.java |  8 +-
 .../beam/sdk/util/ReduceFnRunnerTest.java       |  6 ++
 .../apache/beam/sdk/util/ReduceFnTester.java    |  9 ++-
 .../CopyOnAccessInMemoryStateInternalsTest.java |  6 +-
 16 files changed, 136 insertions(+), 187 deletions(-)
----------------------------------------------------------------------