You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/26 22:38:22 UTC

[2/4] beam git commit: Replace OutputTimeFn UDF with TimestampCombiner enum

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
index 2967f2c..eac465c 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
@@ -26,8 +26,9 @@ import java.util.List;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -53,8 +54,9 @@ public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends Bou
       Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
 
     @SuppressWarnings("unchecked")
-    OutputTimeFn<? super BoundedWindow> outputTimeFn =
-        (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
+    TimestampCombiner timestampCombiner =
+        (TimestampCombiner) windowingStrategy.getTimestampCombiner();
+    WindowFn<Object, W> windowFn = windowingStrategy.getWindowFn();
 
     // get all elements so that we can sort them, has to fit into
     // memory
@@ -88,18 +90,19 @@ public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends Bou
     // create accumulator using the first elements key
     WindowedValue<KV<K, InputT>> currentValue = iterator.next();
     K key = currentValue.getValue().getKey();
-    BoundedWindow currentWindow = Iterables.getOnlyElement(currentValue.getWindows());
+    W currentWindow = (W) Iterables.getOnlyElement(currentValue.getWindows());
     InputT firstValue = currentValue.getValue().getValue();
     AccumT accumulator = flinkCombiner.firstInput(
         key, firstValue, options, sideInputReader, currentValue.getWindows());
 
-    // we use this to keep track of the timestamps assigned by the OutputTimeFn
+    // we use this to keep track of the timestamps assigned by the TimestampCombiner
     Instant windowTimestamp =
-        outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow);
+        timestampCombiner.assign(
+            currentWindow, windowFn.getOutputTime(currentValue.getTimestamp(), currentWindow));
 
     while (iterator.hasNext()) {
       WindowedValue<KV<K, InputT>> nextValue = iterator.next();
-      BoundedWindow nextWindow = Iterables.getOnlyElement(nextValue.getWindows());
+      W nextWindow = (W) Iterables.getOnlyElement(nextValue.getWindows());
 
       if (currentWindow.equals(nextWindow)) {
         // continue accumulating and merge windows
@@ -108,9 +111,12 @@ public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends Bou
         accumulator = flinkCombiner.addInput(key, accumulator, value,
             options, sideInputReader, currentValue.getWindows());
 
-        windowTimestamp = outputTimeFn.combine(
-            windowTimestamp,
-            outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow));
+        windowTimestamp =
+            timestampCombiner.combine(
+                windowTimestamp,
+                timestampCombiner.assign(
+                    currentWindow,
+                    windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow)));
 
       } else {
         // emit the value that we currently have
@@ -127,7 +133,9 @@ public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends Bou
         InputT value = nextValue.getValue().getValue();
         accumulator = flinkCombiner.firstInput(key, value,
             options, sideInputReader, currentValue.getWindows());
-        windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
+        windowTimestamp =
+            timestampCombiner.assign(
+                currentWindow, windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow));
       }
 
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
index 3203446..d015c38 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.CombineContextFactory;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
@@ -176,9 +176,9 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
       }
 
       @Override
-      public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
-          StateTag<? super K, WatermarkHoldState<W>> address,
-          OutputTimeFn<? super W> outputTimeFn) {
+      public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+          StateTag<? super K, WatermarkHoldState> address,
+          TimestampCombiner timestampCombiner) {
          throw new UnsupportedOperationException(
              String.format("%s is not supported", WatermarkHoldState.class.getSimpleName()));
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
index 24b340e..2dd7c96 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
@@ -38,7 +38,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
@@ -186,9 +186,9 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
       }
 
       @Override
-      public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
-          StateTag<? super K, WatermarkHoldState<W>> address,
-          OutputTimeFn<? super W> outputTimeFn) {
+      public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+          StateTag<? super K, WatermarkHoldState> address,
+          TimestampCombiner timestampCombiner) {
         throw new UnsupportedOperationException(
             String.format("%s is not supported", CombiningState.class.getSimpleName()));
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
index 2bf0bf1..17ea62a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.MapState;
@@ -146,9 +146,9 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
       }
 
       @Override
-      public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
-          StateTag<? super K, WatermarkHoldState<W>> address,
-          OutputTimeFn<? super W> outputTimeFn) {
+      public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+          StateTag<? super K, WatermarkHoldState> address,
+          TimestampCombiner timestampCombiner) {
         throw new UnsupportedOperationException(
             String.format("%s is not supported", CombiningState.class.getSimpleName()));
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index 4f961e5..878c914 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -32,7 +32,7 @@ import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.CombineContextFactory;
 import org.apache.beam.sdk.util.state.BagState;
@@ -185,12 +185,12 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
       }
 
       @Override
-      public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
-          StateTag<? super K, WatermarkHoldState<W>> address,
-          OutputTimeFn<? super W> outputTimeFn) {
+      public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+          StateTag<? super K, WatermarkHoldState> address,
+          TimestampCombiner timestampCombiner) {
 
         return new FlinkWatermarkHoldState<>(
-            flinkStateBackend, FlinkStateInternals.this, address, namespace, outputTimeFn);
+            flinkStateBackend, FlinkStateInternals.this, address, namespace, timestampCombiner);
       }
     });
   }
@@ -912,9 +912,9 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
   }
 
   private static class FlinkWatermarkHoldState<K, W extends BoundedWindow>
-      implements WatermarkHoldState<W> {
-    private final StateTag<? super K, WatermarkHoldState<W>> address;
-    private final OutputTimeFn<? super W> outputTimeFn;
+      implements WatermarkHoldState {
+    private final StateTag<? super K, WatermarkHoldState> address;
+    private final TimestampCombiner timestampCombiner;
     private final StateNamespace namespace;
     private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
     private final FlinkStateInternals<K> flinkStateInternals;
@@ -923,11 +923,11 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
     public FlinkWatermarkHoldState(
         KeyedStateBackend<ByteBuffer> flinkStateBackend,
         FlinkStateInternals<K> flinkStateInternals,
-        StateTag<? super K, WatermarkHoldState<W>> address,
+        StateTag<? super K, WatermarkHoldState> address,
         StateNamespace namespace,
-        OutputTimeFn<? super W> outputTimeFn) {
+        TimestampCombiner timestampCombiner) {
       this.address = address;
-      this.outputTimeFn = outputTimeFn;
+      this.timestampCombiner = timestampCombiner;
       this.namespace = namespace;
       this.flinkStateBackend = flinkStateBackend;
       this.flinkStateInternals = flinkStateInternals;
@@ -937,12 +937,12 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public OutputTimeFn<? super W> getOutputTimeFn() {
-      return outputTimeFn;
+    public TimestampCombiner getTimestampCombiner() {
+      return timestampCombiner;
     }
 
     @Override
-    public WatermarkHoldState<W> readLater() {
+    public WatermarkHoldState readLater() {
       return this;
     }
 
@@ -983,7 +983,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
           state.update(value);
           flinkStateInternals.watermarkHolds.put(namespace.stringKey(), value);
         } else {
-          Instant combined = outputTimeFn.combine(current, value);
+          Instant combined = timestampCombiner.combine(current, value);
           state.update(combined);
           flinkStateInternals.watermarkHolds.put(namespace.stringKey(), combined);
         }
@@ -1035,7 +1035,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
       if (!address.equals(that.address)) {
         return false;
       }
-      if (!outputTimeFn.equals(that.outputTimeFn)) {
+      if (!timestampCombiner.equals(that.timestampCombiner)) {
         return false;
       }
       return namespace.equals(that.namespace);
@@ -1045,7 +1045,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
     @Override
     public int hashCode() {
       int result = address.hashCode();
-      result = 31 * result + outputTimeFn.hashCode();
+      result = 31 * result + timestampCombiner.hashCode();
       result = 31 * result + namespace.hashCode();
       return result;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
index d140271..17c43bf 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
@@ -77,14 +77,12 @@ public class FlinkStateInternalsTest {
           "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
   private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
       StateTags.bag("stringBag", StringUtf8Coder.of());
-  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
-      WATERMARK_EARLIEST_ADDR =
-      StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
-  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
-      WATERMARK_LATEST_ADDR =
-      StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp());
-  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR =
-      StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow());
+  private static final StateTag<Object, WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
+      StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
+  private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR =
+      StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
+  private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR =
+      StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
 
   FlinkStateInternals<String> underTest;
 
@@ -274,7 +272,7 @@ public class FlinkStateInternalsTest {
 
   @Test
   public void testWatermarkEarliestState() throws Exception {
-    WatermarkHoldState<BoundedWindow> value =
+    WatermarkHoldState value =
         underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
 
     // State instances are cached, but depend on the namespace.
@@ -298,7 +296,7 @@ public class FlinkStateInternalsTest {
 
   @Test
   public void testWatermarkLatestState() throws Exception {
-    WatermarkHoldState<BoundedWindow> value =
+    WatermarkHoldState value =
         underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
 
     // State instances are cached, but depend on the namespace.
@@ -322,7 +320,7 @@ public class FlinkStateInternalsTest {
 
   @Test
   public void testWatermarkEndOfWindowState() throws Exception {
-    WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
+    WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
 
     // State instances are cached, but depend on the namespace.
     assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
@@ -339,7 +337,7 @@ public class FlinkStateInternalsTest {
 
   @Test
   public void testWatermarkStateIsEmpty() throws Exception {
-    WatermarkHoldState<BoundedWindow> value =
+    WatermarkHoldState value =
         underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
 
     assertThat(value.isEmpty().read(), Matchers.is(true));
@@ -353,9 +351,9 @@ public class FlinkStateInternalsTest {
 
   @Test
   public void testMergeEarliestWatermarkIntoSource() throws Exception {
-    WatermarkHoldState<BoundedWindow> value1 =
+    WatermarkHoldState value1 =
         underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-    WatermarkHoldState<BoundedWindow> value2 =
+    WatermarkHoldState value2 =
         underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
 
     value1.add(new Instant(3000));
@@ -372,11 +370,11 @@ public class FlinkStateInternalsTest {
 
   @Test
   public void testMergeLatestWatermarkIntoSource() throws Exception {
-    WatermarkHoldState<BoundedWindow> value1 =
+    WatermarkHoldState value1 =
         underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
-    WatermarkHoldState<BoundedWindow> value2 =
+    WatermarkHoldState value2 =
         underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
-    WatermarkHoldState<BoundedWindow> value3 =
+    WatermarkHoldState value3 =
         underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
 
     value1.add(new Instant(3000));

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
index 725e9d3..c967521 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
@@ -34,7 +34,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.CombineFnUtil;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
@@ -166,10 +166,10 @@ class SparkStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
-        StateTag<? super K, WatermarkHoldState<W>> address,
-        OutputTimeFn<? super W> outputTimeFn) {
-      return new SparkWatermarkHoldState<>(namespace, address, outputTimeFn);
+    public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+        StateTag<? super K, WatermarkHoldState> address,
+        TimestampCombiner timestampCombiner) {
+      return new SparkWatermarkHoldState(namespace, address, timestampCombiner);
     }
   }
 
@@ -250,21 +250,21 @@ class SparkStateInternals<K> implements StateInternals<K> {
     }
   }
 
-  private class SparkWatermarkHoldState<W extends BoundedWindow>
-      extends AbstractState<Instant> implements WatermarkHoldState<W> {
+  private class SparkWatermarkHoldState extends AbstractState<Instant>
+      implements WatermarkHoldState {
 
-    private final OutputTimeFn<? super W> outputTimeFn;
+    private final TimestampCombiner timestampCombiner;
 
     public SparkWatermarkHoldState(
         StateNamespace namespace,
-        StateTag<?, WatermarkHoldState<W>> address,
-        OutputTimeFn<? super W> outputTimeFn) {
+        StateTag<?, WatermarkHoldState> address,
+        TimestampCombiner timestampCombiner) {
       super(namespace, address, InstantCoder.of());
-      this.outputTimeFn = outputTimeFn;
+      this.timestampCombiner = timestampCombiner;
     }
 
     @Override
-    public SparkWatermarkHoldState<W> readLater() {
+    public SparkWatermarkHoldState readLater() {
       return this;
     }
 
@@ -276,7 +276,10 @@ class SparkStateInternals<K> implements StateInternals<K> {
     @Override
     public void add(Instant outputTime) {
       Instant combined = read();
-      combined = (combined == null) ? outputTime : outputTimeFn.combine(combined, outputTime);
+      combined =
+          (combined == null)
+              ? outputTime
+              : getTimestampCombiner().combine(combined, outputTime);
       writeValue(combined);
     }
 
@@ -295,8 +298,8 @@ class SparkStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public OutputTimeFn<? super W> getOutputTimeFn() {
-      return outputTimeFn;
+    public TimestampCombiner getTimestampCombiner() {
+      return timestampCombiner;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
index fa1c3fc..7d06d6b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
@@ -50,7 +50,7 @@ import org.apache.beam.sdk.values.TupleTag;
 public class SparkAbstractCombineFn implements Serializable {
   protected final SparkRuntimeContext runtimeContext;
   protected final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs;
-  protected final WindowingStrategy<?, ?> windowingStrategy;
+  protected final WindowingStrategy<?, BoundedWindow> windowingStrategy;
 
 
   public SparkAbstractCombineFn(
@@ -59,7 +59,7 @@ public class SparkAbstractCombineFn implements Serializable {
       WindowingStrategy<?, ?> windowingStrategy) {
     this.runtimeContext = runtimeContext;
     this.sideInputs = sideInputs;
-    this.windowingStrategy = windowingStrategy;
+    this.windowingStrategy = (WindowingStrategy<?, BoundedWindow>) windowingStrategy;
   }
 
   // each Spark task should get it's own copy of this SparkKeyedCombineFn, and since Spark tasks

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
index 23f5d20..7d026c6 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
@@ -29,8 +29,9 @@ import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
@@ -70,9 +71,8 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
     // sort exploded inputs.
     Iterable<WindowedValue<InputT>> sortedInputs = sortByWindows(input.explodeWindows());
 
-    @SuppressWarnings("unchecked")
-    OutputTimeFn<? super BoundedWindow> outputTimeFn =
-        (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
+    TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
+    WindowFn<?, BoundedWindow> windowFn = windowingStrategy.getWindowFn();
 
     //--- inputs iterator, by window order.
     final Iterator<WindowedValue<InputT>> iterator = sortedInputs.iterator();
@@ -84,9 +84,13 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
     accumulator = combineFn.addInput(accumulator, currentInput.getValue(),
         ctxtForInput(currentInput));
 
-    // keep track of the timestamps assigned by the OutputTimeFn.
+    // keep track of the timestamps assigned by the TimestampCombiner.
     Instant windowTimestamp =
-        outputTimeFn.assignOutputTime(currentInput.getTimestamp(), currentWindow);
+        timestampCombiner.assign(
+            currentWindow,
+            windowingStrategy
+                .getWindowFn()
+                .getOutputTime(currentInput.getTimestamp(), currentWindow));
 
     // accumulate the next windows, or output.
     List<WindowedValue<AccumT>> output = Lists.newArrayList();
@@ -109,8 +113,13 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
         // keep accumulating and carry on ;-)
         accumulator = combineFn.addInput(accumulator, nextValue.getValue(),
             ctxtForInput(nextValue));
-        windowTimestamp = outputTimeFn.combine(windowTimestamp,
-            outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow));
+        windowTimestamp =
+            timestampCombiner.merge(
+                currentWindow,
+                windowTimestamp,
+                windowingStrategy
+                    .getWindowFn()
+                    .getOutputTime(nextValue.getTimestamp(), currentWindow));
       } else {
         // moving to the next window, first add the current accumulation to output
         // and initialize the accumulator.
@@ -121,7 +130,8 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
         accumulator = combineFn.addInput(accumulator, nextValue.getValue(),
             ctxtForInput(nextValue));
         currentWindow = nextWindow;
-        windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
+        windowTimestamp = timestampCombiner.assign(currentWindow,
+            windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow));
       }
     }
 
@@ -162,8 +172,7 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
     Iterable<WindowedValue<AccumT>> sortedAccumulators = sortByWindows(accumulators);
 
     @SuppressWarnings("unchecked")
-    OutputTimeFn<? super BoundedWindow> outputTimeFn =
-        (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
+    TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
 
     //--- accumulators iterator, by window order.
     final Iterator<WindowedValue<AccumT>> iterator = sortedAccumulators.iterator();
@@ -174,7 +183,7 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
     List<AccumT> currentWindowAccumulators = Lists.newArrayList();
     currentWindowAccumulators.add(currentValue.getValue());
 
-    // keep track of the timestamps assigned by the OutputTimeFn,
+    // keep track of the timestamps assigned by the TimestampCombiner,
     // in createCombiner we already merge the timestamps assigned
     // to individual elements, here we will just merge them.
     List<Instant> windowTimestamps = Lists.newArrayList();
@@ -206,7 +215,7 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
         // add the current accumulation to the output and initialize the accumulation.
 
         // merge the timestamps of all accumulators to merge.
-        Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps);
+        Instant mergedTimestamp = timestampCombiner.merge(currentWindow, windowTimestamps);
 
         // merge accumulators.
         // transforming a KV<K, Iterable<AccumT>> into a KV<K, Iterable<AccumT>>.
@@ -231,7 +240,7 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
     }
 
     // merge the last chunk of accumulators.
-    Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps);
+    Instant mergedTimestamp = timestampCombiner.merge(currentWindow, windowTimestamps);
     Iterable<AccumT> accumsToMerge = Iterables.unmodifiableIterable(currentWindowAccumulators);
     WindowedValue<Iterable<AccumT>> preMergeWindowedValue = WindowedValue.of(
         accumsToMerge, mergedTimestamp, currentWindow, PaneInfo.NO_FIRING);

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
index b5d243f..66c03bc 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
@@ -29,8 +29,9 @@ import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
@@ -72,9 +73,8 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
     // sort exploded inputs.
     Iterable<WindowedValue<KV<K, InputT>>> sortedInputs = sortByWindows(wkvi.explodeWindows());
 
-    @SuppressWarnings("unchecked")
-    OutputTimeFn<? super BoundedWindow> outputTimeFn =
-        (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
+    TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
+    WindowFn<?, BoundedWindow> windowFn = windowingStrategy.getWindowFn();
 
     //--- inputs iterator, by window order.
     final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInputs.iterator();
@@ -87,9 +87,13 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
     accumulator = combineFn.addInput(key, accumulator, currentInput.getValue().getValue(),
         ctxtForInput(currentInput));
 
-    // keep track of the timestamps assigned by the OutputTimeFn.
+    // keep track of the timestamps assigned by the TimestampCombiner.
     Instant windowTimestamp =
-        outputTimeFn.assignOutputTime(currentInput.getTimestamp(), currentWindow);
+        timestampCombiner.assign(
+            currentWindow,
+            windowingStrategy
+                .getWindowFn()
+                .getOutputTime(currentInput.getTimestamp(), currentWindow));
 
     // accumulate the next windows, or output.
     List<WindowedValue<KV<K, AccumT>>> output = Lists.newArrayList();
@@ -112,8 +116,12 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
         // keep accumulating and carry on ;-)
         accumulator = combineFn.addInput(key, accumulator, nextValue.getValue().getValue(),
             ctxtForInput(nextValue));
-        windowTimestamp = outputTimeFn.combine(windowTimestamp,
-            outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow));
+        windowTimestamp =
+            timestampCombiner.combine(
+                windowTimestamp,
+                timestampCombiner.assign(
+                    currentWindow,
+                    windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow)));
       } else {
         // moving to the next window, first add the current accumulation to output
         // and initialize the accumulator.
@@ -124,7 +132,9 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
         accumulator = combineFn.addInput(key, accumulator, nextValue.getValue().getValue(),
             ctxtForInput(nextValue));
         currentWindow = nextWindow;
-        windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
+        windowTimestamp =
+            timestampCombiner.assign(
+                currentWindow, windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow));
       }
     }
 
@@ -170,8 +180,7 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
     Iterable<WindowedValue<KV<K, AccumT>>> sortedAccumulators = sortByWindows(accumulators);
 
     @SuppressWarnings("unchecked")
-    OutputTimeFn<? super BoundedWindow> outputTimeFn =
-        (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
+    TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
 
     //--- accumulators iterator, by window order.
     final Iterator<WindowedValue<KV<K, AccumT>>> iterator = sortedAccumulators.iterator();
@@ -183,7 +192,7 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
     List<AccumT> currentWindowAccumulators = Lists.newArrayList();
     currentWindowAccumulators.add(currentValue.getValue().getValue());
 
-    // keep track of the timestamps assigned by the OutputTimeFn,
+    // keep track of the timestamps assigned by the TimestampCombiner,
     // in createCombiner we already merge the timestamps assigned
     // to individual elements, here we will just merge them.
     List<Instant> windowTimestamps = Lists.newArrayList();
@@ -215,7 +224,7 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
         // add the current accumulation to the output and initialize the accumulation.
 
         // merge the timestamps of all accumulators to merge.
-        Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps);
+        Instant mergedTimestamp = timestampCombiner.merge(currentWindow, windowTimestamps);
 
         // merge accumulators.
         // transforming a KV<K, Iterable<AccumT>> into a KV<K, Iterable<AccumT>>.
@@ -241,7 +250,7 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
     }
 
     // merge the last chunk of accumulators.
-    Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps);
+    Instant mergedTimestamp = timestampCombiner.merge(currentWindow, windowTimestamps);
     Iterable<AccumT> accumsToMerge = Iterables.unmodifiableIterable(currentWindowAccumulators);
     WindowedValue<KV<K, Iterable<AccumT>>> preMergeWindowedValue = WindowedValue.of(
         KV.of(key, accumsToMerge), mergedTimestamp, currentWindow, PaneInfo.NO_FIRING);

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 6c46453..58b5a84 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -135,11 +135,6 @@
 
   <dependencies>
     <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-common-runner-api</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>com.google.http-client</groupId>
       <artifactId>google-http-client</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
index 63e7903..e8c2f8d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
@@ -17,11 +17,14 @@
  */
 package org.apache.beam.sdk.testing;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -35,8 +38,7 @@ import javax.annotation.Nullable;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.joda.time.Instant;
 import org.joda.time.ReadableInstant;
@@ -252,20 +254,19 @@ public class WindowFnTestUtils {
 
   /**
    * Verifies that later-ending merged windows from any of the timestamps hold up output of
-   * earlier-ending windows, using the provided {@link WindowFn} and {@link OutputTimeFn}.
+   * earlier-ending windows, using the provided {@link WindowFn} and {@link TimestampCombiner}.
    *
    * <p>Given a list of lists of timestamps, where each list is expected to merge into a single
    * window with end times in ascending order, assigns and merges windows for each list (as though
-   * each were a separate key/user session). Then maps each timestamp in the list according to
-   * {@link OutputTimeFn#assignOutputTime outputTimeFn.assignOutputTime()} and
-   * {@link OutputTimeFn#combine outputTimeFn.combine()}.
+   * each were a separate key/user session). Then combines each timestamp in the list according to
+   * the provided {@link TimestampCombiner}.
    *
    * <p>Verifies that a overlapping windows do not hold each other up via the watermark.
    */
   public static <T, W extends IntervalWindow>
   void validateGetOutputTimestamps(
       WindowFn<T, W> windowFn,
-      OutputTimeFn<? super W> outputTimeFn,
+      TimestampCombiner timestampCombiner,
       List<List<Long>> timestampsPerWindow) throws Exception {
 
     // Assign windows to each timestamp, then merge them, storing the merged windows in
@@ -300,10 +301,11 @@ public class WindowFnTestUtils {
 
       List<Instant> outputInstants = new ArrayList<>();
       for (long inputTimestamp : timestampsForWindow) {
-        outputInstants.add(outputTimeFn.assignOutputTime(new Instant(inputTimestamp), window));
+        outputInstants.add(
+            assignOutputTime(timestampCombiner, new Instant(inputTimestamp), window));
       }
 
-      combinedOutputTimestamps.add(OutputTimeFns.combineOutputTimes(outputTimeFn, outputInstants));
+      combinedOutputTimestamps.add(combineOutputTimes(timestampCombiner, outputInstants));
     }
 
     // Consider windows in increasing order of max timestamp; ensure the output timestamp is after
@@ -321,4 +323,37 @@ public class WindowFnTestUtils {
       earlierEndingWindow = window;
     }
   }
+
+  private static Instant assignOutputTime(
+      TimestampCombiner timestampCombiner, Instant inputTimestamp, BoundedWindow window) {
+    switch (timestampCombiner) {
+      case EARLIEST:
+      case LATEST:
+        return inputTimestamp;
+      case END_OF_WINDOW:
+        return window.maxTimestamp();
+      default:
+        throw new IllegalArgumentException(
+            String.format("Unknown %s: %s", TimestampCombiner.class, timestampCombiner));
+    }
+  }
+
+  private static Instant combineOutputTimes(
+      TimestampCombiner timestampCombiner, Iterable<Instant> outputInstants) {
+    checkArgument(
+        !Iterables.isEmpty(outputInstants),
+        "Cannot combine zero instants with %s",
+        timestampCombiner);
+    switch(timestampCombiner) {
+      case EARLIEST:
+        return Ordering.natural().min(outputInstants);
+      case LATEST:
+        return Ordering.natural().max(outputInstants);
+      case END_OF_WINDOW:
+        return outputInstants.iterator().next();
+      default:
+        throw new IllegalArgumentException(
+            String.format("Unknown %s: %s", TimestampCombiner.class, timestampCombiner));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
index cc92102..d9c4c9f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -97,7 +98,7 @@ import org.apache.beam.sdk.values.PCollection.IsBounded;
  * for details on the estimation.
  *
  * <p>The timestamp for each emitted pane is determined by the
- * {@link Window#withOutputTimeFn windowing operation}.
+ * {@link Window#withTimestampCombiner(TimestampCombiner)} windowing operation}.
  * The output {@code PCollection} will have the same {@link WindowFn}
  * as the input.
  *

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java
deleted file mode 100644
index 0efd278..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import com.google.common.collect.Ordering;
-import java.io.Serializable;
-import java.util.Objects;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.joda.time.Instant;
-
-/**
- * <b><i>(Experimental)</i></b> A function from timestamps of input values to the timestamp for a
- * computed value.
- *
- * <p>The function is represented via three components:
- * <ol>
- *   <li>{@link #assignOutputTime} calculates an output timestamp for any input
- *       value in a particular window.</li>
- *   <li>The output timestamps for all non-late input values within a window are combined
- *       according to {@link #combine combine()}, a commutative and associative operation on
- *       the output timestamps.</li>
- *   <li>The output timestamp when windows merge is provided by {@link #merge merge()}.</li>
- * </ol>
- *
- * <p>This abstract class cannot be subclassed directly, by design: it may grow
- * in consumer-compatible ways that require mutually-exclusive default implementations. To
- * create a concrete subclass, extend {@link OutputTimeFn.Defaults} or
- * {@link OutputTimeFn.DependsOnlyOnWindow}. Note that as long as this class remains
- * experimental, we may also choose to change it in arbitrary backwards-incompatible ways.
- *
- * @param <W> the type of window. Contravariant: methods accepting any subtype of
- * {@code OutputTimeFn<W>} should use the parameter type {@code OutputTimeFn<? super W>}.
- */
-@Experimental(Experimental.Kind.OUTPUT_TIME)
-public abstract class OutputTimeFn<W extends BoundedWindow> implements Serializable {
-
-  protected OutputTimeFn() { }
-
-  /**
-   * Returns the output timestamp to use for data depending on the given
-   * {@code inputTimestamp} in the specified {@code window}.
-   *
-   * <p>The result of this method must be between {@code inputTimestamp} and
-   * {@code window.maxTimestamp()} (inclusive on both sides).
-   *
-   * <p>This function must be monotonic across input timestamps. Specifically, if {@code A < B},
-   * then {@code assignOutputTime(A, window) <= assignOutputTime(B, window)}.
-   *
-   * <p>For a {@link WindowFn} that doesn't produce overlapping windows, this can (and typically
-   * should) just return {@code inputTimestamp}. In the presence of overlapping windows, it is
-   * suggested that the result in later overlapping windows is past the end of earlier windows
-   * so that the later windows don't prevent the watermark from
-   * progressing past the end of the earlier window.
-   *
-   * <p>See the overview of {@link OutputTimeFn} for the consistency properties required
-   * between {@link #assignOutputTime}, {@link #combine}, and {@link #merge}.
-   */
-  public abstract Instant assignOutputTime(Instant inputTimestamp, W window);
-
-  /**
-   * Combines the given output times, which must be from the same window, into an output time
-   * for a computed value.
-   *
-   * <ul>
-   *   <li>{@code combine} must be commutative: {@code combine(a, b).equals(combine(b, a))}.</li>
-   *   <li>{@code combine} must be associative:
-   *       {@code combine(a, combine(b, c)).equals(combine(combine(a, b), c))}.</li>
-   * </ul>
-   */
-  public abstract Instant combine(Instant outputTime, Instant otherOutputTime);
-
-  /**
-   * Merges the given output times, presumed to be combined output times for windows that
-   * are merging, into an output time for the {@code resultWindow}.
-   *
-   * <p>When windows {@code w1} and {@code w2} merge to become a new window {@code w1plus2},
-   * then {@link #merge} must be implemented such that the output time is the same as
-   * if all timestamps were assigned in {@code w1plus2}. Formally:
-   *
-   * <p>{@code fn.merge(w, fn.assignOutputTime(t1, w1), fn.assignOutputTime(t2, w2))}
-   *
-   * <p>must be equal to
-   *
-   * <p>{@code fn.combine(fn.assignOutputTime(t1, w1plus2), fn.assignOutputTime(t2, w1plus2))}
-   *
-   * <p>If the assigned time depends only on the window, the correct implementation of
-   * {@link #merge merge()} necessarily returns the result of
-   * {@link #assignOutputTime assignOutputTime(t1, w1plus2)}
-   * (which equals {@link #assignOutputTime assignOutputTime(t2, w1plus2)}.
-   * Defaults for this case are provided by {@link DependsOnlyOnWindow}.
-   *
-   * <p>For many other {@link OutputTimeFn} implementations, such as taking the earliest or latest
-   * timestamp, this will be the same as {@link #combine combine()}. Defaults for this
-   * case are provided by {@link Defaults}.
-   */
-  public abstract Instant merge(W intoWindow, Iterable<? extends Instant> mergingTimestamps);
-
-  /**
-   * Returns {@code true} if the result of combination of many output timestamps actually depends
-   * only on the earliest.
-   *
-   * <p>This may allow optimizations when it is very efficient to retrieve the earliest timestamp
-   * to be combined.
-   */
-  public abstract boolean dependsOnlyOnEarliestInputTimestamp();
-
-  /**
-   * Returns {@code true} if the result does not depend on what outputs were combined but only
-   * the window they are in. The canonical example is if all timestamps are sure to
-   * be the end of the window.
-   *
-   * <p>This may allow optimizations, since it is typically very efficient to retrieve the window
-   * and combining output timestamps is not necessary.
-   *
-   * <p>If the assigned output time for an implementation depends only on the window, consider
-   * extending {@link DependsOnlyOnWindow}, which returns {@code true} here and also provides
-   * a framework for easily implementing a correct {@link #merge}, {@link #combine} and
-   * {@link #assignOutputTime}.
-   */
-  public abstract boolean dependsOnlyOnWindow();
-
-  /**
-   * <b><i>(Experimental)</i></b> Default method implementations for {@link OutputTimeFn} where the
-   * output time depends on the input element timestamps and possibly the window.
-   *
-   * <p>To complete an implementation, override {@link #assignOutputTime}, at a minimum.
-   *
-   * <p>By default, {@link #combine} and {@link #merge} return the earliest timestamp of their
-   * inputs.
-   */
-  public abstract static class Defaults<W extends BoundedWindow> extends OutputTimeFn<W> {
-
-    protected Defaults() {
-      super();
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @return the earlier of the two timestamps.
-     */
-    @Override
-    public Instant combine(Instant outputTimestamp, Instant otherOutputTimestamp) {
-      return Ordering.natural().min(outputTimestamp, otherOutputTimestamp);
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @return the result of {@link #combine combine(outputTimstamp, otherOutputTimestamp)},
-     * by default.
-     */
-    @Override
-    public Instant merge(W resultWindow, Iterable<? extends Instant> mergingTimestamps) {
-      return OutputTimeFns.combineOutputTimes(this, mergingTimestamps);
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @return {@code false} by default. An {@link OutputTimeFn} that is known to depend only on the
-     * window should extend {@link OutputTimeFn.DependsOnlyOnWindow}.
-     */
-    @Override
-    public boolean dependsOnlyOnWindow() {
-      return false;
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @return {@code true} by default.
-     */
-    @Override
-    public boolean dependsOnlyOnEarliestInputTimestamp() {
-      return false;
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @return {@code true} if the two {@link OutputTimeFn} instances have the same class, by
-     *         default.
-     */
-    @Override
-    public boolean equals(Object other) {
-      if (other == null) {
-        return false;
-      }
-
-      return this.getClass().equals(other.getClass());
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(getClass());
-    }
-  }
-
-  /**
-   * <b><i>(Experimental)</i></b> Default method implementations for {@link OutputTimeFn} when the
-   * output time depends only on the window.
-   *
-   * <p>To complete an implementation, override {@link #assignOutputTime(BoundedWindow)}.
-   */
-  public abstract static class DependsOnlyOnWindow<W extends BoundedWindow>
-      extends OutputTimeFn<W> {
-
-    protected DependsOnlyOnWindow() {
-      super();
-    }
-
-    /**
-     * Returns the output timestamp to use for data in the specified {@code window}.
-     *
-     * <p>Note that the result of this method must be between the maximum possible input timestamp
-     * in {@code window} and {@code window.maxTimestamp()} (inclusive on both sides).
-     *
-     * <p>For example, using {@code Sessions.withGapDuration(gapDuration)}, we know that all input
-     * timestamps must lie at least {@code gapDuration} from the end of the session, so
-     * {@code window.maxTimestamp() - gapDuration} is an acceptable assigned timestamp.
-     *
-     * @see #assignOutputTime(Instant, BoundedWindow)
-     */
-    protected abstract Instant assignOutputTime(W window);
-
-    /**
-     * {@inheritDoc}
-     *
-     * @return the result of {#link assignOutputTime(BoundedWindow) assignOutputTime(window)}.
-     */
-    @Override
-    public final Instant assignOutputTime(Instant timestamp, W window) {
-      return assignOutputTime(window);
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @return the same timestamp as both argument timestamps, which are necessarily equal.
-     */
-    @Override
-    public final Instant combine(Instant outputTimestamp, Instant otherOutputTimestamp) {
-      return outputTimestamp;
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @return the result of
-     * {@link #assignOutputTime(BoundedWindow) assignOutputTime(resultWindow)}.
-     */
-    @Override
-    public final Instant merge(W resultWindow, Iterable<? extends Instant> mergingTimestamps) {
-      return assignOutputTime(resultWindow);
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @return {@code true}.
-     */
-    @Override
-    public final boolean dependsOnlyOnWindow() {
-      return true;
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @return {@code true}. Since the output time depends only on the window, it can
-     * certainly be ascertained given a single input timestamp.
-     */
-    @Override
-    public final boolean dependsOnlyOnEarliestInputTimestamp() {
-      return true;
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @return {@code true} if the two {@link OutputTimeFn} instances have the same class, by
-     *         default.
-     */
-    @Override
-    public boolean equals(Object other) {
-      if (other == null) {
-        return false;
-      }
-
-      return this.getClass().equals(other.getClass());
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(getClass());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java
deleted file mode 100644
index b5d67fa..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.joda.time.Instant;
-
-/**
- * <b><i>(Experimental)</i></b> Static utility methods and provided implementations for
- * {@link OutputTimeFn}.
- */
-@Experimental(Experimental.Kind.OUTPUT_TIME)
-public class OutputTimeFns {
-  /**
-   * The policy of outputting at the earliest of the input timestamps for non-late input data
-   * that led to a computed value.
-   *
-   * <p>For example, suppose <i>v</i><sub>1</sub> through <i>v</i><sub>n</sub> are all on-time
-   * elements being aggregated via some function {@code f} into
-   * {@code f}(<i>v</i><sub>1</sub>, ..., <i>v</i><sub>n</sub>. When emitted, the output
-   * timestamp of the result will be the earliest of the event time timestamps
-   *
-   * <p>If data arrives late, it has no effect on the output timestamp.
-   */
-  public static OutputTimeFn<BoundedWindow> outputAtEarliestInputTimestamp() {
-    return new OutputAtEarliestInputTimestamp();
-  }
-
-  /**
-   * The policy of holding the watermark to the latest of the input timestamps
-   * for non-late input data that led to a computed value.
-   *
-   * <p>For example, suppose <i>v</i><sub>1</sub> through <i>v</i><sub>n</sub> are all on-time
-   * elements being aggregated via some function {@code f} into
-   * {@code f}(<i>v</i><sub>1</sub>, ..., <i>v</i><sub>n</sub>. When emitted, the output
-   * timestamp of the result will be the latest of the event time timestamps
-   *
-   * <p>If data arrives late, it has no effect on the output timestamp.
-   */
-  public static OutputTimeFn<BoundedWindow> outputAtLatestInputTimestamp() {
-    return new OutputAtLatestInputTimestamp();
-  }
-
-  /**
-   * The policy of outputting with timestamps at the end of the window.
-   *
-   * <p>Note that this output timestamp depends only on the window. See
-   * {#link dependsOnlyOnWindow()}.
-   *
-   * <p>When windows merge, instead of using {@link OutputTimeFn#combine} to obtain an output
-   * timestamp for the results in the new window, it is mandatory to obtain a new output
-   * timestamp from {@link OutputTimeFn#assignOutputTime} with the new window and an arbitrary
-   * timestamp (because it is guaranteed that the timestamp is irrelevant).
-   *
-   * <p>For non-merging window functions, this {@link OutputTimeFn} works transparently.
-   */
-  public static OutputTimeFn<BoundedWindow> outputAtEndOfWindow() {
-    return new OutputAtEndOfWindow();
-  }
-
-  /**
-   * Applies the given {@link OutputTimeFn} to the given output times, obtaining
-   * the output time for a value computed. See {@link OutputTimeFn#combine} for
-   * a full specification.
-   *
-   * @throws IllegalArgumentException if {@code outputTimes} is empty.
-   */
-  public static Instant combineOutputTimes(
-      OutputTimeFn<?> outputTimeFn, Iterable<? extends Instant> outputTimes) {
-    checkArgument(
-        !Iterables.isEmpty(outputTimes),
-        "Collection of output times must not be empty in %s.combineOutputTimes",
-        OutputTimeFns.class.getName());
-
-    @Nullable
-    Instant combinedOutputTime = null;
-    for (Instant outputTime : outputTimes) {
-      combinedOutputTime =
-          combinedOutputTime == null
-              ? outputTime : outputTimeFn.combine(combinedOutputTime, outputTime);
-    }
-    return combinedOutputTime;
-  }
-
-  /**
-   * See {@link #outputAtEarliestInputTimestamp}.
-   */
-  private static class OutputAtEarliestInputTimestamp extends OutputTimeFn.Defaults<BoundedWindow> {
-    @Override
-    public Instant assignOutputTime(Instant inputTimestamp, BoundedWindow window) {
-      return inputTimestamp;
-    }
-
-    @Override
-    public Instant combine(Instant outputTime, Instant otherOutputTime) {
-      return Ordering.natural().min(outputTime, otherOutputTime);
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @return {@code true}. The result of any combine will be the earliest input timestamp.
-     */
-    @Override
-    public boolean dependsOnlyOnEarliestInputTimestamp() {
-      return true;
-    }
-  }
-
-  /**
-   * See {@link #outputAtLatestInputTimestamp}.
-   */
-  private static class OutputAtLatestInputTimestamp extends OutputTimeFn.Defaults<BoundedWindow> {
-    @Override
-    public Instant assignOutputTime(Instant inputTimestamp, BoundedWindow window) {
-      return inputTimestamp;
-    }
-
-    @Override
-    public Instant combine(Instant outputTime, Instant otherOutputTime) {
-      return Ordering.natural().max(outputTime, otherOutputTime);
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @return {@code false}.
-     */
-    @Override
-    public boolean dependsOnlyOnEarliestInputTimestamp() {
-      return false;
-    }
-  }
-
-  private static class OutputAtEndOfWindow extends OutputTimeFn.DependsOnlyOnWindow<BoundedWindow> {
-
-    /**
-     *{@inheritDoc}
-     *
-     *@return {@code window.maxTimestamp()}.
-     */
-    @Override
-    protected Instant assignOutputTime(BoundedWindow window) {
-      return window.maxTimestamp();
-    }
-
-    @Override
-    public String toString() {
-      return getClass().getCanonicalName();
-    }
-  }
-
-  public static RunnerApi.OutputTime toProto(OutputTimeFn<?> outputTimeFn) {
-    if (outputTimeFn instanceof OutputAtEarliestInputTimestamp) {
-      return RunnerApi.OutputTime.EARLIEST_IN_PANE;
-    } else if (outputTimeFn instanceof OutputAtLatestInputTimestamp) {
-      return RunnerApi.OutputTime.LATEST_IN_PANE;
-    } else if (outputTimeFn instanceof OutputAtEndOfWindow) {
-      return RunnerApi.OutputTime.END_OF_WINDOW;
-    } else {
-      throw new IllegalArgumentException(
-          String.format(
-              "Cannot convert %s to %s: %s",
-              OutputTimeFn.class.getCanonicalName(),
-              RunnerApi.OutputTime.class.getCanonicalName(),
-              outputTimeFn));
-    }
-  }
-
-  public static OutputTimeFn<?> fromProto(RunnerApi.OutputTime proto) {
-    switch (proto) {
-      case EARLIEST_IN_PANE:
-        return OutputTimeFns.outputAtEarliestInputTimestamp();
-      case LATEST_IN_PANE:
-        return OutputTimeFns.outputAtLatestInputTimestamp();
-      case END_OF_WINDOW:
-        return OutputTimeFns.outputAtEndOfWindow();
-      case UNRECOGNIZED:
-      default:
-        // Whether or not it is proto that cannot recognize it (due to the version of the
-        // generated code we link to) or the switch hasn't been updated to handle it,
-        // the situation is the same: we don't know what this OutputTime means
-        throw new IllegalArgumentException(
-            String.format(
-                "Cannot convert unknown %s to %s: %s",
-                RunnerApi.OutputTime.class.getCanonicalName(),
-                OutputTimeFn.class.getCanonicalName(),
-                proto));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java
new file mode 100644
index 0000000..39fe8a9
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+import java.util.Arrays;
+import java.util.Collections;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.joda.time.Instant;
+
+/**
+ * Policies for combining timestamps that occur within a window.
+ */
+@Experimental(Experimental.Kind.OUTPUT_TIME)
+public enum TimestampCombiner {
+  /**
+   * The policy of taking at the earliest of a set of timestamps.
+   *
+   * <p>When used in windowed aggregations, the timestamps of non-late inputs will be combined after
+   * they are shifted by the {@link WindowFn} (to allow downstream watermark progress).
+   *
+   * <p>If data arrives late, it has no effect on the output timestamp.
+   */
+  EARLIEST {
+    @Override
+    public Instant combine(Iterable<? extends Instant> timestamps) {
+      return Ordering.natural().min(timestamps);
+    }
+
+    @Override
+    public Instant merge(BoundedWindow intoWindow, Iterable<? extends Instant> mergingTimestamps) {
+      return combine(mergingTimestamps);
+    }
+
+    @Override
+    public boolean dependsOnlyOnEarliestTimestamp() {
+      return true;
+    }
+
+    @Override
+    public boolean dependsOnlyOnWindow() {
+      return false;
+    }
+  },
+
+  /**
+   * The policy of taking the latest of a set of timestamps.
+   *
+   * <p>When used in windowed aggregations, the timestamps of non-late inputs will be combined after
+   * they are shifted by the {@link WindowFn} (to allow downstream watermark progress).
+   *
+   * <p>If data arrives late, it has no effect on the output timestamp.
+   */
+  LATEST {
+    @Override
+    public Instant combine(Iterable<? extends Instant> timestamps) {
+      return Ordering.natural().max(timestamps);
+    }
+
+    @Override
+    public Instant merge(BoundedWindow intoWindow, Iterable<? extends Instant> mergingTimestamps) {
+      return combine(mergingTimestamps);
+    }
+
+    @Override
+    public boolean dependsOnlyOnEarliestTimestamp() {
+      return false;
+    }
+
+    @Override
+    public boolean dependsOnlyOnWindow() {
+      return false;
+    }
+  },
+
+  /**
+   * The policy of using the end of the window, regardless of input timestamps.
+   *
+   * <p>When used in windowed aggregations, the timestamps of non-late inputs will be combined after
+   * they are shifted by the {@link WindowFn} (to allow downstream watermark progress).
+   *
+   * <p>If data arrives late, it has no effect on the output timestamp.
+   */
+  END_OF_WINDOW {
+    @Override
+    public Instant combine(Iterable<? extends Instant> timestamps) {
+      checkArgument(Iterables.size(timestamps) > 0);
+      return Iterables.get(timestamps, 0);
+    }
+
+    @Override
+    public Instant merge(BoundedWindow intoWindow, Iterable<? extends Instant> mergingTimestamps) {
+      return intoWindow.maxTimestamp();
+    }
+
+    @Override
+    public boolean dependsOnlyOnEarliestTimestamp() {
+      return false;
+    }
+
+    @Override
+    public boolean dependsOnlyOnWindow() {
+      return true;
+    }
+  };
+
+  /**
+   * Combines the given times, which must be from the same window and must have been passed through
+   * {@link #merge}.
+   *
+   * <ul>
+   * <li>{@code combine} must be commutative: {@code combine(a, b).equals(combine(b, a))}.
+   * <li>{@code combine} must be associative: {@code combine(a, combine(b,
+   *     c)).equals(combine(combine(a, b), c))}.
+   * </ul>
+   */
+  public abstract Instant combine(Iterable<? extends Instant> timestamps);
+
+  /**
+   * Merges the given timestamps, which may have originated in separate windows, into the context of
+   * the result window.
+   */
+  public abstract Instant merge(
+      BoundedWindow intoWindow, Iterable<? extends Instant> mergingTimestamps);
+
+  /**
+   * Shorthand for {@link #merge} with just one element, to place it into the context of
+   * a window.
+   *
+   * <p>For example, the {@link #END_OF_WINDOW} policy moves the timestamp to the end of the window.
+   */
+  public final Instant assign(BoundedWindow intoWindow, Instant timestamp) {
+    return merge(intoWindow, Collections.singleton(timestamp));
+  }
+
+  /**
+   * Varargs variant of {@link #combine}.
+   */
+  public final Instant combine(Instant... timestamps) {
+    return combine(Arrays.asList(timestamps));
+  }
+
+  /**
+   * Varargs variant of {@link #merge}.
+   */
+  public final Instant merge(BoundedWindow intoWindow, Instant... timestamps) {
+    return merge(intoWindow, Arrays.asList(timestamps));
+  }
+
+  /**
+   * Returns {@code true} if the result of combination of many output timestamps actually depends
+   * only on the earliest.
+   *
+   * <p>This may allow optimizations when it is very efficient to retrieve the earliest timestamp
+   * to be combined.
+   */
+  public abstract boolean dependsOnlyOnEarliestTimestamp();
+
+  /**
+   * Returns {@code true} if the result does not depend on what outputs were combined but only
+   * the window they are in. The canonical example is if all timestamps are sure to
+   * be the end of the window.
+   *
+   * <p>This may allow optimizations, since it is typically very efficient to retrieve the window
+   * and combining output timestamps is not necessary.
+   */
+  public abstract boolean dependsOnlyOnWindow();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index 1000ff7..cb7b430 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -193,7 +193,7 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
   @Nullable abstract AccumulationMode getAccumulationMode();
   @Nullable abstract Duration getAllowedLateness();
   @Nullable abstract ClosingBehavior getClosingBehavior();
-  @Nullable abstract OutputTimeFn<?> getOutputTimeFn();
+  @Nullable abstract TimestampCombiner getTimestampCombiner();
 
   abstract Builder<T> toBuilder();
 
@@ -204,7 +204,7 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
     abstract Builder<T> setAccumulationMode(AccumulationMode mode);
     abstract Builder<T> setAllowedLateness(Duration allowedLateness);
     abstract Builder<T> setClosingBehavior(ClosingBehavior closingBehavior);
-    abstract Builder<T> setOutputTimeFn(OutputTimeFn<?> outputTimeFn);
+    abstract Builder<T> setTimestampCombiner(TimestampCombiner timestampCombiner);
 
     abstract Window<T> build();
   }
@@ -273,12 +273,12 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
   }
 
   /**
-   * <b><i>(Experimental)</i></b> Override the default {@link OutputTimeFn}, to control
+   * <b><i>(Experimental)</i></b> Override the default {@link TimestampCombiner}, to control
    * the output timestamp of values output from a {@link GroupByKey} operation.
    */
   @Experimental(Kind.OUTPUT_TIME)
-  public Window<T> withOutputTimeFn(OutputTimeFn<?> outputTimeFn) {
-    return toBuilder().setOutputTimeFn(outputTimeFn).build();
+  public Window<T> withTimestampCombiner(TimestampCombiner timestampCombiner) {
+    return toBuilder().setTimestampCombiner(timestampCombiner).build();
   }
 
   /**
@@ -300,8 +300,6 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
    * Get the output strategy of this {@link Window Window PTransform}. For internal use
    * only.
    */
-  // Rawtype cast of OutputTimeFn cannot be eliminated with intermediate variable, as it is
-  // casting between wildcards
   public WindowingStrategy<?, ?> getOutputStrategyInternal(
       WindowingStrategy<?, ?> inputStrategy) {
     WindowingStrategy<?, ?> result = inputStrategy;
@@ -320,8 +318,8 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
     if (getClosingBehavior() != null) {
       result = result.withClosingBehavior(getClosingBehavior());
     }
-    if (getOutputTimeFn() != null) {
-      result = result.withOutputTimeFn(getOutputTimeFn());
+    if (getTimestampCombiner() != null) {
+      result = result.withTimestampCombiner(getTimestampCombiner());
     }
     return result;
   }
@@ -411,9 +409,9 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
         .withLabel("Window Closing Behavior"));
     }
 
-    if (getOutputTimeFn() != null) {
-      builder.add(DisplayData.item("outputTimeFn", getOutputTimeFn().getClass())
-        .withLabel("Output Time Function"));
+    if (getTimestampCombiner() != null) {
+      builder.add(DisplayData.item("timestampCombiner", getTimestampCombiner().toString())
+        .withLabel("Timestamp Combiner"));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
index 0c27c4f..706e039 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
@@ -22,7 +22,7 @@ import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -57,13 +57,14 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollecti
     // If the input has already had its windows merged, then the GBK that performed the merge
     // will have set originalStrategy.getWindowFn() to InvalidWindows, causing the GBK contained
     // here to fail. Instead, we install a valid WindowFn that leaves all windows unchanged.
-    // The OutputTimeFn is set to ensure the GroupByKey does not shift elements forwards in time.
+    // The TimestampCombiner is set to ensure the GroupByKey does not shift elements forwards in
+    // time.
     // Because this outputs as fast as possible, this should not hold the watermark.
     Window<KV<K, V>> rewindow =
         Window.<KV<K, V>>into(new IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
             .triggering(new ReshuffleTrigger<>())
             .discardingFiredPanes()
-            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+            .withTimestampCombiner(TimestampCombiner.EARLIEST)
             .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
 
     return input.apply(rewindow)