You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/26 22:38:23 UTC

[3/4] beam git commit: Replace OutputTimeFn UDF with TimestampCombiner enum

Replace OutputTimeFn UDF with TimestampCombiner enum

This also removes the last dependency from SDK core to Runner API proto.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f38e4271
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f38e4271
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f38e4271

Branch: refs/heads/master
Commit: f38e4271334fced94e8dc1dc97f47b60fa810586
Parents: f23dd67
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jan 26 19:56:06 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Apr 26 15:08:29 2017 -0700

----------------------------------------------------------------------
 .../beam/examples/complete/game/GameStats.java  |   4 +-
 .../translation/utils/ApexStateInternals.java   |  26 +-
 .../translation/GroupByKeyTranslatorTest.java   |  10 +-
 .../utils/ApexStateInternalsTest.java           |  33 +-
 .../core/construction/WindowingStrategies.java  |  52 ++-
 .../construction/WindowingStrategiesTest.java   |   6 +-
 .../runners/core/InMemoryStateInternals.java    |  32 +-
 .../beam/runners/core/ReduceFnRunner.java       |   4 +-
 .../beam/runners/core/SplittableParDo.java      |   8 +-
 .../apache/beam/runners/core/StateMerging.java  |  32 +-
 .../org/apache/beam/runners/core/StateTag.java  |  11 +-
 .../org/apache/beam/runners/core/StateTags.java |  16 +-
 .../core/TestInMemoryStateInternals.java        |   2 +-
 .../apache/beam/runners/core/WatermarkHold.java |  45 +--
 .../core/GroupAlsoByWindowsProperties.java      |  20 +-
 .../core/InMemoryStateInternalsTest.java        |  34 +-
 .../beam/runners/core/ReduceFnRunnerTest.java   |  38 +--
 .../beam/runners/core/ReduceFnTester.java       |  13 +-
 .../apache/beam/runners/core/StateTagTest.java  |  16 +-
 .../CopyOnAccessInMemoryStateInternals.java     |  24 +-
 .../direct/ParDoMultiOverrideFactory.java       |   6 +-
 .../CopyOnAccessInMemoryStateInternalsTest.java |  54 ++--
 .../functions/HashingFlinkCombineRunner.java    |  19 +-
 .../functions/SortingFlinkCombineRunner.java    |  30 +-
 .../state/FlinkBroadcastStateInternals.java     |   8 +-
 .../state/FlinkKeyGroupStateInternals.java      |   8 +-
 .../state/FlinkSplitStateInternals.java         |   8 +-
 .../streaming/state/FlinkStateInternals.java    |  34 +-
 .../streaming/FlinkStateInternalsTest.java      |  34 +-
 .../spark/stateful/SparkStateInternals.java     |  33 +-
 .../translation/SparkAbstractCombineFn.java     |   4 +-
 .../spark/translation/SparkGlobalCombineFn.java |  37 ++-
 .../spark/translation/SparkKeyedCombineFn.java  |  37 ++-
 sdks/java/core/pom.xml                          |   5 -
 .../beam/sdk/testing/WindowFnTestUtils.java     |  53 +++-
 .../apache/beam/sdk/transforms/GroupByKey.java  |   3 +-
 .../sdk/transforms/windowing/OutputTimeFn.java  | 314 -------------------
 .../sdk/transforms/windowing/OutputTimeFns.java | 212 -------------
 .../transforms/windowing/TimestampCombiner.java | 186 +++++++++++
 .../beam/sdk/transforms/windowing/Window.java   |  22 +-
 .../org/apache/beam/sdk/util/Reshuffle.java     |   7 +-
 .../apache/beam/sdk/util/WindowingStrategy.java | 176 +++--------
 .../apache/beam/sdk/util/state/StateBinder.java |  12 +-
 .../apache/beam/sdk/util/state/StateSpecs.java  |  23 +-
 .../beam/sdk/util/state/WatermarkHoldState.java |  19 +-
 .../org/apache/beam/SdkCoreApiSurfaceTest.java  |   1 -
 .../beam/sdk/transforms/GroupByKeyTest.java     |  10 +-
 .../sdk/transforms/join/CoGroupByKeyTest.java   |   6 +-
 .../transforms/windowing/OutputTimeFnsTest.java |  51 ---
 .../sdk/transforms/windowing/SessionsTest.java  |   6 +-
 .../sdk/transforms/windowing/WindowTest.java    |  23 +-
 .../sdk/transforms/windowing/WindowingTest.java |   2 +-
 .../org/apache/beam/GcpCoreApiSurfaceTest.java  |   1 -
 53 files changed, 740 insertions(+), 1130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index b6c05be..e0048b7 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -43,8 +43,8 @@ import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -313,7 +313,7 @@ public class GameStats extends LeaderBoard {
     userEvents
       .apply("WindowIntoSessions", Window.<KV<String, Integer>>into(
           Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap())))
-          .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
+          .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))
       // For this use, we care only about the existence of the session, not any particular
       // information aggregated over it, so the following is an efficient way to do that.
       .apply(Combine.perKey(x -> 0))

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
index cfc57cd..ec8f666 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
@@ -45,7 +45,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.CombineFnUtil;
 import org.apache.beam.sdk.util.state.BagState;
@@ -150,10 +150,10 @@ public class ApexStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
-        StateTag<? super K, WatermarkHoldState<W>> address,
-        OutputTimeFn<? super W> outputTimeFn) {
-      return new ApexWatermarkHoldState<>(namespace, address, outputTimeFn);
+    public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+        StateTag<? super K, WatermarkHoldState> address,
+        TimestampCombiner timestampCombiner) {
+      return new ApexWatermarkHoldState<>(namespace, address, timestampCombiner);
     }
 
     @Override
@@ -269,16 +269,16 @@ public class ApexStateInternals<K> implements StateInternals<K> {
   }
 
   private final class ApexWatermarkHoldState<W extends BoundedWindow>
-      extends AbstractState<Instant> implements WatermarkHoldState<W> {
+      extends AbstractState<Instant> implements WatermarkHoldState {
 
-    private final OutputTimeFn<? super W> outputTimeFn;
+    private final TimestampCombiner timestampCombiner;
 
     public ApexWatermarkHoldState(
         StateNamespace namespace,
-        StateTag<?, WatermarkHoldState<W>> address,
-        OutputTimeFn<? super W> outputTimeFn) {
+        StateTag<?, WatermarkHoldState> address,
+        TimestampCombiner timestampCombiner) {
       super(namespace, address, InstantCoder.of());
-      this.outputTimeFn = outputTimeFn;
+      this.timestampCombiner = timestampCombiner;
     }
 
     @Override
@@ -294,7 +294,7 @@ public class ApexStateInternals<K> implements StateInternals<K> {
     @Override
     public void add(Instant outputTime) {
       Instant combined = read();
-      combined = (combined == null) ? outputTime : outputTimeFn.combine(combined, outputTime);
+      combined = (combined == null) ? outputTime : timestampCombiner.combine(combined, outputTime);
       writeValue(combined);
     }
 
@@ -313,8 +313,8 @@ public class ApexStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public OutputTimeFn<? super W> getOutputTimeFn() {
-      return outputTimeFn;
+    public TimestampCombiner getTimestampCombiner() {
+      return timestampCombiner;
     }
 
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
index 193de71..9c61b47 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
@@ -43,7 +43,7 @@ import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.joda.time.Duration;
@@ -83,12 +83,12 @@ public class GroupByKeyTranslatorTest {
         );
 
     p.apply(Read.from(new TestSource(data, new Instant(5000))))
-        .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
-            .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp()))
+        .apply(
+            Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
+                .withTimestampCombiner(TimestampCombiner.LATEST))
         .apply(Count.<String>perElement())
         .apply(ParDo.of(new KeyedByTimestamp<KV<String, Long>>()))
-        .apply(ParDo.of(new EmbeddedCollector()))
-        ;
+        .apply(ParDo.of(new EmbeddedCollector()));
 
     ApexRunnerResult result = (ApexRunnerResult) p.run();
     result.getApexDAG();

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
index 7160e45..225b654 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
@@ -36,7 +36,7 @@ import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.GroupingState;
@@ -65,14 +65,13 @@ public class ApexStateInternalsTest {
           "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
   private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
       StateTags.bag("stringBag", StringUtf8Coder.of());
-  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
+  private static final StateTag<Object, WatermarkHoldState>
       WATERMARK_EARLIEST_ADDR =
-      StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
-  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
-      WATERMARK_LATEST_ADDR =
-      StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp());
-  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR =
-      StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow());
+      StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
+  private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR =
+      StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
+  private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR =
+      StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
 
   private ApexStateInternals<String> underTest;
 
@@ -227,7 +226,7 @@ public class ApexStateInternalsTest {
 
   @Test
   public void testWatermarkEarliestState() throws Exception {
-    WatermarkHoldState<BoundedWindow> value =
+    WatermarkHoldState value =
         underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
 
     // State instances are cached, but depend on the namespace.
@@ -251,7 +250,7 @@ public class ApexStateInternalsTest {
 
   @Test
   public void testWatermarkLatestState() throws Exception {
-    WatermarkHoldState<BoundedWindow> value =
+    WatermarkHoldState value =
         underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
 
     // State instances are cached, but depend on the namespace.
@@ -275,7 +274,7 @@ public class ApexStateInternalsTest {
 
   @Test
   public void testWatermarkEndOfWindowState() throws Exception {
-    WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
+    WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
 
     // State instances are cached, but depend on the namespace.
     assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
@@ -292,7 +291,7 @@ public class ApexStateInternalsTest {
 
   @Test
   public void testWatermarkStateIsEmpty() throws Exception {
-    WatermarkHoldState<BoundedWindow> value =
+    WatermarkHoldState value =
         underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
 
     assertThat(value.isEmpty().read(), Matchers.is(true));
@@ -306,9 +305,9 @@ public class ApexStateInternalsTest {
 
   @Test
   public void testMergeEarliestWatermarkIntoSource() throws Exception {
-    WatermarkHoldState<BoundedWindow> value1 =
+    WatermarkHoldState value1 =
         underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-    WatermarkHoldState<BoundedWindow> value2 =
+    WatermarkHoldState value2 =
         underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
 
     value1.add(new Instant(3000));
@@ -325,11 +324,11 @@ public class ApexStateInternalsTest {
 
   @Test
   public void testMergeLatestWatermarkIntoSource() throws Exception {
-    WatermarkHoldState<BoundedWindow> value1 =
+    WatermarkHoldState value1 =
         underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
-    WatermarkHoldState<BoundedWindow> value2 =
+    WatermarkHoldState value2 =
         underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
-    WatermarkHoldState<BoundedWindow> value3 =
+    WatermarkHoldState value3 =
         underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
 
     value1.add(new Instant(3000));

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
index 3d7deef..0c400db 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
@@ -28,16 +28,15 @@ import java.io.Serializable;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.OutputTime;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.util.WindowingStrategy.CombineWindowFnOutputTimes;
 import org.joda.time.Duration;
 
 /** Utilities for working with {@link WindowingStrategy WindowingStrategies}. */
@@ -115,11 +114,42 @@ public class WindowingStrategies implements Serializable {
     }
   }
 
-  public static RunnerApi.OutputTime toProto(OutputTimeFn<?> outputTimeFn) {
-    if (outputTimeFn instanceof WindowingStrategy.CombineWindowFnOutputTimes) {
-      return toProto(((CombineWindowFnOutputTimes<?>) outputTimeFn).getOutputTimeFn());
-    } else {
-      return OutputTimeFns.toProto(outputTimeFn);
+  public static RunnerApi.OutputTime toProto(TimestampCombiner timestampCombiner) {
+    switch(timestampCombiner) {
+      case EARLIEST:
+        return OutputTime.EARLIEST_IN_PANE;
+      case END_OF_WINDOW:
+        return OutputTime.END_OF_WINDOW;
+      case LATEST:
+        return OutputTime.LATEST_IN_PANE;
+      default:
+        throw new IllegalArgumentException(
+            String.format(
+                "Unknown %s: %s",
+                TimestampCombiner.class.getSimpleName(),
+                timestampCombiner));
+    }
+  }
+
+  public static TimestampCombiner timestampCombinerFromProto(RunnerApi.OutputTime proto) {
+    switch (proto) {
+      case EARLIEST_IN_PANE:
+        return TimestampCombiner.EARLIEST;
+      case END_OF_WINDOW:
+        return TimestampCombiner.END_OF_WINDOW;
+      case LATEST_IN_PANE:
+        return TimestampCombiner.LATEST;
+      case UNRECOGNIZED:
+      default:
+        // Whether or not it is proto that cannot recognize it (due to the version of the
+        // generated code we link to) or the switch hasn't been updated to handle it,
+        // the situation is the same: we don't know what this OutputTime means
+        throw new IllegalArgumentException(
+            String.format(
+                "Cannot convert unknown %s to %s: %s",
+                RunnerApi.OutputTime.class.getCanonicalName(),
+                OutputTime.class.getCanonicalName(),
+                proto));
     }
   }
 
@@ -177,7 +207,7 @@ public class WindowingStrategies implements Serializable {
 
     RunnerApi.WindowingStrategy.Builder windowingStrategyProto =
         RunnerApi.WindowingStrategy.newBuilder()
-            .setOutputTime(toProto(windowingStrategy.getOutputTimeFn()))
+            .setOutputTime(toProto(windowingStrategy.getTimestampCombiner()))
             .setAccumulationMode(toProto(windowingStrategy.getMode()))
             .setClosingBehavior(toProto(windowingStrategy.getClosingBehavior()))
             .setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis())
@@ -229,7 +259,7 @@ public class WindowingStrategies implements Serializable {
             "WindowFn");
 
     WindowFn<?, ?> windowFn = (WindowFn<?, ?>) deserializedWindowFn;
-    OutputTimeFn<?> outputTimeFn = OutputTimeFns.fromProto(proto.getOutputTime());
+    TimestampCombiner timestampCombiner = timestampCombinerFromProto(proto.getOutputTime());
     AccumulationMode accumulationMode = fromProto(proto.getAccumulationMode());
     Trigger trigger = Triggers.fromProto(proto.getTrigger());
     ClosingBehavior closingBehavior = fromProto(proto.getClosingBehavior());
@@ -239,7 +269,7 @@ public class WindowingStrategies implements Serializable {
         .withAllowedLateness(allowedLateness)
         .withMode(accumulationMode)
         .withTrigger(trigger)
-        .withOutputTimeFn(outputTimeFn)
+        .withTimestampCombiner(timestampCombiner)
         .withClosingBehavior(closingBehavior);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
index 62bba8e..78ac61c 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
@@ -25,7 +25,7 @@ import com.google.common.collect.ImmutableList;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -68,14 +68,14 @@ public class WindowingStrategiesTest {
                 .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
                 .withTrigger(REPRESENTATIVE_TRIGGER)
                 .withAllowedLateness(Duration.millis(71))
-                .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())),
+                .withTimestampCombiner(TimestampCombiner.EARLIEST)),
         toProtoAndBackSpec(
             WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN)
                 .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY)
                 .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
                 .withTrigger(REPRESENTATIVE_TRIGGER)
                 .withAllowedLateness(Duration.millis(93))
-                .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp())));
+                .withTimestampCombiner(TimestampCombiner.LATEST)));
   }
 
   @Parameter(0)

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
index 55b7fc2..9fb8e3f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
@@ -34,7 +34,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.CombineFnUtil;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
@@ -156,10 +156,10 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
-        StateTag<? super K, WatermarkHoldState<W>> address,
-        OutputTimeFn<? super W> outputTimeFn) {
-      return new InMemoryWatermarkHold<W>(outputTimeFn);
+    public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+        StateTag<? super K, WatermarkHoldState> address,
+        TimestampCombiner timestampCombiner) {
+      return new InMemoryWatermarkHold<W>(timestampCombiner);
     }
 
     @Override
@@ -233,19 +233,19 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
    * An {@link InMemoryState} implementation of {@link WatermarkHoldState}.
    */
   public static final class InMemoryWatermarkHold<W extends BoundedWindow>
-      implements WatermarkHoldState<W>, InMemoryState<InMemoryWatermarkHold<W>> {
+      implements WatermarkHoldState, InMemoryState<InMemoryWatermarkHold<W>> {
 
-    private final OutputTimeFn<? super W> outputTimeFn;
+    private final TimestampCombiner timestampCombiner;
 
     @Nullable
     private Instant combinedHold = null;
 
-    public InMemoryWatermarkHold(OutputTimeFn<? super W> outputTimeFn) {
-      this.outputTimeFn = outputTimeFn;
+    public InMemoryWatermarkHold(TimestampCombiner timestampCombiner) {
+      this.timestampCombiner = timestampCombiner;
     }
 
     @Override
-    public InMemoryWatermarkHold<W> readLater() {
+    public InMemoryWatermarkHold readLater() {
       return this;
     }
 
@@ -263,8 +263,10 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
 
     @Override
     public void add(Instant outputTime) {
-      combinedHold = combinedHold == null ? outputTime
-          : outputTimeFn.combine(combinedHold, outputTime);
+      combinedHold =
+          combinedHold == null
+              ? outputTime
+              : timestampCombiner.combine(combinedHold, outputTime);
     }
 
     @Override
@@ -287,8 +289,8 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public OutputTimeFn<? super W> getOutputTimeFn() {
-      return outputTimeFn;
+    public TimestampCombiner getTimestampCombiner() {
+      return timestampCombiner;
     }
 
     @Override
@@ -299,7 +301,7 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
     @Override
     public InMemoryWatermarkHold<W> copy() {
       InMemoryWatermarkHold<W> that =
-          new InMemoryWatermarkHold<>(outputTimeFn);
+          new InMemoryWatermarkHold<>(timestampCombiner);
       that.combinedHold = this.combinedHold;
       return that;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 34db752..4c70c97 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -47,9 +47,9 @@ import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.SideInputReader;
@@ -171,7 +171,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
    * <ul>
    * <li>State: Bag of hold timestamps.
    * <li>State style: RENAMED
-   * <li>Merging: Depending on {@link OutputTimeFn}, may need to be recalculated on merging.
+   * <li>Merging: Depending on {@link TimestampCombiner}, may need to be recalculated on merging.
    * When a pane fires it may be necessary to add (back) an end-of-window or garbage collection
    * hold.
    * <li>Lifetime: Cleared when a pane fires or when the window is garbage collected.

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 31d89ee..5273e86 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -45,7 +45,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -355,10 +355,10 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
      * the input watermark when the first {@link DoFn.ProcessElement} call for this element
      * completes.
      */
-    private static final StateTag<Object, WatermarkHoldState<GlobalWindow>> watermarkHoldTag =
+    private static final StateTag<Object, WatermarkHoldState> watermarkHoldTag =
         StateTags.makeSystemTagInternal(
             StateTags.<GlobalWindow>watermarkStateInternal(
-                "hold", OutputTimeFns.outputAtLatestInputTimestamp()));
+                "hold", TimestampCombiner.LATEST));
 
     /**
      * The state cell containing a copy of the element. Written during the first {@link
@@ -480,7 +480,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
           stateInternals.state(stateNamespace, elementTag);
       ValueState<RestrictionT> restrictionState =
           stateInternals.state(stateNamespace, restrictionTag);
-      WatermarkHoldState<GlobalWindow> holdState =
+      WatermarkHoldState holdState =
           stateInternals.state(stateNamespace, watermarkHoldTag);
 
       ElementAndRestriction<WindowedValue<InputT>, RestrictionT> elementAndRestriction;

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
index 3410850..ce37fd3 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
@@ -218,24 +218,24 @@ public class StateMerging {
    */
   public static <K, W extends BoundedWindow> void prefetchWatermarks(
       MergingStateAccessor<K, W> context,
-      StateTag<? super K, WatermarkHoldState<W>> address) {
-    Map<W, WatermarkHoldState<W>> map = context.accessInEachMergingWindow(address);
-    WatermarkHoldState<W> result = context.access(address);
+      StateTag<? super K, WatermarkHoldState> address) {
+    Map<W, WatermarkHoldState> map = context.accessInEachMergingWindow(address);
+    WatermarkHoldState result = context.access(address);
     if (map.isEmpty()) {
       // Nothing to prefetch.
       return;
     }
     if (map.size() == 1 && map.values().contains(result)
-        && result.getOutputTimeFn().dependsOnlyOnEarliestInputTimestamp()) {
+        && result.getTimestampCombiner().dependsOnlyOnEarliestTimestamp()) {
       // Nothing to change.
       return;
     }
-    if (result.getOutputTimeFn().dependsOnlyOnWindow()) {
+    if (result.getTimestampCombiner().dependsOnlyOnWindow()) {
       // No need to read existing holds.
       return;
     }
     // Prefetch.
-    for (WatermarkHoldState<W> source : map.values()) {
+    for (WatermarkHoldState source : map.values()) {
       prefetchRead(source);
     }
   }
@@ -250,7 +250,7 @@ public class StateMerging {
    */
   public static <K, W extends BoundedWindow> void mergeWatermarks(
       MergingStateAccessor<K, W> context,
-      StateTag<? super K, WatermarkHoldState<W>> address,
+      StateTag<? super K, WatermarkHoldState> address,
       W mergeResult) {
     mergeWatermarks(
         context.accessInEachMergingWindow(address).values(), context.access(address), mergeResult);
@@ -261,31 +261,31 @@ public class StateMerging {
    * into {@code result}, where the final merge result window is {@code mergeResult}.
    */
   public static <W extends BoundedWindow> void mergeWatermarks(
-      Collection<WatermarkHoldState<W>> sources, WatermarkHoldState<W> result,
+      Collection<WatermarkHoldState> sources, WatermarkHoldState result,
       W resultWindow) {
     if (sources.isEmpty()) {
       // Nothing to merge.
       return;
     }
     if (sources.size() == 1 && sources.contains(result)
-        && result.getOutputTimeFn().dependsOnlyOnEarliestInputTimestamp()) {
+        && result.getTimestampCombiner().dependsOnlyOnEarliestTimestamp()) {
       // Nothing to merge.
       return;
     }
-    if (result.getOutputTimeFn().dependsOnlyOnWindow()) {
+    if (result.getTimestampCombiner().dependsOnlyOnWindow()) {
       // Clear sources.
-      for (WatermarkHoldState<W> source : sources) {
+      for (WatermarkHoldState source : sources) {
         source.clear();
       }
       // Update directly from window-derived hold.
-      Instant hold = result.getOutputTimeFn().assignOutputTime(
-          BoundedWindow.TIMESTAMP_MIN_VALUE, resultWindow);
+      Instant hold =
+          result.getTimestampCombiner().assign(resultWindow, BoundedWindow.TIMESTAMP_MIN_VALUE);
       checkState(hold.isAfter(BoundedWindow.TIMESTAMP_MIN_VALUE));
       result.add(hold);
     } else {
       // Prefetch.
       List<ReadableState<Instant>> futures = new ArrayList<>(sources.size());
-      for (WatermarkHoldState<W> source : sources) {
+      for (WatermarkHoldState source : sources) {
         futures.add(source);
       }
       // Read.
@@ -297,12 +297,12 @@ public class StateMerging {
         }
       }
       // Clear sources.
-      for (WatermarkHoldState<W> source : sources) {
+      for (WatermarkHoldState source : sources) {
         source.clear();
       }
       if (!outputTimesToMerge.isEmpty()) {
         // Merge and update.
-        result.add(result.getOutputTimeFn().merge(resultWindow, outputTimesToMerge));
+        result.add(result.getTimestampCombiner().merge(resultWindow, outputTimesToMerge));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
index 12c59ad..a5d262a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.MapState;
@@ -115,11 +115,10 @@ public interface StateTag<K, StateT extends State> extends Serializable {
     /**
      * Bind to a watermark {@link StateSpec}.
      *
-     * <p>This accepts the {@link OutputTimeFn} that dictates how watermark hold timestamps added to
-     * the returned {@link WatermarkHoldState} are to be combined.
+     * <p>This accepts the {@link TimestampCombiner} that dictates how watermark hold timestamps
+     * added to the returned {@link WatermarkHoldState} are to be combined.
      */
-    <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
-        StateTag<? super K, WatermarkHoldState<W>> spec,
-        OutputTimeFn<? super W> outputTimeFn);
+    <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+        StateTag<? super K, WatermarkHoldState> spec, TimestampCombiner timestampCombiner);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
index 3a45569..2b3f4b8 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.MapState;
@@ -110,11 +110,11 @@ public class StateTags {
       }
 
       @Override
-      public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+      public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
           String id,
-          StateSpec<? super K, WatermarkHoldState<W>> spec,
-          OutputTimeFn<? super W> outputTimeFn) {
-        return binder.bindWatermark(tagForSpec(id, spec), outputTimeFn);
+          StateSpec<? super K, WatermarkHoldState> spec,
+          TimestampCombiner timestampCombiner) {
+        return binder.bindWatermark(tagForSpec(id, spec), timestampCombiner);
       }
     };
   }
@@ -228,10 +228,10 @@ public class StateTags {
   /**
    * Create a state tag for holding the watermark.
    */
-  public static <W extends BoundedWindow> StateTag<Object, WatermarkHoldState<W>>
-      watermarkStateInternal(String id, OutputTimeFn<? super W> outputTimeFn) {
+  public static <W extends BoundedWindow> StateTag<Object, WatermarkHoldState>
+      watermarkStateInternal(String id, TimestampCombiner timestampCombiner) {
     return new SimpleStateTag<>(
-        new StructuredId(id), StateSpecs.watermarkStateInternal(outputTimeFn));
+        new StructuredId(id), StateSpecs.watermarkStateInternal(timestampCombiner));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
index 0321a33..1dfb85f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
@@ -52,7 +52,7 @@ public class TestInMemoryStateInternals<K> extends InMemoryStateInternals<K> {
     Instant minimum = null;
     for (State storage : inMemoryState.values()) {
       if (storage instanceof WatermarkHoldState) {
-        Instant hold = ((WatermarkHoldState<?>) storage).read();
+        Instant hold = ((WatermarkHoldState) storage).read();
         if (minimum == null || (hold != null && hold.isBefore(minimum))) {
           minimum = hold;
         }

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
index d3c4bc7..9bb9c62 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
@@ -23,9 +23,8 @@ import com.google.common.annotations.VisibleForTesting;
 import java.io.Serializable;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -55,37 +54,38 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
    * used for elements.
    */
   public static <W extends BoundedWindow>
-      StateTag<Object, WatermarkHoldState<W>> watermarkHoldTagForOutputTimeFn(
-          OutputTimeFn<? super W> outputTimeFn) {
-    return StateTags.<Object, WatermarkHoldState<W>>makeSystemTagInternal(
-        StateTags.<W>watermarkStateInternal("hold", outputTimeFn));
+      StateTag<Object, WatermarkHoldState> watermarkHoldTagForTimestampCombiner(
+          TimestampCombiner timestampCombiner) {
+    return StateTags.<Object, WatermarkHoldState>makeSystemTagInternal(
+        StateTags.<W>watermarkStateInternal("hold", timestampCombiner));
   }
 
   /**
    * Tag for state containing end-of-window and garbage collection output watermark holds.
-   * (We can't piggy-back on the data hold state since the outputTimeFn may be
-   * {@link OutputTimeFns#outputAtLatestInputTimestamp()}, in which case every pane will
+   * (We can't piggy-back on the data hold state since the timestampCombiner may be
+   * {@link TimestampCombiner#EARLIEST}, in which case every pane will
    * would take the end-of-window time as its element time.)
    */
   @VisibleForTesting
-  public static final StateTag<Object, WatermarkHoldState<BoundedWindow>> EXTRA_HOLD_TAG =
+  public static final StateTag<Object, WatermarkHoldState> EXTRA_HOLD_TAG =
       StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal(
-          "extra", OutputTimeFns.outputAtEarliestInputTimestamp()));
+          "extra", TimestampCombiner.EARLIEST));
 
   private final TimerInternals timerInternals;
   private final WindowingStrategy<?, W> windowingStrategy;
-  private final StateTag<Object, WatermarkHoldState<W>> elementHoldTag;
+  private final StateTag<Object, WatermarkHoldState> elementHoldTag;
 
   public WatermarkHold(TimerInternals timerInternals, WindowingStrategy<?, W> windowingStrategy) {
     this.timerInternals = timerInternals;
     this.windowingStrategy = windowingStrategy;
-    this.elementHoldTag = watermarkHoldTagForOutputTimeFn(windowingStrategy.getOutputTimeFn());
+    this.elementHoldTag =
+        watermarkHoldTagForTimestampCombiner(windowingStrategy.getTimestampCombiner());
   }
 
   /**
    * Add a hold to prevent the output watermark progressing beyond the (possibly adjusted) timestamp
-   * of the element in {@code context}. We allow the actual hold time to be shifted later by
-   * {@link OutputTimeFn#assignOutputTime}, but no further than the end of the window. The hold will
+   * of the element in {@code context}. We allow the actual hold time to be shifted later by the
+   * {@link TimestampCombiner}, but no further than the end of the window. The hold will
    * remain until cleared by {@link #extractAndRelease}. Return the timestamp at which the hold
    * was placed, or {@literal null} if no hold was placed.
    *
@@ -199,15 +199,18 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
    * strategy's output time function.
    */
   private Instant shift(Instant timestamp, W window) {
-    Instant shifted = windowingStrategy.getOutputTimeFn().assignOutputTime(timestamp, window);
+    Instant shifted =
+        windowingStrategy
+            .getTimestampCombiner()
+            .assign(window, windowingStrategy.getWindowFn().getOutputTime(timestamp, window));
     checkState(!shifted.isBefore(timestamp),
-        "OutputTimeFn moved element from %s to earlier time %s for window %s",
+        "TimestampCombiner moved element from %s to earlier time %s for window %s",
         BoundedWindow.formatTimestamp(timestamp),
         BoundedWindow.formatTimestamp(shifted),
         window);
     checkState(timestamp.isAfter(window.maxTimestamp())
             || !shifted.isAfter(window.maxTimestamp()),
-        "OutputTimeFn moved element from %s to %s which is beyond end of "
+        "TimestampCombiner moved element from %s to %s which is beyond end of "
             + "window %s",
         timestamp, shifted, window);
 
@@ -217,7 +220,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
   /**
    * Attempt to add an 'element hold'. Return the {@link Instant} at which the hold was
    * added (ie the element timestamp plus any forward shift requested by the
-   * {@link WindowingStrategy#getOutputTimeFn}), or {@literal null} if no hold was added.
+   * {@link WindowingStrategy#getTimestampCombiner}), or {@literal null} if no hold was added.
    * The hold is only added if both:
    * <ol>
    * <li>The backend will be able to respect it. In other words the output watermark cannot
@@ -450,7 +453,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
    * Return (a future for) the earliest hold for {@code context}. Clear all the holds after
    * reading, but add/restore an end-of-window or garbage collection hold if required.
    *
-   * <p>The returned timestamp is the output timestamp according to the {@link OutputTimeFn}
+   * <p>The returned timestamp is the output timestamp according to the {@link TimestampCombiner}
    * from the windowing strategy of this {@link WatermarkHold}, combined across all the non-late
    * elements in the current pane. If there is no such value the timestamp is the end
    * of the window.
@@ -462,8 +465,8 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
             + "outputWatermark:{}",
         context.key(), context.window(), timerInternals.currentInputWatermarkTime(),
         timerInternals.currentOutputWatermarkTime());
-    final WatermarkHoldState<W> elementHoldState = context.state().access(elementHoldTag);
-    final WatermarkHoldState<BoundedWindow> extraHoldState = context.state().access(EXTRA_HOLD_TAG);
+    final WatermarkHoldState elementHoldState = context.state().access(elementHoldTag);
+    final WatermarkHoldState extraHoldState = context.state().access(EXTRA_HOLD_TAG);
     return new ReadableState<OldAndNewHolds>() {
       @Override
       public ReadableState<OldAndNewHolds> readLater() {

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
index d0a8923..81ac5fa 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
@@ -43,10 +43,10 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
@@ -149,7 +149,7 @@ public class GroupAlsoByWindowsProperties {
 
     WindowingStrategy<?, IntervalWindow> windowingStrategy =
         WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10)))
-            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp());
+            .withTimestampCombiner(TimestampCombiner.EARLIEST);
 
     List<WindowedValue<KV<String, Iterable<String>>>> result =
         runGABW(
@@ -200,7 +200,7 @@ public class GroupAlsoByWindowsProperties {
 
     WindowingStrategy<?, IntervalWindow> windowingStrategy =
         WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10)))
-            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp());
+            .withTimestampCombiner(TimestampCombiner.EARLIEST);
 
     List<WindowedValue<KV<String, Long>>> result =
         runGABW(
@@ -348,7 +348,7 @@ public class GroupAlsoByWindowsProperties {
   /**
    * Tests that for a simple sequence of elements on the same key, the given GABW implementation
    * correctly groups them according to fixed windows and also sets the output timestamp according
-   * to the policy {@link OutputTimeFns#outputAtEndOfWindow()}.
+   * to the policy {@link TimestampCombiner#END_OF_WINDOW}.
    */
   public static void groupsElementsIntoFixedWindowsWithEndOfWindowTimestamp(
       GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
@@ -356,7 +356,7 @@ public class GroupAlsoByWindowsProperties {
 
     WindowingStrategy<?, IntervalWindow> windowingStrategy =
         WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
-            .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
+            .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW);
 
     List<WindowedValue<KV<String, Iterable<String>>>> result =
         runGABW(
@@ -386,7 +386,7 @@ public class GroupAlsoByWindowsProperties {
   /**
    * Tests that for a simple sequence of elements on the same key, the given GABW implementation
    * correctly groups them according to fixed windows and also sets the output timestamp according
-   * to the policy {@link OutputTimeFns#outputAtLatestInputTimestamp()}.
+   * to the policy {@link TimestampCombiner#LATEST}.
    */
   public static void groupsElementsIntoFixedWindowsWithLatestTimestamp(
       GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
@@ -394,7 +394,7 @@ public class GroupAlsoByWindowsProperties {
 
     WindowingStrategy<?, IntervalWindow> windowingStrategy =
         WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
-            .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp());
+            .withTimestampCombiner(TimestampCombiner.LATEST);
 
     List<WindowedValue<KV<String, Iterable<String>>>> result =
         runGABW(
@@ -431,7 +431,7 @@ public class GroupAlsoByWindowsProperties {
 
     WindowingStrategy<?, IntervalWindow> windowingStrategy =
         WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
-            .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
+            .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW);
 
     List<WindowedValue<KV<String, Iterable<String>>>> result =
         runGABW(
@@ -468,7 +468,7 @@ public class GroupAlsoByWindowsProperties {
 
     WindowingStrategy<?, IntervalWindow> windowingStrategy =
         WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
-            .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp());
+            .withTimestampCombiner(TimestampCombiner.LATEST);
 
     BoundedWindow unmergedWindow = window(15, 25);
     List<WindowedValue<KV<String, Iterable<String>>>> result =
@@ -508,7 +508,7 @@ public class GroupAlsoByWindowsProperties {
 
     WindowingStrategy<?, IntervalWindow> windowingStrategy =
         WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
-            .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
+            .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW);
 
     BoundedWindow secondWindow = window(15, 25);
     List<WindowedValue<KV<String, Long>>> result =

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
index 34ddae6..6248401 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.GroupingState;
@@ -71,14 +71,12 @@ public class InMemoryStateInternalsTest {
       StateTags.set("stringSet", StringUtf8Coder.of());
   private static final StateTag<Object, MapState<String, Integer>> STRING_MAP_ADDR =
       StateTags.map("stringMap", StringUtf8Coder.of(), VarIntCoder.of());
-  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
-      WATERMARK_EARLIEST_ADDR =
-      StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
-  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
-      WATERMARK_LATEST_ADDR =
-      StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp());
-  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR =
-      StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow());
+  private static final StateTag<Object, WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
+      StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
+  private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR =
+      StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
+  private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR =
+      StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
 
   InMemoryStateInternals<String> underTest = InMemoryStateInternals.forKey("dummyKey");
 
@@ -442,7 +440,7 @@ public class InMemoryStateInternalsTest {
 
   @Test
   public void testWatermarkEarliestState() throws Exception {
-    WatermarkHoldState<BoundedWindow> value =
+    WatermarkHoldState value =
         underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
 
     // State instances are cached, but depend on the namespace.
@@ -466,7 +464,7 @@ public class InMemoryStateInternalsTest {
 
   @Test
   public void testWatermarkLatestState() throws Exception {
-    WatermarkHoldState<BoundedWindow> value =
+    WatermarkHoldState value =
         underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
 
     // State instances are cached, but depend on the namespace.
@@ -490,7 +488,7 @@ public class InMemoryStateInternalsTest {
 
   @Test
   public void testWatermarkEndOfWindowState() throws Exception {
-    WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
+    WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
 
     // State instances are cached, but depend on the namespace.
     assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
@@ -507,7 +505,7 @@ public class InMemoryStateInternalsTest {
 
   @Test
   public void testWatermarkStateIsEmpty() throws Exception {
-    WatermarkHoldState<BoundedWindow> value =
+    WatermarkHoldState value =
         underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
 
     assertThat(value.isEmpty().read(), Matchers.is(true));
@@ -521,9 +519,9 @@ public class InMemoryStateInternalsTest {
 
   @Test
   public void testMergeEarliestWatermarkIntoSource() throws Exception {
-    WatermarkHoldState<BoundedWindow> value1 =
+    WatermarkHoldState value1 =
         underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-    WatermarkHoldState<BoundedWindow> value2 =
+    WatermarkHoldState value2 =
         underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
 
     value1.add(new Instant(3000));
@@ -540,11 +538,11 @@ public class InMemoryStateInternalsTest {
 
   @Test
   public void testMergeLatestWatermarkIntoSource() throws Exception {
-    WatermarkHoldState<BoundedWindow> value1 =
+    WatermarkHoldState value1 =
         underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
-    WatermarkHoldState<BoundedWindow> value2 =
+    WatermarkHoldState value2 =
         underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
-    WatermarkHoldState<BoundedWindow> value3 =
+    WatermarkHoldState value3 =
         underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
 
     value1.add(new Instant(3000));

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 0d4d992..44bc538 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -56,12 +56,12 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.Never;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
 import org.apache.beam.sdk.transforms.windowing.Repeatedly;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -210,7 +210,7 @@ public class ReduceFnRunnerTest {
 
     WindowingStrategy<?, IntervalWindow> strategy =
         WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
-            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+            .withTimestampCombiner(TimestampCombiner.EARLIEST)
             .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
             .withAllowedLateness(Duration.millis(100));
 
@@ -284,7 +284,7 @@ public class ReduceFnRunnerTest {
 
     WindowingStrategy<?, IntervalWindow> strategy =
         WindowingStrategy.of((WindowFn<?, IntervalWindow>) windowFn)
-            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+            .withTimestampCombiner(TimestampCombiner.EARLIEST)
             .withTrigger(AfterWatermark.pastEndOfWindow().withLateFirings(Never.ever()))
             .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
             .withAllowedLateness(allowedLateness);
@@ -315,7 +315,7 @@ public class ReduceFnRunnerTest {
 
     WindowingStrategy<?, IntervalWindow> strategy =
         WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
-            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+            .withTimestampCombiner(TimestampCombiner.EARLIEST)
             .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
             .withAllowedLateness(Duration.millis(100));
 
@@ -615,7 +615,7 @@ public class ReduceFnRunnerTest {
                 AfterWatermark.pastEndOfWindow())))
             .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
             .withAllowedLateness(Duration.millis(100))
-            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+            .withTimestampCombiner(TimestampCombiner.EARLIEST)
             .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
 
     tester.advanceInputWatermark(new Instant(0));
@@ -668,7 +668,7 @@ public class ReduceFnRunnerTest {
                 AfterWatermark.pastEndOfWindow())))
             .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
             .withAllowedLateness(Duration.millis(100))
-            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+            .withTimestampCombiner(TimestampCombiner.EARLIEST)
             .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY));
 
     tester.advanceInputWatermark(new Instant(0));
@@ -695,7 +695,7 @@ public class ReduceFnRunnerTest {
                 AfterWatermark.pastEndOfWindow())))
             .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
             .withAllowedLateness(Duration.millis(100))
-            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+            .withTimestampCombiner(TimestampCombiner.EARLIEST)
             .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
 
     tester.advanceInputWatermark(new Instant(0));
@@ -724,7 +724,7 @@ public class ReduceFnRunnerTest {
                 AfterWatermark.pastEndOfWindow())))
             .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
             .withAllowedLateness(Duration.millis(100))
-            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+            .withTimestampCombiner(TimestampCombiner.EARLIEST)
             .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
 
     tester.advanceInputWatermark(new Instant(0));
@@ -1195,7 +1195,7 @@ public class ReduceFnRunnerTest {
 
     WindowingStrategy<?, IntervalWindow> strategy =
         WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
-            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+            .withTimestampCombiner(TimestampCombiner.EARLIEST)
             .withTrigger(
                 AfterEach.<IntervalWindow>inOrder(
                     Repeatedly.forever(
@@ -1251,16 +1251,16 @@ public class ReduceFnRunnerTest {
 
     WindowingStrategy<?, IntervalWindow> strategy =
         WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
-            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
-            .withTrigger(AfterEach.<IntervalWindow>inOrder(
-                Repeatedly
-                    .forever(
-                        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
-                            new Duration(5)))
-                    .orFinally(AfterWatermark.pastEndOfWindow()),
-                Repeatedly.forever(
-                    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
-                        new Duration(25)))))
+            .withTimestampCombiner(TimestampCombiner.EARLIEST)
+            .withTrigger(
+                AfterEach.<IntervalWindow>inOrder(
+                    Repeatedly.forever(
+                            AfterProcessingTime.pastFirstElementInPane()
+                                .plusDelayOf(new Duration(5)))
+                        .orFinally(AfterWatermark.pastEndOfWindow()),
+                    Repeatedly.forever(
+                        AfterProcessingTime.pastFirstElementInPane()
+                            .plusDelayOf(new Duration(25)))))
             .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
             .withAllowedLateness(Duration.millis(100));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 549fd8a..b5b5492 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -58,8 +58,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -161,7 +161,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
           throws Exception {
     WindowingStrategy<?, W> strategy =
         WindowingStrategy.of(windowFn)
-            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+            .withTimestampCombiner(TimestampCombiner.EARLIEST)
             .withMode(mode)
             .withAllowedLateness(allowedDataLateness)
             .withClosingBehavior(closingBehavior);
@@ -329,8 +329,10 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
     assertHasOnlyGlobalAndAllowedTags(
         ImmutableSet.copyOf(expectedWindows),
         ImmutableSet.<StateTag<? super String, ?>>of(
-            TriggerStateMachineRunner.FINISHED_BITS_TAG, PaneInfoTracker.PANE_INFO_TAG,
-            WatermarkHold.watermarkHoldTagForOutputTimeFn(objectStrategy.getOutputTimeFn()),
+            TriggerStateMachineRunner.FINISHED_BITS_TAG,
+            PaneInfoTracker.PANE_INFO_TAG,
+            WatermarkHold.watermarkHoldTagForTimestampCombiner(
+                objectStrategy.getTimestampCombiner()),
             WatermarkHold.EXTRA_HOLD_TAG));
   }
 
@@ -345,7 +347,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
         ImmutableSet.copyOf(expectedWindows),
         ImmutableSet.<StateTag<? super String, ?>>of(
             PaneInfoTracker.PANE_INFO_TAG,
-            WatermarkHold.watermarkHoldTagForOutputTimeFn(objectStrategy.getOutputTimeFn()),
+            WatermarkHold.watermarkHoldTagForTimestampCombiner(
+                objectStrategy.getTimestampCombiner()),
             WatermarkHold.EXTRA_HOLD_TAG));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
index 5f5d92d..10dcb62 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.CombineFnUtil;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -97,15 +97,11 @@ public class StateTagTest {
 
   @Test
   public void testWatermarkBagEquality() {
-    StateTag<?, ?> foo1 = StateTags.watermarkStateInternal(
-        "foo", OutputTimeFns.outputAtEarliestInputTimestamp());
-    StateTag<?, ?> foo2 = StateTags.watermarkStateInternal(
-        "foo", OutputTimeFns.outputAtEarliestInputTimestamp());
-    StateTag<?, ?> bar = StateTags.watermarkStateInternal(
-        "bar", OutputTimeFns.outputAtEarliestInputTimestamp());
-
-    StateTag<?, ?> bar2 = StateTags.watermarkStateInternal(
-        "bar", OutputTimeFns.outputAtLatestInputTimestamp());
+    StateTag<?, ?> foo1 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
+    StateTag<?, ?> foo2 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
+    StateTag<?, ?> bar = StateTags.watermarkStateInternal("bar", TimestampCombiner.EARLIEST);
+
+    StateTag<?, ?> bar2 = StateTags.watermarkStateInternal("bar", TimestampCombiner.LATEST);
 
     // Same id, same fn.
     assertEquals(foo1, foo2);

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
index 0665812..068b37f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
@@ -43,7 +43,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.CombineFnUtil;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
@@ -213,7 +213,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
       Instant earliest = BoundedWindow.TIMESTAMP_MAX_VALUE;
       for (State existingState : this.values()) {
         if (existingState instanceof WatermarkHoldState) {
-          Instant hold = ((WatermarkHoldState<?>) existingState).read();
+          Instant hold = ((WatermarkHoldState) existingState).read();
           if (hold != null && hold.isBefore(earliest)) {
             earliest = hold;
           }
@@ -276,18 +276,18 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
       public StateBinder<K> forNamespace(final StateNamespace namespace, final StateContext<?> c) {
         return new StateBinder<K>() {
           @Override
-          public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
-              StateTag<? super K, WatermarkHoldState<W>> address,
-              OutputTimeFn<? super W> outputTimeFn) {
+          public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+              StateTag<? super K, WatermarkHoldState> address,
+              TimestampCombiner timestampCombiner) {
             if (containedInUnderlying(namespace, address)) {
               @SuppressWarnings("unchecked")
-              InMemoryState<? extends WatermarkHoldState<W>> existingState =
-                  (InMemoryState<? extends WatermarkHoldState<W>>)
+              InMemoryState<? extends WatermarkHoldState> existingState =
+                  (InMemoryState<? extends WatermarkHoldState>)
                   underlying.get().get(namespace, address, c);
               return existingState.copy();
             } else {
               return new InMemoryWatermarkHold<>(
-                  outputTimeFn);
+                  timestampCombiner);
             }
           }
 
@@ -419,7 +419,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
               State state =
                   readTo.get(namespace, existingState.getKey(), StateContexts.nullContext());
               if (state instanceof WatermarkHoldState) {
-                Instant hold = ((WatermarkHoldState<?>) state).read();
+                Instant hold = ((WatermarkHoldState) state).read();
                 if (hold != null && hold.isBefore(earliestHold)) {
                   earliestHold = hold;
                 }
@@ -434,9 +434,9 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
       public StateBinder<K> forNamespace(final StateNamespace namespace, final StateContext<?> c) {
         return new StateBinder<K>() {
           @Override
-          public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
-              StateTag<? super K, WatermarkHoldState<W>> address,
-              OutputTimeFn<? super W> outputTimeFn) {
+          public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+              StateTag<? super K, WatermarkHoldState> address,
+              TimestampCombiner timestampCombiner) {
             return underlying.get(namespace, address, c);
           }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index b08aa8e..322c995 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -40,8 +40,8 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.AfterPane;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -135,14 +135,14 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
               // to alter the flow of data. This entails:
               //  - trigger as fast as possible
               //  - maintain the full timestamps of elements
-              //  - ensure this GBK holds to the minimum of those timestamps (via OutputTimeFn)
+              //  - ensure this GBK holds to the minimum of those timestamps (via TimestampCombiner)
               //  - discard past panes as it is "just a stream" of elements
               .apply(
                   Window.<KV<K, WindowedValue<KV<K, InputT>>>>configure()
                       .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
                       .discardingFiredPanes()
                       .withAllowedLateness(inputWindowingStrategy.getAllowedLateness())
-                      .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()))
+                      .withTimestampCombiner(TimestampCombiner.EARLIEST))
 
               // A full GBK to group by key _and_ window
               .apply("Group by key", GroupByKey.<K, WindowedValue<KV<K, InputT>>>create())

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index 68c6613..f0aeece 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -43,8 +43,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.GroupingState;
@@ -289,13 +288,12 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     CopyOnAccessInMemoryStateInternals<String> underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
 
-    OutputTimeFn<BoundedWindow> outputTimeFn =
-        OutputTimeFns.outputAtEarliestInputTimestamp();
+    TimestampCombiner timestampCombiner = TimestampCombiner.EARLIEST;
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, WatermarkHoldState<BoundedWindow>> stateTag =
-        StateTags.watermarkStateInternal("wmstate", outputTimeFn);
-    WatermarkHoldState<?> underlyingValue = underlying.state(namespace, stateTag);
+    StateTag<Object, WatermarkHoldState> stateTag =
+        StateTags.watermarkStateInternal("wmstate", timestampCombiner);
+    WatermarkHoldState underlyingValue = underlying.state(namespace, stateTag);
     assertThat(underlyingValue.read(), nullValue());
 
     underlyingValue.add(new Instant(250L));
@@ -303,7 +301,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
     CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
-    WatermarkHoldState<BoundedWindow> copyOnAccessState = internals.state(namespace, stateTag);
+    WatermarkHoldState copyOnAccessState = internals.state(namespace, stateTag);
     assertThat(copyOnAccessState.read(), equalTo(new Instant(250L)));
 
     copyOnAccessState.add(new Instant(100L));
@@ -313,7 +311,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     copyOnAccessState.add(new Instant(500L));
     assertThat(copyOnAccessState.read(), equalTo(new Instant(100L)));
 
-    WatermarkHoldState<BoundedWindow> reReadUnderlyingValue =
+    WatermarkHoldState reReadUnderlyingValue =
         underlying.state(namespace, stateTag);
     assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read()));
   }
@@ -514,15 +512,15 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
 
-    StateTag<Object, WatermarkHoldState<BoundedWindow>> firstHoldAddress =
-        StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
-    WatermarkHoldState<BoundedWindow> firstHold =
+    StateTag<Object, WatermarkHoldState> firstHoldAddress =
+        StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
+    WatermarkHoldState firstHold =
         internals.state(StateNamespaces.window(null, first), firstHoldAddress);
     firstHold.add(new Instant(22L));
 
-    StateTag<Object, WatermarkHoldState<BoundedWindow>> secondHoldAddress =
-        StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
-    WatermarkHoldState<BoundedWindow> secondHold =
+    StateTag<Object, WatermarkHoldState> secondHoldAddress =
+        StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
+    WatermarkHoldState secondHold =
         internals.state(StateNamespaces.window(null, second), secondHoldAddress);
     secondHold.add(new Instant(2L));
 
@@ -546,18 +544,18 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     };
     CopyOnAccessInMemoryStateInternals<String> underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
-    StateTag<Object, WatermarkHoldState<BoundedWindow>> firstHoldAddress =
-        StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
-    WatermarkHoldState<BoundedWindow> firstHold =
+    StateTag<Object, WatermarkHoldState> firstHoldAddress =
+        StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
+    WatermarkHoldState firstHold =
         underlying.state(StateNamespaces.window(null, first), firstHoldAddress);
     firstHold.add(new Instant(22L));
 
     CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying("foo", underlying.commit());
 
-    StateTag<Object, WatermarkHoldState<BoundedWindow>> secondHoldAddress =
-        StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
-    WatermarkHoldState<BoundedWindow> secondHold =
+    StateTag<Object, WatermarkHoldState> secondHoldAddress =
+        StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
+    WatermarkHoldState secondHold =
         internals.state(StateNamespaces.window(null, second), secondHoldAddress);
     secondHold.add(new Instant(244L));
 
@@ -583,18 +581,18 @@ public class CopyOnAccessInMemoryStateInternalsTest {
         };
     CopyOnAccessInMemoryStateInternals<String> underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
-    StateTag<Object, WatermarkHoldState<BoundedWindow>> firstHoldAddress =
-        StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
-    WatermarkHoldState<BoundedWindow> firstHold =
+    StateTag<Object, WatermarkHoldState> firstHoldAddress =
+        StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
+    WatermarkHoldState firstHold =
         underlying.state(StateNamespaces.window(null, first), firstHoldAddress);
     firstHold.add(new Instant(224L));
 
     CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying("foo", underlying.commit());
 
-    StateTag<Object, WatermarkHoldState<BoundedWindow>> secondHoldAddress =
-        StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
-    WatermarkHoldState<BoundedWindow> secondHold =
+    StateTag<Object, WatermarkHoldState> secondHoldAddress =
+        StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
+    WatermarkHoldState secondHold =
         internals.state(StateNamespaces.window(null, second), secondHoldAddress);
     secondHold.add(new Instant(24L));
 
@@ -610,7 +608,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     internals
         .state(
             StateNamespaces.global(),
-            StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp()))
+            StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST))
         .add(new Instant(1234L));
 
     thrown.expect(IllegalStateException.class);

http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java
index b904bfe..7ee2f69 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java
@@ -29,8 +29,8 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -60,8 +60,8 @@ public class HashingFlinkCombineRunner<
 
 
     @SuppressWarnings("unchecked")
-    OutputTimeFn<? super BoundedWindow> outputTimeFn =
-        (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
+    TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
+    WindowFn<Object, W> windowFn = windowingStrategy.getWindowFn();
 
     // Flink Iterable can be iterated over only once.
     List<WindowedValue<KV<K, InputT>>> inputs = new ArrayList<>();
@@ -87,14 +87,21 @@ public class HashingFlinkCombineRunner<
           AccumT accumT = flinkCombiner.firstInput(key, currentValue.getValue().getValue(),
               options, sideInputReader, singletonW);
           Instant windowTimestamp =
-              outputTimeFn.assignOutputTime(currentValue.getTimestamp(), mergedWindow);
+              timestampCombiner.assign(
+                  mergedWindow, windowFn.getOutputTime(currentValue.getTimestamp(), mergedWindow));
           accumAndInstant = new Tuple2<>(accumT, windowTimestamp);
           mapState.put(mergedWindow, accumAndInstant);
         } else {
           accumAndInstant.f0 = flinkCombiner.addInput(key, accumAndInstant.f0,
               currentValue.getValue().getValue(), options, sideInputReader, singletonW);
-          accumAndInstant.f1 = outputTimeFn.combine(accumAndInstant.f1,
-              outputTimeFn.assignOutputTime(currentValue.getTimestamp(), mergedWindow));
+          accumAndInstant.f1 =
+              timestampCombiner.combine(
+                  accumAndInstant.f1,
+                  timestampCombiner.assign(
+                      mergedWindow,
+                      windowingStrategy
+                          .getWindowFn()
+                          .getOutputTime(currentValue.getTimestamp(), mergedWindow)));
         }
       }
       if (iterator.hasNext()) {