You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/29 01:08:30 UTC

[2/5] beam git commit: Rollforwards "Replace OutputTimeFn UDF with TimestampCombiner enum""

http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
index 268718a..14f818a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
@@ -20,20 +20,17 @@ package org.apache.beam.sdk.util;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import java.io.Serializable;
-import java.util.Collections;
 import java.util.Objects;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.joda.time.Duration;
-import org.joda.time.Instant;
 
 /**
  * A {@code WindowingStrategy} describes the windowing behavior for a specific collection of values.
@@ -58,22 +55,22 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
   private static final WindowingStrategy<Object, GlobalWindow> DEFAULT = of(new GlobalWindows());
 
   private final WindowFn<T, W> windowFn;
-  private final OutputTimeFn<? super W> outputTimeFn;
   private final Trigger trigger;
   private final AccumulationMode mode;
   private final Duration allowedLateness;
   private final ClosingBehavior closingBehavior;
+  private final TimestampCombiner timestampCombiner;
   private final boolean triggerSpecified;
   private final boolean modeSpecified;
   private final boolean allowedLatenessSpecified;
-  private final boolean outputTimeFnSpecified;
+  private final boolean timestampCombinerSpecified;
 
   private WindowingStrategy(
       WindowFn<T, W> windowFn,
       Trigger trigger, boolean triggerSpecified,
       AccumulationMode mode, boolean modeSpecified,
       Duration allowedLateness, boolean allowedLatenessSpecified,
-      OutputTimeFn<? super W> outputTimeFn, boolean outputTimeFnSpecified,
+      TimestampCombiner timestampCombiner, boolean timestampCombinerSpecified,
       ClosingBehavior closingBehavior) {
     this.windowFn = windowFn;
     this.trigger = trigger;
@@ -83,8 +80,8 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
     this.allowedLateness = allowedLateness;
     this.allowedLatenessSpecified = allowedLatenessSpecified;
     this.closingBehavior = closingBehavior;
-    this.outputTimeFn = outputTimeFn;
-    this.outputTimeFnSpecified = outputTimeFnSpecified;
+    this.timestampCombiner = timestampCombiner;
+    this.timestampCombinerSpecified = timestampCombinerSpecified;
   }
 
   /**
@@ -100,7 +97,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
         DefaultTrigger.of(), false,
         AccumulationMode.DISCARDING_FIRED_PANES, false,
         DEFAULT_ALLOWED_LATENESS, false,
-        new CombineWindowFnOutputTimes(OutputTimeFns.outputAtEndOfWindow(), windowFn), false,
+        TimestampCombiner.END_OF_WINDOW, false,
         ClosingBehavior.FIRE_IF_NON_EMPTY);
   }
 
@@ -136,12 +133,12 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
     return closingBehavior;
   }
 
-  public OutputTimeFn<? super W> getOutputTimeFn() {
-    return outputTimeFn;
+  public TimestampCombiner getTimestampCombiner() {
+    return timestampCombiner;
   }
 
-  public boolean isOutputTimeFnSpecified() {
-    return outputTimeFnSpecified;
+  public boolean isTimestampCombinerSpecified() {
+    return timestampCombinerSpecified;
   }
 
   /**
@@ -154,7 +151,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
         trigger, true,
         mode, modeSpecified,
         allowedLateness, allowedLatenessSpecified,
-        outputTimeFn, outputTimeFnSpecified,
+        timestampCombiner, timestampCombinerSpecified,
         closingBehavior);
   }
 
@@ -168,7 +165,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
         trigger, triggerSpecified,
         mode, true,
         allowedLateness, allowedLatenessSpecified,
-        outputTimeFn, outputTimeFnSpecified,
+        timestampCombiner, timestampCombinerSpecified,
         closingBehavior);
   }
 
@@ -180,17 +177,12 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
     @SuppressWarnings("unchecked")
     WindowFn<T, W> typedWindowFn = (WindowFn<T, W>) wildcardWindowFn;
 
-    // The onus of type correctness falls on the callee.
-    @SuppressWarnings("unchecked")
-    OutputTimeFn<? super W> newOutputTimeFn = (OutputTimeFn<? super W>)
-        new CombineWindowFnOutputTimes<W>(outputTimeFn, typedWindowFn);
-
     return new WindowingStrategy<T, W>(
         typedWindowFn,
         trigger, triggerSpecified,
         mode, modeSpecified,
         allowedLateness, allowedLatenessSpecified,
-        newOutputTimeFn, outputTimeFnSpecified,
+        timestampCombiner, timestampCombinerSpecified,
         closingBehavior);
   }
 
@@ -204,7 +196,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
         trigger, triggerSpecified,
         mode, modeSpecified,
         allowedLateness, true,
-        outputTimeFn, outputTimeFnSpecified,
+        timestampCombiner, timestampCombinerSpecified,
         closingBehavior);
   }
 
@@ -214,40 +206,19 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
         trigger, triggerSpecified,
         mode, modeSpecified,
         allowedLateness, allowedLatenessSpecified,
-        outputTimeFn, outputTimeFnSpecified,
+        timestampCombiner, timestampCombinerSpecified,
         closingBehavior);
   }
 
   @Experimental(Experimental.Kind.OUTPUT_TIME)
-  public WindowingStrategy<T, W> withOutputTimeFn(OutputTimeFn<?> outputTimeFn) {
-
-    @SuppressWarnings("unchecked")
-    OutputTimeFn<? super W> typedOutputTimeFn = (OutputTimeFn<? super W>) outputTimeFn;
-
-    OutputTimeFn<? super W> newOutputTimeFn =
-        new CombineWindowFnOutputTimes<W>(typedOutputTimeFn, windowFn);
+  public WindowingStrategy<T, W> withTimestampCombiner(TimestampCombiner timestampCombiner) {
 
     return new WindowingStrategy<T, W>(
         windowFn,
         trigger, triggerSpecified,
         mode, modeSpecified,
         allowedLateness, allowedLatenessSpecified,
-        newOutputTimeFn, true,
-        closingBehavior);
-  }
-
-  /**
-   * Fixes all the defaults so that equals can be used to check that two strategies are the same,
-   * regardless of the state of "defaulted-ness".
-   */
-  @VisibleForTesting
-  public WindowingStrategy<T, W> fixDefaults() {
-    return new WindowingStrategy<>(
-        windowFn,
-        trigger, true,
-        mode, true,
-        allowedLateness, true,
-        outputTimeFn, true,
+        timestampCombiner, true,
         closingBehavior);
   }
 
@@ -258,7 +229,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
         .add("allowedLateness", allowedLateness)
         .add("trigger", trigger)
         .add("accumulationMode", mode)
-        .add("outputTimeFn", outputTimeFn)
+        .add("timestampCombiner", timestampCombiner)
         .toString();
   }
 
@@ -268,104 +239,45 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
       return false;
     }
     WindowingStrategy<?, ?> other = (WindowingStrategy<?, ?>) object;
-    return
-        isTriggerSpecified() == other.isTriggerSpecified()
+    return isTriggerSpecified() == other.isTriggerSpecified()
         && isAllowedLatenessSpecified() == other.isAllowedLatenessSpecified()
         && isModeSpecified() == other.isModeSpecified()
+        && isTimestampCombinerSpecified() == other.isTimestampCombinerSpecified()
         && getMode().equals(other.getMode())
         && getAllowedLateness().equals(other.getAllowedLateness())
         && getClosingBehavior().equals(other.getClosingBehavior())
         && getTrigger().equals(other.getTrigger())
+        && getTimestampCombiner().equals(other.getTimestampCombiner())
         && getWindowFn().equals(other.getWindowFn());
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(triggerSpecified, allowedLatenessSpecified, modeSpecified,
-        windowFn, trigger, mode, allowedLateness, closingBehavior);
+    return Objects.hash(
+        triggerSpecified,
+        allowedLatenessSpecified,
+        modeSpecified,
+        timestampCombinerSpecified,
+        mode,
+        allowedLateness,
+        closingBehavior,
+        trigger,
+        timestampCombiner,
+        windowFn);
   }
 
   /**
-   * An {@link OutputTimeFn} that uses {@link WindowFn#getOutputTime} to assign initial timestamps
-   * but then combines and merges according to a given {@link OutputTimeFn}.
-   *
-   * <ul>
-   *   <li>The {@link WindowFn#getOutputTime} allows adjustments such as that whereby
-   *       {@link org.apache.beam.sdk.transforms.windowing.SlidingWindows#getOutputTime}
-   *       moves elements later in time to avoid holding up progress downstream.</li>
-   *   <li>Then, when multiple elements are buffered for output, the output timestamp of the
-   *       result is calculated using {@link OutputTimeFn#combine}.</li>
-   *   <li>In the case of a merging {@link WindowFn}, the output timestamp when windows merge
-   *       is calculated using {@link OutputTimeFn#merge}.</li>
-   * </ul>
+   * Fixes all the defaults so that equals can be used to check that two strategies are the same,
+   * regardless of the state of "defaulted-ness".
    */
-  public static class CombineWindowFnOutputTimes<W extends BoundedWindow>
-      extends OutputTimeFn<W> {
-
-    private final OutputTimeFn<? super W> outputTimeFn;
-    private final WindowFn<?, W> windowFn;
-
-    public CombineWindowFnOutputTimes(
-        OutputTimeFn<? super W> outputTimeFn, WindowFn<?, W> windowFn) {
-      this.outputTimeFn = outputTimeFn;
-      this.windowFn = windowFn;
-    }
-
-    public OutputTimeFn<? super W> getOutputTimeFn() {
-      return outputTimeFn;
-    }
-
-    @Override
-    public Instant assignOutputTime(Instant inputTimestamp, W window) {
-      return outputTimeFn.merge(
-          window, Collections.singleton(windowFn.getOutputTime(inputTimestamp, window)));
-    }
-
-    @Override
-    public Instant combine(Instant timestamp, Instant otherTimestamp) {
-      return outputTimeFn.combine(timestamp, otherTimestamp);
-    }
-
-    @Override
-    public Instant merge(W newWindow, Iterable<? extends Instant> timestamps) {
-      return outputTimeFn.merge(newWindow, timestamps);
-    }
-
-    @Override
-    public final boolean dependsOnlyOnWindow() {
-      return outputTimeFn.dependsOnlyOnWindow();
-    }
-
-    @Override
-    public boolean dependsOnlyOnEarliestInputTimestamp() {
-      return outputTimeFn.dependsOnlyOnEarliestInputTimestamp();
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj == this) {
-        return true;
-      }
-
-      if (!(obj instanceof CombineWindowFnOutputTimes)) {
-        return false;
-      }
-
-      CombineWindowFnOutputTimes<?> that = (CombineWindowFnOutputTimes<?>) obj;
-      return outputTimeFn.equals(that.outputTimeFn) && windowFn.equals(that.windowFn);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(outputTimeFn, windowFn);
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(this)
-          .add("outputTimeFn", outputTimeFn)
-          .add("windowFn", windowFn)
-          .toString();
-    }
+  @VisibleForTesting
+  public WindowingStrategy<T, W> fixDefaults() {
+    return new WindowingStrategy<>(
+        windowFn,
+        trigger, true,
+        mode, true,
+        allowedLateness, true,
+        timestampCombiner, true,
+        closingBehavior);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
index 64841fb..f9ab115 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
@@ -21,7 +21,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 
 /**
  * Visitor for binding a {@link StateSpec} and to the associated {@link State}.
@@ -63,11 +63,11 @@ public interface StateBinder<K> {
   /**
    * Bind to a watermark {@link StateSpec}.
    *
-   * <p>This accepts the {@link OutputTimeFn} that dictates how watermark hold timestamps added to
-   * the returned {@link WatermarkHoldState} are to be combined.
+   * <p>This accepts the {@link TimestampCombiner} that dictates how watermark hold timestamps added
+   * to the returned {@link WatermarkHoldState} are to be combined.
    */
-  <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+  <W extends BoundedWindow> WatermarkHoldState bindWatermark(
       String id,
-      StateSpec<? super K, WatermarkHoldState<W>> spec,
-      OutputTimeFn<? super W> outputTimeFn);
+      StateSpec<? super K, WatermarkHoldState> spec,
+      TimestampCombiner timestampCombiner);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
index dc647da..8fa5bb0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 
 /**
  * Static utility methods for creating {@link StateSpec} instances.
@@ -208,9 +208,9 @@ public class StateSpecs {
 
   /** Create a state spec for holding the watermark. */
   public static <W extends BoundedWindow>
-      StateSpec<Object, WatermarkHoldState<W>> watermarkStateInternal(
-          OutputTimeFn<? super W> outputTimeFn) {
-    return new WatermarkStateSpecInternal<W>(outputTimeFn);
+      StateSpec<Object, WatermarkHoldState> watermarkStateInternal(
+          TimestampCombiner timestampCombiner) {
+    return new WatermarkStateSpecInternal<W>(timestampCombiner);
   }
 
   public static <K, InputT, AccumT, OutputT>
@@ -656,26 +656,26 @@ public class StateSpecs {
   /**
    * A specification for a state cell tracking a combined watermark hold.
    *
-   * <p>Includes the {@link OutputTimeFn} according to which the output times
+   * <p>Includes the {@link TimestampCombiner} according to which the output times
    * are combined.
    */
   private static class WatermarkStateSpecInternal<W extends BoundedWindow>
-      implements StateSpec<Object, WatermarkHoldState<W>> {
+      implements StateSpec<Object, WatermarkHoldState> {
 
     /**
      * When multiple output times are added to hold the watermark, this determines how they are
      * combined, and also the behavior when merging windows. Does not contribute to equality/hash
      * since we have at most one watermark hold spec per computation.
      */
-    private final OutputTimeFn<? super W> outputTimeFn;
+    private final TimestampCombiner timestampCombiner;
 
-    private WatermarkStateSpecInternal(OutputTimeFn<? super W> outputTimeFn) {
-      this.outputTimeFn = outputTimeFn;
+    private WatermarkStateSpecInternal(TimestampCombiner timestampCombiner) {
+      this.timestampCombiner = timestampCombiner;
     }
 
     @Override
-    public WatermarkHoldState<W> bind(String id, StateBinder<?> visitor) {
-      return visitor.bindWatermark(id, this, outputTimeFn);
+    public WatermarkHoldState bind(String id, StateBinder<?> visitor) {
+      return visitor.bindWatermark(id, this, timestampCombiner);
     }
 
     @Override
@@ -701,5 +701,4 @@ public class StateSpecs {
       return Objects.hash(getClass());
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
index 20fa05f..ae9b700 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
@@ -19,25 +19,24 @@ package org.apache.beam.sdk.util.state;
 
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.joda.time.Instant;
 
 /**
- * A {@link State} accepting and aggregating output timestamps, which determines
- * the time to which the output watermark must be held.
+ * A {@link State} accepting and aggregating output timestamps, which determines the time to which
+ * the output watermark must be held.
  *
  * <p><b><i>For internal use only. This API may change at any time.</i></b>
  */
 @Experimental(Kind.STATE)
-public interface WatermarkHoldState<W extends BoundedWindow>
-    extends GroupingState<Instant, Instant> {
+public interface WatermarkHoldState extends GroupingState<Instant, Instant> {
   /**
-   * Return the {@link OutputTimeFn} which will be used to determine a watermark hold time given
-   * an element timestamp, and to combine watermarks from windows which are about to be merged.
+   * Return the {@link TimestampCombiner} which will be used to determine a watermark hold time
+   * given an element timestamp, and to combine watermarks from windows which are about to be
+   * merged.
    */
-  OutputTimeFn<? super W> getOutputTimeFn();
+  TimestampCombiner getTimestampCombiner();
 
   @Override
-  WatermarkHoldState<W> readLater();
+  WatermarkHoldState readLater();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java b/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
index 153bd84..26dd9f9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
@@ -39,7 +39,6 @@ public class SdkCoreApiSurfaceTest {
         ImmutableSet.of(
             "org.apache.beam",
             "com.google.api.client",
-            "com.google.protobuf",
             "com.fasterxml.jackson.annotation",
             "com.fasterxml.jackson.core",
             "com.fasterxml.jackson.databind",

http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 939261f..0556199 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -51,8 +51,8 @@ import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.Reshuffle;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -318,14 +318,14 @@ public class GroupByKeyTest {
    */
   @Test
   @Category(ValidatesRunner.class)
-  public void testOutputTimeFnEarliest() {
+  public void testTimestampCombinerEarliest() {
 
     p.apply(
         Create.timestamped(
             TimestampedValue.of(KV.of(0, "hello"), new Instant(0)),
             TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10))))
         .apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(10)))
-            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()))
+            .withTimestampCombiner(TimestampCombiner.EARLIEST))
         .apply(GroupByKey.<Integer, String>create())
         .apply(ParDo.of(new AssertTimestamp(new Instant(0))));
 
@@ -339,13 +339,13 @@ public class GroupByKeyTest {
    */
   @Test
   @Category(ValidatesRunner.class)
-  public void testOutputTimeFnLatest() {
+  public void testTimestampCombinerLatest() {
     p.apply(
         Create.timestamped(
             TimestampedValue.of(KV.of(0, "hello"), new Instant(0)),
             TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10))))
         .apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(10)))
-            .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp()))
+            .withTimestampCombiner(TimestampCombiner.LATEST))
         .apply(GroupByKey.<Integer, String>create())
         .apply(ParDo.of(new AssertTimestamp(new Instant(10))));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
index 4e61f4e..9a17bc7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
@@ -41,7 +41,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -241,7 +241,7 @@ public class CoGroupByKeyTest implements Serializable {
             Arrays.asList(0L, 2L, 4L, 6L, 8L))
         .apply("WindowClicks", Window.<KV<Integer, String>>into(
             FixedWindows.of(new Duration(4)))
-            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()));
+            .withTimestampCombiner(TimestampCombiner.EARLIEST));
 
     PCollection<KV<Integer, String>> purchasesTable =
         createInput("CreatePurchases",
@@ -250,7 +250,7 @@ public class CoGroupByKeyTest implements Serializable {
             Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L))
         .apply("WindowPurchases", Window.<KV<Integer, String>>into(
             FixedWindows.of(new Duration(4)))
-            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()));
+            .withTimestampCombiner(TimestampCombiner.EARLIEST));
 
     PCollection<KV<Integer, CoGbkResult>> coGbkResults =
         KeyedPCollectionTuple.of(clicksTag, clicksTable)

http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java
deleted file mode 100644
index 78d7a2f..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-
-import com.google.common.collect.ImmutableList;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
-
-/** Tests for {@link OutputTimeFns}. */
-@RunWith(Parameterized.class)
-public class OutputTimeFnsTest {
-
-  @Parameters(name = "{index}: {0}")
-  public static Iterable<OutputTimeFn<BoundedWindow>> data() {
-    return ImmutableList.of(
-        OutputTimeFns.outputAtEarliestInputTimestamp(),
-        OutputTimeFns.outputAtLatestInputTimestamp(),
-        OutputTimeFns.outputAtEndOfWindow());
-  }
-
-  @Parameter(0)
-  public OutputTimeFn<?> outputTimeFn;
-
-  @Test
-  public void testToProtoAndBack() throws Exception {
-    OutputTimeFn<?> result = OutputTimeFns.fromProto(OutputTimeFns.toProto(outputTimeFn));
-
-    assertThat(result, equalTo((OutputTimeFn) outputTimeFn));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java
index b131688..9d94928 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java
@@ -118,7 +118,7 @@ public class SessionsTest {
   }
 
   /**
-   * Test to confirm that {@link Sessions} with the default {@link OutputTimeFn} holds up the
+   * Test to confirm that {@link Sessions} with the default {@link TimestampCombiner} holds up the
    * watermark potentially indefinitely.
    */
   @Test
@@ -126,7 +126,7 @@ public class SessionsTest {
     try {
       WindowFnTestUtils.<Object, IntervalWindow>validateGetOutputTimestamps(
           Sessions.withGapDuration(Duration.millis(10)),
-          OutputTimeFns.outputAtEarliestInputTimestamp(),
+          TimestampCombiner.EARLIEST,
           ImmutableList.of(
               (List<Long>) ImmutableList.of(1L, 3L),
               (List<Long>) ImmutableList.of(0L, 5L, 10L, 15L, 20L)));
@@ -148,7 +148,7 @@ public class SessionsTest {
   public void testValidOutputAtEndTimes() throws Exception {
     WindowFnTestUtils.<Object, IntervalWindow>validateGetOutputTimestamps(
         Sessions.withGapDuration(Duration.millis(10)),
-        OutputTimeFns.outputAtEndOfWindow(),
+        TimestampCombiner.END_OF_WINDOW,
           ImmutableList.of(
               (List<Long>) ImmutableList.of(1L, 3L),
               (List<Long>) ImmutableList.of(0L, 5L, 10L, 15L, 20L)));

http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index e1ed66a..534e230 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -366,7 +366,7 @@ public class WindowTest implements Serializable {
    */
   @Test
   @Category(ValidatesRunner.class)
-  public void testOutputTimeFnDefault() {
+  public void testTimestampCombinerDefault() {
     pipeline.enableAbandonedNodeEnforcement(true);
 
     pipeline
@@ -400,7 +400,7 @@ public class WindowTest implements Serializable {
    */
   @Test
   @Category(ValidatesRunner.class)
-  public void testOutputTimeFnEndOfWindow() {
+  public void testTimestampCombinerEndOfWindow() {
     pipeline.enableAbandonedNodeEnforcement(true);
 
     pipeline.apply(
@@ -408,7 +408,7 @@ public class WindowTest implements Serializable {
             TimestampedValue.of(KV.of(0, "hello"), new Instant(0)),
             TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10))))
         .apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(10)))
-            .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
+            .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))
         .apply(GroupByKey.<Integer, String>create())
         .apply(ParDo.of(new DoFn<KV<Integer, Iterable<String>>, Void>() {
           @ProcessElement
@@ -426,14 +426,14 @@ public class WindowTest implements Serializable {
     AfterWatermark.FromEndOfWindow triggerBuilder = AfterWatermark.pastEndOfWindow();
     Duration allowedLateness = Duration.standardMinutes(10);
     Window.ClosingBehavior closingBehavior = Window.ClosingBehavior.FIRE_IF_NON_EMPTY;
-    OutputTimeFn<BoundedWindow> outputTimeFn = OutputTimeFns.outputAtEndOfWindow();
+    TimestampCombiner timestampCombiner = TimestampCombiner.END_OF_WINDOW;
 
     Window<?> window = Window
         .into(windowFn)
         .triggering(triggerBuilder)
         .accumulatingFiredPanes()
         .withAllowedLateness(allowedLateness, closingBehavior)
-        .withOutputTimeFn(outputTimeFn);
+        .withTimestampCombiner(timestampCombiner);
 
     DisplayData displayData = DisplayData.from(window);
 
@@ -446,7 +446,7 @@ public class WindowTest implements Serializable {
     assertThat(displayData,
         hasDisplayItem("allowedLateness", allowedLateness));
     assertThat(displayData, hasDisplayItem("closingBehavior", closingBehavior.toString()));
-    assertThat(displayData, hasDisplayItem("outputTimeFn", outputTimeFn.getClass()));
+    assertThat(displayData, hasDisplayItem("timestampCombiner", timestampCombiner.toString()));
   }
 
   @Test
@@ -456,14 +456,14 @@ public class WindowTest implements Serializable {
     AfterWatermark.FromEndOfWindow triggerBuilder = AfterWatermark.pastEndOfWindow();
     Duration allowedLateness = Duration.standardMinutes(10);
     Window.ClosingBehavior closingBehavior = Window.ClosingBehavior.FIRE_IF_NON_EMPTY;
-    OutputTimeFn<BoundedWindow> outputTimeFn = OutputTimeFns.outputAtEndOfWindow();
+    TimestampCombiner timestampCombiner = TimestampCombiner.END_OF_WINDOW;
 
     Window<?> window = Window
         .into(windowFn)
         .triggering(triggerBuilder)
         .accumulatingFiredPanes()
         .withAllowedLateness(allowedLateness, closingBehavior)
-        .withOutputTimeFn(outputTimeFn);
+        .withTimestampCombiner(timestampCombiner);
 
     DisplayData primitiveDisplayData =
         Iterables.getOnlyElement(
@@ -478,7 +478,8 @@ public class WindowTest implements Serializable {
     assertThat(primitiveDisplayData,
         hasDisplayItem("allowedLateness", allowedLateness));
     assertThat(primitiveDisplayData, hasDisplayItem("closingBehavior", closingBehavior.toString()));
-    assertThat(primitiveDisplayData, hasDisplayItem("outputTimeFn", outputTimeFn.getClass()));
+    assertThat(
+        primitiveDisplayData, hasDisplayItem("timestampCombiner", timestampCombiner.toString()));
   }
 
   @Test
@@ -497,7 +498,7 @@ public class WindowTest implements Serializable {
     assertThat(displayData, not(hasDisplayItem("accumulationMode")));
     assertThat(displayData, not(hasDisplayItem("allowedLateness")));
     assertThat(displayData, not(hasDisplayItem("closingBehavior")));
-    assertThat(displayData, not(hasDisplayItem("outputTimeFn")));
+    assertThat(displayData, not(hasDisplayItem("timestampCombiner")));
   }
 
   @Test
@@ -506,7 +507,7 @@ public class WindowTest implements Serializable {
     assertThat(DisplayData.from(onlyHasAccumulationMode), not(hasDisplayItem(hasKey(isOneOf(
         "windowFn",
         "trigger",
-        "outputTimeFn",
+        "timestampCombiner",
         "allowedLateness",
         "closingBehavior")))));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
index a3f5352..30b0311 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
@@ -76,7 +76,7 @@ public class WindowingTest implements Serializable {
     public PCollection<String> expand(PCollection<String> in) {
       return in.apply("Window",
               Window.<String>into(windowFn)
-                  .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()))
+                  .withTimestampCombiner(TimestampCombiner.EARLIEST))
           .apply(Count.<String>perElement())
           .apply("FormatCounts", ParDo.of(new FormatCountsDoFn()))
           .setCoder(StringUtf8Coder.of());

http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
index 50edd83..215b0f4 100644
--- a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
+++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
@@ -42,7 +42,6 @@ public class GcpCoreApiSurfaceTest {
             "com.google.api.services.cloudresourcemanager",
             "com.google.api.services.storage",
             "com.google.auth",
-            "com.google.protobuf",
             "com.fasterxml.jackson.annotation",
             "com.fasterxml.jackson.core",
             "com.fasterxml.jackson.databind",