You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/27 03:32:53 UTC

[1/4] beam git commit: Revert "Replace OutputTimeFn UDF with TimestampCombiner enum"

Repository: beam
Updated Branches:
  refs/heads/master 3bcbba121 -> b82cd2446


http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
index 14f818a..268718a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
@@ -20,17 +20,20 @@ package org.apache.beam.sdk.util;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.Objects;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.joda.time.Duration;
+import org.joda.time.Instant;
 
 /**
  * A {@code WindowingStrategy} describes the windowing behavior for a specific collection of values.
@@ -55,22 +58,22 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
   private static final WindowingStrategy<Object, GlobalWindow> DEFAULT = of(new GlobalWindows());
 
   private final WindowFn<T, W> windowFn;
+  private final OutputTimeFn<? super W> outputTimeFn;
   private final Trigger trigger;
   private final AccumulationMode mode;
   private final Duration allowedLateness;
   private final ClosingBehavior closingBehavior;
-  private final TimestampCombiner timestampCombiner;
   private final boolean triggerSpecified;
   private final boolean modeSpecified;
   private final boolean allowedLatenessSpecified;
-  private final boolean timestampCombinerSpecified;
+  private final boolean outputTimeFnSpecified;
 
   private WindowingStrategy(
       WindowFn<T, W> windowFn,
       Trigger trigger, boolean triggerSpecified,
       AccumulationMode mode, boolean modeSpecified,
       Duration allowedLateness, boolean allowedLatenessSpecified,
-      TimestampCombiner timestampCombiner, boolean timestampCombinerSpecified,
+      OutputTimeFn<? super W> outputTimeFn, boolean outputTimeFnSpecified,
       ClosingBehavior closingBehavior) {
     this.windowFn = windowFn;
     this.trigger = trigger;
@@ -80,8 +83,8 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
     this.allowedLateness = allowedLateness;
     this.allowedLatenessSpecified = allowedLatenessSpecified;
     this.closingBehavior = closingBehavior;
-    this.timestampCombiner = timestampCombiner;
-    this.timestampCombinerSpecified = timestampCombinerSpecified;
+    this.outputTimeFn = outputTimeFn;
+    this.outputTimeFnSpecified = outputTimeFnSpecified;
   }
 
   /**
@@ -97,7 +100,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
         DefaultTrigger.of(), false,
         AccumulationMode.DISCARDING_FIRED_PANES, false,
         DEFAULT_ALLOWED_LATENESS, false,
-        TimestampCombiner.END_OF_WINDOW, false,
+        new CombineWindowFnOutputTimes(OutputTimeFns.outputAtEndOfWindow(), windowFn), false,
         ClosingBehavior.FIRE_IF_NON_EMPTY);
   }
 
@@ -133,12 +136,12 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
     return closingBehavior;
   }
 
-  public TimestampCombiner getTimestampCombiner() {
-    return timestampCombiner;
+  public OutputTimeFn<? super W> getOutputTimeFn() {
+    return outputTimeFn;
   }
 
-  public boolean isTimestampCombinerSpecified() {
-    return timestampCombinerSpecified;
+  public boolean isOutputTimeFnSpecified() {
+    return outputTimeFnSpecified;
   }
 
   /**
@@ -151,7 +154,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
         trigger, true,
         mode, modeSpecified,
         allowedLateness, allowedLatenessSpecified,
-        timestampCombiner, timestampCombinerSpecified,
+        outputTimeFn, outputTimeFnSpecified,
         closingBehavior);
   }
 
@@ -165,7 +168,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
         trigger, triggerSpecified,
         mode, true,
         allowedLateness, allowedLatenessSpecified,
-        timestampCombiner, timestampCombinerSpecified,
+        outputTimeFn, outputTimeFnSpecified,
         closingBehavior);
   }
 
@@ -177,12 +180,17 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
     @SuppressWarnings("unchecked")
     WindowFn<T, W> typedWindowFn = (WindowFn<T, W>) wildcardWindowFn;
 
+    // The onus of type correctness falls on the callee.
+    @SuppressWarnings("unchecked")
+    OutputTimeFn<? super W> newOutputTimeFn = (OutputTimeFn<? super W>)
+        new CombineWindowFnOutputTimes<W>(outputTimeFn, typedWindowFn);
+
     return new WindowingStrategy<T, W>(
         typedWindowFn,
         trigger, triggerSpecified,
         mode, modeSpecified,
         allowedLateness, allowedLatenessSpecified,
-        timestampCombiner, timestampCombinerSpecified,
+        newOutputTimeFn, outputTimeFnSpecified,
         closingBehavior);
   }
 
@@ -196,7 +204,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
         trigger, triggerSpecified,
         mode, modeSpecified,
         allowedLateness, true,
-        timestampCombiner, timestampCombinerSpecified,
+        outputTimeFn, outputTimeFnSpecified,
         closingBehavior);
   }
 
@@ -206,19 +214,40 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
         trigger, triggerSpecified,
         mode, modeSpecified,
         allowedLateness, allowedLatenessSpecified,
-        timestampCombiner, timestampCombinerSpecified,
+        outputTimeFn, outputTimeFnSpecified,
         closingBehavior);
   }
 
   @Experimental(Experimental.Kind.OUTPUT_TIME)
-  public WindowingStrategy<T, W> withTimestampCombiner(TimestampCombiner timestampCombiner) {
+  public WindowingStrategy<T, W> withOutputTimeFn(OutputTimeFn<?> outputTimeFn) {
+
+    @SuppressWarnings("unchecked")
+    OutputTimeFn<? super W> typedOutputTimeFn = (OutputTimeFn<? super W>) outputTimeFn;
+
+    OutputTimeFn<? super W> newOutputTimeFn =
+        new CombineWindowFnOutputTimes<W>(typedOutputTimeFn, windowFn);
 
     return new WindowingStrategy<T, W>(
         windowFn,
         trigger, triggerSpecified,
         mode, modeSpecified,
         allowedLateness, allowedLatenessSpecified,
-        timestampCombiner, true,
+        newOutputTimeFn, true,
+        closingBehavior);
+  }
+
+  /**
+   * Fixes all the defaults so that equals can be used to check that two strategies are the same,
+   * regardless of the state of "defaulted-ness".
+   */
+  @VisibleForTesting
+  public WindowingStrategy<T, W> fixDefaults() {
+    return new WindowingStrategy<>(
+        windowFn,
+        trigger, true,
+        mode, true,
+        allowedLateness, true,
+        outputTimeFn, true,
         closingBehavior);
   }
 
@@ -229,7 +258,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
         .add("allowedLateness", allowedLateness)
         .add("trigger", trigger)
         .add("accumulationMode", mode)
-        .add("timestampCombiner", timestampCombiner)
+        .add("outputTimeFn", outputTimeFn)
         .toString();
   }
 
@@ -239,45 +268,104 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
       return false;
     }
     WindowingStrategy<?, ?> other = (WindowingStrategy<?, ?>) object;
-    return isTriggerSpecified() == other.isTriggerSpecified()
+    return
+        isTriggerSpecified() == other.isTriggerSpecified()
         && isAllowedLatenessSpecified() == other.isAllowedLatenessSpecified()
         && isModeSpecified() == other.isModeSpecified()
-        && isTimestampCombinerSpecified() == other.isTimestampCombinerSpecified()
         && getMode().equals(other.getMode())
         && getAllowedLateness().equals(other.getAllowedLateness())
         && getClosingBehavior().equals(other.getClosingBehavior())
         && getTrigger().equals(other.getTrigger())
-        && getTimestampCombiner().equals(other.getTimestampCombiner())
         && getWindowFn().equals(other.getWindowFn());
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(
-        triggerSpecified,
-        allowedLatenessSpecified,
-        modeSpecified,
-        timestampCombinerSpecified,
-        mode,
-        allowedLateness,
-        closingBehavior,
-        trigger,
-        timestampCombiner,
-        windowFn);
+    return Objects.hash(triggerSpecified, allowedLatenessSpecified, modeSpecified,
+        windowFn, trigger, mode, allowedLateness, closingBehavior);
   }
 
   /**
-   * Fixes all the defaults so that equals can be used to check that two strategies are the same,
-   * regardless of the state of "defaulted-ness".
+   * An {@link OutputTimeFn} that uses {@link WindowFn#getOutputTime} to assign initial timestamps
+   * but then combines and merges according to a given {@link OutputTimeFn}.
+   *
+   * <ul>
+   *   <li>The {@link WindowFn#getOutputTime} allows adjustments such as that whereby
+   *       {@link org.apache.beam.sdk.transforms.windowing.SlidingWindows#getOutputTime}
+   *       moves elements later in time to avoid holding up progress downstream.</li>
+   *   <li>Then, when multiple elements are buffered for output, the output timestamp of the
+   *       result is calculated using {@link OutputTimeFn#combine}.</li>
+   *   <li>In the case of a merging {@link WindowFn}, the output timestamp when windows merge
+   *       is calculated using {@link OutputTimeFn#merge}.</li>
+   * </ul>
    */
-  @VisibleForTesting
-  public WindowingStrategy<T, W> fixDefaults() {
-    return new WindowingStrategy<>(
-        windowFn,
-        trigger, true,
-        mode, true,
-        allowedLateness, true,
-        timestampCombiner, true,
-        closingBehavior);
+  public static class CombineWindowFnOutputTimes<W extends BoundedWindow>
+      extends OutputTimeFn<W> {
+
+    private final OutputTimeFn<? super W> outputTimeFn;
+    private final WindowFn<?, W> windowFn;
+
+    public CombineWindowFnOutputTimes(
+        OutputTimeFn<? super W> outputTimeFn, WindowFn<?, W> windowFn) {
+      this.outputTimeFn = outputTimeFn;
+      this.windowFn = windowFn;
+    }
+
+    public OutputTimeFn<? super W> getOutputTimeFn() {
+      return outputTimeFn;
+    }
+
+    @Override
+    public Instant assignOutputTime(Instant inputTimestamp, W window) {
+      return outputTimeFn.merge(
+          window, Collections.singleton(windowFn.getOutputTime(inputTimestamp, window)));
+    }
+
+    @Override
+    public Instant combine(Instant timestamp, Instant otherTimestamp) {
+      return outputTimeFn.combine(timestamp, otherTimestamp);
+    }
+
+    @Override
+    public Instant merge(W newWindow, Iterable<? extends Instant> timestamps) {
+      return outputTimeFn.merge(newWindow, timestamps);
+    }
+
+    @Override
+    public final boolean dependsOnlyOnWindow() {
+      return outputTimeFn.dependsOnlyOnWindow();
+    }
+
+    @Override
+    public boolean dependsOnlyOnEarliestInputTimestamp() {
+      return outputTimeFn.dependsOnlyOnEarliestInputTimestamp();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      }
+
+      if (!(obj instanceof CombineWindowFnOutputTimes)) {
+        return false;
+      }
+
+      CombineWindowFnOutputTimes<?> that = (CombineWindowFnOutputTimes<?>) obj;
+      return outputTimeFn.equals(that.outputTimeFn) && windowFn.equals(that.windowFn);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(outputTimeFn, windowFn);
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("outputTimeFn", outputTimeFn)
+          .add("windowFn", windowFn)
+          .toString();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
index f9ab115..64841fb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
@@ -21,7 +21,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 
 /**
  * Visitor for binding a {@link StateSpec} and to the associated {@link State}.
@@ -63,11 +63,11 @@ public interface StateBinder<K> {
   /**
    * Bind to a watermark {@link StateSpec}.
    *
-   * <p>This accepts the {@link TimestampCombiner} that dictates how watermark hold timestamps added
-   * to the returned {@link WatermarkHoldState} are to be combined.
+   * <p>This accepts the {@link OutputTimeFn} that dictates how watermark hold timestamps added to
+   * the returned {@link WatermarkHoldState} are to be combined.
    */
-  <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+  <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
       String id,
-      StateSpec<? super K, WatermarkHoldState> spec,
-      TimestampCombiner timestampCombiner);
+      StateSpec<? super K, WatermarkHoldState<W>> spec,
+      OutputTimeFn<? super W> outputTimeFn);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
index 8fa5bb0..dc647da 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 
 /**
  * Static utility methods for creating {@link StateSpec} instances.
@@ -208,9 +208,9 @@ public class StateSpecs {
 
   /** Create a state spec for holding the watermark. */
   public static <W extends BoundedWindow>
-      StateSpec<Object, WatermarkHoldState> watermarkStateInternal(
-          TimestampCombiner timestampCombiner) {
-    return new WatermarkStateSpecInternal<W>(timestampCombiner);
+      StateSpec<Object, WatermarkHoldState<W>> watermarkStateInternal(
+          OutputTimeFn<? super W> outputTimeFn) {
+    return new WatermarkStateSpecInternal<W>(outputTimeFn);
   }
 
   public static <K, InputT, AccumT, OutputT>
@@ -656,26 +656,26 @@ public class StateSpecs {
   /**
    * A specification for a state cell tracking a combined watermark hold.
    *
-   * <p>Includes the {@link TimestampCombiner} according to which the output times
+   * <p>Includes the {@link OutputTimeFn} according to which the output times
    * are combined.
    */
   private static class WatermarkStateSpecInternal<W extends BoundedWindow>
-      implements StateSpec<Object, WatermarkHoldState> {
+      implements StateSpec<Object, WatermarkHoldState<W>> {
 
     /**
      * When multiple output times are added to hold the watermark, this determines how they are
      * combined, and also the behavior when merging windows. Does not contribute to equality/hash
      * since we have at most one watermark hold spec per computation.
      */
-    private final TimestampCombiner timestampCombiner;
+    private final OutputTimeFn<? super W> outputTimeFn;
 
-    private WatermarkStateSpecInternal(TimestampCombiner timestampCombiner) {
-      this.timestampCombiner = timestampCombiner;
+    private WatermarkStateSpecInternal(OutputTimeFn<? super W> outputTimeFn) {
+      this.outputTimeFn = outputTimeFn;
     }
 
     @Override
-    public WatermarkHoldState bind(String id, StateBinder<?> visitor) {
-      return visitor.bindWatermark(id, this, timestampCombiner);
+    public WatermarkHoldState<W> bind(String id, StateBinder<?> visitor) {
+      return visitor.bindWatermark(id, this, outputTimeFn);
     }
 
     @Override
@@ -701,4 +701,5 @@ public class StateSpecs {
       return Objects.hash(getClass());
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
index ae9b700..20fa05f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
@@ -19,24 +19,25 @@ package org.apache.beam.sdk.util.state;
 
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.joda.time.Instant;
 
 /**
- * A {@link State} accepting and aggregating output timestamps, which determines the time to which
- * the output watermark must be held.
+ * A {@link State} accepting and aggregating output timestamps, which determines
+ * the time to which the output watermark must be held.
  *
  * <p><b><i>For internal use only. This API may change at any time.</i></b>
  */
 @Experimental(Kind.STATE)
-public interface WatermarkHoldState extends GroupingState<Instant, Instant> {
+public interface WatermarkHoldState<W extends BoundedWindow>
+    extends GroupingState<Instant, Instant> {
   /**
-   * Return the {@link TimestampCombiner} which will be used to determine a watermark hold time
-   * given an element timestamp, and to combine watermarks from windows which are about to be
-   * merged.
+   * Return the {@link OutputTimeFn} which will be used to determine a watermark hold time given
+   * an element timestamp, and to combine watermarks from windows which are about to be merged.
    */
-  TimestampCombiner getTimestampCombiner();
+  OutputTimeFn<? super W> getOutputTimeFn();
 
   @Override
-  WatermarkHoldState readLater();
+  WatermarkHoldState<W> readLater();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java b/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
index 26dd9f9..153bd84 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
@@ -39,6 +39,7 @@ public class SdkCoreApiSurfaceTest {
         ImmutableSet.of(
             "org.apache.beam",
             "com.google.api.client",
+            "com.google.protobuf",
             "com.fasterxml.jackson.annotation",
             "com.fasterxml.jackson.core",
             "com.fasterxml.jackson.databind",

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 0556199..939261f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -51,8 +51,8 @@ import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.Reshuffle;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -318,14 +318,14 @@ public class GroupByKeyTest {
    */
   @Test
   @Category(ValidatesRunner.class)
-  public void testTimestampCombinerEarliest() {
+  public void testOutputTimeFnEarliest() {
 
     p.apply(
         Create.timestamped(
             TimestampedValue.of(KV.of(0, "hello"), new Instant(0)),
             TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10))))
         .apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(10)))
-            .withTimestampCombiner(TimestampCombiner.EARLIEST))
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()))
         .apply(GroupByKey.<Integer, String>create())
         .apply(ParDo.of(new AssertTimestamp(new Instant(0))));
 
@@ -339,13 +339,13 @@ public class GroupByKeyTest {
    */
   @Test
   @Category(ValidatesRunner.class)
-  public void testTimestampCombinerLatest() {
+  public void testOutputTimeFnLatest() {
     p.apply(
         Create.timestamped(
             TimestampedValue.of(KV.of(0, "hello"), new Instant(0)),
             TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10))))
         .apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(10)))
-            .withTimestampCombiner(TimestampCombiner.LATEST))
+            .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp()))
         .apply(GroupByKey.<Integer, String>create())
         .apply(ParDo.of(new AssertTimestamp(new Instant(10))));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
index 9a17bc7..4e61f4e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
@@ -41,7 +41,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -241,7 +241,7 @@ public class CoGroupByKeyTest implements Serializable {
             Arrays.asList(0L, 2L, 4L, 6L, 8L))
         .apply("WindowClicks", Window.<KV<Integer, String>>into(
             FixedWindows.of(new Duration(4)))
-            .withTimestampCombiner(TimestampCombiner.EARLIEST));
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()));
 
     PCollection<KV<Integer, String>> purchasesTable =
         createInput("CreatePurchases",
@@ -250,7 +250,7 @@ public class CoGroupByKeyTest implements Serializable {
             Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L))
         .apply("WindowPurchases", Window.<KV<Integer, String>>into(
             FixedWindows.of(new Duration(4)))
-            .withTimestampCombiner(TimestampCombiner.EARLIEST));
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()));
 
     PCollection<KV<Integer, CoGbkResult>> coGbkResults =
         KeyedPCollectionTuple.of(clicksTag, clicksTable)

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java
new file mode 100644
index 0000000..78d7a2f
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/** Tests for {@link OutputTimeFns}. */
+@RunWith(Parameterized.class)
+public class OutputTimeFnsTest {
+
+  @Parameters(name = "{index}: {0}")
+  public static Iterable<OutputTimeFn<BoundedWindow>> data() {
+    return ImmutableList.of(
+        OutputTimeFns.outputAtEarliestInputTimestamp(),
+        OutputTimeFns.outputAtLatestInputTimestamp(),
+        OutputTimeFns.outputAtEndOfWindow());
+  }
+
+  @Parameter(0)
+  public OutputTimeFn<?> outputTimeFn;
+
+  @Test
+  public void testToProtoAndBack() throws Exception {
+    OutputTimeFn<?> result = OutputTimeFns.fromProto(OutputTimeFns.toProto(outputTimeFn));
+
+    assertThat(result, equalTo((OutputTimeFn) outputTimeFn));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java
index 9d94928..b131688 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java
@@ -118,7 +118,7 @@ public class SessionsTest {
   }
 
   /**
-   * Test to confirm that {@link Sessions} with the default {@link TimestampCombiner} holds up the
+   * Test to confirm that {@link Sessions} with the default {@link OutputTimeFn} holds up the
    * watermark potentially indefinitely.
    */
   @Test
@@ -126,7 +126,7 @@ public class SessionsTest {
     try {
       WindowFnTestUtils.<Object, IntervalWindow>validateGetOutputTimestamps(
           Sessions.withGapDuration(Duration.millis(10)),
-          TimestampCombiner.EARLIEST,
+          OutputTimeFns.outputAtEarliestInputTimestamp(),
           ImmutableList.of(
               (List<Long>) ImmutableList.of(1L, 3L),
               (List<Long>) ImmutableList.of(0L, 5L, 10L, 15L, 20L)));
@@ -148,7 +148,7 @@ public class SessionsTest {
   public void testValidOutputAtEndTimes() throws Exception {
     WindowFnTestUtils.<Object, IntervalWindow>validateGetOutputTimestamps(
         Sessions.withGapDuration(Duration.millis(10)),
-        TimestampCombiner.END_OF_WINDOW,
+        OutputTimeFns.outputAtEndOfWindow(),
           ImmutableList.of(
               (List<Long>) ImmutableList.of(1L, 3L),
               (List<Long>) ImmutableList.of(0L, 5L, 10L, 15L, 20L)));

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index 534e230..e1ed66a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -366,7 +366,7 @@ public class WindowTest implements Serializable {
    */
   @Test
   @Category(ValidatesRunner.class)
-  public void testTimestampCombinerDefault() {
+  public void testOutputTimeFnDefault() {
     pipeline.enableAbandonedNodeEnforcement(true);
 
     pipeline
@@ -400,7 +400,7 @@ public class WindowTest implements Serializable {
    */
   @Test
   @Category(ValidatesRunner.class)
-  public void testTimestampCombinerEndOfWindow() {
+  public void testOutputTimeFnEndOfWindow() {
     pipeline.enableAbandonedNodeEnforcement(true);
 
     pipeline.apply(
@@ -408,7 +408,7 @@ public class WindowTest implements Serializable {
             TimestampedValue.of(KV.of(0, "hello"), new Instant(0)),
             TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10))))
         .apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(10)))
-            .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))
+            .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
         .apply(GroupByKey.<Integer, String>create())
         .apply(ParDo.of(new DoFn<KV<Integer, Iterable<String>>, Void>() {
           @ProcessElement
@@ -426,14 +426,14 @@ public class WindowTest implements Serializable {
     AfterWatermark.FromEndOfWindow triggerBuilder = AfterWatermark.pastEndOfWindow();
     Duration allowedLateness = Duration.standardMinutes(10);
     Window.ClosingBehavior closingBehavior = Window.ClosingBehavior.FIRE_IF_NON_EMPTY;
-    TimestampCombiner timestampCombiner = TimestampCombiner.END_OF_WINDOW;
+    OutputTimeFn<BoundedWindow> outputTimeFn = OutputTimeFns.outputAtEndOfWindow();
 
     Window<?> window = Window
         .into(windowFn)
         .triggering(triggerBuilder)
         .accumulatingFiredPanes()
         .withAllowedLateness(allowedLateness, closingBehavior)
-        .withTimestampCombiner(timestampCombiner);
+        .withOutputTimeFn(outputTimeFn);
 
     DisplayData displayData = DisplayData.from(window);
 
@@ -446,7 +446,7 @@ public class WindowTest implements Serializable {
     assertThat(displayData,
         hasDisplayItem("allowedLateness", allowedLateness));
     assertThat(displayData, hasDisplayItem("closingBehavior", closingBehavior.toString()));
-    assertThat(displayData, hasDisplayItem("timestampCombiner", timestampCombiner.toString()));
+    assertThat(displayData, hasDisplayItem("outputTimeFn", outputTimeFn.getClass()));
   }
 
   @Test
@@ -456,14 +456,14 @@ public class WindowTest implements Serializable {
     AfterWatermark.FromEndOfWindow triggerBuilder = AfterWatermark.pastEndOfWindow();
     Duration allowedLateness = Duration.standardMinutes(10);
     Window.ClosingBehavior closingBehavior = Window.ClosingBehavior.FIRE_IF_NON_EMPTY;
-    TimestampCombiner timestampCombiner = TimestampCombiner.END_OF_WINDOW;
+    OutputTimeFn<BoundedWindow> outputTimeFn = OutputTimeFns.outputAtEndOfWindow();
 
     Window<?> window = Window
         .into(windowFn)
         .triggering(triggerBuilder)
         .accumulatingFiredPanes()
         .withAllowedLateness(allowedLateness, closingBehavior)
-        .withTimestampCombiner(timestampCombiner);
+        .withOutputTimeFn(outputTimeFn);
 
     DisplayData primitiveDisplayData =
         Iterables.getOnlyElement(
@@ -478,8 +478,7 @@ public class WindowTest implements Serializable {
     assertThat(primitiveDisplayData,
         hasDisplayItem("allowedLateness", allowedLateness));
     assertThat(primitiveDisplayData, hasDisplayItem("closingBehavior", closingBehavior.toString()));
-    assertThat(
-        primitiveDisplayData, hasDisplayItem("timestampCombiner", timestampCombiner.toString()));
+    assertThat(primitiveDisplayData, hasDisplayItem("outputTimeFn", outputTimeFn.getClass()));
   }
 
   @Test
@@ -498,7 +497,7 @@ public class WindowTest implements Serializable {
     assertThat(displayData, not(hasDisplayItem("accumulationMode")));
     assertThat(displayData, not(hasDisplayItem("allowedLateness")));
     assertThat(displayData, not(hasDisplayItem("closingBehavior")));
-    assertThat(displayData, not(hasDisplayItem("timestampCombiner")));
+    assertThat(displayData, not(hasDisplayItem("outputTimeFn")));
   }
 
   @Test
@@ -507,7 +506,7 @@ public class WindowTest implements Serializable {
     assertThat(DisplayData.from(onlyHasAccumulationMode), not(hasDisplayItem(hasKey(isOneOf(
         "windowFn",
         "trigger",
-        "timestampCombiner",
+        "outputTimeFn",
         "allowedLateness",
         "closingBehavior")))));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
index 30b0311..a3f5352 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
@@ -76,7 +76,7 @@ public class WindowingTest implements Serializable {
     public PCollection<String> expand(PCollection<String> in) {
       return in.apply("Window",
               Window.<String>into(windowFn)
-                  .withTimestampCombiner(TimestampCombiner.EARLIEST))
+                  .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()))
           .apply(Count.<String>perElement())
           .apply("FormatCounts", ParDo.of(new FormatCountsDoFn()))
           .setCoder(StringUtf8Coder.of());

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java
index 215b0f4..50edd83 100644
--- a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java
+++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java
@@ -42,6 +42,7 @@ public class GcpCoreApiSurfaceTest {
             "com.google.api.services.cloudresourcemanager",
             "com.google.api.services.storage",
             "com.google.auth",
+            "com.google.protobuf",
             "com.fasterxml.jackson.annotation",
             "com.fasterxml.jackson.core",
             "com.fasterxml.jackson.databind",


[2/4] beam git commit: Revert "Replace OutputTimeFn UDF with TimestampCombiner enum"

Posted by ke...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
index eac465c..2967f2c 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
@@ -26,9 +26,8 @@ import java.util.List;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -54,9 +53,8 @@ public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends Bou
       Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
 
     @SuppressWarnings("unchecked")
-    TimestampCombiner timestampCombiner =
-        (TimestampCombiner) windowingStrategy.getTimestampCombiner();
-    WindowFn<Object, W> windowFn = windowingStrategy.getWindowFn();
+    OutputTimeFn<? super BoundedWindow> outputTimeFn =
+        (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
 
     // get all elements so that we can sort them, has to fit into
     // memory
@@ -90,19 +88,18 @@ public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends Bou
     // create accumulator using the first elements key
     WindowedValue<KV<K, InputT>> currentValue = iterator.next();
     K key = currentValue.getValue().getKey();
-    W currentWindow = (W) Iterables.getOnlyElement(currentValue.getWindows());
+    BoundedWindow currentWindow = Iterables.getOnlyElement(currentValue.getWindows());
     InputT firstValue = currentValue.getValue().getValue();
     AccumT accumulator = flinkCombiner.firstInput(
         key, firstValue, options, sideInputReader, currentValue.getWindows());
 
-    // we use this to keep track of the timestamps assigned by the TimestampCombiner
+    // we use this to keep track of the timestamps assigned by the OutputTimeFn
     Instant windowTimestamp =
-        timestampCombiner.assign(
-            currentWindow, windowFn.getOutputTime(currentValue.getTimestamp(), currentWindow));
+        outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow);
 
     while (iterator.hasNext()) {
       WindowedValue<KV<K, InputT>> nextValue = iterator.next();
-      W nextWindow = (W) Iterables.getOnlyElement(nextValue.getWindows());
+      BoundedWindow nextWindow = Iterables.getOnlyElement(nextValue.getWindows());
 
       if (currentWindow.equals(nextWindow)) {
         // continue accumulating and merge windows
@@ -111,12 +108,9 @@ public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends Bou
         accumulator = flinkCombiner.addInput(key, accumulator, value,
             options, sideInputReader, currentValue.getWindows());
 
-        windowTimestamp =
-            timestampCombiner.combine(
-                windowTimestamp,
-                timestampCombiner.assign(
-                    currentWindow,
-                    windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow)));
+        windowTimestamp = outputTimeFn.combine(
+            windowTimestamp,
+            outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow));
 
       } else {
         // emit the value that we currently have
@@ -133,9 +127,7 @@ public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends Bou
         InputT value = nextValue.getValue().getValue();
         accumulator = flinkCombiner.firstInput(key, value,
             options, sideInputReader, currentValue.getWindows());
-        windowTimestamp =
-            timestampCombiner.assign(
-                currentWindow, windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow));
+        windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
       }
 
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
index d015c38..3203446 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.util.CombineContextFactory;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
@@ -176,9 +176,9 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
       }
 
       @Override
-      public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
-          StateTag<? super K, WatermarkHoldState> address,
-          TimestampCombiner timestampCombiner) {
+      public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+          StateTag<? super K, WatermarkHoldState<W>> address,
+          OutputTimeFn<? super W> outputTimeFn) {
          throw new UnsupportedOperationException(
              String.format("%s is not supported", WatermarkHoldState.class.getSimpleName()));
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
index 2dd7c96..24b340e 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
@@ -38,7 +38,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
@@ -186,9 +186,9 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
       }
 
       @Override
-      public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
-          StateTag<? super K, WatermarkHoldState> address,
-          TimestampCombiner timestampCombiner) {
+      public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+          StateTag<? super K, WatermarkHoldState<W>> address,
+          OutputTimeFn<? super W> outputTimeFn) {
         throw new UnsupportedOperationException(
             String.format("%s is not supported", CombiningState.class.getSimpleName()));
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
index 17ea62a..2bf0bf1 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.MapState;
@@ -146,9 +146,9 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
       }
 
       @Override
-      public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
-          StateTag<? super K, WatermarkHoldState> address,
-          TimestampCombiner timestampCombiner) {
+      public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+          StateTag<? super K, WatermarkHoldState<W>> address,
+          OutputTimeFn<? super W> outputTimeFn) {
         throw new UnsupportedOperationException(
             String.format("%s is not supported", CombiningState.class.getSimpleName()));
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index 878c914..4f961e5 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -32,7 +32,7 @@ import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.CombineContextFactory;
 import org.apache.beam.sdk.util.state.BagState;
@@ -185,12 +185,12 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
       }
 
       @Override
-      public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
-          StateTag<? super K, WatermarkHoldState> address,
-          TimestampCombiner timestampCombiner) {
+      public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+          StateTag<? super K, WatermarkHoldState<W>> address,
+          OutputTimeFn<? super W> outputTimeFn) {
 
         return new FlinkWatermarkHoldState<>(
-            flinkStateBackend, FlinkStateInternals.this, address, namespace, timestampCombiner);
+            flinkStateBackend, FlinkStateInternals.this, address, namespace, outputTimeFn);
       }
     });
   }
@@ -912,9 +912,9 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
   }
 
   private static class FlinkWatermarkHoldState<K, W extends BoundedWindow>
-      implements WatermarkHoldState {
-    private final StateTag<? super K, WatermarkHoldState> address;
-    private final TimestampCombiner timestampCombiner;
+      implements WatermarkHoldState<W> {
+    private final StateTag<? super K, WatermarkHoldState<W>> address;
+    private final OutputTimeFn<? super W> outputTimeFn;
     private final StateNamespace namespace;
     private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
     private final FlinkStateInternals<K> flinkStateInternals;
@@ -923,11 +923,11 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
     public FlinkWatermarkHoldState(
         KeyedStateBackend<ByteBuffer> flinkStateBackend,
         FlinkStateInternals<K> flinkStateInternals,
-        StateTag<? super K, WatermarkHoldState> address,
+        StateTag<? super K, WatermarkHoldState<W>> address,
         StateNamespace namespace,
-        TimestampCombiner timestampCombiner) {
+        OutputTimeFn<? super W> outputTimeFn) {
       this.address = address;
-      this.timestampCombiner = timestampCombiner;
+      this.outputTimeFn = outputTimeFn;
       this.namespace = namespace;
       this.flinkStateBackend = flinkStateBackend;
       this.flinkStateInternals = flinkStateInternals;
@@ -937,12 +937,12 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public TimestampCombiner getTimestampCombiner() {
-      return timestampCombiner;
+    public OutputTimeFn<? super W> getOutputTimeFn() {
+      return outputTimeFn;
     }
 
     @Override
-    public WatermarkHoldState readLater() {
+    public WatermarkHoldState<W> readLater() {
       return this;
     }
 
@@ -983,7 +983,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
           state.update(value);
           flinkStateInternals.watermarkHolds.put(namespace.stringKey(), value);
         } else {
-          Instant combined = timestampCombiner.combine(current, value);
+          Instant combined = outputTimeFn.combine(current, value);
           state.update(combined);
           flinkStateInternals.watermarkHolds.put(namespace.stringKey(), combined);
         }
@@ -1035,7 +1035,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
       if (!address.equals(that.address)) {
         return false;
       }
-      if (!timestampCombiner.equals(that.timestampCombiner)) {
+      if (!outputTimeFn.equals(that.outputTimeFn)) {
         return false;
       }
       return namespace.equals(that.namespace);
@@ -1045,7 +1045,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
     @Override
     public int hashCode() {
       int result = address.hashCode();
-      result = 31 * result + timestampCombiner.hashCode();
+      result = 31 * result + outputTimeFn.hashCode();
       result = 31 * result + namespace.hashCode();
       return result;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
index 17c43bf..d140271 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
@@ -77,12 +77,14 @@ public class FlinkStateInternalsTest {
           "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
   private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
       StateTags.bag("stringBag", StringUtf8Coder.of());
-  private static final StateTag<Object, WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
-      StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
-  private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR =
-      StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
-  private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR =
-      StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
+  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
+      WATERMARK_EARLIEST_ADDR =
+      StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
+  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
+      WATERMARK_LATEST_ADDR =
+      StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp());
+  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR =
+      StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow());
 
   FlinkStateInternals<String> underTest;
 
@@ -272,7 +274,7 @@ public class FlinkStateInternalsTest {
 
   @Test
   public void testWatermarkEarliestState() throws Exception {
-    WatermarkHoldState value =
+    WatermarkHoldState<BoundedWindow> value =
         underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
 
     // State instances are cached, but depend on the namespace.
@@ -296,7 +298,7 @@ public class FlinkStateInternalsTest {
 
   @Test
   public void testWatermarkLatestState() throws Exception {
-    WatermarkHoldState value =
+    WatermarkHoldState<BoundedWindow> value =
         underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
 
     // State instances are cached, but depend on the namespace.
@@ -320,7 +322,7 @@ public class FlinkStateInternalsTest {
 
   @Test
   public void testWatermarkEndOfWindowState() throws Exception {
-    WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
+    WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
 
     // State instances are cached, but depend on the namespace.
     assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
@@ -337,7 +339,7 @@ public class FlinkStateInternalsTest {
 
   @Test
   public void testWatermarkStateIsEmpty() throws Exception {
-    WatermarkHoldState value =
+    WatermarkHoldState<BoundedWindow> value =
         underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
 
     assertThat(value.isEmpty().read(), Matchers.is(true));
@@ -351,9 +353,9 @@ public class FlinkStateInternalsTest {
 
   @Test
   public void testMergeEarliestWatermarkIntoSource() throws Exception {
-    WatermarkHoldState value1 =
+    WatermarkHoldState<BoundedWindow> value1 =
         underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-    WatermarkHoldState value2 =
+    WatermarkHoldState<BoundedWindow> value2 =
         underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
 
     value1.add(new Instant(3000));
@@ -370,11 +372,11 @@ public class FlinkStateInternalsTest {
 
   @Test
   public void testMergeLatestWatermarkIntoSource() throws Exception {
-    WatermarkHoldState value1 =
+    WatermarkHoldState<BoundedWindow> value1 =
         underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
-    WatermarkHoldState value2 =
+    WatermarkHoldState<BoundedWindow> value2 =
         underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
-    WatermarkHoldState value3 =
+    WatermarkHoldState<BoundedWindow> value3 =
         underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
 
     value1.add(new Instant(3000));

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
index c967521..725e9d3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
@@ -34,7 +34,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.util.CombineFnUtil;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
@@ -166,10 +166,10 @@ class SparkStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
-        StateTag<? super K, WatermarkHoldState> address,
-        TimestampCombiner timestampCombiner) {
-      return new SparkWatermarkHoldState(namespace, address, timestampCombiner);
+    public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+        StateTag<? super K, WatermarkHoldState<W>> address,
+        OutputTimeFn<? super W> outputTimeFn) {
+      return new SparkWatermarkHoldState<>(namespace, address, outputTimeFn);
     }
   }
 
@@ -250,21 +250,21 @@ class SparkStateInternals<K> implements StateInternals<K> {
     }
   }
 
-  private class SparkWatermarkHoldState extends AbstractState<Instant>
-      implements WatermarkHoldState {
+  private class SparkWatermarkHoldState<W extends BoundedWindow>
+      extends AbstractState<Instant> implements WatermarkHoldState<W> {
 
-    private final TimestampCombiner timestampCombiner;
+    private final OutputTimeFn<? super W> outputTimeFn;
 
     public SparkWatermarkHoldState(
         StateNamespace namespace,
-        StateTag<?, WatermarkHoldState> address,
-        TimestampCombiner timestampCombiner) {
+        StateTag<?, WatermarkHoldState<W>> address,
+        OutputTimeFn<? super W> outputTimeFn) {
       super(namespace, address, InstantCoder.of());
-      this.timestampCombiner = timestampCombiner;
+      this.outputTimeFn = outputTimeFn;
     }
 
     @Override
-    public SparkWatermarkHoldState readLater() {
+    public SparkWatermarkHoldState<W> readLater() {
       return this;
     }
 
@@ -276,10 +276,7 @@ class SparkStateInternals<K> implements StateInternals<K> {
     @Override
     public void add(Instant outputTime) {
       Instant combined = read();
-      combined =
-          (combined == null)
-              ? outputTime
-              : getTimestampCombiner().combine(combined, outputTime);
+      combined = (combined == null) ? outputTime : outputTimeFn.combine(combined, outputTime);
       writeValue(combined);
     }
 
@@ -298,8 +295,8 @@ class SparkStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public TimestampCombiner getTimestampCombiner() {
-      return timestampCombiner;
+    public OutputTimeFn<? super W> getOutputTimeFn() {
+      return outputTimeFn;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
index 7d06d6b..fa1c3fc 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
@@ -50,7 +50,7 @@ import org.apache.beam.sdk.values.TupleTag;
 public class SparkAbstractCombineFn implements Serializable {
   protected final SparkRuntimeContext runtimeContext;
   protected final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs;
-  protected final WindowingStrategy<?, BoundedWindow> windowingStrategy;
+  protected final WindowingStrategy<?, ?> windowingStrategy;
 
 
   public SparkAbstractCombineFn(
@@ -59,7 +59,7 @@ public class SparkAbstractCombineFn implements Serializable {
       WindowingStrategy<?, ?> windowingStrategy) {
     this.runtimeContext = runtimeContext;
     this.sideInputs = sideInputs;
-    this.windowingStrategy = (WindowingStrategy<?, BoundedWindow>) windowingStrategy;
+    this.windowingStrategy = windowingStrategy;
   }
 
   // each Spark task should get it's own copy of this SparkKeyedCombineFn, and since Spark tasks

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
index 7d026c6..23f5d20 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
@@ -29,9 +29,8 @@ import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
@@ -71,8 +70,9 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
     // sort exploded inputs.
     Iterable<WindowedValue<InputT>> sortedInputs = sortByWindows(input.explodeWindows());
 
-    TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
-    WindowFn<?, BoundedWindow> windowFn = windowingStrategy.getWindowFn();
+    @SuppressWarnings("unchecked")
+    OutputTimeFn<? super BoundedWindow> outputTimeFn =
+        (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
 
     //--- inputs iterator, by window order.
     final Iterator<WindowedValue<InputT>> iterator = sortedInputs.iterator();
@@ -84,13 +84,9 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
     accumulator = combineFn.addInput(accumulator, currentInput.getValue(),
         ctxtForInput(currentInput));
 
-    // keep track of the timestamps assigned by the TimestampCombiner.
+    // keep track of the timestamps assigned by the OutputTimeFn.
     Instant windowTimestamp =
-        timestampCombiner.assign(
-            currentWindow,
-            windowingStrategy
-                .getWindowFn()
-                .getOutputTime(currentInput.getTimestamp(), currentWindow));
+        outputTimeFn.assignOutputTime(currentInput.getTimestamp(), currentWindow);
 
     // accumulate the next windows, or output.
     List<WindowedValue<AccumT>> output = Lists.newArrayList();
@@ -113,13 +109,8 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
         // keep accumulating and carry on ;-)
         accumulator = combineFn.addInput(accumulator, nextValue.getValue(),
             ctxtForInput(nextValue));
-        windowTimestamp =
-            timestampCombiner.merge(
-                currentWindow,
-                windowTimestamp,
-                windowingStrategy
-                    .getWindowFn()
-                    .getOutputTime(nextValue.getTimestamp(), currentWindow));
+        windowTimestamp = outputTimeFn.combine(windowTimestamp,
+            outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow));
       } else {
         // moving to the next window, first add the current accumulation to output
         // and initialize the accumulator.
@@ -130,8 +121,7 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
         accumulator = combineFn.addInput(accumulator, nextValue.getValue(),
             ctxtForInput(nextValue));
         currentWindow = nextWindow;
-        windowTimestamp = timestampCombiner.assign(currentWindow,
-            windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow));
+        windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
       }
     }
 
@@ -172,7 +162,8 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
     Iterable<WindowedValue<AccumT>> sortedAccumulators = sortByWindows(accumulators);
 
     @SuppressWarnings("unchecked")
-    TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
+    OutputTimeFn<? super BoundedWindow> outputTimeFn =
+        (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
 
     //--- accumulators iterator, by window order.
     final Iterator<WindowedValue<AccumT>> iterator = sortedAccumulators.iterator();
@@ -183,7 +174,7 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
     List<AccumT> currentWindowAccumulators = Lists.newArrayList();
     currentWindowAccumulators.add(currentValue.getValue());
 
-    // keep track of the timestamps assigned by the TimestampCombiner,
+    // keep track of the timestamps assigned by the OutputTimeFn,
     // in createCombiner we already merge the timestamps assigned
     // to individual elements, here we will just merge them.
     List<Instant> windowTimestamps = Lists.newArrayList();
@@ -215,7 +206,7 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
         // add the current accumulation to the output and initialize the accumulation.
 
         // merge the timestamps of all accumulators to merge.
-        Instant mergedTimestamp = timestampCombiner.merge(currentWindow, windowTimestamps);
+        Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps);
 
         // merge accumulators.
         // transforming a KV<K, Iterable<AccumT>> into a KV<K, Iterable<AccumT>>.
@@ -240,7 +231,7 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
     }
 
     // merge the last chunk of accumulators.
-    Instant mergedTimestamp = timestampCombiner.merge(currentWindow, windowTimestamps);
+    Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps);
     Iterable<AccumT> accumsToMerge = Iterables.unmodifiableIterable(currentWindowAccumulators);
     WindowedValue<Iterable<AccumT>> preMergeWindowedValue = WindowedValue.of(
         accumsToMerge, mergedTimestamp, currentWindow, PaneInfo.NO_FIRING);

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
index 66c03bc..b5d243f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
@@ -29,9 +29,8 @@ import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
@@ -73,8 +72,9 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
     // sort exploded inputs.
     Iterable<WindowedValue<KV<K, InputT>>> sortedInputs = sortByWindows(wkvi.explodeWindows());
 
-    TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
-    WindowFn<?, BoundedWindow> windowFn = windowingStrategy.getWindowFn();
+    @SuppressWarnings("unchecked")
+    OutputTimeFn<? super BoundedWindow> outputTimeFn =
+        (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
 
     //--- inputs iterator, by window order.
     final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInputs.iterator();
@@ -87,13 +87,9 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
     accumulator = combineFn.addInput(key, accumulator, currentInput.getValue().getValue(),
         ctxtForInput(currentInput));
 
-    // keep track of the timestamps assigned by the TimestampCombiner.
+    // keep track of the timestamps assigned by the OutputTimeFn.
     Instant windowTimestamp =
-        timestampCombiner.assign(
-            currentWindow,
-            windowingStrategy
-                .getWindowFn()
-                .getOutputTime(currentInput.getTimestamp(), currentWindow));
+        outputTimeFn.assignOutputTime(currentInput.getTimestamp(), currentWindow);
 
     // accumulate the next windows, or output.
     List<WindowedValue<KV<K, AccumT>>> output = Lists.newArrayList();
@@ -116,12 +112,8 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
         // keep accumulating and carry on ;-)
         accumulator = combineFn.addInput(key, accumulator, nextValue.getValue().getValue(),
             ctxtForInput(nextValue));
-        windowTimestamp =
-            timestampCombiner.combine(
-                windowTimestamp,
-                timestampCombiner.assign(
-                    currentWindow,
-                    windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow)));
+        windowTimestamp = outputTimeFn.combine(windowTimestamp,
+            outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow));
       } else {
         // moving to the next window, first add the current accumulation to output
         // and initialize the accumulator.
@@ -132,9 +124,7 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
         accumulator = combineFn.addInput(key, accumulator, nextValue.getValue().getValue(),
             ctxtForInput(nextValue));
         currentWindow = nextWindow;
-        windowTimestamp =
-            timestampCombiner.assign(
-                currentWindow, windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow));
+        windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
       }
     }
 
@@ -180,7 +170,8 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
     Iterable<WindowedValue<KV<K, AccumT>>> sortedAccumulators = sortByWindows(accumulators);
 
     @SuppressWarnings("unchecked")
-    TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
+    OutputTimeFn<? super BoundedWindow> outputTimeFn =
+        (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
 
     //--- accumulators iterator, by window order.
     final Iterator<WindowedValue<KV<K, AccumT>>> iterator = sortedAccumulators.iterator();
@@ -192,7 +183,7 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
     List<AccumT> currentWindowAccumulators = Lists.newArrayList();
     currentWindowAccumulators.add(currentValue.getValue().getValue());
 
-    // keep track of the timestamps assigned by the TimestampCombiner,
+    // keep track of the timestamps assigned by the OutputTimeFn,
     // in createCombiner we already merge the timestamps assigned
     // to individual elements, here we will just merge them.
     List<Instant> windowTimestamps = Lists.newArrayList();
@@ -224,7 +215,7 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
         // add the current accumulation to the output and initialize the accumulation.
 
         // merge the timestamps of all accumulators to merge.
-        Instant mergedTimestamp = timestampCombiner.merge(currentWindow, windowTimestamps);
+        Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps);
 
         // merge accumulators.
         // transforming a KV<K, Iterable<AccumT>> into a KV<K, Iterable<AccumT>>.
@@ -250,7 +241,7 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
     }
 
     // merge the last chunk of accumulators.
-    Instant mergedTimestamp = timestampCombiner.merge(currentWindow, windowTimestamps);
+    Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps);
     Iterable<AccumT> accumsToMerge = Iterables.unmodifiableIterable(currentWindowAccumulators);
     WindowedValue<KV<K, Iterable<AccumT>>> preMergeWindowedValue = WindowedValue.of(
         KV.of(key, accumsToMerge), mergedTimestamp, currentWindow, PaneInfo.NO_FIRING);

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 58b5a84..6c46453 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -135,6 +135,11 @@
 
   <dependencies>
     <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-common-runner-api</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.http-client</groupId>
       <artifactId>google-http-client</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
index e8c2f8d..63e7903 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
@@ -17,14 +17,11 @@
  */
 package org.apache.beam.sdk.testing;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -38,7 +35,8 @@ import javax.annotation.Nullable;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.joda.time.Instant;
 import org.joda.time.ReadableInstant;
@@ -254,19 +252,20 @@ public class WindowFnTestUtils {
 
   /**
    * Verifies that later-ending merged windows from any of the timestamps hold up output of
-   * earlier-ending windows, using the provided {@link WindowFn} and {@link TimestampCombiner}.
+   * earlier-ending windows, using the provided {@link WindowFn} and {@link OutputTimeFn}.
    *
    * <p>Given a list of lists of timestamps, where each list is expected to merge into a single
    * window with end times in ascending order, assigns and merges windows for each list (as though
-   * each were a separate key/user session). Then combines each timestamp in the list according to
-   * the provided {@link TimestampCombiner}.
+   * each were a separate key/user session). Then maps each timestamp in the list according to
+   * {@link OutputTimeFn#assignOutputTime outputTimeFn.assignOutputTime()} and
+   * {@link OutputTimeFn#combine outputTimeFn.combine()}.
    *
    * <p>Verifies that a overlapping windows do not hold each other up via the watermark.
    */
   public static <T, W extends IntervalWindow>
   void validateGetOutputTimestamps(
       WindowFn<T, W> windowFn,
-      TimestampCombiner timestampCombiner,
+      OutputTimeFn<? super W> outputTimeFn,
       List<List<Long>> timestampsPerWindow) throws Exception {
 
     // Assign windows to each timestamp, then merge them, storing the merged windows in
@@ -301,11 +300,10 @@ public class WindowFnTestUtils {
 
       List<Instant> outputInstants = new ArrayList<>();
       for (long inputTimestamp : timestampsForWindow) {
-        outputInstants.add(
-            assignOutputTime(timestampCombiner, new Instant(inputTimestamp), window));
+        outputInstants.add(outputTimeFn.assignOutputTime(new Instant(inputTimestamp), window));
       }
 
-      combinedOutputTimestamps.add(combineOutputTimes(timestampCombiner, outputInstants));
+      combinedOutputTimestamps.add(OutputTimeFns.combineOutputTimes(outputTimeFn, outputInstants));
     }
 
     // Consider windows in increasing order of max timestamp; ensure the output timestamp is after
@@ -323,37 +321,4 @@ public class WindowFnTestUtils {
       earlierEndingWindow = window;
     }
   }
-
-  private static Instant assignOutputTime(
-      TimestampCombiner timestampCombiner, Instant inputTimestamp, BoundedWindow window) {
-    switch (timestampCombiner) {
-      case EARLIEST:
-      case LATEST:
-        return inputTimestamp;
-      case END_OF_WINDOW:
-        return window.maxTimestamp();
-      default:
-        throw new IllegalArgumentException(
-            String.format("Unknown %s: %s", TimestampCombiner.class, timestampCombiner));
-    }
-  }
-
-  private static Instant combineOutputTimes(
-      TimestampCombiner timestampCombiner, Iterable<Instant> outputInstants) {
-    checkArgument(
-        !Iterables.isEmpty(outputInstants),
-        "Cannot combine zero instants with %s",
-        timestampCombiner);
-    switch(timestampCombiner) {
-      case EARLIEST:
-        return Ordering.natural().min(outputInstants);
-      case LATEST:
-        return Ordering.natural().max(outputInstants);
-      case END_OF_WINDOW:
-        return outputInstants.iterator().next();
-      default:
-        throw new IllegalArgumentException(
-            String.format("Unknown %s: %s", TimestampCombiner.class, timestampCombiner));
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
index d9c4c9f..cc92102 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
@@ -25,7 +25,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -98,7 +97,7 @@ import org.apache.beam.sdk.values.PCollection.IsBounded;
  * for details on the estimation.
  *
  * <p>The timestamp for each emitted pane is determined by the
- * {@link Window#withTimestampCombiner(TimestampCombiner)} windowing operation}.
+ * {@link Window#withOutputTimeFn windowing operation}.
  * The output {@code PCollection} will have the same {@link WindowFn}
  * as the input.
  *

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java
new file mode 100644
index 0000000..0efd278
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import com.google.common.collect.Ordering;
+import java.io.Serializable;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.joda.time.Instant;
+
+/**
+ * <b><i>(Experimental)</i></b> A function from timestamps of input values to the timestamp for a
+ * computed value.
+ *
+ * <p>The function is represented via three components:
+ * <ol>
+ *   <li>{@link #assignOutputTime} calculates an output timestamp for any input
+ *       value in a particular window.</li>
+ *   <li>The output timestamps for all non-late input values within a window are combined
+ *       according to {@link #combine combine()}, a commutative and associative operation on
+ *       the output timestamps.</li>
+ *   <li>The output timestamp when windows merge is provided by {@link #merge merge()}.</li>
+ * </ol>
+ *
+ * <p>This abstract class cannot be subclassed directly, by design: it may grow
+ * in consumer-compatible ways that require mutually-exclusive default implementations. To
+ * create a concrete subclass, extend {@link OutputTimeFn.Defaults} or
+ * {@link OutputTimeFn.DependsOnlyOnWindow}. Note that as long as this class remains
+ * experimental, we may also choose to change it in arbitrary backwards-incompatible ways.
+ *
+ * @param <W> the type of window. Contravariant: methods accepting any subtype of
+ * {@code OutputTimeFn<W>} should use the parameter type {@code OutputTimeFn<? super W>}.
+ */
+@Experimental(Experimental.Kind.OUTPUT_TIME)
+public abstract class OutputTimeFn<W extends BoundedWindow> implements Serializable {
+
+  protected OutputTimeFn() { }
+
+  /**
+   * Returns the output timestamp to use for data depending on the given
+   * {@code inputTimestamp} in the specified {@code window}.
+   *
+   * <p>The result of this method must be between {@code inputTimestamp} and
+   * {@code window.maxTimestamp()} (inclusive on both sides).
+   *
+   * <p>This function must be monotonic across input timestamps. Specifically, if {@code A < B},
+   * then {@code assignOutputTime(A, window) <= assignOutputTime(B, window)}.
+   *
+   * <p>For a {@link WindowFn} that doesn't produce overlapping windows, this can (and typically
+   * should) just return {@code inputTimestamp}. In the presence of overlapping windows, it is
+   * suggested that the result in later overlapping windows is past the end of earlier windows
+   * so that the later windows don't prevent the watermark from
+   * progressing past the end of the earlier window.
+   *
+   * <p>See the overview of {@link OutputTimeFn} for the consistency properties required
+   * between {@link #assignOutputTime}, {@link #combine}, and {@link #merge}.
+   */
+  public abstract Instant assignOutputTime(Instant inputTimestamp, W window);
+
+  /**
+   * Combines the given output times, which must be from the same window, into an output time
+   * for a computed value.
+   *
+   * <ul>
+   *   <li>{@code combine} must be commutative: {@code combine(a, b).equals(combine(b, a))}.</li>
+   *   <li>{@code combine} must be associative:
+   *       {@code combine(a, combine(b, c)).equals(combine(combine(a, b), c))}.</li>
+   * </ul>
+   */
+  public abstract Instant combine(Instant outputTime, Instant otherOutputTime);
+
+  /**
+   * Merges the given output times, presumed to be combined output times for windows that
+   * are merging, into an output time for the {@code resultWindow}.
+   *
+   * <p>When windows {@code w1} and {@code w2} merge to become a new window {@code w1plus2},
+   * then {@link #merge} must be implemented such that the output time is the same as
+   * if all timestamps were assigned in {@code w1plus2}. Formally:
+   *
+   * <p>{@code fn.merge(w, fn.assignOutputTime(t1, w1), fn.assignOutputTime(t2, w2))}
+   *
+   * <p>must be equal to
+   *
+   * <p>{@code fn.combine(fn.assignOutputTime(t1, w1plus2), fn.assignOutputTime(t2, w1plus2))}
+   *
+   * <p>If the assigned time depends only on the window, the correct implementation of
+   * {@link #merge merge()} necessarily returns the result of
+   * {@link #assignOutputTime assignOutputTime(t1, w1plus2)}
+   * (which equals {@link #assignOutputTime assignOutputTime(t2, w1plus2)}.
+   * Defaults for this case are provided by {@link DependsOnlyOnWindow}.
+   *
+   * <p>For many other {@link OutputTimeFn} implementations, such as taking the earliest or latest
+   * timestamp, this will be the same as {@link #combine combine()}. Defaults for this
+   * case are provided by {@link Defaults}.
+   */
+  public abstract Instant merge(W intoWindow, Iterable<? extends Instant> mergingTimestamps);
+
+  /**
+   * Returns {@code true} if the result of combination of many output timestamps actually depends
+   * only on the earliest.
+   *
+   * <p>This may allow optimizations when it is very efficient to retrieve the earliest timestamp
+   * to be combined.
+   */
+  public abstract boolean dependsOnlyOnEarliestInputTimestamp();
+
+  /**
+   * Returns {@code true} if the result does not depend on what outputs were combined but only
+   * the window they are in. The canonical example is if all timestamps are sure to
+   * be the end of the window.
+   *
+   * <p>This may allow optimizations, since it is typically very efficient to retrieve the window
+   * and combining output timestamps is not necessary.
+   *
+   * <p>If the assigned output time for an implementation depends only on the window, consider
+   * extending {@link DependsOnlyOnWindow}, which returns {@code true} here and also provides
+   * a framework for easily implementing a correct {@link #merge}, {@link #combine} and
+   * {@link #assignOutputTime}.
+   */
+  public abstract boolean dependsOnlyOnWindow();
+
+  /**
+   * <b><i>(Experimental)</i></b> Default method implementations for {@link OutputTimeFn} where the
+   * output time depends on the input element timestamps and possibly the window.
+   *
+   * <p>To complete an implementation, override {@link #assignOutputTime}, at a minimum.
+   *
+   * <p>By default, {@link #combine} and {@link #merge} return the earliest timestamp of their
+   * inputs.
+   */
+  public abstract static class Defaults<W extends BoundedWindow> extends OutputTimeFn<W> {
+
+    protected Defaults() {
+      super();
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * @return the earlier of the two timestamps.
+     */
+    @Override
+    public Instant combine(Instant outputTimestamp, Instant otherOutputTimestamp) {
+      return Ordering.natural().min(outputTimestamp, otherOutputTimestamp);
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * @return the result of {@link #combine combine(outputTimstamp, otherOutputTimestamp)},
+     * by default.
+     */
+    @Override
+    public Instant merge(W resultWindow, Iterable<? extends Instant> mergingTimestamps) {
+      return OutputTimeFns.combineOutputTimes(this, mergingTimestamps);
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * @return {@code false} by default. An {@link OutputTimeFn} that is known to depend only on the
+     * window should extend {@link OutputTimeFn.DependsOnlyOnWindow}.
+     */
+    @Override
+    public boolean dependsOnlyOnWindow() {
+      return false;
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * @return {@code true} by default.
+     */
+    @Override
+    public boolean dependsOnlyOnEarliestInputTimestamp() {
+      return false;
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * @return {@code true} if the two {@link OutputTimeFn} instances have the same class, by
+     *         default.
+     */
+    @Override
+    public boolean equals(Object other) {
+      if (other == null) {
+        return false;
+      }
+
+      return this.getClass().equals(other.getClass());
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(getClass());
+    }
+  }
+
+  /**
+   * <b><i>(Experimental)</i></b> Default method implementations for {@link OutputTimeFn} when the
+   * output time depends only on the window.
+   *
+   * <p>To complete an implementation, override {@link #assignOutputTime(BoundedWindow)}.
+   */
+  public abstract static class DependsOnlyOnWindow<W extends BoundedWindow>
+      extends OutputTimeFn<W> {
+
+    protected DependsOnlyOnWindow() {
+      super();
+    }
+
+    /**
+     * Returns the output timestamp to use for data in the specified {@code window}.
+     *
+     * <p>Note that the result of this method must be between the maximum possible input timestamp
+     * in {@code window} and {@code window.maxTimestamp()} (inclusive on both sides).
+     *
+     * <p>For example, using {@code Sessions.withGapDuration(gapDuration)}, we know that all input
+     * timestamps must lie at least {@code gapDuration} from the end of the session, so
+     * {@code window.maxTimestamp() - gapDuration} is an acceptable assigned timestamp.
+     *
+     * @see #assignOutputTime(Instant, BoundedWindow)
+     */
+    protected abstract Instant assignOutputTime(W window);
+
+    /**
+     * {@inheritDoc}
+     *
+     * @return the result of {#link assignOutputTime(BoundedWindow) assignOutputTime(window)}.
+     */
+    @Override
+    public final Instant assignOutputTime(Instant timestamp, W window) {
+      return assignOutputTime(window);
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * @return the same timestamp as both argument timestamps, which are necessarily equal.
+     */
+    @Override
+    public final Instant combine(Instant outputTimestamp, Instant otherOutputTimestamp) {
+      return outputTimestamp;
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * @return the result of
+     * {@link #assignOutputTime(BoundedWindow) assignOutputTime(resultWindow)}.
+     */
+    @Override
+    public final Instant merge(W resultWindow, Iterable<? extends Instant> mergingTimestamps) {
+      return assignOutputTime(resultWindow);
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * @return {@code true}.
+     */
+    @Override
+    public final boolean dependsOnlyOnWindow() {
+      return true;
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * @return {@code true}. Since the output time depends only on the window, it can
+     * certainly be ascertained given a single input timestamp.
+     */
+    @Override
+    public final boolean dependsOnlyOnEarliestInputTimestamp() {
+      return true;
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * @return {@code true} if the two {@link OutputTimeFn} instances have the same class, by
+     *         default.
+     */
+    @Override
+    public boolean equals(Object other) {
+      if (other == null) {
+        return false;
+      }
+
+      return this.getClass().equals(other.getClass());
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(getClass());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java
new file mode 100644
index 0000000..b5d67fa
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.joda.time.Instant;
+
+/**
+ * <b><i>(Experimental)</i></b> Static utility methods and provided implementations for
+ * {@link OutputTimeFn}.
+ */
+@Experimental(Experimental.Kind.OUTPUT_TIME)
+public class OutputTimeFns {
+  /**
+   * The policy of outputting at the earliest of the input timestamps for non-late input data
+   * that led to a computed value.
+   *
+   * <p>For example, suppose <i>v</i><sub>1</sub> through <i>v</i><sub>n</sub> are all on-time
+   * elements being aggregated via some function {@code f} into
+   * {@code f}(<i>v</i><sub>1</sub>, ..., <i>v</i><sub>n</sub>. When emitted, the output
+   * timestamp of the result will be the earliest of the event time timestamps
+   *
+   * <p>If data arrives late, it has no effect on the output timestamp.
+   */
+  public static OutputTimeFn<BoundedWindow> outputAtEarliestInputTimestamp() {
+    return new OutputAtEarliestInputTimestamp();
+  }
+
+  /**
+   * The policy of holding the watermark to the latest of the input timestamps
+   * for non-late input data that led to a computed value.
+   *
+   * <p>For example, suppose <i>v</i><sub>1</sub> through <i>v</i><sub>n</sub> are all on-time
+   * elements being aggregated via some function {@code f} into
+   * {@code f}(<i>v</i><sub>1</sub>, ..., <i>v</i><sub>n</sub>. When emitted, the output
+   * timestamp of the result will be the latest of the event time timestamps
+   *
+   * <p>If data arrives late, it has no effect on the output timestamp.
+   */
+  public static OutputTimeFn<BoundedWindow> outputAtLatestInputTimestamp() {
+    return new OutputAtLatestInputTimestamp();
+  }
+
+  /**
+   * The policy of outputting with timestamps at the end of the window.
+   *
+   * <p>Note that this output timestamp depends only on the window. See
+   * {#link dependsOnlyOnWindow()}.
+   *
+   * <p>When windows merge, instead of using {@link OutputTimeFn#combine} to obtain an output
+   * timestamp for the results in the new window, it is mandatory to obtain a new output
+   * timestamp from {@link OutputTimeFn#assignOutputTime} with the new window and an arbitrary
+   * timestamp (because it is guaranteed that the timestamp is irrelevant).
+   *
+   * <p>For non-merging window functions, this {@link OutputTimeFn} works transparently.
+   */
+  public static OutputTimeFn<BoundedWindow> outputAtEndOfWindow() {
+    return new OutputAtEndOfWindow();
+  }
+
+  /**
+   * Applies the given {@link OutputTimeFn} to the given output times, obtaining
+   * the output time for a value computed. See {@link OutputTimeFn#combine} for
+   * a full specification.
+   *
+   * @throws IllegalArgumentException if {@code outputTimes} is empty.
+   */
+  public static Instant combineOutputTimes(
+      OutputTimeFn<?> outputTimeFn, Iterable<? extends Instant> outputTimes) {
+    checkArgument(
+        !Iterables.isEmpty(outputTimes),
+        "Collection of output times must not be empty in %s.combineOutputTimes",
+        OutputTimeFns.class.getName());
+
+    @Nullable
+    Instant combinedOutputTime = null;
+    for (Instant outputTime : outputTimes) {
+      combinedOutputTime =
+          combinedOutputTime == null
+              ? outputTime : outputTimeFn.combine(combinedOutputTime, outputTime);
+    }
+    return combinedOutputTime;
+  }
+
+  /**
+   * See {@link #outputAtEarliestInputTimestamp}.
+   */
+  private static class OutputAtEarliestInputTimestamp extends OutputTimeFn.Defaults<BoundedWindow> {
+    @Override
+    public Instant assignOutputTime(Instant inputTimestamp, BoundedWindow window) {
+      return inputTimestamp;
+    }
+
+    @Override
+    public Instant combine(Instant outputTime, Instant otherOutputTime) {
+      return Ordering.natural().min(outputTime, otherOutputTime);
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * @return {@code true}. The result of any combine will be the earliest input timestamp.
+     */
+    @Override
+    public boolean dependsOnlyOnEarliestInputTimestamp() {
+      return true;
+    }
+  }
+
+  /**
+   * See {@link #outputAtLatestInputTimestamp}.
+   */
+  private static class OutputAtLatestInputTimestamp extends OutputTimeFn.Defaults<BoundedWindow> {
+    @Override
+    public Instant assignOutputTime(Instant inputTimestamp, BoundedWindow window) {
+      return inputTimestamp;
+    }
+
+    @Override
+    public Instant combine(Instant outputTime, Instant otherOutputTime) {
+      return Ordering.natural().max(outputTime, otherOutputTime);
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * @return {@code false}.
+     */
+    @Override
+    public boolean dependsOnlyOnEarliestInputTimestamp() {
+      return false;
+    }
+  }
+
+  private static class OutputAtEndOfWindow extends OutputTimeFn.DependsOnlyOnWindow<BoundedWindow> {
+
+    /**
+     *{@inheritDoc}
+     *
+     *@return {@code window.maxTimestamp()}.
+     */
+    @Override
+    protected Instant assignOutputTime(BoundedWindow window) {
+      return window.maxTimestamp();
+    }
+
+    @Override
+    public String toString() {
+      return getClass().getCanonicalName();
+    }
+  }
+
+  public static RunnerApi.OutputTime toProto(OutputTimeFn<?> outputTimeFn) {
+    if (outputTimeFn instanceof OutputAtEarliestInputTimestamp) {
+      return RunnerApi.OutputTime.EARLIEST_IN_PANE;
+    } else if (outputTimeFn instanceof OutputAtLatestInputTimestamp) {
+      return RunnerApi.OutputTime.LATEST_IN_PANE;
+    } else if (outputTimeFn instanceof OutputAtEndOfWindow) {
+      return RunnerApi.OutputTime.END_OF_WINDOW;
+    } else {
+      throw new IllegalArgumentException(
+          String.format(
+              "Cannot convert %s to %s: %s",
+              OutputTimeFn.class.getCanonicalName(),
+              RunnerApi.OutputTime.class.getCanonicalName(),
+              outputTimeFn));
+    }
+  }
+
+  public static OutputTimeFn<?> fromProto(RunnerApi.OutputTime proto) {
+    switch (proto) {
+      case EARLIEST_IN_PANE:
+        return OutputTimeFns.outputAtEarliestInputTimestamp();
+      case LATEST_IN_PANE:
+        return OutputTimeFns.outputAtLatestInputTimestamp();
+      case END_OF_WINDOW:
+        return OutputTimeFns.outputAtEndOfWindow();
+      case UNRECOGNIZED:
+      default:
+        // Whether or not it is proto that cannot recognize it (due to the version of the
+        // generated code we link to) or the switch hasn't been updated to handle it,
+        // the situation is the same: we don't know what this OutputTime means
+        throw new IllegalArgumentException(
+            String.format(
+                "Cannot convert unknown %s to %s: %s",
+                RunnerApi.OutputTime.class.getCanonicalName(),
+                OutputTimeFn.class.getCanonicalName(),
+                proto));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java
deleted file mode 100644
index 39fe8a9..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
-import java.util.Arrays;
-import java.util.Collections;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.joda.time.Instant;
-
-/**
- * Policies for combining timestamps that occur within a window.
- */
-@Experimental(Experimental.Kind.OUTPUT_TIME)
-public enum TimestampCombiner {
-  /**
-   * The policy of taking at the earliest of a set of timestamps.
-   *
-   * <p>When used in windowed aggregations, the timestamps of non-late inputs will be combined after
-   * they are shifted by the {@link WindowFn} (to allow downstream watermark progress).
-   *
-   * <p>If data arrives late, it has no effect on the output timestamp.
-   */
-  EARLIEST {
-    @Override
-    public Instant combine(Iterable<? extends Instant> timestamps) {
-      return Ordering.natural().min(timestamps);
-    }
-
-    @Override
-    public Instant merge(BoundedWindow intoWindow, Iterable<? extends Instant> mergingTimestamps) {
-      return combine(mergingTimestamps);
-    }
-
-    @Override
-    public boolean dependsOnlyOnEarliestTimestamp() {
-      return true;
-    }
-
-    @Override
-    public boolean dependsOnlyOnWindow() {
-      return false;
-    }
-  },
-
-  /**
-   * The policy of taking the latest of a set of timestamps.
-   *
-   * <p>When used in windowed aggregations, the timestamps of non-late inputs will be combined after
-   * they are shifted by the {@link WindowFn} (to allow downstream watermark progress).
-   *
-   * <p>If data arrives late, it has no effect on the output timestamp.
-   */
-  LATEST {
-    @Override
-    public Instant combine(Iterable<? extends Instant> timestamps) {
-      return Ordering.natural().max(timestamps);
-    }
-
-    @Override
-    public Instant merge(BoundedWindow intoWindow, Iterable<? extends Instant> mergingTimestamps) {
-      return combine(mergingTimestamps);
-    }
-
-    @Override
-    public boolean dependsOnlyOnEarliestTimestamp() {
-      return false;
-    }
-
-    @Override
-    public boolean dependsOnlyOnWindow() {
-      return false;
-    }
-  },
-
-  /**
-   * The policy of using the end of the window, regardless of input timestamps.
-   *
-   * <p>When used in windowed aggregations, the timestamps of non-late inputs will be combined after
-   * they are shifted by the {@link WindowFn} (to allow downstream watermark progress).
-   *
-   * <p>If data arrives late, it has no effect on the output timestamp.
-   */
-  END_OF_WINDOW {
-    @Override
-    public Instant combine(Iterable<? extends Instant> timestamps) {
-      checkArgument(Iterables.size(timestamps) > 0);
-      return Iterables.get(timestamps, 0);
-    }
-
-    @Override
-    public Instant merge(BoundedWindow intoWindow, Iterable<? extends Instant> mergingTimestamps) {
-      return intoWindow.maxTimestamp();
-    }
-
-    @Override
-    public boolean dependsOnlyOnEarliestTimestamp() {
-      return false;
-    }
-
-    @Override
-    public boolean dependsOnlyOnWindow() {
-      return true;
-    }
-  };
-
-  /**
-   * Combines the given times, which must be from the same window and must have been passed through
-   * {@link #merge}.
-   *
-   * <ul>
-   * <li>{@code combine} must be commutative: {@code combine(a, b).equals(combine(b, a))}.
-   * <li>{@code combine} must be associative: {@code combine(a, combine(b,
-   *     c)).equals(combine(combine(a, b), c))}.
-   * </ul>
-   */
-  public abstract Instant combine(Iterable<? extends Instant> timestamps);
-
-  /**
-   * Merges the given timestamps, which may have originated in separate windows, into the context of
-   * the result window.
-   */
-  public abstract Instant merge(
-      BoundedWindow intoWindow, Iterable<? extends Instant> mergingTimestamps);
-
-  /**
-   * Shorthand for {@link #merge} with just one element, to place it into the context of
-   * a window.
-   *
-   * <p>For example, the {@link #END_OF_WINDOW} policy moves the timestamp to the end of the window.
-   */
-  public final Instant assign(BoundedWindow intoWindow, Instant timestamp) {
-    return merge(intoWindow, Collections.singleton(timestamp));
-  }
-
-  /**
-   * Varargs variant of {@link #combine}.
-   */
-  public final Instant combine(Instant... timestamps) {
-    return combine(Arrays.asList(timestamps));
-  }
-
-  /**
-   * Varargs variant of {@link #merge}.
-   */
-  public final Instant merge(BoundedWindow intoWindow, Instant... timestamps) {
-    return merge(intoWindow, Arrays.asList(timestamps));
-  }
-
-  /**
-   * Returns {@code true} if the result of combination of many output timestamps actually depends
-   * only on the earliest.
-   *
-   * <p>This may allow optimizations when it is very efficient to retrieve the earliest timestamp
-   * to be combined.
-   */
-  public abstract boolean dependsOnlyOnEarliestTimestamp();
-
-  /**
-   * Returns {@code true} if the result does not depend on what outputs were combined but only
-   * the window they are in. The canonical example is if all timestamps are sure to
-   * be the end of the window.
-   *
-   * <p>This may allow optimizations, since it is typically very efficient to retrieve the window
-   * and combining output timestamps is not necessary.
-   */
-  public abstract boolean dependsOnlyOnWindow();
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index cb7b430..1000ff7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -193,7 +193,7 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
   @Nullable abstract AccumulationMode getAccumulationMode();
   @Nullable abstract Duration getAllowedLateness();
   @Nullable abstract ClosingBehavior getClosingBehavior();
-  @Nullable abstract TimestampCombiner getTimestampCombiner();
+  @Nullable abstract OutputTimeFn<?> getOutputTimeFn();
 
   abstract Builder<T> toBuilder();
 
@@ -204,7 +204,7 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
     abstract Builder<T> setAccumulationMode(AccumulationMode mode);
     abstract Builder<T> setAllowedLateness(Duration allowedLateness);
     abstract Builder<T> setClosingBehavior(ClosingBehavior closingBehavior);
-    abstract Builder<T> setTimestampCombiner(TimestampCombiner timestampCombiner);
+    abstract Builder<T> setOutputTimeFn(OutputTimeFn<?> outputTimeFn);
 
     abstract Window<T> build();
   }
@@ -273,12 +273,12 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
   }
 
   /**
-   * <b><i>(Experimental)</i></b> Override the default {@link TimestampCombiner}, to control
+   * <b><i>(Experimental)</i></b> Override the default {@link OutputTimeFn}, to control
    * the output timestamp of values output from a {@link GroupByKey} operation.
    */
   @Experimental(Kind.OUTPUT_TIME)
-  public Window<T> withTimestampCombiner(TimestampCombiner timestampCombiner) {
-    return toBuilder().setTimestampCombiner(timestampCombiner).build();
+  public Window<T> withOutputTimeFn(OutputTimeFn<?> outputTimeFn) {
+    return toBuilder().setOutputTimeFn(outputTimeFn).build();
   }
 
   /**
@@ -300,6 +300,8 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
    * Get the output strategy of this {@link Window Window PTransform}. For internal use
    * only.
    */
+  // Rawtype cast of OutputTimeFn cannot be eliminated with intermediate variable, as it is
+  // casting between wildcards
   public WindowingStrategy<?, ?> getOutputStrategyInternal(
       WindowingStrategy<?, ?> inputStrategy) {
     WindowingStrategy<?, ?> result = inputStrategy;
@@ -318,8 +320,8 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
     if (getClosingBehavior() != null) {
       result = result.withClosingBehavior(getClosingBehavior());
     }
-    if (getTimestampCombiner() != null) {
-      result = result.withTimestampCombiner(getTimestampCombiner());
+    if (getOutputTimeFn() != null) {
+      result = result.withOutputTimeFn(getOutputTimeFn());
     }
     return result;
   }
@@ -409,9 +411,9 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
         .withLabel("Window Closing Behavior"));
     }
 
-    if (getTimestampCombiner() != null) {
-      builder.add(DisplayData.item("timestampCombiner", getTimestampCombiner().toString())
-        .withLabel("Timestamp Combiner"));
+    if (getOutputTimeFn() != null) {
+      builder.add(DisplayData.item("outputTimeFn", getOutputTimeFn().getClass())
+        .withLabel("Output Time Function"));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
index 706e039..0c27c4f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
@@ -22,7 +22,7 @@ import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -57,14 +57,13 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollecti
     // If the input has already had its windows merged, then the GBK that performed the merge
     // will have set originalStrategy.getWindowFn() to InvalidWindows, causing the GBK contained
     // here to fail. Instead, we install a valid WindowFn that leaves all windows unchanged.
-    // The TimestampCombiner is set to ensure the GroupByKey does not shift elements forwards in
-    // time.
+    // The OutputTimeFn is set to ensure the GroupByKey does not shift elements forwards in time.
     // Because this outputs as fast as possible, this should not hold the watermark.
     Window<KV<K, V>> rewindow =
         Window.<KV<K, V>>into(new IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
             .triggering(new ReshuffleTrigger<>())
             .discardingFiredPanes()
-            .withTimestampCombiner(TimestampCombiner.EARLIEST)
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
             .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
 
     return input.apply(rewindow)


[4/4] beam git commit: This closes #2725: Revert "Replace OutputTimeFn UDF with TimestampCombiner enum"

Posted by ke...@apache.org.
This closes #2725: Revert "Replace OutputTimeFn UDF with TimestampCombiner enum"


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b82cd244
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b82cd244
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b82cd244

Branch: refs/heads/master
Commit: b82cd2446fa681576f83bae0fc2bdd1f03be5e7e
Parents: 3bcbba1 83d41fc
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Apr 26 20:32:09 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Apr 26 20:32:09 2017 -0700

----------------------------------------------------------------------
 .../beam/examples/complete/game/GameStats.java  |   4 +-
 .../translation/utils/ApexStateInternals.java   |  26 +-
 .../translation/GroupByKeyTranslatorTest.java   |  10 +-
 .../utils/ApexStateInternalsTest.java           |  33 +-
 .../core/construction/WindowingStrategies.java  |  52 +--
 .../construction/WindowingStrategiesTest.java   |   6 +-
 .../runners/core/InMemoryStateInternals.java    |  32 +-
 .../beam/runners/core/ReduceFnRunner.java       |   4 +-
 .../beam/runners/core/SplittableParDo.java      |   8 +-
 .../apache/beam/runners/core/StateMerging.java  |  32 +-
 .../org/apache/beam/runners/core/StateTag.java  |  11 +-
 .../org/apache/beam/runners/core/StateTags.java |  16 +-
 .../core/TestInMemoryStateInternals.java        |   2 +-
 .../apache/beam/runners/core/WatermarkHold.java |  45 ++-
 .../core/GroupAlsoByWindowsProperties.java      |  20 +-
 .../core/InMemoryStateInternalsTest.java        |  34 +-
 .../beam/runners/core/ReduceFnRunnerTest.java   |  38 +--
 .../beam/runners/core/ReduceFnTester.java       |  13 +-
 .../apache/beam/runners/core/StateTagTest.java  |  16 +-
 .../CopyOnAccessInMemoryStateInternals.java     |  24 +-
 .../direct/ParDoMultiOverrideFactory.java       |   6 +-
 .../CopyOnAccessInMemoryStateInternalsTest.java |  54 ++--
 .../functions/HashingFlinkCombineRunner.java    |  19 +-
 .../functions/SortingFlinkCombineRunner.java    |  30 +-
 .../state/FlinkBroadcastStateInternals.java     |   8 +-
 .../state/FlinkKeyGroupStateInternals.java      |   8 +-
 .../state/FlinkSplitStateInternals.java         |   8 +-
 .../streaming/state/FlinkStateInternals.java    |  34 +-
 .../streaming/FlinkStateInternalsTest.java      |  34 +-
 .../spark/stateful/SparkStateInternals.java     |  33 +-
 .../translation/SparkAbstractCombineFn.java     |   4 +-
 .../spark/translation/SparkGlobalCombineFn.java |  37 +--
 .../spark/translation/SparkKeyedCombineFn.java  |  37 +--
 sdks/java/core/pom.xml                          |   5 +
 .../beam/sdk/testing/WindowFnTestUtils.java     |  53 +---
 .../apache/beam/sdk/transforms/GroupByKey.java  |   3 +-
 .../sdk/transforms/windowing/OutputTimeFn.java  | 314 +++++++++++++++++++
 .../sdk/transforms/windowing/OutputTimeFns.java | 212 +++++++++++++
 .../transforms/windowing/TimestampCombiner.java | 186 -----------
 .../beam/sdk/transforms/windowing/Window.java   |  22 +-
 .../org/apache/beam/sdk/util/Reshuffle.java     |   7 +-
 .../apache/beam/sdk/util/WindowingStrategy.java | 176 ++++++++---
 .../apache/beam/sdk/util/state/StateBinder.java |  12 +-
 .../apache/beam/sdk/util/state/StateSpecs.java  |  23 +-
 .../beam/sdk/util/state/WatermarkHoldState.java |  19 +-
 .../org/apache/beam/SdkCoreApiSurfaceTest.java  |   1 +
 .../beam/sdk/transforms/GroupByKeyTest.java     |  10 +-
 .../sdk/transforms/join/CoGroupByKeyTest.java   |   6 +-
 .../transforms/windowing/OutputTimeFnsTest.java |  51 +++
 .../sdk/transforms/windowing/SessionsTest.java  |   6 +-
 .../sdk/transforms/windowing/WindowTest.java    |  23 +-
 .../sdk/transforms/windowing/WindowingTest.java |   2 +-
 .../org/apache/beam/GcpCoreApiSurfaceTest.java  |   1 +
 53 files changed, 1130 insertions(+), 740 deletions(-)
----------------------------------------------------------------------



[3/4] beam git commit: Revert "Replace OutputTimeFn UDF with TimestampCombiner enum"

Posted by ke...@apache.org.
Revert "Replace OutputTimeFn UDF with TimestampCombiner enum"

This reverts commit f38e4271334fced94e8dc1dc97f47b60fa810586.

It will require a synchronous Dataflow worker container bump.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/83d41fcc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/83d41fcc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/83d41fcc

Branch: refs/heads/master
Commit: 83d41fcce0c7b123459e5d26ab9938de49f48dab
Parents: bb12a56
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Apr 26 19:33:55 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Apr 26 19:33:55 2017 -0700

----------------------------------------------------------------------
 .../beam/examples/complete/game/GameStats.java  |   4 +-
 .../translation/utils/ApexStateInternals.java   |  26 +-
 .../translation/GroupByKeyTranslatorTest.java   |  10 +-
 .../utils/ApexStateInternalsTest.java           |  33 +-
 .../core/construction/WindowingStrategies.java  |  52 +--
 .../construction/WindowingStrategiesTest.java   |   6 +-
 .../runners/core/InMemoryStateInternals.java    |  32 +-
 .../beam/runners/core/ReduceFnRunner.java       |   4 +-
 .../beam/runners/core/SplittableParDo.java      |   8 +-
 .../apache/beam/runners/core/StateMerging.java  |  32 +-
 .../org/apache/beam/runners/core/StateTag.java  |  11 +-
 .../org/apache/beam/runners/core/StateTags.java |  16 +-
 .../core/TestInMemoryStateInternals.java        |   2 +-
 .../apache/beam/runners/core/WatermarkHold.java |  45 ++-
 .../core/GroupAlsoByWindowsProperties.java      |  20 +-
 .../core/InMemoryStateInternalsTest.java        |  34 +-
 .../beam/runners/core/ReduceFnRunnerTest.java   |  38 +--
 .../beam/runners/core/ReduceFnTester.java       |  13 +-
 .../apache/beam/runners/core/StateTagTest.java  |  16 +-
 .../CopyOnAccessInMemoryStateInternals.java     |  24 +-
 .../direct/ParDoMultiOverrideFactory.java       |   6 +-
 .../CopyOnAccessInMemoryStateInternalsTest.java |  54 ++--
 .../functions/HashingFlinkCombineRunner.java    |  19 +-
 .../functions/SortingFlinkCombineRunner.java    |  30 +-
 .../state/FlinkBroadcastStateInternals.java     |   8 +-
 .../state/FlinkKeyGroupStateInternals.java      |   8 +-
 .../state/FlinkSplitStateInternals.java         |   8 +-
 .../streaming/state/FlinkStateInternals.java    |  34 +-
 .../streaming/FlinkStateInternalsTest.java      |  34 +-
 .../spark/stateful/SparkStateInternals.java     |  33 +-
 .../translation/SparkAbstractCombineFn.java     |   4 +-
 .../spark/translation/SparkGlobalCombineFn.java |  37 +--
 .../spark/translation/SparkKeyedCombineFn.java  |  37 +--
 sdks/java/core/pom.xml                          |   5 +
 .../beam/sdk/testing/WindowFnTestUtils.java     |  53 +---
 .../apache/beam/sdk/transforms/GroupByKey.java  |   3 +-
 .../sdk/transforms/windowing/OutputTimeFn.java  | 314 +++++++++++++++++++
 .../sdk/transforms/windowing/OutputTimeFns.java | 212 +++++++++++++
 .../transforms/windowing/TimestampCombiner.java | 186 -----------
 .../beam/sdk/transforms/windowing/Window.java   |  22 +-
 .../org/apache/beam/sdk/util/Reshuffle.java     |   7 +-
 .../apache/beam/sdk/util/WindowingStrategy.java | 176 ++++++++---
 .../apache/beam/sdk/util/state/StateBinder.java |  12 +-
 .../apache/beam/sdk/util/state/StateSpecs.java  |  23 +-
 .../beam/sdk/util/state/WatermarkHoldState.java |  19 +-
 .../org/apache/beam/SdkCoreApiSurfaceTest.java  |   1 +
 .../beam/sdk/transforms/GroupByKeyTest.java     |  10 +-
 .../sdk/transforms/join/CoGroupByKeyTest.java   |   6 +-
 .../transforms/windowing/OutputTimeFnsTest.java |  51 +++
 .../sdk/transforms/windowing/SessionsTest.java  |   6 +-
 .../sdk/transforms/windowing/WindowTest.java    |  23 +-
 .../sdk/transforms/windowing/WindowingTest.java |   2 +-
 .../org/apache/beam/GcpCoreApiSurfaceTest.java  |   1 +
 53 files changed, 1130 insertions(+), 740 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index e0048b7..b6c05be 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -43,8 +43,8 @@ import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -313,7 +313,7 @@ public class GameStats extends LeaderBoard {
     userEvents
       .apply("WindowIntoSessions", Window.<KV<String, Integer>>into(
           Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap())))
-          .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))
+          .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
       // For this use, we care only about the existence of the session, not any particular
       // information aggregated over it, so the following is an efficient way to do that.
       .apply(Combine.perKey(x -> 0))

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
index ec8f666..cfc57cd 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
@@ -45,7 +45,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.CombineFnUtil;
 import org.apache.beam.sdk.util.state.BagState;
@@ -150,10 +150,10 @@ public class ApexStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
-        StateTag<? super K, WatermarkHoldState> address,
-        TimestampCombiner timestampCombiner) {
-      return new ApexWatermarkHoldState<>(namespace, address, timestampCombiner);
+    public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+        StateTag<? super K, WatermarkHoldState<W>> address,
+        OutputTimeFn<? super W> outputTimeFn) {
+      return new ApexWatermarkHoldState<>(namespace, address, outputTimeFn);
     }
 
     @Override
@@ -269,16 +269,16 @@ public class ApexStateInternals<K> implements StateInternals<K> {
   }
 
   private final class ApexWatermarkHoldState<W extends BoundedWindow>
-      extends AbstractState<Instant> implements WatermarkHoldState {
+      extends AbstractState<Instant> implements WatermarkHoldState<W> {
 
-    private final TimestampCombiner timestampCombiner;
+    private final OutputTimeFn<? super W> outputTimeFn;
 
     public ApexWatermarkHoldState(
         StateNamespace namespace,
-        StateTag<?, WatermarkHoldState> address,
-        TimestampCombiner timestampCombiner) {
+        StateTag<?, WatermarkHoldState<W>> address,
+        OutputTimeFn<? super W> outputTimeFn) {
       super(namespace, address, InstantCoder.of());
-      this.timestampCombiner = timestampCombiner;
+      this.outputTimeFn = outputTimeFn;
     }
 
     @Override
@@ -294,7 +294,7 @@ public class ApexStateInternals<K> implements StateInternals<K> {
     @Override
     public void add(Instant outputTime) {
       Instant combined = read();
-      combined = (combined == null) ? outputTime : timestampCombiner.combine(combined, outputTime);
+      combined = (combined == null) ? outputTime : outputTimeFn.combine(combined, outputTime);
       writeValue(combined);
     }
 
@@ -313,8 +313,8 @@ public class ApexStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public TimestampCombiner getTimestampCombiner() {
-      return timestampCombiner;
+    public OutputTimeFn<? super W> getOutputTimeFn() {
+      return outputTimeFn;
     }
 
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
index 9c61b47..193de71 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
@@ -43,7 +43,7 @@ import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.joda.time.Duration;
@@ -83,12 +83,12 @@ public class GroupByKeyTranslatorTest {
         );
 
     p.apply(Read.from(new TestSource(data, new Instant(5000))))
-        .apply(
-            Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
-                .withTimestampCombiner(TimestampCombiner.LATEST))
+        .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
+            .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp()))
         .apply(Count.<String>perElement())
         .apply(ParDo.of(new KeyedByTimestamp<KV<String, Long>>()))
-        .apply(ParDo.of(new EmbeddedCollector()));
+        .apply(ParDo.of(new EmbeddedCollector()))
+        ;
 
     ApexRunnerResult result = (ApexRunnerResult) p.run();
     result.getApexDAG();

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
index 225b654..7160e45 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
@@ -36,7 +36,7 @@ import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.GroupingState;
@@ -65,13 +65,14 @@ public class ApexStateInternalsTest {
           "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
   private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
       StateTags.bag("stringBag", StringUtf8Coder.of());
-  private static final StateTag<Object, WatermarkHoldState>
+  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
       WATERMARK_EARLIEST_ADDR =
-      StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
-  private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR =
-      StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
-  private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR =
-      StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
+      StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
+  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
+      WATERMARK_LATEST_ADDR =
+      StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp());
+  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR =
+      StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow());
 
   private ApexStateInternals<String> underTest;
 
@@ -226,7 +227,7 @@ public class ApexStateInternalsTest {
 
   @Test
   public void testWatermarkEarliestState() throws Exception {
-    WatermarkHoldState value =
+    WatermarkHoldState<BoundedWindow> value =
         underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
 
     // State instances are cached, but depend on the namespace.
@@ -250,7 +251,7 @@ public class ApexStateInternalsTest {
 
   @Test
   public void testWatermarkLatestState() throws Exception {
-    WatermarkHoldState value =
+    WatermarkHoldState<BoundedWindow> value =
         underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
 
     // State instances are cached, but depend on the namespace.
@@ -274,7 +275,7 @@ public class ApexStateInternalsTest {
 
   @Test
   public void testWatermarkEndOfWindowState() throws Exception {
-    WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
+    WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
 
     // State instances are cached, but depend on the namespace.
     assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
@@ -291,7 +292,7 @@ public class ApexStateInternalsTest {
 
   @Test
   public void testWatermarkStateIsEmpty() throws Exception {
-    WatermarkHoldState value =
+    WatermarkHoldState<BoundedWindow> value =
         underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
 
     assertThat(value.isEmpty().read(), Matchers.is(true));
@@ -305,9 +306,9 @@ public class ApexStateInternalsTest {
 
   @Test
   public void testMergeEarliestWatermarkIntoSource() throws Exception {
-    WatermarkHoldState value1 =
+    WatermarkHoldState<BoundedWindow> value1 =
         underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-    WatermarkHoldState value2 =
+    WatermarkHoldState<BoundedWindow> value2 =
         underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
 
     value1.add(new Instant(3000));
@@ -324,11 +325,11 @@ public class ApexStateInternalsTest {
 
   @Test
   public void testMergeLatestWatermarkIntoSource() throws Exception {
-    WatermarkHoldState value1 =
+    WatermarkHoldState<BoundedWindow> value1 =
         underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
-    WatermarkHoldState value2 =
+    WatermarkHoldState<BoundedWindow> value2 =
         underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
-    WatermarkHoldState value3 =
+    WatermarkHoldState<BoundedWindow> value3 =
         underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
 
     value1.add(new Instant(3000));

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
index 0c400db..3d7deef 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
@@ -28,15 +28,16 @@ import java.io.Serializable;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.OutputTime;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
+import org.apache.beam.sdk.util.WindowingStrategy.CombineWindowFnOutputTimes;
 import org.joda.time.Duration;
 
 /** Utilities for working with {@link WindowingStrategy WindowingStrategies}. */
@@ -114,42 +115,11 @@ public class WindowingStrategies implements Serializable {
     }
   }
 
-  public static RunnerApi.OutputTime toProto(TimestampCombiner timestampCombiner) {
-    switch(timestampCombiner) {
-      case EARLIEST:
-        return OutputTime.EARLIEST_IN_PANE;
-      case END_OF_WINDOW:
-        return OutputTime.END_OF_WINDOW;
-      case LATEST:
-        return OutputTime.LATEST_IN_PANE;
-      default:
-        throw new IllegalArgumentException(
-            String.format(
-                "Unknown %s: %s",
-                TimestampCombiner.class.getSimpleName(),
-                timestampCombiner));
-    }
-  }
-
-  public static TimestampCombiner timestampCombinerFromProto(RunnerApi.OutputTime proto) {
-    switch (proto) {
-      case EARLIEST_IN_PANE:
-        return TimestampCombiner.EARLIEST;
-      case END_OF_WINDOW:
-        return TimestampCombiner.END_OF_WINDOW;
-      case LATEST_IN_PANE:
-        return TimestampCombiner.LATEST;
-      case UNRECOGNIZED:
-      default:
-        // Whether or not it is proto that cannot recognize it (due to the version of the
-        // generated code we link to) or the switch hasn't been updated to handle it,
-        // the situation is the same: we don't know what this OutputTime means
-        throw new IllegalArgumentException(
-            String.format(
-                "Cannot convert unknown %s to %s: %s",
-                RunnerApi.OutputTime.class.getCanonicalName(),
-                OutputTime.class.getCanonicalName(),
-                proto));
+  public static RunnerApi.OutputTime toProto(OutputTimeFn<?> outputTimeFn) {
+    if (outputTimeFn instanceof WindowingStrategy.CombineWindowFnOutputTimes) {
+      return toProto(((CombineWindowFnOutputTimes<?>) outputTimeFn).getOutputTimeFn());
+    } else {
+      return OutputTimeFns.toProto(outputTimeFn);
     }
   }
 
@@ -207,7 +177,7 @@ public class WindowingStrategies implements Serializable {
 
     RunnerApi.WindowingStrategy.Builder windowingStrategyProto =
         RunnerApi.WindowingStrategy.newBuilder()
-            .setOutputTime(toProto(windowingStrategy.getTimestampCombiner()))
+            .setOutputTime(toProto(windowingStrategy.getOutputTimeFn()))
             .setAccumulationMode(toProto(windowingStrategy.getMode()))
             .setClosingBehavior(toProto(windowingStrategy.getClosingBehavior()))
             .setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis())
@@ -259,7 +229,7 @@ public class WindowingStrategies implements Serializable {
             "WindowFn");
 
     WindowFn<?, ?> windowFn = (WindowFn<?, ?>) deserializedWindowFn;
-    TimestampCombiner timestampCombiner = timestampCombinerFromProto(proto.getOutputTime());
+    OutputTimeFn<?> outputTimeFn = OutputTimeFns.fromProto(proto.getOutputTime());
     AccumulationMode accumulationMode = fromProto(proto.getAccumulationMode());
     Trigger trigger = Triggers.fromProto(proto.getTrigger());
     ClosingBehavior closingBehavior = fromProto(proto.getClosingBehavior());
@@ -269,7 +239,7 @@ public class WindowingStrategies implements Serializable {
         .withAllowedLateness(allowedLateness)
         .withMode(accumulationMode)
         .withTrigger(trigger)
-        .withTimestampCombiner(timestampCombiner)
+        .withOutputTimeFn(outputTimeFn)
         .withClosingBehavior(closingBehavior);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
index 78ac61c..62bba8e 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
@@ -25,7 +25,7 @@ import com.google.common.collect.ImmutableList;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -68,14 +68,14 @@ public class WindowingStrategiesTest {
                 .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
                 .withTrigger(REPRESENTATIVE_TRIGGER)
                 .withAllowedLateness(Duration.millis(71))
-                .withTimestampCombiner(TimestampCombiner.EARLIEST)),
+                .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())),
         toProtoAndBackSpec(
             WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN)
                 .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY)
                 .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
                 .withTrigger(REPRESENTATIVE_TRIGGER)
                 .withAllowedLateness(Duration.millis(93))
-                .withTimestampCombiner(TimestampCombiner.LATEST)));
+                .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp())));
   }
 
   @Parameter(0)

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
index 9fb8e3f..55b7fc2 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
@@ -34,7 +34,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.util.CombineFnUtil;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
@@ -156,10 +156,10 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
-        StateTag<? super K, WatermarkHoldState> address,
-        TimestampCombiner timestampCombiner) {
-      return new InMemoryWatermarkHold<W>(timestampCombiner);
+    public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+        StateTag<? super K, WatermarkHoldState<W>> address,
+        OutputTimeFn<? super W> outputTimeFn) {
+      return new InMemoryWatermarkHold<W>(outputTimeFn);
     }
 
     @Override
@@ -233,19 +233,19 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
    * An {@link InMemoryState} implementation of {@link WatermarkHoldState}.
    */
   public static final class InMemoryWatermarkHold<W extends BoundedWindow>
-      implements WatermarkHoldState, InMemoryState<InMemoryWatermarkHold<W>> {
+      implements WatermarkHoldState<W>, InMemoryState<InMemoryWatermarkHold<W>> {
 
-    private final TimestampCombiner timestampCombiner;
+    private final OutputTimeFn<? super W> outputTimeFn;
 
     @Nullable
     private Instant combinedHold = null;
 
-    public InMemoryWatermarkHold(TimestampCombiner timestampCombiner) {
-      this.timestampCombiner = timestampCombiner;
+    public InMemoryWatermarkHold(OutputTimeFn<? super W> outputTimeFn) {
+      this.outputTimeFn = outputTimeFn;
     }
 
     @Override
-    public InMemoryWatermarkHold readLater() {
+    public InMemoryWatermarkHold<W> readLater() {
       return this;
     }
 
@@ -263,10 +263,8 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
 
     @Override
     public void add(Instant outputTime) {
-      combinedHold =
-          combinedHold == null
-              ? outputTime
-              : timestampCombiner.combine(combinedHold, outputTime);
+      combinedHold = combinedHold == null ? outputTime
+          : outputTimeFn.combine(combinedHold, outputTime);
     }
 
     @Override
@@ -289,8 +287,8 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public TimestampCombiner getTimestampCombiner() {
-      return timestampCombiner;
+    public OutputTimeFn<? super W> getOutputTimeFn() {
+      return outputTimeFn;
     }
 
     @Override
@@ -301,7 +299,7 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
     @Override
     public InMemoryWatermarkHold<W> copy() {
       InMemoryWatermarkHold<W> that =
-          new InMemoryWatermarkHold<>(timestampCombiner);
+          new InMemoryWatermarkHold<>(outputTimeFn);
       that.combinedHold = this.combinedHold;
       return that;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 4c70c97..34db752 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -47,9 +47,9 @@ import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.SideInputReader;
@@ -171,7 +171,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
    * <ul>
    * <li>State: Bag of hold timestamps.
    * <li>State style: RENAMED
-   * <li>Merging: Depending on {@link TimestampCombiner}, may need to be recalculated on merging.
+   * <li>Merging: Depending on {@link OutputTimeFn}, may need to be recalculated on merging.
    * When a pane fires it may be necessary to add (back) an end-of-window or garbage collection
    * hold.
    * <li>Lifetime: Cleared when a pane fires or when the window is garbage collected.

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 5273e86..31d89ee 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -45,7 +45,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -355,10 +355,10 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
      * the input watermark when the first {@link DoFn.ProcessElement} call for this element
      * completes.
      */
-    private static final StateTag<Object, WatermarkHoldState> watermarkHoldTag =
+    private static final StateTag<Object, WatermarkHoldState<GlobalWindow>> watermarkHoldTag =
         StateTags.makeSystemTagInternal(
             StateTags.<GlobalWindow>watermarkStateInternal(
-                "hold", TimestampCombiner.LATEST));
+                "hold", OutputTimeFns.outputAtLatestInputTimestamp()));
 
     /**
      * The state cell containing a copy of the element. Written during the first {@link
@@ -480,7 +480,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
           stateInternals.state(stateNamespace, elementTag);
       ValueState<RestrictionT> restrictionState =
           stateInternals.state(stateNamespace, restrictionTag);
-      WatermarkHoldState holdState =
+      WatermarkHoldState<GlobalWindow> holdState =
           stateInternals.state(stateNamespace, watermarkHoldTag);
 
       ElementAndRestriction<WindowedValue<InputT>, RestrictionT> elementAndRestriction;

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
index ce37fd3..3410850 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
@@ -218,24 +218,24 @@ public class StateMerging {
    */
   public static <K, W extends BoundedWindow> void prefetchWatermarks(
       MergingStateAccessor<K, W> context,
-      StateTag<? super K, WatermarkHoldState> address) {
-    Map<W, WatermarkHoldState> map = context.accessInEachMergingWindow(address);
-    WatermarkHoldState result = context.access(address);
+      StateTag<? super K, WatermarkHoldState<W>> address) {
+    Map<W, WatermarkHoldState<W>> map = context.accessInEachMergingWindow(address);
+    WatermarkHoldState<W> result = context.access(address);
     if (map.isEmpty()) {
       // Nothing to prefetch.
       return;
     }
     if (map.size() == 1 && map.values().contains(result)
-        && result.getTimestampCombiner().dependsOnlyOnEarliestTimestamp()) {
+        && result.getOutputTimeFn().dependsOnlyOnEarliestInputTimestamp()) {
       // Nothing to change.
       return;
     }
-    if (result.getTimestampCombiner().dependsOnlyOnWindow()) {
+    if (result.getOutputTimeFn().dependsOnlyOnWindow()) {
       // No need to read existing holds.
       return;
     }
     // Prefetch.
-    for (WatermarkHoldState source : map.values()) {
+    for (WatermarkHoldState<W> source : map.values()) {
       prefetchRead(source);
     }
   }
@@ -250,7 +250,7 @@ public class StateMerging {
    */
   public static <K, W extends BoundedWindow> void mergeWatermarks(
       MergingStateAccessor<K, W> context,
-      StateTag<? super K, WatermarkHoldState> address,
+      StateTag<? super K, WatermarkHoldState<W>> address,
       W mergeResult) {
     mergeWatermarks(
         context.accessInEachMergingWindow(address).values(), context.access(address), mergeResult);
@@ -261,31 +261,31 @@ public class StateMerging {
    * into {@code result}, where the final merge result window is {@code mergeResult}.
    */
   public static <W extends BoundedWindow> void mergeWatermarks(
-      Collection<WatermarkHoldState> sources, WatermarkHoldState result,
+      Collection<WatermarkHoldState<W>> sources, WatermarkHoldState<W> result,
       W resultWindow) {
     if (sources.isEmpty()) {
       // Nothing to merge.
       return;
     }
     if (sources.size() == 1 && sources.contains(result)
-        && result.getTimestampCombiner().dependsOnlyOnEarliestTimestamp()) {
+        && result.getOutputTimeFn().dependsOnlyOnEarliestInputTimestamp()) {
       // Nothing to merge.
       return;
     }
-    if (result.getTimestampCombiner().dependsOnlyOnWindow()) {
+    if (result.getOutputTimeFn().dependsOnlyOnWindow()) {
       // Clear sources.
-      for (WatermarkHoldState source : sources) {
+      for (WatermarkHoldState<W> source : sources) {
         source.clear();
       }
       // Update directly from window-derived hold.
-      Instant hold =
-          result.getTimestampCombiner().assign(resultWindow, BoundedWindow.TIMESTAMP_MIN_VALUE);
+      Instant hold = result.getOutputTimeFn().assignOutputTime(
+          BoundedWindow.TIMESTAMP_MIN_VALUE, resultWindow);
       checkState(hold.isAfter(BoundedWindow.TIMESTAMP_MIN_VALUE));
       result.add(hold);
     } else {
       // Prefetch.
       List<ReadableState<Instant>> futures = new ArrayList<>(sources.size());
-      for (WatermarkHoldState source : sources) {
+      for (WatermarkHoldState<W> source : sources) {
         futures.add(source);
       }
       // Read.
@@ -297,12 +297,12 @@ public class StateMerging {
         }
       }
       // Clear sources.
-      for (WatermarkHoldState source : sources) {
+      for (WatermarkHoldState<W> source : sources) {
         source.clear();
       }
       if (!outputTimesToMerge.isEmpty()) {
         // Merge and update.
-        result.add(result.getTimestampCombiner().merge(resultWindow, outputTimesToMerge));
+        result.add(result.getOutputTimeFn().merge(resultWindow, outputTimesToMerge));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
index a5d262a..12c59ad 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.MapState;
@@ -115,10 +115,11 @@ public interface StateTag<K, StateT extends State> extends Serializable {
     /**
      * Bind to a watermark {@link StateSpec}.
      *
-     * <p>This accepts the {@link TimestampCombiner} that dictates how watermark hold timestamps
-     * added to the returned {@link WatermarkHoldState} are to be combined.
+     * <p>This accepts the {@link OutputTimeFn} that dictates how watermark hold timestamps added to
+     * the returned {@link WatermarkHoldState} are to be combined.
      */
-    <W extends BoundedWindow> WatermarkHoldState bindWatermark(
-        StateTag<? super K, WatermarkHoldState> spec, TimestampCombiner timestampCombiner);
+    <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+        StateTag<? super K, WatermarkHoldState<W>> spec,
+        OutputTimeFn<? super W> outputTimeFn);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
index 2b3f4b8..3a45569 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.MapState;
@@ -110,11 +110,11 @@ public class StateTags {
       }
 
       @Override
-      public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+      public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
           String id,
-          StateSpec<? super K, WatermarkHoldState> spec,
-          TimestampCombiner timestampCombiner) {
-        return binder.bindWatermark(tagForSpec(id, spec), timestampCombiner);
+          StateSpec<? super K, WatermarkHoldState<W>> spec,
+          OutputTimeFn<? super W> outputTimeFn) {
+        return binder.bindWatermark(tagForSpec(id, spec), outputTimeFn);
       }
     };
   }
@@ -228,10 +228,10 @@ public class StateTags {
   /**
    * Create a state tag for holding the watermark.
    */
-  public static <W extends BoundedWindow> StateTag<Object, WatermarkHoldState>
-      watermarkStateInternal(String id, TimestampCombiner timestampCombiner) {
+  public static <W extends BoundedWindow> StateTag<Object, WatermarkHoldState<W>>
+      watermarkStateInternal(String id, OutputTimeFn<? super W> outputTimeFn) {
     return new SimpleStateTag<>(
-        new StructuredId(id), StateSpecs.watermarkStateInternal(timestampCombiner));
+        new StructuredId(id), StateSpecs.watermarkStateInternal(outputTimeFn));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
index 1dfb85f..0321a33 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
@@ -52,7 +52,7 @@ public class TestInMemoryStateInternals<K> extends InMemoryStateInternals<K> {
     Instant minimum = null;
     for (State storage : inMemoryState.values()) {
       if (storage instanceof WatermarkHoldState) {
-        Instant hold = ((WatermarkHoldState) storage).read();
+        Instant hold = ((WatermarkHoldState<?>) storage).read();
         if (minimum == null || (hold != null && hold.isBefore(minimum))) {
           minimum = hold;
         }

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
index 9bb9c62..d3c4bc7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
@@ -23,8 +23,9 @@ import com.google.common.annotations.VisibleForTesting;
 import java.io.Serializable;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -54,38 +55,37 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
    * used for elements.
    */
   public static <W extends BoundedWindow>
-      StateTag<Object, WatermarkHoldState> watermarkHoldTagForTimestampCombiner(
-          TimestampCombiner timestampCombiner) {
-    return StateTags.<Object, WatermarkHoldState>makeSystemTagInternal(
-        StateTags.<W>watermarkStateInternal("hold", timestampCombiner));
+      StateTag<Object, WatermarkHoldState<W>> watermarkHoldTagForOutputTimeFn(
+          OutputTimeFn<? super W> outputTimeFn) {
+    return StateTags.<Object, WatermarkHoldState<W>>makeSystemTagInternal(
+        StateTags.<W>watermarkStateInternal("hold", outputTimeFn));
   }
 
   /**
    * Tag for state containing end-of-window and garbage collection output watermark holds.
-   * (We can't piggy-back on the data hold state since the timestampCombiner may be
-   * {@link TimestampCombiner#EARLIEST}, in which case every pane will
+   * (We can't piggy-back on the data hold state since the outputTimeFn may be
+   * {@link OutputTimeFns#outputAtLatestInputTimestamp()}, in which case every pane will
    * would take the end-of-window time as its element time.)
    */
   @VisibleForTesting
-  public static final StateTag<Object, WatermarkHoldState> EXTRA_HOLD_TAG =
+  public static final StateTag<Object, WatermarkHoldState<BoundedWindow>> EXTRA_HOLD_TAG =
       StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal(
-          "extra", TimestampCombiner.EARLIEST));
+          "extra", OutputTimeFns.outputAtEarliestInputTimestamp()));
 
   private final TimerInternals timerInternals;
   private final WindowingStrategy<?, W> windowingStrategy;
-  private final StateTag<Object, WatermarkHoldState> elementHoldTag;
+  private final StateTag<Object, WatermarkHoldState<W>> elementHoldTag;
 
   public WatermarkHold(TimerInternals timerInternals, WindowingStrategy<?, W> windowingStrategy) {
     this.timerInternals = timerInternals;
     this.windowingStrategy = windowingStrategy;
-    this.elementHoldTag =
-        watermarkHoldTagForTimestampCombiner(windowingStrategy.getTimestampCombiner());
+    this.elementHoldTag = watermarkHoldTagForOutputTimeFn(windowingStrategy.getOutputTimeFn());
   }
 
   /**
    * Add a hold to prevent the output watermark progressing beyond the (possibly adjusted) timestamp
-   * of the element in {@code context}. We allow the actual hold time to be shifted later by the
-   * {@link TimestampCombiner}, but no further than the end of the window. The hold will
+   * of the element in {@code context}. We allow the actual hold time to be shifted later by
+   * {@link OutputTimeFn#assignOutputTime}, but no further than the end of the window. The hold will
    * remain until cleared by {@link #extractAndRelease}. Return the timestamp at which the hold
    * was placed, or {@literal null} if no hold was placed.
    *
@@ -199,18 +199,15 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
    * strategy's output time function.
    */
   private Instant shift(Instant timestamp, W window) {
-    Instant shifted =
-        windowingStrategy
-            .getTimestampCombiner()
-            .assign(window, windowingStrategy.getWindowFn().getOutputTime(timestamp, window));
+    Instant shifted = windowingStrategy.getOutputTimeFn().assignOutputTime(timestamp, window);
     checkState(!shifted.isBefore(timestamp),
-        "TimestampCombiner moved element from %s to earlier time %s for window %s",
+        "OutputTimeFn moved element from %s to earlier time %s for window %s",
         BoundedWindow.formatTimestamp(timestamp),
         BoundedWindow.formatTimestamp(shifted),
         window);
     checkState(timestamp.isAfter(window.maxTimestamp())
             || !shifted.isAfter(window.maxTimestamp()),
-        "TimestampCombiner moved element from %s to %s which is beyond end of "
+        "OutputTimeFn moved element from %s to %s which is beyond end of "
             + "window %s",
         timestamp, shifted, window);
 
@@ -220,7 +217,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
   /**
    * Attempt to add an 'element hold'. Return the {@link Instant} at which the hold was
    * added (ie the element timestamp plus any forward shift requested by the
-   * {@link WindowingStrategy#getTimestampCombiner}), or {@literal null} if no hold was added.
+   * {@link WindowingStrategy#getOutputTimeFn}), or {@literal null} if no hold was added.
    * The hold is only added if both:
    * <ol>
    * <li>The backend will be able to respect it. In other words the output watermark cannot
@@ -453,7 +450,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
    * Return (a future for) the earliest hold for {@code context}. Clear all the holds after
    * reading, but add/restore an end-of-window or garbage collection hold if required.
    *
-   * <p>The returned timestamp is the output timestamp according to the {@link TimestampCombiner}
+   * <p>The returned timestamp is the output timestamp according to the {@link OutputTimeFn}
    * from the windowing strategy of this {@link WatermarkHold}, combined across all the non-late
    * elements in the current pane. If there is no such value the timestamp is the end
    * of the window.
@@ -465,8 +462,8 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
             + "outputWatermark:{}",
         context.key(), context.window(), timerInternals.currentInputWatermarkTime(),
         timerInternals.currentOutputWatermarkTime());
-    final WatermarkHoldState elementHoldState = context.state().access(elementHoldTag);
-    final WatermarkHoldState extraHoldState = context.state().access(EXTRA_HOLD_TAG);
+    final WatermarkHoldState<W> elementHoldState = context.state().access(elementHoldTag);
+    final WatermarkHoldState<BoundedWindow> extraHoldState = context.state().access(EXTRA_HOLD_TAG);
     return new ReadableState<OldAndNewHolds>() {
       @Override
       public ReadableState<OldAndNewHolds> readLater() {

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
index 81ac5fa..d0a8923 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
@@ -43,10 +43,10 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
@@ -149,7 +149,7 @@ public class GroupAlsoByWindowsProperties {
 
     WindowingStrategy<?, IntervalWindow> windowingStrategy =
         WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10)))
-            .withTimestampCombiner(TimestampCombiner.EARLIEST);
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp());
 
     List<WindowedValue<KV<String, Iterable<String>>>> result =
         runGABW(
@@ -200,7 +200,7 @@ public class GroupAlsoByWindowsProperties {
 
     WindowingStrategy<?, IntervalWindow> windowingStrategy =
         WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10)))
-            .withTimestampCombiner(TimestampCombiner.EARLIEST);
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp());
 
     List<WindowedValue<KV<String, Long>>> result =
         runGABW(
@@ -348,7 +348,7 @@ public class GroupAlsoByWindowsProperties {
   /**
    * Tests that for a simple sequence of elements on the same key, the given GABW implementation
    * correctly groups them according to fixed windows and also sets the output timestamp according
-   * to the policy {@link TimestampCombiner#END_OF_WINDOW}.
+   * to the policy {@link OutputTimeFns#outputAtEndOfWindow()}.
    */
   public static void groupsElementsIntoFixedWindowsWithEndOfWindowTimestamp(
       GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
@@ -356,7 +356,7 @@ public class GroupAlsoByWindowsProperties {
 
     WindowingStrategy<?, IntervalWindow> windowingStrategy =
         WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
-            .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW);
+            .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
 
     List<WindowedValue<KV<String, Iterable<String>>>> result =
         runGABW(
@@ -386,7 +386,7 @@ public class GroupAlsoByWindowsProperties {
   /**
    * Tests that for a simple sequence of elements on the same key, the given GABW implementation
    * correctly groups them according to fixed windows and also sets the output timestamp according
-   * to the policy {@link TimestampCombiner#LATEST}.
+   * to the policy {@link OutputTimeFns#outputAtLatestInputTimestamp()}.
    */
   public static void groupsElementsIntoFixedWindowsWithLatestTimestamp(
       GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
@@ -394,7 +394,7 @@ public class GroupAlsoByWindowsProperties {
 
     WindowingStrategy<?, IntervalWindow> windowingStrategy =
         WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
-            .withTimestampCombiner(TimestampCombiner.LATEST);
+            .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp());
 
     List<WindowedValue<KV<String, Iterable<String>>>> result =
         runGABW(
@@ -431,7 +431,7 @@ public class GroupAlsoByWindowsProperties {
 
     WindowingStrategy<?, IntervalWindow> windowingStrategy =
         WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
-            .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW);
+            .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
 
     List<WindowedValue<KV<String, Iterable<String>>>> result =
         runGABW(
@@ -468,7 +468,7 @@ public class GroupAlsoByWindowsProperties {
 
     WindowingStrategy<?, IntervalWindow> windowingStrategy =
         WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
-            .withTimestampCombiner(TimestampCombiner.LATEST);
+            .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp());
 
     BoundedWindow unmergedWindow = window(15, 25);
     List<WindowedValue<KV<String, Iterable<String>>>> result =
@@ -508,7 +508,7 @@ public class GroupAlsoByWindowsProperties {
 
     WindowingStrategy<?, IntervalWindow> windowingStrategy =
         WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
-            .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW);
+            .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
 
     BoundedWindow secondWindow = window(15, 25);
     List<WindowedValue<KV<String, Long>>> result =

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
index 6248401..34ddae6 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.GroupingState;
@@ -71,12 +71,14 @@ public class InMemoryStateInternalsTest {
       StateTags.set("stringSet", StringUtf8Coder.of());
   private static final StateTag<Object, MapState<String, Integer>> STRING_MAP_ADDR =
       StateTags.map("stringMap", StringUtf8Coder.of(), VarIntCoder.of());
-  private static final StateTag<Object, WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
-      StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
-  private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR =
-      StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
-  private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR =
-      StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
+  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
+      WATERMARK_EARLIEST_ADDR =
+      StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
+  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
+      WATERMARK_LATEST_ADDR =
+      StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp());
+  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR =
+      StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow());
 
   InMemoryStateInternals<String> underTest = InMemoryStateInternals.forKey("dummyKey");
 
@@ -440,7 +442,7 @@ public class InMemoryStateInternalsTest {
 
   @Test
   public void testWatermarkEarliestState() throws Exception {
-    WatermarkHoldState value =
+    WatermarkHoldState<BoundedWindow> value =
         underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
 
     // State instances are cached, but depend on the namespace.
@@ -464,7 +466,7 @@ public class InMemoryStateInternalsTest {
 
   @Test
   public void testWatermarkLatestState() throws Exception {
-    WatermarkHoldState value =
+    WatermarkHoldState<BoundedWindow> value =
         underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
 
     // State instances are cached, but depend on the namespace.
@@ -488,7 +490,7 @@ public class InMemoryStateInternalsTest {
 
   @Test
   public void testWatermarkEndOfWindowState() throws Exception {
-    WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
+    WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
 
     // State instances are cached, but depend on the namespace.
     assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
@@ -505,7 +507,7 @@ public class InMemoryStateInternalsTest {
 
   @Test
   public void testWatermarkStateIsEmpty() throws Exception {
-    WatermarkHoldState value =
+    WatermarkHoldState<BoundedWindow> value =
         underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
 
     assertThat(value.isEmpty().read(), Matchers.is(true));
@@ -519,9 +521,9 @@ public class InMemoryStateInternalsTest {
 
   @Test
   public void testMergeEarliestWatermarkIntoSource() throws Exception {
-    WatermarkHoldState value1 =
+    WatermarkHoldState<BoundedWindow> value1 =
         underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-    WatermarkHoldState value2 =
+    WatermarkHoldState<BoundedWindow> value2 =
         underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
 
     value1.add(new Instant(3000));
@@ -538,11 +540,11 @@ public class InMemoryStateInternalsTest {
 
   @Test
   public void testMergeLatestWatermarkIntoSource() throws Exception {
-    WatermarkHoldState value1 =
+    WatermarkHoldState<BoundedWindow> value1 =
         underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
-    WatermarkHoldState value2 =
+    WatermarkHoldState<BoundedWindow> value2 =
         underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
-    WatermarkHoldState value3 =
+    WatermarkHoldState<BoundedWindow> value3 =
         underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
 
     value1.add(new Instant(3000));

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 44bc538..0d4d992 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -56,12 +56,12 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.Never;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
 import org.apache.beam.sdk.transforms.windowing.Repeatedly;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -210,7 +210,7 @@ public class ReduceFnRunnerTest {
 
     WindowingStrategy<?, IntervalWindow> strategy =
         WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
-            .withTimestampCombiner(TimestampCombiner.EARLIEST)
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
             .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
             .withAllowedLateness(Duration.millis(100));
 
@@ -284,7 +284,7 @@ public class ReduceFnRunnerTest {
 
     WindowingStrategy<?, IntervalWindow> strategy =
         WindowingStrategy.of((WindowFn<?, IntervalWindow>) windowFn)
-            .withTimestampCombiner(TimestampCombiner.EARLIEST)
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
             .withTrigger(AfterWatermark.pastEndOfWindow().withLateFirings(Never.ever()))
             .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
             .withAllowedLateness(allowedLateness);
@@ -315,7 +315,7 @@ public class ReduceFnRunnerTest {
 
     WindowingStrategy<?, IntervalWindow> strategy =
         WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
-            .withTimestampCombiner(TimestampCombiner.EARLIEST)
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
             .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
             .withAllowedLateness(Duration.millis(100));
 
@@ -615,7 +615,7 @@ public class ReduceFnRunnerTest {
                 AfterWatermark.pastEndOfWindow())))
             .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
             .withAllowedLateness(Duration.millis(100))
-            .withTimestampCombiner(TimestampCombiner.EARLIEST)
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
             .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
 
     tester.advanceInputWatermark(new Instant(0));
@@ -668,7 +668,7 @@ public class ReduceFnRunnerTest {
                 AfterWatermark.pastEndOfWindow())))
             .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
             .withAllowedLateness(Duration.millis(100))
-            .withTimestampCombiner(TimestampCombiner.EARLIEST)
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
             .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY));
 
     tester.advanceInputWatermark(new Instant(0));
@@ -695,7 +695,7 @@ public class ReduceFnRunnerTest {
                 AfterWatermark.pastEndOfWindow())))
             .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
             .withAllowedLateness(Duration.millis(100))
-            .withTimestampCombiner(TimestampCombiner.EARLIEST)
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
             .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
 
     tester.advanceInputWatermark(new Instant(0));
@@ -724,7 +724,7 @@ public class ReduceFnRunnerTest {
                 AfterWatermark.pastEndOfWindow())))
             .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
             .withAllowedLateness(Duration.millis(100))
-            .withTimestampCombiner(TimestampCombiner.EARLIEST)
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
             .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
 
     tester.advanceInputWatermark(new Instant(0));
@@ -1195,7 +1195,7 @@ public class ReduceFnRunnerTest {
 
     WindowingStrategy<?, IntervalWindow> strategy =
         WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
-            .withTimestampCombiner(TimestampCombiner.EARLIEST)
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
             .withTrigger(
                 AfterEach.<IntervalWindow>inOrder(
                     Repeatedly.forever(
@@ -1251,16 +1251,16 @@ public class ReduceFnRunnerTest {
 
     WindowingStrategy<?, IntervalWindow> strategy =
         WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
-            .withTimestampCombiner(TimestampCombiner.EARLIEST)
-            .withTrigger(
-                AfterEach.<IntervalWindow>inOrder(
-                    Repeatedly.forever(
-                            AfterProcessingTime.pastFirstElementInPane()
-                                .plusDelayOf(new Duration(5)))
-                        .orFinally(AfterWatermark.pastEndOfWindow()),
-                    Repeatedly.forever(
-                        AfterProcessingTime.pastFirstElementInPane()
-                            .plusDelayOf(new Duration(25)))))
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+            .withTrigger(AfterEach.<IntervalWindow>inOrder(
+                Repeatedly
+                    .forever(
+                        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
+                            new Duration(5)))
+                    .orFinally(AfterWatermark.pastEndOfWindow()),
+                Repeatedly.forever(
+                    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
+                        new Duration(25)))))
             .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
             .withAllowedLateness(Duration.millis(100));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index b5b5492..549fd8a 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -58,8 +58,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -161,7 +161,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
           throws Exception {
     WindowingStrategy<?, W> strategy =
         WindowingStrategy.of(windowFn)
-            .withTimestampCombiner(TimestampCombiner.EARLIEST)
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
             .withMode(mode)
             .withAllowedLateness(allowedDataLateness)
             .withClosingBehavior(closingBehavior);
@@ -329,10 +329,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
     assertHasOnlyGlobalAndAllowedTags(
         ImmutableSet.copyOf(expectedWindows),
         ImmutableSet.<StateTag<? super String, ?>>of(
-            TriggerStateMachineRunner.FINISHED_BITS_TAG,
-            PaneInfoTracker.PANE_INFO_TAG,
-            WatermarkHold.watermarkHoldTagForTimestampCombiner(
-                objectStrategy.getTimestampCombiner()),
+            TriggerStateMachineRunner.FINISHED_BITS_TAG, PaneInfoTracker.PANE_INFO_TAG,
+            WatermarkHold.watermarkHoldTagForOutputTimeFn(objectStrategy.getOutputTimeFn()),
             WatermarkHold.EXTRA_HOLD_TAG));
   }
 
@@ -347,8 +345,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
         ImmutableSet.copyOf(expectedWindows),
         ImmutableSet.<StateTag<? super String, ?>>of(
             PaneInfoTracker.PANE_INFO_TAG,
-            WatermarkHold.watermarkHoldTagForTimestampCombiner(
-                objectStrategy.getTimestampCombiner()),
+            WatermarkHold.watermarkHoldTagForOutputTimeFn(objectStrategy.getOutputTimeFn()),
             WatermarkHold.EXTRA_HOLD_TAG));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
index 10dcb62..5f5d92d 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.util.CombineFnUtil;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -97,11 +97,15 @@ public class StateTagTest {
 
   @Test
   public void testWatermarkBagEquality() {
-    StateTag<?, ?> foo1 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
-    StateTag<?, ?> foo2 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
-    StateTag<?, ?> bar = StateTags.watermarkStateInternal("bar", TimestampCombiner.EARLIEST);
-
-    StateTag<?, ?> bar2 = StateTags.watermarkStateInternal("bar", TimestampCombiner.LATEST);
+    StateTag<?, ?> foo1 = StateTags.watermarkStateInternal(
+        "foo", OutputTimeFns.outputAtEarliestInputTimestamp());
+    StateTag<?, ?> foo2 = StateTags.watermarkStateInternal(
+        "foo", OutputTimeFns.outputAtEarliestInputTimestamp());
+    StateTag<?, ?> bar = StateTags.watermarkStateInternal(
+        "bar", OutputTimeFns.outputAtEarliestInputTimestamp());
+
+    StateTag<?, ?> bar2 = StateTags.watermarkStateInternal(
+        "bar", OutputTimeFns.outputAtLatestInputTimestamp());
 
     // Same id, same fn.
     assertEquals(foo1, foo2);

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
index 068b37f..0665812 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
@@ -43,7 +43,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.util.CombineFnUtil;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
@@ -213,7 +213,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
       Instant earliest = BoundedWindow.TIMESTAMP_MAX_VALUE;
       for (State existingState : this.values()) {
         if (existingState instanceof WatermarkHoldState) {
-          Instant hold = ((WatermarkHoldState) existingState).read();
+          Instant hold = ((WatermarkHoldState<?>) existingState).read();
           if (hold != null && hold.isBefore(earliest)) {
             earliest = hold;
           }
@@ -276,18 +276,18 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
       public StateBinder<K> forNamespace(final StateNamespace namespace, final StateContext<?> c) {
         return new StateBinder<K>() {
           @Override
-          public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
-              StateTag<? super K, WatermarkHoldState> address,
-              TimestampCombiner timestampCombiner) {
+          public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+              StateTag<? super K, WatermarkHoldState<W>> address,
+              OutputTimeFn<? super W> outputTimeFn) {
             if (containedInUnderlying(namespace, address)) {
               @SuppressWarnings("unchecked")
-              InMemoryState<? extends WatermarkHoldState> existingState =
-                  (InMemoryState<? extends WatermarkHoldState>)
+              InMemoryState<? extends WatermarkHoldState<W>> existingState =
+                  (InMemoryState<? extends WatermarkHoldState<W>>)
                   underlying.get().get(namespace, address, c);
               return existingState.copy();
             } else {
               return new InMemoryWatermarkHold<>(
-                  timestampCombiner);
+                  outputTimeFn);
             }
           }
 
@@ -419,7 +419,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
               State state =
                   readTo.get(namespace, existingState.getKey(), StateContexts.nullContext());
               if (state instanceof WatermarkHoldState) {
-                Instant hold = ((WatermarkHoldState) state).read();
+                Instant hold = ((WatermarkHoldState<?>) state).read();
                 if (hold != null && hold.isBefore(earliestHold)) {
                   earliestHold = hold;
                 }
@@ -434,9 +434,9 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
       public StateBinder<K> forNamespace(final StateNamespace namespace, final StateContext<?> c) {
         return new StateBinder<K>() {
           @Override
-          public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
-              StateTag<? super K, WatermarkHoldState> address,
-              TimestampCombiner timestampCombiner) {
+          public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+              StateTag<? super K, WatermarkHoldState<W>> address,
+              OutputTimeFn<? super W> outputTimeFn) {
             return underlying.get(namespace, address, c);
           }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 322c995..b08aa8e 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -40,8 +40,8 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.AfterPane;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.Repeatedly;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -135,14 +135,14 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
               // to alter the flow of data. This entails:
               //  - trigger as fast as possible
               //  - maintain the full timestamps of elements
-              //  - ensure this GBK holds to the minimum of those timestamps (via TimestampCombiner)
+              //  - ensure this GBK holds to the minimum of those timestamps (via OutputTimeFn)
               //  - discard past panes as it is "just a stream" of elements
               .apply(
                   Window.<KV<K, WindowedValue<KV<K, InputT>>>>configure()
                       .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
                       .discardingFiredPanes()
                       .withAllowedLateness(inputWindowingStrategy.getAllowedLateness())
-                      .withTimestampCombiner(TimestampCombiner.EARLIEST))
+                      .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()))
 
               // A full GBK to group by key _and_ window
               .apply("Group by key", GroupByKey.<K, WindowedValue<KV<K, InputT>>>create())

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index f0aeece..68c6613 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -43,7 +43,8 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.GroupingState;
@@ -288,12 +289,13 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     CopyOnAccessInMemoryStateInternals<String> underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
 
-    TimestampCombiner timestampCombiner = TimestampCombiner.EARLIEST;
+    OutputTimeFn<BoundedWindow> outputTimeFn =
+        OutputTimeFns.outputAtEarliestInputTimestamp();
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, WatermarkHoldState> stateTag =
-        StateTags.watermarkStateInternal("wmstate", timestampCombiner);
-    WatermarkHoldState underlyingValue = underlying.state(namespace, stateTag);
+    StateTag<Object, WatermarkHoldState<BoundedWindow>> stateTag =
+        StateTags.watermarkStateInternal("wmstate", outputTimeFn);
+    WatermarkHoldState<?> underlyingValue = underlying.state(namespace, stateTag);
     assertThat(underlyingValue.read(), nullValue());
 
     underlyingValue.add(new Instant(250L));
@@ -301,7 +303,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
     CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
-    WatermarkHoldState copyOnAccessState = internals.state(namespace, stateTag);
+    WatermarkHoldState<BoundedWindow> copyOnAccessState = internals.state(namespace, stateTag);
     assertThat(copyOnAccessState.read(), equalTo(new Instant(250L)));
 
     copyOnAccessState.add(new Instant(100L));
@@ -311,7 +313,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     copyOnAccessState.add(new Instant(500L));
     assertThat(copyOnAccessState.read(), equalTo(new Instant(100L)));
 
-    WatermarkHoldState reReadUnderlyingValue =
+    WatermarkHoldState<BoundedWindow> reReadUnderlyingValue =
         underlying.state(namespace, stateTag);
     assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read()));
   }
@@ -512,15 +514,15 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
 
-    StateTag<Object, WatermarkHoldState> firstHoldAddress =
-        StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
-    WatermarkHoldState firstHold =
+    StateTag<Object, WatermarkHoldState<BoundedWindow>> firstHoldAddress =
+        StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
+    WatermarkHoldState<BoundedWindow> firstHold =
         internals.state(StateNamespaces.window(null, first), firstHoldAddress);
     firstHold.add(new Instant(22L));
 
-    StateTag<Object, WatermarkHoldState> secondHoldAddress =
-        StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
-    WatermarkHoldState secondHold =
+    StateTag<Object, WatermarkHoldState<BoundedWindow>> secondHoldAddress =
+        StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
+    WatermarkHoldState<BoundedWindow> secondHold =
         internals.state(StateNamespaces.window(null, second), secondHoldAddress);
     secondHold.add(new Instant(2L));
 
@@ -544,18 +546,18 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     };
     CopyOnAccessInMemoryStateInternals<String> underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
-    StateTag<Object, WatermarkHoldState> firstHoldAddress =
-        StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
-    WatermarkHoldState firstHold =
+    StateTag<Object, WatermarkHoldState<BoundedWindow>> firstHoldAddress =
+        StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
+    WatermarkHoldState<BoundedWindow> firstHold =
         underlying.state(StateNamespaces.window(null, first), firstHoldAddress);
     firstHold.add(new Instant(22L));
 
     CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying("foo", underlying.commit());
 
-    StateTag<Object, WatermarkHoldState> secondHoldAddress =
-        StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
-    WatermarkHoldState secondHold =
+    StateTag<Object, WatermarkHoldState<BoundedWindow>> secondHoldAddress =
+        StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
+    WatermarkHoldState<BoundedWindow> secondHold =
         internals.state(StateNamespaces.window(null, second), secondHoldAddress);
     secondHold.add(new Instant(244L));
 
@@ -581,18 +583,18 @@ public class CopyOnAccessInMemoryStateInternalsTest {
         };
     CopyOnAccessInMemoryStateInternals<String> underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
-    StateTag<Object, WatermarkHoldState> firstHoldAddress =
-        StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
-    WatermarkHoldState firstHold =
+    StateTag<Object, WatermarkHoldState<BoundedWindow>> firstHoldAddress =
+        StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
+    WatermarkHoldState<BoundedWindow> firstHold =
         underlying.state(StateNamespaces.window(null, first), firstHoldAddress);
     firstHold.add(new Instant(224L));
 
     CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying("foo", underlying.commit());
 
-    StateTag<Object, WatermarkHoldState> secondHoldAddress =
-        StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
-    WatermarkHoldState secondHold =
+    StateTag<Object, WatermarkHoldState<BoundedWindow>> secondHoldAddress =
+        StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
+    WatermarkHoldState<BoundedWindow> secondHold =
         internals.state(StateNamespaces.window(null, second), secondHoldAddress);
     secondHold.add(new Instant(24L));
 
@@ -608,7 +610,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     internals
         .state(
             StateNamespaces.global(),
-            StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST))
+            StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp()))
         .add(new Instant(1234L));
 
     thrown.expect(IllegalStateException.class);

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java
index 7ee2f69..b904bfe 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java
@@ -29,8 +29,8 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -60,8 +60,8 @@ public class HashingFlinkCombineRunner<
 
 
     @SuppressWarnings("unchecked")
-    TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
-    WindowFn<Object, W> windowFn = windowingStrategy.getWindowFn();
+    OutputTimeFn<? super BoundedWindow> outputTimeFn =
+        (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
 
     // Flink Iterable can be iterated over only once.
     List<WindowedValue<KV<K, InputT>>> inputs = new ArrayList<>();
@@ -87,21 +87,14 @@ public class HashingFlinkCombineRunner<
           AccumT accumT = flinkCombiner.firstInput(key, currentValue.getValue().getValue(),
               options, sideInputReader, singletonW);
           Instant windowTimestamp =
-              timestampCombiner.assign(
-                  mergedWindow, windowFn.getOutputTime(currentValue.getTimestamp(), mergedWindow));
+              outputTimeFn.assignOutputTime(currentValue.getTimestamp(), mergedWindow);
           accumAndInstant = new Tuple2<>(accumT, windowTimestamp);
           mapState.put(mergedWindow, accumAndInstant);
         } else {
           accumAndInstant.f0 = flinkCombiner.addInput(key, accumAndInstant.f0,
               currentValue.getValue().getValue(), options, sideInputReader, singletonW);
-          accumAndInstant.f1 =
-              timestampCombiner.combine(
-                  accumAndInstant.f1,
-                  timestampCombiner.assign(
-                      mergedWindow,
-                      windowingStrategy
-                          .getWindowFn()
-                          .getOutputTime(currentValue.getTimestamp(), mergedWindow)));
+          accumAndInstant.f1 = outputTimeFn.combine(accumAndInstant.f1,
+              outputTimeFn.assignOutputTime(currentValue.getTimestamp(), mergedWindow));
         }
       }
       if (iterator.hasNext()) {