You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/29 01:08:30 UTC
[2/5] beam git commit: Rollforwards "Replace OutputTimeFn UDF with
TimestampCombiner enum""
http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
index 268718a..14f818a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
@@ -20,20 +20,17 @@ package org.apache.beam.sdk.util;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import java.io.Serializable;
-import java.util.Collections;
import java.util.Objects;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.joda.time.Duration;
-import org.joda.time.Instant;
/**
* A {@code WindowingStrategy} describes the windowing behavior for a specific collection of values.
@@ -58,22 +55,22 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
private static final WindowingStrategy<Object, GlobalWindow> DEFAULT = of(new GlobalWindows());
private final WindowFn<T, W> windowFn;
- private final OutputTimeFn<? super W> outputTimeFn;
private final Trigger trigger;
private final AccumulationMode mode;
private final Duration allowedLateness;
private final ClosingBehavior closingBehavior;
+ private final TimestampCombiner timestampCombiner;
private final boolean triggerSpecified;
private final boolean modeSpecified;
private final boolean allowedLatenessSpecified;
- private final boolean outputTimeFnSpecified;
+ private final boolean timestampCombinerSpecified;
private WindowingStrategy(
WindowFn<T, W> windowFn,
Trigger trigger, boolean triggerSpecified,
AccumulationMode mode, boolean modeSpecified,
Duration allowedLateness, boolean allowedLatenessSpecified,
- OutputTimeFn<? super W> outputTimeFn, boolean outputTimeFnSpecified,
+ TimestampCombiner timestampCombiner, boolean timestampCombinerSpecified,
ClosingBehavior closingBehavior) {
this.windowFn = windowFn;
this.trigger = trigger;
@@ -83,8 +80,8 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
this.allowedLateness = allowedLateness;
this.allowedLatenessSpecified = allowedLatenessSpecified;
this.closingBehavior = closingBehavior;
- this.outputTimeFn = outputTimeFn;
- this.outputTimeFnSpecified = outputTimeFnSpecified;
+ this.timestampCombiner = timestampCombiner;
+ this.timestampCombinerSpecified = timestampCombinerSpecified;
}
/**
@@ -100,7 +97,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
DefaultTrigger.of(), false,
AccumulationMode.DISCARDING_FIRED_PANES, false,
DEFAULT_ALLOWED_LATENESS, false,
- new CombineWindowFnOutputTimes(OutputTimeFns.outputAtEndOfWindow(), windowFn), false,
+ TimestampCombiner.END_OF_WINDOW, false,
ClosingBehavior.FIRE_IF_NON_EMPTY);
}
@@ -136,12 +133,12 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
return closingBehavior;
}
- public OutputTimeFn<? super W> getOutputTimeFn() {
- return outputTimeFn;
+ public TimestampCombiner getTimestampCombiner() {
+ return timestampCombiner;
}
- public boolean isOutputTimeFnSpecified() {
- return outputTimeFnSpecified;
+ public boolean isTimestampCombinerSpecified() {
+ return timestampCombinerSpecified;
}
/**
@@ -154,7 +151,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
trigger, true,
mode, modeSpecified,
allowedLateness, allowedLatenessSpecified,
- outputTimeFn, outputTimeFnSpecified,
+ timestampCombiner, timestampCombinerSpecified,
closingBehavior);
}
@@ -168,7 +165,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
trigger, triggerSpecified,
mode, true,
allowedLateness, allowedLatenessSpecified,
- outputTimeFn, outputTimeFnSpecified,
+ timestampCombiner, timestampCombinerSpecified,
closingBehavior);
}
@@ -180,17 +177,12 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
@SuppressWarnings("unchecked")
WindowFn<T, W> typedWindowFn = (WindowFn<T, W>) wildcardWindowFn;
- // The onus of type correctness falls on the callee.
- @SuppressWarnings("unchecked")
- OutputTimeFn<? super W> newOutputTimeFn = (OutputTimeFn<? super W>)
- new CombineWindowFnOutputTimes<W>(outputTimeFn, typedWindowFn);
-
return new WindowingStrategy<T, W>(
typedWindowFn,
trigger, triggerSpecified,
mode, modeSpecified,
allowedLateness, allowedLatenessSpecified,
- newOutputTimeFn, outputTimeFnSpecified,
+ timestampCombiner, timestampCombinerSpecified,
closingBehavior);
}
@@ -204,7 +196,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
trigger, triggerSpecified,
mode, modeSpecified,
allowedLateness, true,
- outputTimeFn, outputTimeFnSpecified,
+ timestampCombiner, timestampCombinerSpecified,
closingBehavior);
}
@@ -214,40 +206,19 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
trigger, triggerSpecified,
mode, modeSpecified,
allowedLateness, allowedLatenessSpecified,
- outputTimeFn, outputTimeFnSpecified,
+ timestampCombiner, timestampCombinerSpecified,
closingBehavior);
}
@Experimental(Experimental.Kind.OUTPUT_TIME)
- public WindowingStrategy<T, W> withOutputTimeFn(OutputTimeFn<?> outputTimeFn) {
-
- @SuppressWarnings("unchecked")
- OutputTimeFn<? super W> typedOutputTimeFn = (OutputTimeFn<? super W>) outputTimeFn;
-
- OutputTimeFn<? super W> newOutputTimeFn =
- new CombineWindowFnOutputTimes<W>(typedOutputTimeFn, windowFn);
+ public WindowingStrategy<T, W> withTimestampCombiner(TimestampCombiner timestampCombiner) {
return new WindowingStrategy<T, W>(
windowFn,
trigger, triggerSpecified,
mode, modeSpecified,
allowedLateness, allowedLatenessSpecified,
- newOutputTimeFn, true,
- closingBehavior);
- }
-
- /**
- * Fixes all the defaults so that equals can be used to check that two strategies are the same,
- * regardless of the state of "defaulted-ness".
- */
- @VisibleForTesting
- public WindowingStrategy<T, W> fixDefaults() {
- return new WindowingStrategy<>(
- windowFn,
- trigger, true,
- mode, true,
- allowedLateness, true,
- outputTimeFn, true,
+ timestampCombiner, true,
closingBehavior);
}
@@ -258,7 +229,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
.add("allowedLateness", allowedLateness)
.add("trigger", trigger)
.add("accumulationMode", mode)
- .add("outputTimeFn", outputTimeFn)
+ .add("timestampCombiner", timestampCombiner)
.toString();
}
@@ -268,104 +239,45 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
return false;
}
WindowingStrategy<?, ?> other = (WindowingStrategy<?, ?>) object;
- return
- isTriggerSpecified() == other.isTriggerSpecified()
+ return isTriggerSpecified() == other.isTriggerSpecified()
&& isAllowedLatenessSpecified() == other.isAllowedLatenessSpecified()
&& isModeSpecified() == other.isModeSpecified()
+ && isTimestampCombinerSpecified() == other.isTimestampCombinerSpecified()
&& getMode().equals(other.getMode())
&& getAllowedLateness().equals(other.getAllowedLateness())
&& getClosingBehavior().equals(other.getClosingBehavior())
&& getTrigger().equals(other.getTrigger())
+ && getTimestampCombiner().equals(other.getTimestampCombiner())
&& getWindowFn().equals(other.getWindowFn());
}
@Override
public int hashCode() {
- return Objects.hash(triggerSpecified, allowedLatenessSpecified, modeSpecified,
- windowFn, trigger, mode, allowedLateness, closingBehavior);
+ return Objects.hash(
+ triggerSpecified,
+ allowedLatenessSpecified,
+ modeSpecified,
+ timestampCombinerSpecified,
+ mode,
+ allowedLateness,
+ closingBehavior,
+ trigger,
+ timestampCombiner,
+ windowFn);
}
/**
- * An {@link OutputTimeFn} that uses {@link WindowFn#getOutputTime} to assign initial timestamps
- * but then combines and merges according to a given {@link OutputTimeFn}.
- *
- * <ul>
- * <li>The {@link WindowFn#getOutputTime} allows adjustments such as that whereby
- * {@link org.apache.beam.sdk.transforms.windowing.SlidingWindows#getOutputTime}
- * moves elements later in time to avoid holding up progress downstream.</li>
- * <li>Then, when multiple elements are buffered for output, the output timestamp of the
- * result is calculated using {@link OutputTimeFn#combine}.</li>
- * <li>In the case of a merging {@link WindowFn}, the output timestamp when windows merge
- * is calculated using {@link OutputTimeFn#merge}.</li>
- * </ul>
+ * Fixes all the defaults so that equals can be used to check that two strategies are the same,
+ * regardless of the state of "defaulted-ness".
*/
- public static class CombineWindowFnOutputTimes<W extends BoundedWindow>
- extends OutputTimeFn<W> {
-
- private final OutputTimeFn<? super W> outputTimeFn;
- private final WindowFn<?, W> windowFn;
-
- public CombineWindowFnOutputTimes(
- OutputTimeFn<? super W> outputTimeFn, WindowFn<?, W> windowFn) {
- this.outputTimeFn = outputTimeFn;
- this.windowFn = windowFn;
- }
-
- public OutputTimeFn<? super W> getOutputTimeFn() {
- return outputTimeFn;
- }
-
- @Override
- public Instant assignOutputTime(Instant inputTimestamp, W window) {
- return outputTimeFn.merge(
- window, Collections.singleton(windowFn.getOutputTime(inputTimestamp, window)));
- }
-
- @Override
- public Instant combine(Instant timestamp, Instant otherTimestamp) {
- return outputTimeFn.combine(timestamp, otherTimestamp);
- }
-
- @Override
- public Instant merge(W newWindow, Iterable<? extends Instant> timestamps) {
- return outputTimeFn.merge(newWindow, timestamps);
- }
-
- @Override
- public final boolean dependsOnlyOnWindow() {
- return outputTimeFn.dependsOnlyOnWindow();
- }
-
- @Override
- public boolean dependsOnlyOnEarliestInputTimestamp() {
- return outputTimeFn.dependsOnlyOnEarliestInputTimestamp();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
-
- if (!(obj instanceof CombineWindowFnOutputTimes)) {
- return false;
- }
-
- CombineWindowFnOutputTimes<?> that = (CombineWindowFnOutputTimes<?>) obj;
- return outputTimeFn.equals(that.outputTimeFn) && windowFn.equals(that.windowFn);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(outputTimeFn, windowFn);
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("outputTimeFn", outputTimeFn)
- .add("windowFn", windowFn)
- .toString();
- }
+ @VisibleForTesting
+ public WindowingStrategy<T, W> fixDefaults() {
+ return new WindowingStrategy<>(
+ windowFn,
+ trigger, true,
+ mode, true,
+ allowedLateness, true,
+ timestampCombiner, true,
+ closingBehavior);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
index 64841fb..f9ab115 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
@@ -21,7 +21,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
/**
* Visitor for binding a {@link StateSpec} and to the associated {@link State}.
@@ -63,11 +63,11 @@ public interface StateBinder<K> {
/**
* Bind to a watermark {@link StateSpec}.
*
- * <p>This accepts the {@link OutputTimeFn} that dictates how watermark hold timestamps added to
- * the returned {@link WatermarkHoldState} are to be combined.
+ * <p>This accepts the {@link TimestampCombiner} that dictates how watermark hold timestamps added
+ * to the returned {@link WatermarkHoldState} are to be combined.
*/
- <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+ <W extends BoundedWindow> WatermarkHoldState bindWatermark(
String id,
- StateSpec<? super K, WatermarkHoldState<W>> spec,
- OutputTimeFn<? super W> outputTimeFn);
+ StateSpec<? super K, WatermarkHoldState> spec,
+ TimestampCombiner timestampCombiner);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
index dc647da..8fa5bb0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
/**
* Static utility methods for creating {@link StateSpec} instances.
@@ -208,9 +208,9 @@ public class StateSpecs {
/** Create a state spec for holding the watermark. */
public static <W extends BoundedWindow>
- StateSpec<Object, WatermarkHoldState<W>> watermarkStateInternal(
- OutputTimeFn<? super W> outputTimeFn) {
- return new WatermarkStateSpecInternal<W>(outputTimeFn);
+ StateSpec<Object, WatermarkHoldState> watermarkStateInternal(
+ TimestampCombiner timestampCombiner) {
+ return new WatermarkStateSpecInternal<W>(timestampCombiner);
}
public static <K, InputT, AccumT, OutputT>
@@ -656,26 +656,26 @@ public class StateSpecs {
/**
* A specification for a state cell tracking a combined watermark hold.
*
- * <p>Includes the {@link OutputTimeFn} according to which the output times
+ * <p>Includes the {@link TimestampCombiner} according to which the output times
* are combined.
*/
private static class WatermarkStateSpecInternal<W extends BoundedWindow>
- implements StateSpec<Object, WatermarkHoldState<W>> {
+ implements StateSpec<Object, WatermarkHoldState> {
/**
* When multiple output times are added to hold the watermark, this determines how they are
* combined, and also the behavior when merging windows. Does not contribute to equality/hash
* since we have at most one watermark hold spec per computation.
*/
- private final OutputTimeFn<? super W> outputTimeFn;
+ private final TimestampCombiner timestampCombiner;
- private WatermarkStateSpecInternal(OutputTimeFn<? super W> outputTimeFn) {
- this.outputTimeFn = outputTimeFn;
+ private WatermarkStateSpecInternal(TimestampCombiner timestampCombiner) {
+ this.timestampCombiner = timestampCombiner;
}
@Override
- public WatermarkHoldState<W> bind(String id, StateBinder<?> visitor) {
- return visitor.bindWatermark(id, this, outputTimeFn);
+ public WatermarkHoldState bind(String id, StateBinder<?> visitor) {
+ return visitor.bindWatermark(id, this, timestampCombiner);
}
@Override
@@ -701,5 +701,4 @@ public class StateSpecs {
return Objects.hash(getClass());
}
}
-
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
index 20fa05f..ae9b700 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
@@ -19,25 +19,24 @@ package org.apache.beam.sdk.util.state;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.joda.time.Instant;
/**
- * A {@link State} accepting and aggregating output timestamps, which determines
- * the time to which the output watermark must be held.
+ * A {@link State} accepting and aggregating output timestamps, which determines the time to which
+ * the output watermark must be held.
*
* <p><b><i>For internal use only. This API may change at any time.</i></b>
*/
@Experimental(Kind.STATE)
-public interface WatermarkHoldState<W extends BoundedWindow>
- extends GroupingState<Instant, Instant> {
+public interface WatermarkHoldState extends GroupingState<Instant, Instant> {
/**
- * Return the {@link OutputTimeFn} which will be used to determine a watermark hold time given
- * an element timestamp, and to combine watermarks from windows which are about to be merged.
+ * Return the {@link TimestampCombiner} which will be used to determine a watermark hold time
+ * given an element timestamp, and to combine watermarks from windows which are about to be
+ * merged.
*/
- OutputTimeFn<? super W> getOutputTimeFn();
+ TimestampCombiner getTimestampCombiner();
@Override
- WatermarkHoldState<W> readLater();
+ WatermarkHoldState readLater();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java b/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
index 153bd84..26dd9f9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
@@ -39,7 +39,6 @@ public class SdkCoreApiSurfaceTest {
ImmutableSet.of(
"org.apache.beam",
"com.google.api.client",
- "com.google.protobuf",
"com.fasterxml.jackson.annotation",
"com.fasterxml.jackson.core",
"com.fasterxml.jackson.databind",
http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 939261f..0556199 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -51,8 +51,8 @@ import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.Reshuffle;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -318,14 +318,14 @@ public class GroupByKeyTest {
*/
@Test
@Category(ValidatesRunner.class)
- public void testOutputTimeFnEarliest() {
+ public void testTimestampCombinerEarliest() {
p.apply(
Create.timestamped(
TimestampedValue.of(KV.of(0, "hello"), new Instant(0)),
TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10))))
.apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(10)))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()))
+ .withTimestampCombiner(TimestampCombiner.EARLIEST))
.apply(GroupByKey.<Integer, String>create())
.apply(ParDo.of(new AssertTimestamp(new Instant(0))));
@@ -339,13 +339,13 @@ public class GroupByKeyTest {
*/
@Test
@Category(ValidatesRunner.class)
- public void testOutputTimeFnLatest() {
+ public void testTimestampCombinerLatest() {
p.apply(
Create.timestamped(
TimestampedValue.of(KV.of(0, "hello"), new Instant(0)),
TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10))))
.apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(10)))
- .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp()))
+ .withTimestampCombiner(TimestampCombiner.LATEST))
.apply(GroupByKey.<Integer, String>create())
.apply(ParDo.of(new AssertTimestamp(new Instant(10))));
http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
index 4e61f4e..9a17bc7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
@@ -41,7 +41,7 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -241,7 +241,7 @@ public class CoGroupByKeyTest implements Serializable {
Arrays.asList(0L, 2L, 4L, 6L, 8L))
.apply("WindowClicks", Window.<KV<Integer, String>>into(
FixedWindows.of(new Duration(4)))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()));
+ .withTimestampCombiner(TimestampCombiner.EARLIEST));
PCollection<KV<Integer, String>> purchasesTable =
createInput("CreatePurchases",
@@ -250,7 +250,7 @@ public class CoGroupByKeyTest implements Serializable {
Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L))
.apply("WindowPurchases", Window.<KV<Integer, String>>into(
FixedWindows.of(new Duration(4)))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()));
+ .withTimestampCombiner(TimestampCombiner.EARLIEST));
PCollection<KV<Integer, CoGbkResult>> coGbkResults =
KeyedPCollectionTuple.of(clicksTag, clicksTable)
http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java
deleted file mode 100644
index 78d7a2f..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-
-import com.google.common.collect.ImmutableList;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
-
-/** Tests for {@link OutputTimeFns}. */
-@RunWith(Parameterized.class)
-public class OutputTimeFnsTest {
-
- @Parameters(name = "{index}: {0}")
- public static Iterable<OutputTimeFn<BoundedWindow>> data() {
- return ImmutableList.of(
- OutputTimeFns.outputAtEarliestInputTimestamp(),
- OutputTimeFns.outputAtLatestInputTimestamp(),
- OutputTimeFns.outputAtEndOfWindow());
- }
-
- @Parameter(0)
- public OutputTimeFn<?> outputTimeFn;
-
- @Test
- public void testToProtoAndBack() throws Exception {
- OutputTimeFn<?> result = OutputTimeFns.fromProto(OutputTimeFns.toProto(outputTimeFn));
-
- assertThat(result, equalTo((OutputTimeFn) outputTimeFn));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java
index b131688..9d94928 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java
@@ -118,7 +118,7 @@ public class SessionsTest {
}
/**
- * Test to confirm that {@link Sessions} with the default {@link OutputTimeFn} holds up the
+ * Test to confirm that {@link Sessions} with the default {@link TimestampCombiner} holds up the
* watermark potentially indefinitely.
*/
@Test
@@ -126,7 +126,7 @@ public class SessionsTest {
try {
WindowFnTestUtils.<Object, IntervalWindow>validateGetOutputTimestamps(
Sessions.withGapDuration(Duration.millis(10)),
- OutputTimeFns.outputAtEarliestInputTimestamp(),
+ TimestampCombiner.EARLIEST,
ImmutableList.of(
(List<Long>) ImmutableList.of(1L, 3L),
(List<Long>) ImmutableList.of(0L, 5L, 10L, 15L, 20L)));
@@ -148,7 +148,7 @@ public class SessionsTest {
public void testValidOutputAtEndTimes() throws Exception {
WindowFnTestUtils.<Object, IntervalWindow>validateGetOutputTimestamps(
Sessions.withGapDuration(Duration.millis(10)),
- OutputTimeFns.outputAtEndOfWindow(),
+ TimestampCombiner.END_OF_WINDOW,
ImmutableList.of(
(List<Long>) ImmutableList.of(1L, 3L),
(List<Long>) ImmutableList.of(0L, 5L, 10L, 15L, 20L)));
http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index e1ed66a..534e230 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -366,7 +366,7 @@ public class WindowTest implements Serializable {
*/
@Test
@Category(ValidatesRunner.class)
- public void testOutputTimeFnDefault() {
+ public void testTimestampCombinerDefault() {
pipeline.enableAbandonedNodeEnforcement(true);
pipeline
@@ -400,7 +400,7 @@ public class WindowTest implements Serializable {
*/
@Test
@Category(ValidatesRunner.class)
- public void testOutputTimeFnEndOfWindow() {
+ public void testTimestampCombinerEndOfWindow() {
pipeline.enableAbandonedNodeEnforcement(true);
pipeline.apply(
@@ -408,7 +408,7 @@ public class WindowTest implements Serializable {
TimestampedValue.of(KV.of(0, "hello"), new Instant(0)),
TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10))))
.apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(10)))
- .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
+ .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))
.apply(GroupByKey.<Integer, String>create())
.apply(ParDo.of(new DoFn<KV<Integer, Iterable<String>>, Void>() {
@ProcessElement
@@ -426,14 +426,14 @@ public class WindowTest implements Serializable {
AfterWatermark.FromEndOfWindow triggerBuilder = AfterWatermark.pastEndOfWindow();
Duration allowedLateness = Duration.standardMinutes(10);
Window.ClosingBehavior closingBehavior = Window.ClosingBehavior.FIRE_IF_NON_EMPTY;
- OutputTimeFn<BoundedWindow> outputTimeFn = OutputTimeFns.outputAtEndOfWindow();
+ TimestampCombiner timestampCombiner = TimestampCombiner.END_OF_WINDOW;
Window<?> window = Window
.into(windowFn)
.triggering(triggerBuilder)
.accumulatingFiredPanes()
.withAllowedLateness(allowedLateness, closingBehavior)
- .withOutputTimeFn(outputTimeFn);
+ .withTimestampCombiner(timestampCombiner);
DisplayData displayData = DisplayData.from(window);
@@ -446,7 +446,7 @@ public class WindowTest implements Serializable {
assertThat(displayData,
hasDisplayItem("allowedLateness", allowedLateness));
assertThat(displayData, hasDisplayItem("closingBehavior", closingBehavior.toString()));
- assertThat(displayData, hasDisplayItem("outputTimeFn", outputTimeFn.getClass()));
+ assertThat(displayData, hasDisplayItem("timestampCombiner", timestampCombiner.toString()));
}
@Test
@@ -456,14 +456,14 @@ public class WindowTest implements Serializable {
AfterWatermark.FromEndOfWindow triggerBuilder = AfterWatermark.pastEndOfWindow();
Duration allowedLateness = Duration.standardMinutes(10);
Window.ClosingBehavior closingBehavior = Window.ClosingBehavior.FIRE_IF_NON_EMPTY;
- OutputTimeFn<BoundedWindow> outputTimeFn = OutputTimeFns.outputAtEndOfWindow();
+ TimestampCombiner timestampCombiner = TimestampCombiner.END_OF_WINDOW;
Window<?> window = Window
.into(windowFn)
.triggering(triggerBuilder)
.accumulatingFiredPanes()
.withAllowedLateness(allowedLateness, closingBehavior)
- .withOutputTimeFn(outputTimeFn);
+ .withTimestampCombiner(timestampCombiner);
DisplayData primitiveDisplayData =
Iterables.getOnlyElement(
@@ -478,7 +478,8 @@ public class WindowTest implements Serializable {
assertThat(primitiveDisplayData,
hasDisplayItem("allowedLateness", allowedLateness));
assertThat(primitiveDisplayData, hasDisplayItem("closingBehavior", closingBehavior.toString()));
- assertThat(primitiveDisplayData, hasDisplayItem("outputTimeFn", outputTimeFn.getClass()));
+ assertThat(
+ primitiveDisplayData, hasDisplayItem("timestampCombiner", timestampCombiner.toString()));
}
@Test
@@ -497,7 +498,7 @@ public class WindowTest implements Serializable {
assertThat(displayData, not(hasDisplayItem("accumulationMode")));
assertThat(displayData, not(hasDisplayItem("allowedLateness")));
assertThat(displayData, not(hasDisplayItem("closingBehavior")));
- assertThat(displayData, not(hasDisplayItem("outputTimeFn")));
+ assertThat(displayData, not(hasDisplayItem("timestampCombiner")));
}
@Test
@@ -506,7 +507,7 @@ public class WindowTest implements Serializable {
assertThat(DisplayData.from(onlyHasAccumulationMode), not(hasDisplayItem(hasKey(isOneOf(
"windowFn",
"trigger",
- "outputTimeFn",
+ "timestampCombiner",
"allowedLateness",
"closingBehavior")))));
http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
index a3f5352..30b0311 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
@@ -76,7 +76,7 @@ public class WindowingTest implements Serializable {
public PCollection<String> expand(PCollection<String> in) {
return in.apply("Window",
Window.<String>into(windowFn)
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()))
+ .withTimestampCombiner(TimestampCombiner.EARLIEST))
.apply(Count.<String>perElement())
.apply("FormatCounts", ParDo.of(new FormatCountsDoFn()))
.setCoder(StringUtf8Coder.of());
http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
index 50edd83..215b0f4 100644
--- a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
+++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
@@ -42,7 +42,6 @@ public class GcpCoreApiSurfaceTest {
"com.google.api.services.cloudresourcemanager",
"com.google.api.services.storage",
"com.google.auth",
- "com.google.protobuf",
"com.fasterxml.jackson.annotation",
"com.fasterxml.jackson.core",
"com.fasterxml.jackson.databind",