You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/26 22:38:21 UTC
[1/4] beam git commit: Replace OutputTimeFn UDF with
TimestampCombiner enum
Repository: beam
Updated Branches:
refs/heads/master 7339882b0 -> a32371eb3
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
index 268718a..14f818a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
@@ -20,20 +20,17 @@ package org.apache.beam.sdk.util;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import java.io.Serializable;
-import java.util.Collections;
import java.util.Objects;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.joda.time.Duration;
-import org.joda.time.Instant;
/**
* A {@code WindowingStrategy} describes the windowing behavior for a specific collection of values.
@@ -58,22 +55,22 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
private static final WindowingStrategy<Object, GlobalWindow> DEFAULT = of(new GlobalWindows());
private final WindowFn<T, W> windowFn;
- private final OutputTimeFn<? super W> outputTimeFn;
private final Trigger trigger;
private final AccumulationMode mode;
private final Duration allowedLateness;
private final ClosingBehavior closingBehavior;
+ private final TimestampCombiner timestampCombiner;
private final boolean triggerSpecified;
private final boolean modeSpecified;
private final boolean allowedLatenessSpecified;
- private final boolean outputTimeFnSpecified;
+ private final boolean timestampCombinerSpecified;
private WindowingStrategy(
WindowFn<T, W> windowFn,
Trigger trigger, boolean triggerSpecified,
AccumulationMode mode, boolean modeSpecified,
Duration allowedLateness, boolean allowedLatenessSpecified,
- OutputTimeFn<? super W> outputTimeFn, boolean outputTimeFnSpecified,
+ TimestampCombiner timestampCombiner, boolean timestampCombinerSpecified,
ClosingBehavior closingBehavior) {
this.windowFn = windowFn;
this.trigger = trigger;
@@ -83,8 +80,8 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
this.allowedLateness = allowedLateness;
this.allowedLatenessSpecified = allowedLatenessSpecified;
this.closingBehavior = closingBehavior;
- this.outputTimeFn = outputTimeFn;
- this.outputTimeFnSpecified = outputTimeFnSpecified;
+ this.timestampCombiner = timestampCombiner;
+ this.timestampCombinerSpecified = timestampCombinerSpecified;
}
/**
@@ -100,7 +97,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
DefaultTrigger.of(), false,
AccumulationMode.DISCARDING_FIRED_PANES, false,
DEFAULT_ALLOWED_LATENESS, false,
- new CombineWindowFnOutputTimes(OutputTimeFns.outputAtEndOfWindow(), windowFn), false,
+ TimestampCombiner.END_OF_WINDOW, false,
ClosingBehavior.FIRE_IF_NON_EMPTY);
}
@@ -136,12 +133,12 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
return closingBehavior;
}
- public OutputTimeFn<? super W> getOutputTimeFn() {
- return outputTimeFn;
+ public TimestampCombiner getTimestampCombiner() {
+ return timestampCombiner;
}
- public boolean isOutputTimeFnSpecified() {
- return outputTimeFnSpecified;
+ public boolean isTimestampCombinerSpecified() {
+ return timestampCombinerSpecified;
}
/**
@@ -154,7 +151,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
trigger, true,
mode, modeSpecified,
allowedLateness, allowedLatenessSpecified,
- outputTimeFn, outputTimeFnSpecified,
+ timestampCombiner, timestampCombinerSpecified,
closingBehavior);
}
@@ -168,7 +165,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
trigger, triggerSpecified,
mode, true,
allowedLateness, allowedLatenessSpecified,
- outputTimeFn, outputTimeFnSpecified,
+ timestampCombiner, timestampCombinerSpecified,
closingBehavior);
}
@@ -180,17 +177,12 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
@SuppressWarnings("unchecked")
WindowFn<T, W> typedWindowFn = (WindowFn<T, W>) wildcardWindowFn;
- // The onus of type correctness falls on the callee.
- @SuppressWarnings("unchecked")
- OutputTimeFn<? super W> newOutputTimeFn = (OutputTimeFn<? super W>)
- new CombineWindowFnOutputTimes<W>(outputTimeFn, typedWindowFn);
-
return new WindowingStrategy<T, W>(
typedWindowFn,
trigger, triggerSpecified,
mode, modeSpecified,
allowedLateness, allowedLatenessSpecified,
- newOutputTimeFn, outputTimeFnSpecified,
+ timestampCombiner, timestampCombinerSpecified,
closingBehavior);
}
@@ -204,7 +196,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
trigger, triggerSpecified,
mode, modeSpecified,
allowedLateness, true,
- outputTimeFn, outputTimeFnSpecified,
+ timestampCombiner, timestampCombinerSpecified,
closingBehavior);
}
@@ -214,40 +206,19 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
trigger, triggerSpecified,
mode, modeSpecified,
allowedLateness, allowedLatenessSpecified,
- outputTimeFn, outputTimeFnSpecified,
+ timestampCombiner, timestampCombinerSpecified,
closingBehavior);
}
@Experimental(Experimental.Kind.OUTPUT_TIME)
- public WindowingStrategy<T, W> withOutputTimeFn(OutputTimeFn<?> outputTimeFn) {
-
- @SuppressWarnings("unchecked")
- OutputTimeFn<? super W> typedOutputTimeFn = (OutputTimeFn<? super W>) outputTimeFn;
-
- OutputTimeFn<? super W> newOutputTimeFn =
- new CombineWindowFnOutputTimes<W>(typedOutputTimeFn, windowFn);
+ public WindowingStrategy<T, W> withTimestampCombiner(TimestampCombiner timestampCombiner) {
return new WindowingStrategy<T, W>(
windowFn,
trigger, triggerSpecified,
mode, modeSpecified,
allowedLateness, allowedLatenessSpecified,
- newOutputTimeFn, true,
- closingBehavior);
- }
-
- /**
- * Fixes all the defaults so that equals can be used to check that two strategies are the same,
- * regardless of the state of "defaulted-ness".
- */
- @VisibleForTesting
- public WindowingStrategy<T, W> fixDefaults() {
- return new WindowingStrategy<>(
- windowFn,
- trigger, true,
- mode, true,
- allowedLateness, true,
- outputTimeFn, true,
+ timestampCombiner, true,
closingBehavior);
}
@@ -258,7 +229,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
.add("allowedLateness", allowedLateness)
.add("trigger", trigger)
.add("accumulationMode", mode)
- .add("outputTimeFn", outputTimeFn)
+ .add("timestampCombiner", timestampCombiner)
.toString();
}
@@ -268,104 +239,45 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
return false;
}
WindowingStrategy<?, ?> other = (WindowingStrategy<?, ?>) object;
- return
- isTriggerSpecified() == other.isTriggerSpecified()
+ return isTriggerSpecified() == other.isTriggerSpecified()
&& isAllowedLatenessSpecified() == other.isAllowedLatenessSpecified()
&& isModeSpecified() == other.isModeSpecified()
+ && isTimestampCombinerSpecified() == other.isTimestampCombinerSpecified()
&& getMode().equals(other.getMode())
&& getAllowedLateness().equals(other.getAllowedLateness())
&& getClosingBehavior().equals(other.getClosingBehavior())
&& getTrigger().equals(other.getTrigger())
+ && getTimestampCombiner().equals(other.getTimestampCombiner())
&& getWindowFn().equals(other.getWindowFn());
}
@Override
public int hashCode() {
- return Objects.hash(triggerSpecified, allowedLatenessSpecified, modeSpecified,
- windowFn, trigger, mode, allowedLateness, closingBehavior);
+ return Objects.hash(
+ triggerSpecified,
+ allowedLatenessSpecified,
+ modeSpecified,
+ timestampCombinerSpecified,
+ mode,
+ allowedLateness,
+ closingBehavior,
+ trigger,
+ timestampCombiner,
+ windowFn);
}
/**
- * An {@link OutputTimeFn} that uses {@link WindowFn#getOutputTime} to assign initial timestamps
- * but then combines and merges according to a given {@link OutputTimeFn}.
- *
- * <ul>
- * <li>The {@link WindowFn#getOutputTime} allows adjustments such as that whereby
- * {@link org.apache.beam.sdk.transforms.windowing.SlidingWindows#getOutputTime}
- * moves elements later in time to avoid holding up progress downstream.</li>
- * <li>Then, when multiple elements are buffered for output, the output timestamp of the
- * result is calculated using {@link OutputTimeFn#combine}.</li>
- * <li>In the case of a merging {@link WindowFn}, the output timestamp when windows merge
- * is calculated using {@link OutputTimeFn#merge}.</li>
- * </ul>
+ * Fixes all the defaults so that equals can be used to check that two strategies are the same,
+ * regardless of the state of "defaulted-ness".
*/
- public static class CombineWindowFnOutputTimes<W extends BoundedWindow>
- extends OutputTimeFn<W> {
-
- private final OutputTimeFn<? super W> outputTimeFn;
- private final WindowFn<?, W> windowFn;
-
- public CombineWindowFnOutputTimes(
- OutputTimeFn<? super W> outputTimeFn, WindowFn<?, W> windowFn) {
- this.outputTimeFn = outputTimeFn;
- this.windowFn = windowFn;
- }
-
- public OutputTimeFn<? super W> getOutputTimeFn() {
- return outputTimeFn;
- }
-
- @Override
- public Instant assignOutputTime(Instant inputTimestamp, W window) {
- return outputTimeFn.merge(
- window, Collections.singleton(windowFn.getOutputTime(inputTimestamp, window)));
- }
-
- @Override
- public Instant combine(Instant timestamp, Instant otherTimestamp) {
- return outputTimeFn.combine(timestamp, otherTimestamp);
- }
-
- @Override
- public Instant merge(W newWindow, Iterable<? extends Instant> timestamps) {
- return outputTimeFn.merge(newWindow, timestamps);
- }
-
- @Override
- public final boolean dependsOnlyOnWindow() {
- return outputTimeFn.dependsOnlyOnWindow();
- }
-
- @Override
- public boolean dependsOnlyOnEarliestInputTimestamp() {
- return outputTimeFn.dependsOnlyOnEarliestInputTimestamp();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
-
- if (!(obj instanceof CombineWindowFnOutputTimes)) {
- return false;
- }
-
- CombineWindowFnOutputTimes<?> that = (CombineWindowFnOutputTimes<?>) obj;
- return outputTimeFn.equals(that.outputTimeFn) && windowFn.equals(that.windowFn);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(outputTimeFn, windowFn);
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("outputTimeFn", outputTimeFn)
- .add("windowFn", windowFn)
- .toString();
- }
+ @VisibleForTesting
+ public WindowingStrategy<T, W> fixDefaults() {
+ return new WindowingStrategy<>(
+ windowFn,
+ trigger, true,
+ mode, true,
+ allowedLateness, true,
+ timestampCombiner, true,
+ closingBehavior);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
index 64841fb..f9ab115 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
@@ -21,7 +21,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
/**
* Visitor for binding a {@link StateSpec} and to the associated {@link State}.
@@ -63,11 +63,11 @@ public interface StateBinder<K> {
/**
* Bind to a watermark {@link StateSpec}.
*
- * <p>This accepts the {@link OutputTimeFn} that dictates how watermark hold timestamps added to
- * the returned {@link WatermarkHoldState} are to be combined.
+ * <p>This accepts the {@link TimestampCombiner} that dictates how watermark hold timestamps added
+ * to the returned {@link WatermarkHoldState} are to be combined.
*/
- <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+ <W extends BoundedWindow> WatermarkHoldState bindWatermark(
String id,
- StateSpec<? super K, WatermarkHoldState<W>> spec,
- OutputTimeFn<? super W> outputTimeFn);
+ StateSpec<? super K, WatermarkHoldState> spec,
+ TimestampCombiner timestampCombiner);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
index dc647da..8fa5bb0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
/**
* Static utility methods for creating {@link StateSpec} instances.
@@ -208,9 +208,9 @@ public class StateSpecs {
/** Create a state spec for holding the watermark. */
public static <W extends BoundedWindow>
- StateSpec<Object, WatermarkHoldState<W>> watermarkStateInternal(
- OutputTimeFn<? super W> outputTimeFn) {
- return new WatermarkStateSpecInternal<W>(outputTimeFn);
+ StateSpec<Object, WatermarkHoldState> watermarkStateInternal(
+ TimestampCombiner timestampCombiner) {
+ return new WatermarkStateSpecInternal<W>(timestampCombiner);
}
public static <K, InputT, AccumT, OutputT>
@@ -656,26 +656,26 @@ public class StateSpecs {
/**
* A specification for a state cell tracking a combined watermark hold.
*
- * <p>Includes the {@link OutputTimeFn} according to which the output times
+ * <p>Includes the {@link TimestampCombiner} according to which the output times
* are combined.
*/
private static class WatermarkStateSpecInternal<W extends BoundedWindow>
- implements StateSpec<Object, WatermarkHoldState<W>> {
+ implements StateSpec<Object, WatermarkHoldState> {
/**
* When multiple output times are added to hold the watermark, this determines how they are
* combined, and also the behavior when merging windows. Does not contribute to equality/hash
* since we have at most one watermark hold spec per computation.
*/
- private final OutputTimeFn<? super W> outputTimeFn;
+ private final TimestampCombiner timestampCombiner;
- private WatermarkStateSpecInternal(OutputTimeFn<? super W> outputTimeFn) {
- this.outputTimeFn = outputTimeFn;
+ private WatermarkStateSpecInternal(TimestampCombiner timestampCombiner) {
+ this.timestampCombiner = timestampCombiner;
}
@Override
- public WatermarkHoldState<W> bind(String id, StateBinder<?> visitor) {
- return visitor.bindWatermark(id, this, outputTimeFn);
+ public WatermarkHoldState bind(String id, StateBinder<?> visitor) {
+ return visitor.bindWatermark(id, this, timestampCombiner);
}
@Override
@@ -701,5 +701,4 @@ public class StateSpecs {
return Objects.hash(getClass());
}
}
-
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
index 20fa05f..ae9b700 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
@@ -19,25 +19,24 @@ package org.apache.beam.sdk.util.state;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.joda.time.Instant;
/**
- * A {@link State} accepting and aggregating output timestamps, which determines
- * the time to which the output watermark must be held.
+ * A {@link State} accepting and aggregating output timestamps, which determines the time to which
+ * the output watermark must be held.
*
* <p><b><i>For internal use only. This API may change at any time.</i></b>
*/
@Experimental(Kind.STATE)
-public interface WatermarkHoldState<W extends BoundedWindow>
- extends GroupingState<Instant, Instant> {
+public interface WatermarkHoldState extends GroupingState<Instant, Instant> {
/**
- * Return the {@link OutputTimeFn} which will be used to determine a watermark hold time given
- * an element timestamp, and to combine watermarks from windows which are about to be merged.
+ * Return the {@link TimestampCombiner} which will be used to determine a watermark hold time
+ * given an element timestamp, and to combine watermarks from windows which are about to be
+ * merged.
*/
- OutputTimeFn<? super W> getOutputTimeFn();
+ TimestampCombiner getTimestampCombiner();
@Override
- WatermarkHoldState<W> readLater();
+ WatermarkHoldState readLater();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java b/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
index 153bd84..26dd9f9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
@@ -39,7 +39,6 @@ public class SdkCoreApiSurfaceTest {
ImmutableSet.of(
"org.apache.beam",
"com.google.api.client",
- "com.google.protobuf",
"com.fasterxml.jackson.annotation",
"com.fasterxml.jackson.core",
"com.fasterxml.jackson.databind",
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 939261f..0556199 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -51,8 +51,8 @@ import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.Reshuffle;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -318,14 +318,14 @@ public class GroupByKeyTest {
*/
@Test
@Category(ValidatesRunner.class)
- public void testOutputTimeFnEarliest() {
+ public void testTimestampCombinerEarliest() {
p.apply(
Create.timestamped(
TimestampedValue.of(KV.of(0, "hello"), new Instant(0)),
TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10))))
.apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(10)))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()))
+ .withTimestampCombiner(TimestampCombiner.EARLIEST))
.apply(GroupByKey.<Integer, String>create())
.apply(ParDo.of(new AssertTimestamp(new Instant(0))));
@@ -339,13 +339,13 @@ public class GroupByKeyTest {
*/
@Test
@Category(ValidatesRunner.class)
- public void testOutputTimeFnLatest() {
+ public void testTimestampCombinerLatest() {
p.apply(
Create.timestamped(
TimestampedValue.of(KV.of(0, "hello"), new Instant(0)),
TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10))))
.apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(10)))
- .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp()))
+ .withTimestampCombiner(TimestampCombiner.LATEST))
.apply(GroupByKey.<Integer, String>create())
.apply(ParDo.of(new AssertTimestamp(new Instant(10))));
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
index 4e61f4e..9a17bc7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
@@ -41,7 +41,7 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -241,7 +241,7 @@ public class CoGroupByKeyTest implements Serializable {
Arrays.asList(0L, 2L, 4L, 6L, 8L))
.apply("WindowClicks", Window.<KV<Integer, String>>into(
FixedWindows.of(new Duration(4)))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()));
+ .withTimestampCombiner(TimestampCombiner.EARLIEST));
PCollection<KV<Integer, String>> purchasesTable =
createInput("CreatePurchases",
@@ -250,7 +250,7 @@ public class CoGroupByKeyTest implements Serializable {
Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L))
.apply("WindowPurchases", Window.<KV<Integer, String>>into(
FixedWindows.of(new Duration(4)))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()));
+ .withTimestampCombiner(TimestampCombiner.EARLIEST));
PCollection<KV<Integer, CoGbkResult>> coGbkResults =
KeyedPCollectionTuple.of(clicksTag, clicksTable)
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java
deleted file mode 100644
index 78d7a2f..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-
-import com.google.common.collect.ImmutableList;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
-
-/** Tests for {@link OutputTimeFns}. */
-@RunWith(Parameterized.class)
-public class OutputTimeFnsTest {
-
- @Parameters(name = "{index}: {0}")
- public static Iterable<OutputTimeFn<BoundedWindow>> data() {
- return ImmutableList.of(
- OutputTimeFns.outputAtEarliestInputTimestamp(),
- OutputTimeFns.outputAtLatestInputTimestamp(),
- OutputTimeFns.outputAtEndOfWindow());
- }
-
- @Parameter(0)
- public OutputTimeFn<?> outputTimeFn;
-
- @Test
- public void testToProtoAndBack() throws Exception {
- OutputTimeFn<?> result = OutputTimeFns.fromProto(OutputTimeFns.toProto(outputTimeFn));
-
- assertThat(result, equalTo((OutputTimeFn) outputTimeFn));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java
index b131688..9d94928 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java
@@ -118,7 +118,7 @@ public class SessionsTest {
}
/**
- * Test to confirm that {@link Sessions} with the default {@link OutputTimeFn} holds up the
+ * Test to confirm that {@link Sessions} with the default {@link TimestampCombiner} holds up the
* watermark potentially indefinitely.
*/
@Test
@@ -126,7 +126,7 @@ public class SessionsTest {
try {
WindowFnTestUtils.<Object, IntervalWindow>validateGetOutputTimestamps(
Sessions.withGapDuration(Duration.millis(10)),
- OutputTimeFns.outputAtEarliestInputTimestamp(),
+ TimestampCombiner.EARLIEST,
ImmutableList.of(
(List<Long>) ImmutableList.of(1L, 3L),
(List<Long>) ImmutableList.of(0L, 5L, 10L, 15L, 20L)));
@@ -148,7 +148,7 @@ public class SessionsTest {
public void testValidOutputAtEndTimes() throws Exception {
WindowFnTestUtils.<Object, IntervalWindow>validateGetOutputTimestamps(
Sessions.withGapDuration(Duration.millis(10)),
- OutputTimeFns.outputAtEndOfWindow(),
+ TimestampCombiner.END_OF_WINDOW,
ImmutableList.of(
(List<Long>) ImmutableList.of(1L, 3L),
(List<Long>) ImmutableList.of(0L, 5L, 10L, 15L, 20L)));
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index e1ed66a..534e230 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -366,7 +366,7 @@ public class WindowTest implements Serializable {
*/
@Test
@Category(ValidatesRunner.class)
- public void testOutputTimeFnDefault() {
+ public void testTimestampCombinerDefault() {
pipeline.enableAbandonedNodeEnforcement(true);
pipeline
@@ -400,7 +400,7 @@ public class WindowTest implements Serializable {
*/
@Test
@Category(ValidatesRunner.class)
- public void testOutputTimeFnEndOfWindow() {
+ public void testTimestampCombinerEndOfWindow() {
pipeline.enableAbandonedNodeEnforcement(true);
pipeline.apply(
@@ -408,7 +408,7 @@ public class WindowTest implements Serializable {
TimestampedValue.of(KV.of(0, "hello"), new Instant(0)),
TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10))))
.apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(10)))
- .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
+ .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))
.apply(GroupByKey.<Integer, String>create())
.apply(ParDo.of(new DoFn<KV<Integer, Iterable<String>>, Void>() {
@ProcessElement
@@ -426,14 +426,14 @@ public class WindowTest implements Serializable {
AfterWatermark.FromEndOfWindow triggerBuilder = AfterWatermark.pastEndOfWindow();
Duration allowedLateness = Duration.standardMinutes(10);
Window.ClosingBehavior closingBehavior = Window.ClosingBehavior.FIRE_IF_NON_EMPTY;
- OutputTimeFn<BoundedWindow> outputTimeFn = OutputTimeFns.outputAtEndOfWindow();
+ TimestampCombiner timestampCombiner = TimestampCombiner.END_OF_WINDOW;
Window<?> window = Window
.into(windowFn)
.triggering(triggerBuilder)
.accumulatingFiredPanes()
.withAllowedLateness(allowedLateness, closingBehavior)
- .withOutputTimeFn(outputTimeFn);
+ .withTimestampCombiner(timestampCombiner);
DisplayData displayData = DisplayData.from(window);
@@ -446,7 +446,7 @@ public class WindowTest implements Serializable {
assertThat(displayData,
hasDisplayItem("allowedLateness", allowedLateness));
assertThat(displayData, hasDisplayItem("closingBehavior", closingBehavior.toString()));
- assertThat(displayData, hasDisplayItem("outputTimeFn", outputTimeFn.getClass()));
+ assertThat(displayData, hasDisplayItem("timestampCombiner", timestampCombiner.toString()));
}
@Test
@@ -456,14 +456,14 @@ public class WindowTest implements Serializable {
AfterWatermark.FromEndOfWindow triggerBuilder = AfterWatermark.pastEndOfWindow();
Duration allowedLateness = Duration.standardMinutes(10);
Window.ClosingBehavior closingBehavior = Window.ClosingBehavior.FIRE_IF_NON_EMPTY;
- OutputTimeFn<BoundedWindow> outputTimeFn = OutputTimeFns.outputAtEndOfWindow();
+ TimestampCombiner timestampCombiner = TimestampCombiner.END_OF_WINDOW;
Window<?> window = Window
.into(windowFn)
.triggering(triggerBuilder)
.accumulatingFiredPanes()
.withAllowedLateness(allowedLateness, closingBehavior)
- .withOutputTimeFn(outputTimeFn);
+ .withTimestampCombiner(timestampCombiner);
DisplayData primitiveDisplayData =
Iterables.getOnlyElement(
@@ -478,7 +478,8 @@ public class WindowTest implements Serializable {
assertThat(primitiveDisplayData,
hasDisplayItem("allowedLateness", allowedLateness));
assertThat(primitiveDisplayData, hasDisplayItem("closingBehavior", closingBehavior.toString()));
- assertThat(primitiveDisplayData, hasDisplayItem("outputTimeFn", outputTimeFn.getClass()));
+ assertThat(
+ primitiveDisplayData, hasDisplayItem("timestampCombiner", timestampCombiner.toString()));
}
@Test
@@ -497,7 +498,7 @@ public class WindowTest implements Serializable {
assertThat(displayData, not(hasDisplayItem("accumulationMode")));
assertThat(displayData, not(hasDisplayItem("allowedLateness")));
assertThat(displayData, not(hasDisplayItem("closingBehavior")));
- assertThat(displayData, not(hasDisplayItem("outputTimeFn")));
+ assertThat(displayData, not(hasDisplayItem("timestampCombiner")));
}
@Test
@@ -506,7 +507,7 @@ public class WindowTest implements Serializable {
assertThat(DisplayData.from(onlyHasAccumulationMode), not(hasDisplayItem(hasKey(isOneOf(
"windowFn",
"trigger",
- "outputTimeFn",
+ "timestampCombiner",
"allowedLateness",
"closingBehavior")))));
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
index a3f5352..30b0311 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
@@ -76,7 +76,7 @@ public class WindowingTest implements Serializable {
public PCollection<String> expand(PCollection<String> in) {
return in.apply("Window",
Window.<String>into(windowFn)
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()))
+ .withTimestampCombiner(TimestampCombiner.EARLIEST))
.apply(Count.<String>perElement())
.apply("FormatCounts", ParDo.of(new FormatCountsDoFn()))
.setCoder(StringUtf8Coder.of());
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java
index 50edd83..215b0f4 100644
--- a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java
+++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java
@@ -42,7 +42,6 @@ public class GcpCoreApiSurfaceTest {
"com.google.api.services.cloudresourcemanager",
"com.google.api.services.storage",
"com.google.auth",
- "com.google.protobuf",
"com.fasterxml.jackson.annotation",
"com.fasterxml.jackson.core",
"com.fasterxml.jackson.databind",
[3/4] beam git commit: Replace OutputTimeFn UDF with
TimestampCombiner enum
Posted by ke...@apache.org.
Replace OutputTimeFn UDF with TimestampCombiner enum
This also removes the last dependency from SDK core to Runner API proto.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f38e4271
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f38e4271
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f38e4271
Branch: refs/heads/master
Commit: f38e4271334fced94e8dc1dc97f47b60fa810586
Parents: f23dd67
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jan 26 19:56:06 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Apr 26 15:08:29 2017 -0700
----------------------------------------------------------------------
.../beam/examples/complete/game/GameStats.java | 4 +-
.../translation/utils/ApexStateInternals.java | 26 +-
.../translation/GroupByKeyTranslatorTest.java | 10 +-
.../utils/ApexStateInternalsTest.java | 33 +-
.../core/construction/WindowingStrategies.java | 52 ++-
.../construction/WindowingStrategiesTest.java | 6 +-
.../runners/core/InMemoryStateInternals.java | 32 +-
.../beam/runners/core/ReduceFnRunner.java | 4 +-
.../beam/runners/core/SplittableParDo.java | 8 +-
.../apache/beam/runners/core/StateMerging.java | 32 +-
.../org/apache/beam/runners/core/StateTag.java | 11 +-
.../org/apache/beam/runners/core/StateTags.java | 16 +-
.../core/TestInMemoryStateInternals.java | 2 +-
.../apache/beam/runners/core/WatermarkHold.java | 45 +--
.../core/GroupAlsoByWindowsProperties.java | 20 +-
.../core/InMemoryStateInternalsTest.java | 34 +-
.../beam/runners/core/ReduceFnRunnerTest.java | 38 +--
.../beam/runners/core/ReduceFnTester.java | 13 +-
.../apache/beam/runners/core/StateTagTest.java | 16 +-
.../CopyOnAccessInMemoryStateInternals.java | 24 +-
.../direct/ParDoMultiOverrideFactory.java | 6 +-
.../CopyOnAccessInMemoryStateInternalsTest.java | 54 ++--
.../functions/HashingFlinkCombineRunner.java | 19 +-
.../functions/SortingFlinkCombineRunner.java | 30 +-
.../state/FlinkBroadcastStateInternals.java | 8 +-
.../state/FlinkKeyGroupStateInternals.java | 8 +-
.../state/FlinkSplitStateInternals.java | 8 +-
.../streaming/state/FlinkStateInternals.java | 34 +-
.../streaming/FlinkStateInternalsTest.java | 34 +-
.../spark/stateful/SparkStateInternals.java | 33 +-
.../translation/SparkAbstractCombineFn.java | 4 +-
.../spark/translation/SparkGlobalCombineFn.java | 37 ++-
.../spark/translation/SparkKeyedCombineFn.java | 37 ++-
sdks/java/core/pom.xml | 5 -
.../beam/sdk/testing/WindowFnTestUtils.java | 53 +++-
.../apache/beam/sdk/transforms/GroupByKey.java | 3 +-
.../sdk/transforms/windowing/OutputTimeFn.java | 314 -------------------
.../sdk/transforms/windowing/OutputTimeFns.java | 212 -------------
.../transforms/windowing/TimestampCombiner.java | 186 +++++++++++
.../beam/sdk/transforms/windowing/Window.java | 22 +-
.../org/apache/beam/sdk/util/Reshuffle.java | 7 +-
.../apache/beam/sdk/util/WindowingStrategy.java | 176 +++--------
.../apache/beam/sdk/util/state/StateBinder.java | 12 +-
.../apache/beam/sdk/util/state/StateSpecs.java | 23 +-
.../beam/sdk/util/state/WatermarkHoldState.java | 19 +-
.../org/apache/beam/SdkCoreApiSurfaceTest.java | 1 -
.../beam/sdk/transforms/GroupByKeyTest.java | 10 +-
.../sdk/transforms/join/CoGroupByKeyTest.java | 6 +-
.../transforms/windowing/OutputTimeFnsTest.java | 51 ---
.../sdk/transforms/windowing/SessionsTest.java | 6 +-
.../sdk/transforms/windowing/WindowTest.java | 23 +-
.../sdk/transforms/windowing/WindowingTest.java | 2 +-
.../org/apache/beam/GcpCoreApiSurfaceTest.java | 1 -
53 files changed, 740 insertions(+), 1130 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index b6c05be..e0048b7 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -43,8 +43,8 @@ import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -313,7 +313,7 @@ public class GameStats extends LeaderBoard {
userEvents
.apply("WindowIntoSessions", Window.<KV<String, Integer>>into(
Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap())))
- .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
+ .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))
// For this use, we care only about the existence of the session, not any particular
// information aggregated over it, so the following is an efficient way to do that.
.apply(Combine.perKey(x -> 0))
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
index cfc57cd..ec8f666 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
@@ -45,7 +45,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.state.BagState;
@@ -150,10 +150,10 @@ public class ApexStateInternals<K> implements StateInternals<K> {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
- StateTag<? super K, WatermarkHoldState<W>> address,
- OutputTimeFn<? super W> outputTimeFn) {
- return new ApexWatermarkHoldState<>(namespace, address, outputTimeFn);
+ public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+ StateTag<? super K, WatermarkHoldState> address,
+ TimestampCombiner timestampCombiner) {
+ return new ApexWatermarkHoldState<>(namespace, address, timestampCombiner);
}
@Override
@@ -269,16 +269,16 @@ public class ApexStateInternals<K> implements StateInternals<K> {
}
private final class ApexWatermarkHoldState<W extends BoundedWindow>
- extends AbstractState<Instant> implements WatermarkHoldState<W> {
+ extends AbstractState<Instant> implements WatermarkHoldState {
- private final OutputTimeFn<? super W> outputTimeFn;
+ private final TimestampCombiner timestampCombiner;
public ApexWatermarkHoldState(
StateNamespace namespace,
- StateTag<?, WatermarkHoldState<W>> address,
- OutputTimeFn<? super W> outputTimeFn) {
+ StateTag<?, WatermarkHoldState> address,
+ TimestampCombiner timestampCombiner) {
super(namespace, address, InstantCoder.of());
- this.outputTimeFn = outputTimeFn;
+ this.timestampCombiner = timestampCombiner;
}
@Override
@@ -294,7 +294,7 @@ public class ApexStateInternals<K> implements StateInternals<K> {
@Override
public void add(Instant outputTime) {
Instant combined = read();
- combined = (combined == null) ? outputTime : outputTimeFn.combine(combined, outputTime);
+ combined = (combined == null) ? outputTime : timestampCombiner.combine(combined, outputTime);
writeValue(combined);
}
@@ -313,8 +313,8 @@ public class ApexStateInternals<K> implements StateInternals<K> {
}
@Override
- public OutputTimeFn<? super W> getOutputTimeFn() {
- return outputTimeFn;
+ public TimestampCombiner getTimestampCombiner() {
+ return timestampCombiner;
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
index 193de71..9c61b47 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
@@ -43,7 +43,7 @@ import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Duration;
@@ -83,12 +83,12 @@ public class GroupByKeyTranslatorTest {
);
p.apply(Read.from(new TestSource(data, new Instant(5000))))
- .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
- .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp()))
+ .apply(
+ Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
+ .withTimestampCombiner(TimestampCombiner.LATEST))
.apply(Count.<String>perElement())
.apply(ParDo.of(new KeyedByTimestamp<KV<String, Long>>()))
- .apply(ParDo.of(new EmbeddedCollector()))
- ;
+ .apply(ParDo.of(new EmbeddedCollector()));
ApexRunnerResult result = (ApexRunnerResult) p.run();
result.getApexDAG();
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
index 7160e45..225b654 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
@@ -36,7 +36,7 @@ import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.GroupingState;
@@ -65,14 +65,13 @@ public class ApexStateInternalsTest {
"sumInteger", VarIntCoder.of(), Sum.ofIntegers());
private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
StateTags.bag("stringBag", StringUtf8Coder.of());
- private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
+ private static final StateTag<Object, WatermarkHoldState>
WATERMARK_EARLIEST_ADDR =
- StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
- private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
- WATERMARK_LATEST_ADDR =
- StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp());
- private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR =
- StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow());
+ StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
+ private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR =
+ StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
+ private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR =
+ StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
private ApexStateInternals<String> underTest;
@@ -227,7 +226,7 @@ public class ApexStateInternalsTest {
@Test
public void testWatermarkEarliestState() throws Exception {
- WatermarkHoldState<BoundedWindow> value =
+ WatermarkHoldState value =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
// State instances are cached, but depend on the namespace.
@@ -251,7 +250,7 @@ public class ApexStateInternalsTest {
@Test
public void testWatermarkLatestState() throws Exception {
- WatermarkHoldState<BoundedWindow> value =
+ WatermarkHoldState value =
underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
// State instances are cached, but depend on the namespace.
@@ -275,7 +274,7 @@ public class ApexStateInternalsTest {
@Test
public void testWatermarkEndOfWindowState() throws Exception {
- WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
+ WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
// State instances are cached, but depend on the namespace.
assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
@@ -292,7 +291,7 @@ public class ApexStateInternalsTest {
@Test
public void testWatermarkStateIsEmpty() throws Exception {
- WatermarkHoldState<BoundedWindow> value =
+ WatermarkHoldState value =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
assertThat(value.isEmpty().read(), Matchers.is(true));
@@ -306,9 +305,9 @@ public class ApexStateInternalsTest {
@Test
public void testMergeEarliestWatermarkIntoSource() throws Exception {
- WatermarkHoldState<BoundedWindow> value1 =
+ WatermarkHoldState value1 =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
- WatermarkHoldState<BoundedWindow> value2 =
+ WatermarkHoldState value2 =
underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
value1.add(new Instant(3000));
@@ -325,11 +324,11 @@ public class ApexStateInternalsTest {
@Test
public void testMergeLatestWatermarkIntoSource() throws Exception {
- WatermarkHoldState<BoundedWindow> value1 =
+ WatermarkHoldState value1 =
underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
- WatermarkHoldState<BoundedWindow> value2 =
+ WatermarkHoldState value2 =
underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
- WatermarkHoldState<BoundedWindow> value3 =
+ WatermarkHoldState value3 =
underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
value1.add(new Instant(3000));
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
index 3d7deef..0c400db 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
@@ -28,16 +28,15 @@ import java.io.Serializable;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.OutputTime;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.util.WindowingStrategy.CombineWindowFnOutputTimes;
import org.joda.time.Duration;
/** Utilities for working with {@link WindowingStrategy WindowingStrategies}. */
@@ -115,11 +114,42 @@ public class WindowingStrategies implements Serializable {
}
}
- public static RunnerApi.OutputTime toProto(OutputTimeFn<?> outputTimeFn) {
- if (outputTimeFn instanceof WindowingStrategy.CombineWindowFnOutputTimes) {
- return toProto(((CombineWindowFnOutputTimes<?>) outputTimeFn).getOutputTimeFn());
- } else {
- return OutputTimeFns.toProto(outputTimeFn);
+ public static RunnerApi.OutputTime toProto(TimestampCombiner timestampCombiner) {
+ switch(timestampCombiner) {
+ case EARLIEST:
+ return OutputTime.EARLIEST_IN_PANE;
+ case END_OF_WINDOW:
+ return OutputTime.END_OF_WINDOW;
+ case LATEST:
+ return OutputTime.LATEST_IN_PANE;
+ default:
+ throw new IllegalArgumentException(
+ String.format(
+ "Unknown %s: %s",
+ TimestampCombiner.class.getSimpleName(),
+ timestampCombiner));
+ }
+ }
+
+ public static TimestampCombiner timestampCombinerFromProto(RunnerApi.OutputTime proto) {
+ switch (proto) {
+ case EARLIEST_IN_PANE:
+ return TimestampCombiner.EARLIEST;
+ case END_OF_WINDOW:
+ return TimestampCombiner.END_OF_WINDOW;
+ case LATEST_IN_PANE:
+ return TimestampCombiner.LATEST;
+ case UNRECOGNIZED:
+ default:
+ // Whether or not it is proto that cannot recognize it (due to the version of the
+ // generated code we link to) or the switch hasn't been updated to handle it,
+ // the situation is the same: we don't know what this OutputTime means
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot convert unknown %s to %s: %s",
+ RunnerApi.OutputTime.class.getCanonicalName(),
+ OutputTime.class.getCanonicalName(),
+ proto));
}
}
@@ -177,7 +207,7 @@ public class WindowingStrategies implements Serializable {
RunnerApi.WindowingStrategy.Builder windowingStrategyProto =
RunnerApi.WindowingStrategy.newBuilder()
- .setOutputTime(toProto(windowingStrategy.getOutputTimeFn()))
+ .setOutputTime(toProto(windowingStrategy.getTimestampCombiner()))
.setAccumulationMode(toProto(windowingStrategy.getMode()))
.setClosingBehavior(toProto(windowingStrategy.getClosingBehavior()))
.setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis())
@@ -229,7 +259,7 @@ public class WindowingStrategies implements Serializable {
"WindowFn");
WindowFn<?, ?> windowFn = (WindowFn<?, ?>) deserializedWindowFn;
- OutputTimeFn<?> outputTimeFn = OutputTimeFns.fromProto(proto.getOutputTime());
+ TimestampCombiner timestampCombiner = timestampCombinerFromProto(proto.getOutputTime());
AccumulationMode accumulationMode = fromProto(proto.getAccumulationMode());
Trigger trigger = Triggers.fromProto(proto.getTrigger());
ClosingBehavior closingBehavior = fromProto(proto.getClosingBehavior());
@@ -239,7 +269,7 @@ public class WindowingStrategies implements Serializable {
.withAllowedLateness(allowedLateness)
.withMode(accumulationMode)
.withTrigger(trigger)
- .withOutputTimeFn(outputTimeFn)
+ .withTimestampCombiner(timestampCombiner)
.withClosingBehavior(closingBehavior);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
index 62bba8e..78ac61c 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
@@ -25,7 +25,7 @@ import com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -68,14 +68,14 @@ public class WindowingStrategiesTest {
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withTrigger(REPRESENTATIVE_TRIGGER)
.withAllowedLateness(Duration.millis(71))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())),
+ .withTimestampCombiner(TimestampCombiner.EARLIEST)),
toProtoAndBackSpec(
WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN)
.withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY)
.withMode(AccumulationMode.DISCARDING_FIRED_PANES)
.withTrigger(REPRESENTATIVE_TRIGGER)
.withAllowedLateness(Duration.millis(93))
- .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp())));
+ .withTimestampCombiner(TimestampCombiner.LATEST)));
}
@Parameter(0)
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
index 55b7fc2..9fb8e3f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
@@ -34,7 +34,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
@@ -156,10 +156,10 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
- StateTag<? super K, WatermarkHoldState<W>> address,
- OutputTimeFn<? super W> outputTimeFn) {
- return new InMemoryWatermarkHold<W>(outputTimeFn);
+ public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+ StateTag<? super K, WatermarkHoldState> address,
+ TimestampCombiner timestampCombiner) {
+ return new InMemoryWatermarkHold<W>(timestampCombiner);
}
@Override
@@ -233,19 +233,19 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
* An {@link InMemoryState} implementation of {@link WatermarkHoldState}.
*/
public static final class InMemoryWatermarkHold<W extends BoundedWindow>
- implements WatermarkHoldState<W>, InMemoryState<InMemoryWatermarkHold<W>> {
+ implements WatermarkHoldState, InMemoryState<InMemoryWatermarkHold<W>> {
- private final OutputTimeFn<? super W> outputTimeFn;
+ private final TimestampCombiner timestampCombiner;
@Nullable
private Instant combinedHold = null;
- public InMemoryWatermarkHold(OutputTimeFn<? super W> outputTimeFn) {
- this.outputTimeFn = outputTimeFn;
+ public InMemoryWatermarkHold(TimestampCombiner timestampCombiner) {
+ this.timestampCombiner = timestampCombiner;
}
@Override
- public InMemoryWatermarkHold<W> readLater() {
+ public InMemoryWatermarkHold readLater() {
return this;
}
@@ -263,8 +263,10 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
@Override
public void add(Instant outputTime) {
- combinedHold = combinedHold == null ? outputTime
- : outputTimeFn.combine(combinedHold, outputTime);
+ combinedHold =
+ combinedHold == null
+ ? outputTime
+ : timestampCombiner.combine(combinedHold, outputTime);
}
@Override
@@ -287,8 +289,8 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
}
@Override
- public OutputTimeFn<? super W> getOutputTimeFn() {
- return outputTimeFn;
+ public TimestampCombiner getTimestampCombiner() {
+ return timestampCombiner;
}
@Override
@@ -299,7 +301,7 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
@Override
public InMemoryWatermarkHold<W> copy() {
InMemoryWatermarkHold<W> that =
- new InMemoryWatermarkHold<>(outputTimeFn);
+ new InMemoryWatermarkHold<>(timestampCombiner);
that.combinedHold = this.combinedHold;
return that;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 34db752..4c70c97 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -47,9 +47,9 @@ import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.SideInputReader;
@@ -171,7 +171,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
* <ul>
* <li>State: Bag of hold timestamps.
* <li>State style: RENAMED
- * <li>Merging: Depending on {@link OutputTimeFn}, may need to be recalculated on merging.
+ * <li>Merging: Depending on {@link TimestampCombiner}, may need to be recalculated on merging.
* When a pane fires it may be necessary to add (back) an end-of-window or garbage collection
* hold.
* <li>Lifetime: Cleared when a pane fires or when the window is garbage collected.
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 31d89ee..5273e86 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -45,7 +45,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -355,10 +355,10 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
* the input watermark when the first {@link DoFn.ProcessElement} call for this element
* completes.
*/
- private static final StateTag<Object, WatermarkHoldState<GlobalWindow>> watermarkHoldTag =
+ private static final StateTag<Object, WatermarkHoldState> watermarkHoldTag =
StateTags.makeSystemTagInternal(
StateTags.<GlobalWindow>watermarkStateInternal(
- "hold", OutputTimeFns.outputAtLatestInputTimestamp()));
+ "hold", TimestampCombiner.LATEST));
/**
* The state cell containing a copy of the element. Written during the first {@link
@@ -480,7 +480,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
stateInternals.state(stateNamespace, elementTag);
ValueState<RestrictionT> restrictionState =
stateInternals.state(stateNamespace, restrictionTag);
- WatermarkHoldState<GlobalWindow> holdState =
+ WatermarkHoldState holdState =
stateInternals.state(stateNamespace, watermarkHoldTag);
ElementAndRestriction<WindowedValue<InputT>, RestrictionT> elementAndRestriction;
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
index 3410850..ce37fd3 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
@@ -218,24 +218,24 @@ public class StateMerging {
*/
public static <K, W extends BoundedWindow> void prefetchWatermarks(
MergingStateAccessor<K, W> context,
- StateTag<? super K, WatermarkHoldState<W>> address) {
- Map<W, WatermarkHoldState<W>> map = context.accessInEachMergingWindow(address);
- WatermarkHoldState<W> result = context.access(address);
+ StateTag<? super K, WatermarkHoldState> address) {
+ Map<W, WatermarkHoldState> map = context.accessInEachMergingWindow(address);
+ WatermarkHoldState result = context.access(address);
if (map.isEmpty()) {
// Nothing to prefetch.
return;
}
if (map.size() == 1 && map.values().contains(result)
- && result.getOutputTimeFn().dependsOnlyOnEarliestInputTimestamp()) {
+ && result.getTimestampCombiner().dependsOnlyOnEarliestTimestamp()) {
// Nothing to change.
return;
}
- if (result.getOutputTimeFn().dependsOnlyOnWindow()) {
+ if (result.getTimestampCombiner().dependsOnlyOnWindow()) {
// No need to read existing holds.
return;
}
// Prefetch.
- for (WatermarkHoldState<W> source : map.values()) {
+ for (WatermarkHoldState source : map.values()) {
prefetchRead(source);
}
}
@@ -250,7 +250,7 @@ public class StateMerging {
*/
public static <K, W extends BoundedWindow> void mergeWatermarks(
MergingStateAccessor<K, W> context,
- StateTag<? super K, WatermarkHoldState<W>> address,
+ StateTag<? super K, WatermarkHoldState> address,
W mergeResult) {
mergeWatermarks(
context.accessInEachMergingWindow(address).values(), context.access(address), mergeResult);
@@ -261,31 +261,31 @@ public class StateMerging {
* into {@code result}, where the final merge result window is {@code mergeResult}.
*/
public static <W extends BoundedWindow> void mergeWatermarks(
- Collection<WatermarkHoldState<W>> sources, WatermarkHoldState<W> result,
+ Collection<WatermarkHoldState> sources, WatermarkHoldState result,
W resultWindow) {
if (sources.isEmpty()) {
// Nothing to merge.
return;
}
if (sources.size() == 1 && sources.contains(result)
- && result.getOutputTimeFn().dependsOnlyOnEarliestInputTimestamp()) {
+ && result.getTimestampCombiner().dependsOnlyOnEarliestTimestamp()) {
// Nothing to merge.
return;
}
- if (result.getOutputTimeFn().dependsOnlyOnWindow()) {
+ if (result.getTimestampCombiner().dependsOnlyOnWindow()) {
// Clear sources.
- for (WatermarkHoldState<W> source : sources) {
+ for (WatermarkHoldState source : sources) {
source.clear();
}
// Update directly from window-derived hold.
- Instant hold = result.getOutputTimeFn().assignOutputTime(
- BoundedWindow.TIMESTAMP_MIN_VALUE, resultWindow);
+ Instant hold =
+ result.getTimestampCombiner().assign(resultWindow, BoundedWindow.TIMESTAMP_MIN_VALUE);
checkState(hold.isAfter(BoundedWindow.TIMESTAMP_MIN_VALUE));
result.add(hold);
} else {
// Prefetch.
List<ReadableState<Instant>> futures = new ArrayList<>(sources.size());
- for (WatermarkHoldState<W> source : sources) {
+ for (WatermarkHoldState source : sources) {
futures.add(source);
}
// Read.
@@ -297,12 +297,12 @@ public class StateMerging {
}
}
// Clear sources.
- for (WatermarkHoldState<W> source : sources) {
+ for (WatermarkHoldState source : sources) {
source.clear();
}
if (!outputTimesToMerge.isEmpty()) {
// Merge and update.
- result.add(result.getOutputTimeFn().merge(resultWindow, outputTimesToMerge));
+ result.add(result.getTimestampCombiner().merge(resultWindow, outputTimesToMerge));
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
index 12c59ad..a5d262a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.MapState;
@@ -115,11 +115,10 @@ public interface StateTag<K, StateT extends State> extends Serializable {
/**
* Bind to a watermark {@link StateSpec}.
*
- * <p>This accepts the {@link OutputTimeFn} that dictates how watermark hold timestamps added to
- * the returned {@link WatermarkHoldState} are to be combined.
+ * <p>This accepts the {@link TimestampCombiner} that dictates how watermark hold timestamps
+ * added to the returned {@link WatermarkHoldState} are to be combined.
*/
- <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
- StateTag<? super K, WatermarkHoldState<W>> spec,
- OutputTimeFn<? super W> outputTimeFn);
+ <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+ StateTag<? super K, WatermarkHoldState> spec, TimestampCombiner timestampCombiner);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
index 3a45569..2b3f4b8 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.MapState;
@@ -110,11 +110,11 @@ public class StateTags {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+ public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
String id,
- StateSpec<? super K, WatermarkHoldState<W>> spec,
- OutputTimeFn<? super W> outputTimeFn) {
- return binder.bindWatermark(tagForSpec(id, spec), outputTimeFn);
+ StateSpec<? super K, WatermarkHoldState> spec,
+ TimestampCombiner timestampCombiner) {
+ return binder.bindWatermark(tagForSpec(id, spec), timestampCombiner);
}
};
}
@@ -228,10 +228,10 @@ public class StateTags {
/**
* Create a state tag for holding the watermark.
*/
- public static <W extends BoundedWindow> StateTag<Object, WatermarkHoldState<W>>
- watermarkStateInternal(String id, OutputTimeFn<? super W> outputTimeFn) {
+ public static <W extends BoundedWindow> StateTag<Object, WatermarkHoldState>
+ watermarkStateInternal(String id, TimestampCombiner timestampCombiner) {
return new SimpleStateTag<>(
- new StructuredId(id), StateSpecs.watermarkStateInternal(outputTimeFn));
+ new StructuredId(id), StateSpecs.watermarkStateInternal(timestampCombiner));
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
index 0321a33..1dfb85f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
@@ -52,7 +52,7 @@ public class TestInMemoryStateInternals<K> extends InMemoryStateInternals<K> {
Instant minimum = null;
for (State storage : inMemoryState.values()) {
if (storage instanceof WatermarkHoldState) {
- Instant hold = ((WatermarkHoldState<?>) storage).read();
+ Instant hold = ((WatermarkHoldState) storage).read();
if (minimum == null || (hold != null && hold.isBefore(minimum))) {
minimum = hold;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
index d3c4bc7..9bb9c62 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
@@ -23,9 +23,8 @@ import com.google.common.annotations.VisibleForTesting;
import java.io.Serializable;
import javax.annotation.Nullable;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -55,37 +54,38 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
* used for elements.
*/
public static <W extends BoundedWindow>
- StateTag<Object, WatermarkHoldState<W>> watermarkHoldTagForOutputTimeFn(
- OutputTimeFn<? super W> outputTimeFn) {
- return StateTags.<Object, WatermarkHoldState<W>>makeSystemTagInternal(
- StateTags.<W>watermarkStateInternal("hold", outputTimeFn));
+ StateTag<Object, WatermarkHoldState> watermarkHoldTagForTimestampCombiner(
+ TimestampCombiner timestampCombiner) {
+ return StateTags.<Object, WatermarkHoldState>makeSystemTagInternal(
+ StateTags.<W>watermarkStateInternal("hold", timestampCombiner));
}
/**
* Tag for state containing end-of-window and garbage collection output watermark holds.
- * (We can't piggy-back on the data hold state since the outputTimeFn may be
- * {@link OutputTimeFns#outputAtLatestInputTimestamp()}, in which case every pane will
+ * (We can't piggy-back on the data hold state since the timestampCombiner may be
+ * {@link TimestampCombiner#EARLIEST}, in which case every pane will
* would take the end-of-window time as its element time.)
*/
@VisibleForTesting
- public static final StateTag<Object, WatermarkHoldState<BoundedWindow>> EXTRA_HOLD_TAG =
+ public static final StateTag<Object, WatermarkHoldState> EXTRA_HOLD_TAG =
StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal(
- "extra", OutputTimeFns.outputAtEarliestInputTimestamp()));
+ "extra", TimestampCombiner.EARLIEST));
private final TimerInternals timerInternals;
private final WindowingStrategy<?, W> windowingStrategy;
- private final StateTag<Object, WatermarkHoldState<W>> elementHoldTag;
+ private final StateTag<Object, WatermarkHoldState> elementHoldTag;
public WatermarkHold(TimerInternals timerInternals, WindowingStrategy<?, W> windowingStrategy) {
this.timerInternals = timerInternals;
this.windowingStrategy = windowingStrategy;
- this.elementHoldTag = watermarkHoldTagForOutputTimeFn(windowingStrategy.getOutputTimeFn());
+ this.elementHoldTag =
+ watermarkHoldTagForTimestampCombiner(windowingStrategy.getTimestampCombiner());
}
/**
* Add a hold to prevent the output watermark progressing beyond the (possibly adjusted) timestamp
- * of the element in {@code context}. We allow the actual hold time to be shifted later by
- * {@link OutputTimeFn#assignOutputTime}, but no further than the end of the window. The hold will
+ * of the element in {@code context}. We allow the actual hold time to be shifted later by the
+ * {@link TimestampCombiner}, but no further than the end of the window. The hold will
* remain until cleared by {@link #extractAndRelease}. Return the timestamp at which the hold
* was placed, or {@literal null} if no hold was placed.
*
@@ -199,15 +199,18 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
* strategy's output time function.
*/
private Instant shift(Instant timestamp, W window) {
- Instant shifted = windowingStrategy.getOutputTimeFn().assignOutputTime(timestamp, window);
+ Instant shifted =
+ windowingStrategy
+ .getTimestampCombiner()
+ .assign(window, windowingStrategy.getWindowFn().getOutputTime(timestamp, window));
checkState(!shifted.isBefore(timestamp),
- "OutputTimeFn moved element from %s to earlier time %s for window %s",
+ "TimestampCombiner moved element from %s to earlier time %s for window %s",
BoundedWindow.formatTimestamp(timestamp),
BoundedWindow.formatTimestamp(shifted),
window);
checkState(timestamp.isAfter(window.maxTimestamp())
|| !shifted.isAfter(window.maxTimestamp()),
- "OutputTimeFn moved element from %s to %s which is beyond end of "
+ "TimestampCombiner moved element from %s to %s which is beyond end of "
+ "window %s",
timestamp, shifted, window);
@@ -217,7 +220,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
/**
* Attempt to add an 'element hold'. Return the {@link Instant} at which the hold was
* added (ie the element timestamp plus any forward shift requested by the
- * {@link WindowingStrategy#getOutputTimeFn}), or {@literal null} if no hold was added.
+ * {@link WindowingStrategy#getTimestampCombiner}), or {@literal null} if no hold was added.
* The hold is only added if both:
* <ol>
* <li>The backend will be able to respect it. In other words the output watermark cannot
@@ -450,7 +453,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
* Return (a future for) the earliest hold for {@code context}. Clear all the holds after
* reading, but add/restore an end-of-window or garbage collection hold if required.
*
- * <p>The returned timestamp is the output timestamp according to the {@link OutputTimeFn}
+ * <p>The returned timestamp is the output timestamp according to the {@link TimestampCombiner}
* from the windowing strategy of this {@link WatermarkHold}, combined across all the non-late
* elements in the current pane. If there is no such value the timestamp is the end
* of the window.
@@ -462,8 +465,8 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
+ "outputWatermark:{}",
context.key(), context.window(), timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
- final WatermarkHoldState<W> elementHoldState = context.state().access(elementHoldTag);
- final WatermarkHoldState<BoundedWindow> extraHoldState = context.state().access(EXTRA_HOLD_TAG);
+ final WatermarkHoldState elementHoldState = context.state().access(elementHoldTag);
+ final WatermarkHoldState extraHoldState = context.state().access(EXTRA_HOLD_TAG);
return new ReadableState<OldAndNewHolds>() {
@Override
public ReadableState<OldAndNewHolds> readLater() {
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
index d0a8923..81ac5fa 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
@@ -43,10 +43,10 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
@@ -149,7 +149,7 @@ public class GroupAlsoByWindowsProperties {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10)))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp());
+ .withTimestampCombiner(TimestampCombiner.EARLIEST);
List<WindowedValue<KV<String, Iterable<String>>>> result =
runGABW(
@@ -200,7 +200,7 @@ public class GroupAlsoByWindowsProperties {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10)))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp());
+ .withTimestampCombiner(TimestampCombiner.EARLIEST);
List<WindowedValue<KV<String, Long>>> result =
runGABW(
@@ -348,7 +348,7 @@ public class GroupAlsoByWindowsProperties {
/**
* Tests that for a simple sequence of elements on the same key, the given GABW implementation
* correctly groups them according to fixed windows and also sets the output timestamp according
- * to the policy {@link OutputTimeFns#outputAtEndOfWindow()}.
+ * to the policy {@link TimestampCombiner#END_OF_WINDOW}.
*/
public static void groupsElementsIntoFixedWindowsWithEndOfWindowTimestamp(
GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
@@ -356,7 +356,7 @@ public class GroupAlsoByWindowsProperties {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
- .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
+ .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW);
List<WindowedValue<KV<String, Iterable<String>>>> result =
runGABW(
@@ -386,7 +386,7 @@ public class GroupAlsoByWindowsProperties {
/**
* Tests that for a simple sequence of elements on the same key, the given GABW implementation
* correctly groups them according to fixed windows and also sets the output timestamp according
- * to the policy {@link OutputTimeFns#outputAtLatestInputTimestamp()}.
+ * to the policy {@link TimestampCombiner#LATEST}.
*/
public static void groupsElementsIntoFixedWindowsWithLatestTimestamp(
GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
@@ -394,7 +394,7 @@ public class GroupAlsoByWindowsProperties {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
- .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp());
+ .withTimestampCombiner(TimestampCombiner.LATEST);
List<WindowedValue<KV<String, Iterable<String>>>> result =
runGABW(
@@ -431,7 +431,7 @@ public class GroupAlsoByWindowsProperties {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
- .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
+ .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW);
List<WindowedValue<KV<String, Iterable<String>>>> result =
runGABW(
@@ -468,7 +468,7 @@ public class GroupAlsoByWindowsProperties {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
- .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp());
+ .withTimestampCombiner(TimestampCombiner.LATEST);
BoundedWindow unmergedWindow = window(15, 25);
List<WindowedValue<KV<String, Iterable<String>>>> result =
@@ -508,7 +508,7 @@ public class GroupAlsoByWindowsProperties {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
- .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
+ .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW);
BoundedWindow secondWindow = window(15, 25);
List<WindowedValue<KV<String, Long>>> result =
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
index 34ddae6..6248401 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.GroupingState;
@@ -71,14 +71,12 @@ public class InMemoryStateInternalsTest {
StateTags.set("stringSet", StringUtf8Coder.of());
private static final StateTag<Object, MapState<String, Integer>> STRING_MAP_ADDR =
StateTags.map("stringMap", StringUtf8Coder.of(), VarIntCoder.of());
- private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
- WATERMARK_EARLIEST_ADDR =
- StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
- private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
- WATERMARK_LATEST_ADDR =
- StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp());
- private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR =
- StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow());
+ private static final StateTag<Object, WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
+ StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
+ private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR =
+ StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
+ private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR =
+ StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
InMemoryStateInternals<String> underTest = InMemoryStateInternals.forKey("dummyKey");
@@ -442,7 +440,7 @@ public class InMemoryStateInternalsTest {
@Test
public void testWatermarkEarliestState() throws Exception {
- WatermarkHoldState<BoundedWindow> value =
+ WatermarkHoldState value =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
// State instances are cached, but depend on the namespace.
@@ -466,7 +464,7 @@ public class InMemoryStateInternalsTest {
@Test
public void testWatermarkLatestState() throws Exception {
- WatermarkHoldState<BoundedWindow> value =
+ WatermarkHoldState value =
underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
// State instances are cached, but depend on the namespace.
@@ -490,7 +488,7 @@ public class InMemoryStateInternalsTest {
@Test
public void testWatermarkEndOfWindowState() throws Exception {
- WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
+ WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
// State instances are cached, but depend on the namespace.
assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
@@ -507,7 +505,7 @@ public class InMemoryStateInternalsTest {
@Test
public void testWatermarkStateIsEmpty() throws Exception {
- WatermarkHoldState<BoundedWindow> value =
+ WatermarkHoldState value =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
assertThat(value.isEmpty().read(), Matchers.is(true));
@@ -521,9 +519,9 @@ public class InMemoryStateInternalsTest {
@Test
public void testMergeEarliestWatermarkIntoSource() throws Exception {
- WatermarkHoldState<BoundedWindow> value1 =
+ WatermarkHoldState value1 =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
- WatermarkHoldState<BoundedWindow> value2 =
+ WatermarkHoldState value2 =
underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
value1.add(new Instant(3000));
@@ -540,11 +538,11 @@ public class InMemoryStateInternalsTest {
@Test
public void testMergeLatestWatermarkIntoSource() throws Exception {
- WatermarkHoldState<BoundedWindow> value1 =
+ WatermarkHoldState value1 =
underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
- WatermarkHoldState<BoundedWindow> value2 =
+ WatermarkHoldState value2 =
underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
- WatermarkHoldState<BoundedWindow> value3 =
+ WatermarkHoldState value3 =
underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
value1.add(new Instant(3000));
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 0d4d992..44bc538 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -56,12 +56,12 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Never;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -210,7 +210,7 @@ public class ReduceFnRunnerTest {
WindowingStrategy<?, IntervalWindow> strategy =
WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+ .withTimestampCombiner(TimestampCombiner.EARLIEST)
.withMode(AccumulationMode.DISCARDING_FIRED_PANES)
.withAllowedLateness(Duration.millis(100));
@@ -284,7 +284,7 @@ public class ReduceFnRunnerTest {
WindowingStrategy<?, IntervalWindow> strategy =
WindowingStrategy.of((WindowFn<?, IntervalWindow>) windowFn)
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+ .withTimestampCombiner(TimestampCombiner.EARLIEST)
.withTrigger(AfterWatermark.pastEndOfWindow().withLateFirings(Never.ever()))
.withMode(AccumulationMode.DISCARDING_FIRED_PANES)
.withAllowedLateness(allowedLateness);
@@ -315,7 +315,7 @@ public class ReduceFnRunnerTest {
WindowingStrategy<?, IntervalWindow> strategy =
WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+ .withTimestampCombiner(TimestampCombiner.EARLIEST)
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withAllowedLateness(Duration.millis(100));
@@ -615,7 +615,7 @@ public class ReduceFnRunnerTest {
AfterWatermark.pastEndOfWindow())))
.withMode(AccumulationMode.DISCARDING_FIRED_PANES)
.withAllowedLateness(Duration.millis(100))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+ .withTimestampCombiner(TimestampCombiner.EARLIEST)
.withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
tester.advanceInputWatermark(new Instant(0));
@@ -668,7 +668,7 @@ public class ReduceFnRunnerTest {
AfterWatermark.pastEndOfWindow())))
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withAllowedLateness(Duration.millis(100))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+ .withTimestampCombiner(TimestampCombiner.EARLIEST)
.withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY));
tester.advanceInputWatermark(new Instant(0));
@@ -695,7 +695,7 @@ public class ReduceFnRunnerTest {
AfterWatermark.pastEndOfWindow())))
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withAllowedLateness(Duration.millis(100))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+ .withTimestampCombiner(TimestampCombiner.EARLIEST)
.withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
tester.advanceInputWatermark(new Instant(0));
@@ -724,7 +724,7 @@ public class ReduceFnRunnerTest {
AfterWatermark.pastEndOfWindow())))
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withAllowedLateness(Duration.millis(100))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+ .withTimestampCombiner(TimestampCombiner.EARLIEST)
.withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
tester.advanceInputWatermark(new Instant(0));
@@ -1195,7 +1195,7 @@ public class ReduceFnRunnerTest {
WindowingStrategy<?, IntervalWindow> strategy =
WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+ .withTimestampCombiner(TimestampCombiner.EARLIEST)
.withTrigger(
AfterEach.<IntervalWindow>inOrder(
Repeatedly.forever(
@@ -1251,16 +1251,16 @@ public class ReduceFnRunnerTest {
WindowingStrategy<?, IntervalWindow> strategy =
WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
- .withTrigger(AfterEach.<IntervalWindow>inOrder(
- Repeatedly
- .forever(
- AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
- new Duration(5)))
- .orFinally(AfterWatermark.pastEndOfWindow()),
- Repeatedly.forever(
- AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
- new Duration(25)))))
+ .withTimestampCombiner(TimestampCombiner.EARLIEST)
+ .withTrigger(
+ AfterEach.<IntervalWindow>inOrder(
+ Repeatedly.forever(
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(new Duration(5)))
+ .orFinally(AfterWatermark.pastEndOfWindow()),
+ Repeatedly.forever(
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(new Duration(25)))))
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withAllowedLateness(Duration.millis(100));
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 549fd8a..b5b5492 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -58,8 +58,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -161,7 +161,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
throws Exception {
WindowingStrategy<?, W> strategy =
WindowingStrategy.of(windowFn)
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+ .withTimestampCombiner(TimestampCombiner.EARLIEST)
.withMode(mode)
.withAllowedLateness(allowedDataLateness)
.withClosingBehavior(closingBehavior);
@@ -329,8 +329,10 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
assertHasOnlyGlobalAndAllowedTags(
ImmutableSet.copyOf(expectedWindows),
ImmutableSet.<StateTag<? super String, ?>>of(
- TriggerStateMachineRunner.FINISHED_BITS_TAG, PaneInfoTracker.PANE_INFO_TAG,
- WatermarkHold.watermarkHoldTagForOutputTimeFn(objectStrategy.getOutputTimeFn()),
+ TriggerStateMachineRunner.FINISHED_BITS_TAG,
+ PaneInfoTracker.PANE_INFO_TAG,
+ WatermarkHold.watermarkHoldTagForTimestampCombiner(
+ objectStrategy.getTimestampCombiner()),
WatermarkHold.EXTRA_HOLD_TAG));
}
@@ -345,7 +347,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
ImmutableSet.copyOf(expectedWindows),
ImmutableSet.<StateTag<? super String, ?>>of(
PaneInfoTracker.PANE_INFO_TAG,
- WatermarkHold.watermarkHoldTagForOutputTimeFn(objectStrategy.getOutputTimeFn()),
+ WatermarkHold.watermarkHoldTagForTimestampCombiner(
+ objectStrategy.getTimestampCombiner()),
WatermarkHold.EXTRA_HOLD_TAG));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
index 5f5d92d..10dcb62 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -97,15 +97,11 @@ public class StateTagTest {
@Test
public void testWatermarkBagEquality() {
- StateTag<?, ?> foo1 = StateTags.watermarkStateInternal(
- "foo", OutputTimeFns.outputAtEarliestInputTimestamp());
- StateTag<?, ?> foo2 = StateTags.watermarkStateInternal(
- "foo", OutputTimeFns.outputAtEarliestInputTimestamp());
- StateTag<?, ?> bar = StateTags.watermarkStateInternal(
- "bar", OutputTimeFns.outputAtEarliestInputTimestamp());
-
- StateTag<?, ?> bar2 = StateTags.watermarkStateInternal(
- "bar", OutputTimeFns.outputAtLatestInputTimestamp());
+ StateTag<?, ?> foo1 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
+ StateTag<?, ?> foo2 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
+ StateTag<?, ?> bar = StateTags.watermarkStateInternal("bar", TimestampCombiner.EARLIEST);
+
+ StateTag<?, ?> bar2 = StateTags.watermarkStateInternal("bar", TimestampCombiner.LATEST);
// Same id, same fn.
assertEquals(foo1, foo2);
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
index 0665812..068b37f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
@@ -43,7 +43,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
@@ -213,7 +213,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
Instant earliest = BoundedWindow.TIMESTAMP_MAX_VALUE;
for (State existingState : this.values()) {
if (existingState instanceof WatermarkHoldState) {
- Instant hold = ((WatermarkHoldState<?>) existingState).read();
+ Instant hold = ((WatermarkHoldState) existingState).read();
if (hold != null && hold.isBefore(earliest)) {
earliest = hold;
}
@@ -276,18 +276,18 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
public StateBinder<K> forNamespace(final StateNamespace namespace, final StateContext<?> c) {
return new StateBinder<K>() {
@Override
- public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
- StateTag<? super K, WatermarkHoldState<W>> address,
- OutputTimeFn<? super W> outputTimeFn) {
+ public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+ StateTag<? super K, WatermarkHoldState> address,
+ TimestampCombiner timestampCombiner) {
if (containedInUnderlying(namespace, address)) {
@SuppressWarnings("unchecked")
- InMemoryState<? extends WatermarkHoldState<W>> existingState =
- (InMemoryState<? extends WatermarkHoldState<W>>)
+ InMemoryState<? extends WatermarkHoldState> existingState =
+ (InMemoryState<? extends WatermarkHoldState>)
underlying.get().get(namespace, address, c);
return existingState.copy();
} else {
return new InMemoryWatermarkHold<>(
- outputTimeFn);
+ timestampCombiner);
}
}
@@ -419,7 +419,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
State state =
readTo.get(namespace, existingState.getKey(), StateContexts.nullContext());
if (state instanceof WatermarkHoldState) {
- Instant hold = ((WatermarkHoldState<?>) state).read();
+ Instant hold = ((WatermarkHoldState) state).read();
if (hold != null && hold.isBefore(earliestHold)) {
earliestHold = hold;
}
@@ -434,9 +434,9 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
public StateBinder<K> forNamespace(final StateNamespace namespace, final StateContext<?> c) {
return new StateBinder<K>() {
@Override
- public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
- StateTag<? super K, WatermarkHoldState<W>> address,
- OutputTimeFn<? super W> outputTimeFn) {
+ public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+ StateTag<? super K, WatermarkHoldState> address,
+ TimestampCombiner timestampCombiner) {
return underlying.get(namespace, address, c);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index b08aa8e..322c995 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -40,8 +40,8 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -135,14 +135,14 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
// to alter the flow of data. This entails:
// - trigger as fast as possible
// - maintain the full timestamps of elements
- // - ensure this GBK holds to the minimum of those timestamps (via OutputTimeFn)
+ // - ensure this GBK holds to the minimum of those timestamps (via TimestampCombiner)
// - discard past panes as it is "just a stream" of elements
.apply(
Window.<KV<K, WindowedValue<KV<K, InputT>>>>configure()
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.discardingFiredPanes()
.withAllowedLateness(inputWindowingStrategy.getAllowedLateness())
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()))
+ .withTimestampCombiner(TimestampCombiner.EARLIEST))
// A full GBK to group by key _and_ window
.apply("Group by key", GroupByKey.<K, WindowedValue<KV<K, InputT>>>create())
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index 68c6613..f0aeece 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -43,8 +43,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.GroupingState;
@@ -289,13 +288,12 @@ public class CopyOnAccessInMemoryStateInternalsTest {
CopyOnAccessInMemoryStateInternals<String> underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
- OutputTimeFn<BoundedWindow> outputTimeFn =
- OutputTimeFns.outputAtEarliestInputTimestamp();
+ TimestampCombiner timestampCombiner = TimestampCombiner.EARLIEST;
StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, WatermarkHoldState<BoundedWindow>> stateTag =
- StateTags.watermarkStateInternal("wmstate", outputTimeFn);
- WatermarkHoldState<?> underlyingValue = underlying.state(namespace, stateTag);
+ StateTag<Object, WatermarkHoldState> stateTag =
+ StateTags.watermarkStateInternal("wmstate", timestampCombiner);
+ WatermarkHoldState underlyingValue = underlying.state(namespace, stateTag);
assertThat(underlyingValue.read(), nullValue());
underlyingValue.add(new Instant(250L));
@@ -303,7 +301,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
- WatermarkHoldState<BoundedWindow> copyOnAccessState = internals.state(namespace, stateTag);
+ WatermarkHoldState copyOnAccessState = internals.state(namespace, stateTag);
assertThat(copyOnAccessState.read(), equalTo(new Instant(250L)));
copyOnAccessState.add(new Instant(100L));
@@ -313,7 +311,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
copyOnAccessState.add(new Instant(500L));
assertThat(copyOnAccessState.read(), equalTo(new Instant(100L)));
- WatermarkHoldState<BoundedWindow> reReadUnderlyingValue =
+ WatermarkHoldState reReadUnderlyingValue =
underlying.state(namespace, stateTag);
assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read()));
}
@@ -514,15 +512,15 @@ public class CopyOnAccessInMemoryStateInternalsTest {
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
- StateTag<Object, WatermarkHoldState<BoundedWindow>> firstHoldAddress =
- StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
- WatermarkHoldState<BoundedWindow> firstHold =
+ StateTag<Object, WatermarkHoldState> firstHoldAddress =
+ StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
+ WatermarkHoldState firstHold =
internals.state(StateNamespaces.window(null, first), firstHoldAddress);
firstHold.add(new Instant(22L));
- StateTag<Object, WatermarkHoldState<BoundedWindow>> secondHoldAddress =
- StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
- WatermarkHoldState<BoundedWindow> secondHold =
+ StateTag<Object, WatermarkHoldState> secondHoldAddress =
+ StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
+ WatermarkHoldState secondHold =
internals.state(StateNamespaces.window(null, second), secondHoldAddress);
secondHold.add(new Instant(2L));
@@ -546,18 +544,18 @@ public class CopyOnAccessInMemoryStateInternalsTest {
};
CopyOnAccessInMemoryStateInternals<String> underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
- StateTag<Object, WatermarkHoldState<BoundedWindow>> firstHoldAddress =
- StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
- WatermarkHoldState<BoundedWindow> firstHold =
+ StateTag<Object, WatermarkHoldState> firstHoldAddress =
+ StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
+ WatermarkHoldState firstHold =
underlying.state(StateNamespaces.window(null, first), firstHoldAddress);
firstHold.add(new Instant(22L));
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying("foo", underlying.commit());
- StateTag<Object, WatermarkHoldState<BoundedWindow>> secondHoldAddress =
- StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
- WatermarkHoldState<BoundedWindow> secondHold =
+ StateTag<Object, WatermarkHoldState> secondHoldAddress =
+ StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
+ WatermarkHoldState secondHold =
internals.state(StateNamespaces.window(null, second), secondHoldAddress);
secondHold.add(new Instant(244L));
@@ -583,18 +581,18 @@ public class CopyOnAccessInMemoryStateInternalsTest {
};
CopyOnAccessInMemoryStateInternals<String> underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
- StateTag<Object, WatermarkHoldState<BoundedWindow>> firstHoldAddress =
- StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
- WatermarkHoldState<BoundedWindow> firstHold =
+ StateTag<Object, WatermarkHoldState> firstHoldAddress =
+ StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
+ WatermarkHoldState firstHold =
underlying.state(StateNamespaces.window(null, first), firstHoldAddress);
firstHold.add(new Instant(224L));
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying("foo", underlying.commit());
- StateTag<Object, WatermarkHoldState<BoundedWindow>> secondHoldAddress =
- StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
- WatermarkHoldState<BoundedWindow> secondHold =
+ StateTag<Object, WatermarkHoldState> secondHoldAddress =
+ StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
+ WatermarkHoldState secondHold =
internals.state(StateNamespaces.window(null, second), secondHoldAddress);
secondHold.add(new Instant(24L));
@@ -610,7 +608,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
internals
.state(
StateNamespaces.global(),
- StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp()))
+ StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST))
.add(new Instant(1234L));
thrown.expect(IllegalStateException.class);
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java
index b904bfe..7ee2f69 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java
@@ -29,8 +29,8 @@ import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
@@ -60,8 +60,8 @@ public class HashingFlinkCombineRunner<
@SuppressWarnings("unchecked")
- OutputTimeFn<? super BoundedWindow> outputTimeFn =
- (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
+ TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
+ WindowFn<Object, W> windowFn = windowingStrategy.getWindowFn();
// Flink Iterable can be iterated over only once.
List<WindowedValue<KV<K, InputT>>> inputs = new ArrayList<>();
@@ -87,14 +87,21 @@ public class HashingFlinkCombineRunner<
AccumT accumT = flinkCombiner.firstInput(key, currentValue.getValue().getValue(),
options, sideInputReader, singletonW);
Instant windowTimestamp =
- outputTimeFn.assignOutputTime(currentValue.getTimestamp(), mergedWindow);
+ timestampCombiner.assign(
+ mergedWindow, windowFn.getOutputTime(currentValue.getTimestamp(), mergedWindow));
accumAndInstant = new Tuple2<>(accumT, windowTimestamp);
mapState.put(mergedWindow, accumAndInstant);
} else {
accumAndInstant.f0 = flinkCombiner.addInput(key, accumAndInstant.f0,
currentValue.getValue().getValue(), options, sideInputReader, singletonW);
- accumAndInstant.f1 = outputTimeFn.combine(accumAndInstant.f1,
- outputTimeFn.assignOutputTime(currentValue.getTimestamp(), mergedWindow));
+ accumAndInstant.f1 =
+ timestampCombiner.combine(
+ accumAndInstant.f1,
+ timestampCombiner.assign(
+ mergedWindow,
+ windowingStrategy
+ .getWindowFn()
+ .getOutputTime(currentValue.getTimestamp(), mergedWindow)));
}
}
if (iterator.hasNext()) {
[2/4] beam git commit: Replace OutputTimeFn UDF with
TimestampCombiner enum
Posted by ke...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
index 2967f2c..eac465c 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
@@ -26,8 +26,9 @@ import java.util.List;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -53,8 +54,9 @@ public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends Bou
Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
@SuppressWarnings("unchecked")
- OutputTimeFn<? super BoundedWindow> outputTimeFn =
- (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
+ TimestampCombiner timestampCombiner =
+ (TimestampCombiner) windowingStrategy.getTimestampCombiner();
+ WindowFn<Object, W> windowFn = windowingStrategy.getWindowFn();
// get all elements so that we can sort them, has to fit into
// memory
@@ -88,18 +90,19 @@ public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends Bou
// create accumulator using the first elements key
WindowedValue<KV<K, InputT>> currentValue = iterator.next();
K key = currentValue.getValue().getKey();
- BoundedWindow currentWindow = Iterables.getOnlyElement(currentValue.getWindows());
+ W currentWindow = (W) Iterables.getOnlyElement(currentValue.getWindows());
InputT firstValue = currentValue.getValue().getValue();
AccumT accumulator = flinkCombiner.firstInput(
key, firstValue, options, sideInputReader, currentValue.getWindows());
- // we use this to keep track of the timestamps assigned by the OutputTimeFn
+ // we use this to keep track of the timestamps assigned by the TimestampCombiner
Instant windowTimestamp =
- outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow);
+ timestampCombiner.assign(
+ currentWindow, windowFn.getOutputTime(currentValue.getTimestamp(), currentWindow));
while (iterator.hasNext()) {
WindowedValue<KV<K, InputT>> nextValue = iterator.next();
- BoundedWindow nextWindow = Iterables.getOnlyElement(nextValue.getWindows());
+ W nextWindow = (W) Iterables.getOnlyElement(nextValue.getWindows());
if (currentWindow.equals(nextWindow)) {
// continue accumulating and merge windows
@@ -108,9 +111,12 @@ public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends Bou
accumulator = flinkCombiner.addInput(key, accumulator, value,
options, sideInputReader, currentValue.getWindows());
- windowTimestamp = outputTimeFn.combine(
- windowTimestamp,
- outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow));
+ windowTimestamp =
+ timestampCombiner.combine(
+ windowTimestamp,
+ timestampCombiner.assign(
+ currentWindow,
+ windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow)));
} else {
// emit the value that we currently have
@@ -127,7 +133,9 @@ public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends Bou
InputT value = nextValue.getValue().getValue();
accumulator = flinkCombiner.firstInput(key, value,
options, sideInputReader, currentValue.getWindows());
- windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
+ windowTimestamp =
+ timestampCombiner.assign(
+ currentWindow, windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
index 3203446..d015c38 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CombineContextFactory;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
@@ -176,9 +176,9 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
- StateTag<? super K, WatermarkHoldState<W>> address,
- OutputTimeFn<? super W> outputTimeFn) {
+ public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+ StateTag<? super K, WatermarkHoldState> address,
+ TimestampCombiner timestampCombiner) {
throw new UnsupportedOperationException(
String.format("%s is not supported", WatermarkHoldState.class.getSimpleName()));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
index 24b340e..2dd7c96 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
@@ -38,7 +38,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
@@ -186,9 +186,9 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
- StateTag<? super K, WatermarkHoldState<W>> address,
- OutputTimeFn<? super W> outputTimeFn) {
+ public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+ StateTag<? super K, WatermarkHoldState> address,
+ TimestampCombiner timestampCombiner) {
throw new UnsupportedOperationException(
String.format("%s is not supported", CombiningState.class.getSimpleName()));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
index 2bf0bf1..17ea62a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.MapState;
@@ -146,9 +146,9 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
- StateTag<? super K, WatermarkHoldState<W>> address,
- OutputTimeFn<? super W> outputTimeFn) {
+ public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+ StateTag<? super K, WatermarkHoldState> address,
+ TimestampCombiner timestampCombiner) {
throw new UnsupportedOperationException(
String.format("%s is not supported", CombiningState.class.getSimpleName()));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index 4f961e5..878c914 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -32,7 +32,7 @@ import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.CombineContextFactory;
import org.apache.beam.sdk.util.state.BagState;
@@ -185,12 +185,12 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
- StateTag<? super K, WatermarkHoldState<W>> address,
- OutputTimeFn<? super W> outputTimeFn) {
+ public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+ StateTag<? super K, WatermarkHoldState> address,
+ TimestampCombiner timestampCombiner) {
return new FlinkWatermarkHoldState<>(
- flinkStateBackend, FlinkStateInternals.this, address, namespace, outputTimeFn);
+ flinkStateBackend, FlinkStateInternals.this, address, namespace, timestampCombiner);
}
});
}
@@ -912,9 +912,9 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
}
private static class FlinkWatermarkHoldState<K, W extends BoundedWindow>
- implements WatermarkHoldState<W> {
- private final StateTag<? super K, WatermarkHoldState<W>> address;
- private final OutputTimeFn<? super W> outputTimeFn;
+ implements WatermarkHoldState {
+ private final StateTag<? super K, WatermarkHoldState> address;
+ private final TimestampCombiner timestampCombiner;
private final StateNamespace namespace;
private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
private final FlinkStateInternals<K> flinkStateInternals;
@@ -923,11 +923,11 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
public FlinkWatermarkHoldState(
KeyedStateBackend<ByteBuffer> flinkStateBackend,
FlinkStateInternals<K> flinkStateInternals,
- StateTag<? super K, WatermarkHoldState<W>> address,
+ StateTag<? super K, WatermarkHoldState> address,
StateNamespace namespace,
- OutputTimeFn<? super W> outputTimeFn) {
+ TimestampCombiner timestampCombiner) {
this.address = address;
- this.outputTimeFn = outputTimeFn;
+ this.timestampCombiner = timestampCombiner;
this.namespace = namespace;
this.flinkStateBackend = flinkStateBackend;
this.flinkStateInternals = flinkStateInternals;
@@ -937,12 +937,12 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
}
@Override
- public OutputTimeFn<? super W> getOutputTimeFn() {
- return outputTimeFn;
+ public TimestampCombiner getTimestampCombiner() {
+ return timestampCombiner;
}
@Override
- public WatermarkHoldState<W> readLater() {
+ public WatermarkHoldState readLater() {
return this;
}
@@ -983,7 +983,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
state.update(value);
flinkStateInternals.watermarkHolds.put(namespace.stringKey(), value);
} else {
- Instant combined = outputTimeFn.combine(current, value);
+ Instant combined = timestampCombiner.combine(current, value);
state.update(combined);
flinkStateInternals.watermarkHolds.put(namespace.stringKey(), combined);
}
@@ -1035,7 +1035,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
if (!address.equals(that.address)) {
return false;
}
- if (!outputTimeFn.equals(that.outputTimeFn)) {
+ if (!timestampCombiner.equals(that.timestampCombiner)) {
return false;
}
return namespace.equals(that.namespace);
@@ -1045,7 +1045,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
@Override
public int hashCode() {
int result = address.hashCode();
- result = 31 * result + outputTimeFn.hashCode();
+ result = 31 * result + timestampCombiner.hashCode();
result = 31 * result + namespace.hashCode();
return result;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
index d140271..17c43bf 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
@@ -77,14 +77,12 @@ public class FlinkStateInternalsTest {
"sumInteger", VarIntCoder.of(), Sum.ofIntegers());
private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
StateTags.bag("stringBag", StringUtf8Coder.of());
- private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
- WATERMARK_EARLIEST_ADDR =
- StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
- private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
- WATERMARK_LATEST_ADDR =
- StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp());
- private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR =
- StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow());
+ private static final StateTag<Object, WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
+ StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
+ private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR =
+ StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
+ private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR =
+ StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
FlinkStateInternals<String> underTest;
@@ -274,7 +272,7 @@ public class FlinkStateInternalsTest {
@Test
public void testWatermarkEarliestState() throws Exception {
- WatermarkHoldState<BoundedWindow> value =
+ WatermarkHoldState value =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
// State instances are cached, but depend on the namespace.
@@ -298,7 +296,7 @@ public class FlinkStateInternalsTest {
@Test
public void testWatermarkLatestState() throws Exception {
- WatermarkHoldState<BoundedWindow> value =
+ WatermarkHoldState value =
underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
// State instances are cached, but depend on the namespace.
@@ -322,7 +320,7 @@ public class FlinkStateInternalsTest {
@Test
public void testWatermarkEndOfWindowState() throws Exception {
- WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
+ WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
// State instances are cached, but depend on the namespace.
assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
@@ -339,7 +337,7 @@ public class FlinkStateInternalsTest {
@Test
public void testWatermarkStateIsEmpty() throws Exception {
- WatermarkHoldState<BoundedWindow> value =
+ WatermarkHoldState value =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
assertThat(value.isEmpty().read(), Matchers.is(true));
@@ -353,9 +351,9 @@ public class FlinkStateInternalsTest {
@Test
public void testMergeEarliestWatermarkIntoSource() throws Exception {
- WatermarkHoldState<BoundedWindow> value1 =
+ WatermarkHoldState value1 =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
- WatermarkHoldState<BoundedWindow> value2 =
+ WatermarkHoldState value2 =
underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
value1.add(new Instant(3000));
@@ -372,11 +370,11 @@ public class FlinkStateInternalsTest {
@Test
public void testMergeLatestWatermarkIntoSource() throws Exception {
- WatermarkHoldState<BoundedWindow> value1 =
+ WatermarkHoldState value1 =
underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
- WatermarkHoldState<BoundedWindow> value2 =
+ WatermarkHoldState value2 =
underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
- WatermarkHoldState<BoundedWindow> value3 =
+ WatermarkHoldState value3 =
underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
value1.add(new Instant(3000));
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
index 725e9d3..c967521 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
@@ -34,7 +34,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
@@ -166,10 +166,10 @@ class SparkStateInternals<K> implements StateInternals<K> {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
- StateTag<? super K, WatermarkHoldState<W>> address,
- OutputTimeFn<? super W> outputTimeFn) {
- return new SparkWatermarkHoldState<>(namespace, address, outputTimeFn);
+ public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+ StateTag<? super K, WatermarkHoldState> address,
+ TimestampCombiner timestampCombiner) {
+ return new SparkWatermarkHoldState(namespace, address, timestampCombiner);
}
}
@@ -250,21 +250,21 @@ class SparkStateInternals<K> implements StateInternals<K> {
}
}
- private class SparkWatermarkHoldState<W extends BoundedWindow>
- extends AbstractState<Instant> implements WatermarkHoldState<W> {
+ private class SparkWatermarkHoldState extends AbstractState<Instant>
+ implements WatermarkHoldState {
- private final OutputTimeFn<? super W> outputTimeFn;
+ private final TimestampCombiner timestampCombiner;
public SparkWatermarkHoldState(
StateNamespace namespace,
- StateTag<?, WatermarkHoldState<W>> address,
- OutputTimeFn<? super W> outputTimeFn) {
+ StateTag<?, WatermarkHoldState> address,
+ TimestampCombiner timestampCombiner) {
super(namespace, address, InstantCoder.of());
- this.outputTimeFn = outputTimeFn;
+ this.timestampCombiner = timestampCombiner;
}
@Override
- public SparkWatermarkHoldState<W> readLater() {
+ public SparkWatermarkHoldState readLater() {
return this;
}
@@ -276,7 +276,10 @@ class SparkStateInternals<K> implements StateInternals<K> {
@Override
public void add(Instant outputTime) {
Instant combined = read();
- combined = (combined == null) ? outputTime : outputTimeFn.combine(combined, outputTime);
+ combined =
+ (combined == null)
+ ? outputTime
+ : getTimestampCombiner().combine(combined, outputTime);
writeValue(combined);
}
@@ -295,8 +298,8 @@ class SparkStateInternals<K> implements StateInternals<K> {
}
@Override
- public OutputTimeFn<? super W> getOutputTimeFn() {
- return outputTimeFn;
+ public TimestampCombiner getTimestampCombiner() {
+ return timestampCombiner;
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
index fa1c3fc..7d06d6b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
@@ -50,7 +50,7 @@ import org.apache.beam.sdk.values.TupleTag;
public class SparkAbstractCombineFn implements Serializable {
protected final SparkRuntimeContext runtimeContext;
protected final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs;
- protected final WindowingStrategy<?, ?> windowingStrategy;
+ protected final WindowingStrategy<?, BoundedWindow> windowingStrategy;
public SparkAbstractCombineFn(
@@ -59,7 +59,7 @@ public class SparkAbstractCombineFn implements Serializable {
WindowingStrategy<?, ?> windowingStrategy) {
this.runtimeContext = runtimeContext;
this.sideInputs = sideInputs;
- this.windowingStrategy = windowingStrategy;
+ this.windowingStrategy = (WindowingStrategy<?, BoundedWindow>) windowingStrategy;
}
// each Spark task should get it's own copy of this SparkKeyedCombineFn, and since Spark tasks
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
index 23f5d20..7d026c6 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
@@ -29,8 +29,9 @@ import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
@@ -70,9 +71,8 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
// sort exploded inputs.
Iterable<WindowedValue<InputT>> sortedInputs = sortByWindows(input.explodeWindows());
- @SuppressWarnings("unchecked")
- OutputTimeFn<? super BoundedWindow> outputTimeFn =
- (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
+ TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
+ WindowFn<?, BoundedWindow> windowFn = windowingStrategy.getWindowFn();
//--- inputs iterator, by window order.
final Iterator<WindowedValue<InputT>> iterator = sortedInputs.iterator();
@@ -84,9 +84,13 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
accumulator = combineFn.addInput(accumulator, currentInput.getValue(),
ctxtForInput(currentInput));
- // keep track of the timestamps assigned by the OutputTimeFn.
+ // keep track of the timestamps assigned by the TimestampCombiner.
Instant windowTimestamp =
- outputTimeFn.assignOutputTime(currentInput.getTimestamp(), currentWindow);
+ timestampCombiner.assign(
+ currentWindow,
+ windowingStrategy
+ .getWindowFn()
+ .getOutputTime(currentInput.getTimestamp(), currentWindow));
// accumulate the next windows, or output.
List<WindowedValue<AccumT>> output = Lists.newArrayList();
@@ -109,8 +113,13 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
// keep accumulating and carry on ;-)
accumulator = combineFn.addInput(accumulator, nextValue.getValue(),
ctxtForInput(nextValue));
- windowTimestamp = outputTimeFn.combine(windowTimestamp,
- outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow));
+ windowTimestamp =
+ timestampCombiner.merge(
+ currentWindow,
+ windowTimestamp,
+ windowingStrategy
+ .getWindowFn()
+ .getOutputTime(nextValue.getTimestamp(), currentWindow));
} else {
// moving to the next window, first add the current accumulation to output
// and initialize the accumulator.
@@ -121,7 +130,8 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
accumulator = combineFn.addInput(accumulator, nextValue.getValue(),
ctxtForInput(nextValue));
currentWindow = nextWindow;
- windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
+ windowTimestamp = timestampCombiner.assign(currentWindow,
+ windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow));
}
}
@@ -162,8 +172,7 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
Iterable<WindowedValue<AccumT>> sortedAccumulators = sortByWindows(accumulators);
@SuppressWarnings("unchecked")
- OutputTimeFn<? super BoundedWindow> outputTimeFn =
- (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
+ TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
//--- accumulators iterator, by window order.
final Iterator<WindowedValue<AccumT>> iterator = sortedAccumulators.iterator();
@@ -174,7 +183,7 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
List<AccumT> currentWindowAccumulators = Lists.newArrayList();
currentWindowAccumulators.add(currentValue.getValue());
- // keep track of the timestamps assigned by the OutputTimeFn,
+ // keep track of the timestamps assigned by the TimestampCombiner,
// in createCombiner we already merge the timestamps assigned
// to individual elements, here we will just merge them.
List<Instant> windowTimestamps = Lists.newArrayList();
@@ -206,7 +215,7 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
// add the current accumulation to the output and initialize the accumulation.
// merge the timestamps of all accumulators to merge.
- Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps);
+ Instant mergedTimestamp = timestampCombiner.merge(currentWindow, windowTimestamps);
// merge accumulators.
// transforming a KV<K, Iterable<AccumT>> into a KV<K, Iterable<AccumT>>.
@@ -231,7 +240,7 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
}
// merge the last chunk of accumulators.
- Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps);
+ Instant mergedTimestamp = timestampCombiner.merge(currentWindow, windowTimestamps);
Iterable<AccumT> accumsToMerge = Iterables.unmodifiableIterable(currentWindowAccumulators);
WindowedValue<Iterable<AccumT>> preMergeWindowedValue = WindowedValue.of(
accumsToMerge, mergedTimestamp, currentWindow, PaneInfo.NO_FIRING);
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
index b5d243f..66c03bc 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
@@ -29,8 +29,9 @@ import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
@@ -72,9 +73,8 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
// sort exploded inputs.
Iterable<WindowedValue<KV<K, InputT>>> sortedInputs = sortByWindows(wkvi.explodeWindows());
- @SuppressWarnings("unchecked")
- OutputTimeFn<? super BoundedWindow> outputTimeFn =
- (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
+ TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
+ WindowFn<?, BoundedWindow> windowFn = windowingStrategy.getWindowFn();
//--- inputs iterator, by window order.
final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInputs.iterator();
@@ -87,9 +87,13 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
accumulator = combineFn.addInput(key, accumulator, currentInput.getValue().getValue(),
ctxtForInput(currentInput));
- // keep track of the timestamps assigned by the OutputTimeFn.
+ // keep track of the timestamps assigned by the TimestampCombiner.
Instant windowTimestamp =
- outputTimeFn.assignOutputTime(currentInput.getTimestamp(), currentWindow);
+ timestampCombiner.assign(
+ currentWindow,
+ windowingStrategy
+ .getWindowFn()
+ .getOutputTime(currentInput.getTimestamp(), currentWindow));
// accumulate the next windows, or output.
List<WindowedValue<KV<K, AccumT>>> output = Lists.newArrayList();
@@ -112,8 +116,12 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
// keep accumulating and carry on ;-)
accumulator = combineFn.addInput(key, accumulator, nextValue.getValue().getValue(),
ctxtForInput(nextValue));
- windowTimestamp = outputTimeFn.combine(windowTimestamp,
- outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow));
+ windowTimestamp =
+ timestampCombiner.combine(
+ windowTimestamp,
+ timestampCombiner.assign(
+ currentWindow,
+ windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow)));
} else {
// moving to the next window, first add the current accumulation to output
// and initialize the accumulator.
@@ -124,7 +132,9 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
accumulator = combineFn.addInput(key, accumulator, nextValue.getValue().getValue(),
ctxtForInput(nextValue));
currentWindow = nextWindow;
- windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
+ windowTimestamp =
+ timestampCombiner.assign(
+ currentWindow, windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow));
}
}
@@ -170,8 +180,7 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
Iterable<WindowedValue<KV<K, AccumT>>> sortedAccumulators = sortByWindows(accumulators);
@SuppressWarnings("unchecked")
- OutputTimeFn<? super BoundedWindow> outputTimeFn =
- (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
+ TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
//--- accumulators iterator, by window order.
final Iterator<WindowedValue<KV<K, AccumT>>> iterator = sortedAccumulators.iterator();
@@ -183,7 +192,7 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
List<AccumT> currentWindowAccumulators = Lists.newArrayList();
currentWindowAccumulators.add(currentValue.getValue().getValue());
- // keep track of the timestamps assigned by the OutputTimeFn,
+ // keep track of the timestamps assigned by the TimestampCombiner,
// in createCombiner we already merge the timestamps assigned
// to individual elements, here we will just merge them.
List<Instant> windowTimestamps = Lists.newArrayList();
@@ -215,7 +224,7 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
// add the current accumulation to the output and initialize the accumulation.
// merge the timestamps of all accumulators to merge.
- Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps);
+ Instant mergedTimestamp = timestampCombiner.merge(currentWindow, windowTimestamps);
// merge accumulators.
// transforming a KV<K, Iterable<AccumT>> into a KV<K, Iterable<AccumT>>.
@@ -241,7 +250,7 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
}
// merge the last chunk of accumulators.
- Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps);
+ Instant mergedTimestamp = timestampCombiner.merge(currentWindow, windowTimestamps);
Iterable<AccumT> accumsToMerge = Iterables.unmodifiableIterable(currentWindowAccumulators);
WindowedValue<KV<K, Iterable<AccumT>>> preMergeWindowedValue = WindowedValue.of(
KV.of(key, accumsToMerge), mergedTimestamp, currentWindow, PaneInfo.NO_FIRING);
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 6c46453..58b5a84 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -135,11 +135,6 @@
<dependencies>
<dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-common-runner-api</artifactId>
- </dependency>
-
- <dependency>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
index 63e7903..e8c2f8d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
@@ -17,11 +17,14 @@
*/
package org.apache.beam.sdk.testing;
+import static com.google.common.base.Preconditions.checkArgument;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -35,8 +38,7 @@ import javax.annotation.Nullable;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
@@ -252,20 +254,19 @@ public class WindowFnTestUtils {
/**
* Verifies that later-ending merged windows from any of the timestamps hold up output of
- * earlier-ending windows, using the provided {@link WindowFn} and {@link OutputTimeFn}.
+ * earlier-ending windows, using the provided {@link WindowFn} and {@link TimestampCombiner}.
*
* <p>Given a list of lists of timestamps, where each list is expected to merge into a single
* window with end times in ascending order, assigns and merges windows for each list (as though
- * each were a separate key/user session). Then maps each timestamp in the list according to
- * {@link OutputTimeFn#assignOutputTime outputTimeFn.assignOutputTime()} and
- * {@link OutputTimeFn#combine outputTimeFn.combine()}.
+ * each were a separate key/user session). Then combines each timestamp in the list according to
+ * the provided {@link TimestampCombiner}.
*
* <p>Verifies that a overlapping windows do not hold each other up via the watermark.
*/
public static <T, W extends IntervalWindow>
void validateGetOutputTimestamps(
WindowFn<T, W> windowFn,
- OutputTimeFn<? super W> outputTimeFn,
+ TimestampCombiner timestampCombiner,
List<List<Long>> timestampsPerWindow) throws Exception {
// Assign windows to each timestamp, then merge them, storing the merged windows in
@@ -300,10 +301,11 @@ public class WindowFnTestUtils {
List<Instant> outputInstants = new ArrayList<>();
for (long inputTimestamp : timestampsForWindow) {
- outputInstants.add(outputTimeFn.assignOutputTime(new Instant(inputTimestamp), window));
+ outputInstants.add(
+ assignOutputTime(timestampCombiner, new Instant(inputTimestamp), window));
}
- combinedOutputTimestamps.add(OutputTimeFns.combineOutputTimes(outputTimeFn, outputInstants));
+ combinedOutputTimestamps.add(combineOutputTimes(timestampCombiner, outputInstants));
}
// Consider windows in increasing order of max timestamp; ensure the output timestamp is after
@@ -321,4 +323,37 @@ public class WindowFnTestUtils {
earlierEndingWindow = window;
}
}
+
+ private static Instant assignOutputTime(
+ TimestampCombiner timestampCombiner, Instant inputTimestamp, BoundedWindow window) {
+ switch (timestampCombiner) {
+ case EARLIEST:
+ case LATEST:
+ return inputTimestamp;
+ case END_OF_WINDOW:
+ return window.maxTimestamp();
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unknown %s: %s", TimestampCombiner.class, timestampCombiner));
+ }
+ }
+
+ private static Instant combineOutputTimes(
+ TimestampCombiner timestampCombiner, Iterable<Instant> outputInstants) {
+ checkArgument(
+ !Iterables.isEmpty(outputInstants),
+ "Cannot combine zero instants with %s",
+ timestampCombiner);
+ switch(timestampCombiner) {
+ case EARLIEST:
+ return Ordering.natural().min(outputInstants);
+ case LATEST:
+ return Ordering.natural().max(outputInstants);
+ case END_OF_WINDOW:
+ return outputInstants.iterator().next();
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unknown %s: %s", TimestampCombiner.class, timestampCombiner));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
index cc92102..d9c4c9f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -97,7 +98,7 @@ import org.apache.beam.sdk.values.PCollection.IsBounded;
* for details on the estimation.
*
* <p>The timestamp for each emitted pane is determined by the
- * {@link Window#withOutputTimeFn windowing operation}.
+ * {@link Window#withTimestampCombiner(TimestampCombiner)} windowing operation}.
* The output {@code PCollection} will have the same {@link WindowFn}
* as the input.
*
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java
deleted file mode 100644
index 0efd278..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import com.google.common.collect.Ordering;
-import java.io.Serializable;
-import java.util.Objects;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.joda.time.Instant;
-
-/**
- * <b><i>(Experimental)</i></b> A function from timestamps of input values to the timestamp for a
- * computed value.
- *
- * <p>The function is represented via three components:
- * <ol>
- * <li>{@link #assignOutputTime} calculates an output timestamp for any input
- * value in a particular window.</li>
- * <li>The output timestamps for all non-late input values within a window are combined
- * according to {@link #combine combine()}, a commutative and associative operation on
- * the output timestamps.</li>
- * <li>The output timestamp when windows merge is provided by {@link #merge merge()}.</li>
- * </ol>
- *
- * <p>This abstract class cannot be subclassed directly, by design: it may grow
- * in consumer-compatible ways that require mutually-exclusive default implementations. To
- * create a concrete subclass, extend {@link OutputTimeFn.Defaults} or
- * {@link OutputTimeFn.DependsOnlyOnWindow}. Note that as long as this class remains
- * experimental, we may also choose to change it in arbitrary backwards-incompatible ways.
- *
- * @param <W> the type of window. Contravariant: methods accepting any subtype of
- * {@code OutputTimeFn<W>} should use the parameter type {@code OutputTimeFn<? super W>}.
- */
-@Experimental(Experimental.Kind.OUTPUT_TIME)
-public abstract class OutputTimeFn<W extends BoundedWindow> implements Serializable {
-
- protected OutputTimeFn() { }
-
- /**
- * Returns the output timestamp to use for data depending on the given
- * {@code inputTimestamp} in the specified {@code window}.
- *
- * <p>The result of this method must be between {@code inputTimestamp} and
- * {@code window.maxTimestamp()} (inclusive on both sides).
- *
- * <p>This function must be monotonic across input timestamps. Specifically, if {@code A < B},
- * then {@code assignOutputTime(A, window) <= assignOutputTime(B, window)}.
- *
- * <p>For a {@link WindowFn} that doesn't produce overlapping windows, this can (and typically
- * should) just return {@code inputTimestamp}. In the presence of overlapping windows, it is
- * suggested that the result in later overlapping windows is past the end of earlier windows
- * so that the later windows don't prevent the watermark from
- * progressing past the end of the earlier window.
- *
- * <p>See the overview of {@link OutputTimeFn} for the consistency properties required
- * between {@link #assignOutputTime}, {@link #combine}, and {@link #merge}.
- */
- public abstract Instant assignOutputTime(Instant inputTimestamp, W window);
-
- /**
- * Combines the given output times, which must be from the same window, into an output time
- * for a computed value.
- *
- * <ul>
- * <li>{@code combine} must be commutative: {@code combine(a, b).equals(combine(b, a))}.</li>
- * <li>{@code combine} must be associative:
- * {@code combine(a, combine(b, c)).equals(combine(combine(a, b), c))}.</li>
- * </ul>
- */
- public abstract Instant combine(Instant outputTime, Instant otherOutputTime);
-
- /**
- * Merges the given output times, presumed to be combined output times for windows that
- * are merging, into an output time for the {@code resultWindow}.
- *
- * <p>When windows {@code w1} and {@code w2} merge to become a new window {@code w1plus2},
- * then {@link #merge} must be implemented such that the output time is the same as
- * if all timestamps were assigned in {@code w1plus2}. Formally:
- *
- * <p>{@code fn.merge(w, fn.assignOutputTime(t1, w1), fn.assignOutputTime(t2, w2))}
- *
- * <p>must be equal to
- *
- * <p>{@code fn.combine(fn.assignOutputTime(t1, w1plus2), fn.assignOutputTime(t2, w1plus2))}
- *
- * <p>If the assigned time depends only on the window, the correct implementation of
- * {@link #merge merge()} necessarily returns the result of
- * {@link #assignOutputTime assignOutputTime(t1, w1plus2)}
- * (which equals {@link #assignOutputTime assignOutputTime(t2, w1plus2)}.
- * Defaults for this case are provided by {@link DependsOnlyOnWindow}.
- *
- * <p>For many other {@link OutputTimeFn} implementations, such as taking the earliest or latest
- * timestamp, this will be the same as {@link #combine combine()}. Defaults for this
- * case are provided by {@link Defaults}.
- */
- public abstract Instant merge(W intoWindow, Iterable<? extends Instant> mergingTimestamps);
-
- /**
- * Returns {@code true} if the result of combination of many output timestamps actually depends
- * only on the earliest.
- *
- * <p>This may allow optimizations when it is very efficient to retrieve the earliest timestamp
- * to be combined.
- */
- public abstract boolean dependsOnlyOnEarliestInputTimestamp();
-
- /**
- * Returns {@code true} if the result does not depend on what outputs were combined but only
- * the window they are in. The canonical example is if all timestamps are sure to
- * be the end of the window.
- *
- * <p>This may allow optimizations, since it is typically very efficient to retrieve the window
- * and combining output timestamps is not necessary.
- *
- * <p>If the assigned output time for an implementation depends only on the window, consider
- * extending {@link DependsOnlyOnWindow}, which returns {@code true} here and also provides
- * a framework for easily implementing a correct {@link #merge}, {@link #combine} and
- * {@link #assignOutputTime}.
- */
- public abstract boolean dependsOnlyOnWindow();
-
- /**
- * <b><i>(Experimental)</i></b> Default method implementations for {@link OutputTimeFn} where the
- * output time depends on the input element timestamps and possibly the window.
- *
- * <p>To complete an implementation, override {@link #assignOutputTime}, at a minimum.
- *
- * <p>By default, {@link #combine} and {@link #merge} return the earliest timestamp of their
- * inputs.
- */
- public abstract static class Defaults<W extends BoundedWindow> extends OutputTimeFn<W> {
-
- protected Defaults() {
- super();
- }
-
- /**
- * {@inheritDoc}
- *
- * @return the earlier of the two timestamps.
- */
- @Override
- public Instant combine(Instant outputTimestamp, Instant otherOutputTimestamp) {
- return Ordering.natural().min(outputTimestamp, otherOutputTimestamp);
- }
-
- /**
- * {@inheritDoc}
- *
- * @return the result of {@link #combine combine(outputTimstamp, otherOutputTimestamp)},
- * by default.
- */
- @Override
- public Instant merge(W resultWindow, Iterable<? extends Instant> mergingTimestamps) {
- return OutputTimeFns.combineOutputTimes(this, mergingTimestamps);
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code false} by default. An {@link OutputTimeFn} that is known to depend only on the
- * window should extend {@link OutputTimeFn.DependsOnlyOnWindow}.
- */
- @Override
- public boolean dependsOnlyOnWindow() {
- return false;
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true} by default.
- */
- @Override
- public boolean dependsOnlyOnEarliestInputTimestamp() {
- return false;
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true} if the two {@link OutputTimeFn} instances have the same class, by
- * default.
- */
- @Override
- public boolean equals(Object other) {
- if (other == null) {
- return false;
- }
-
- return this.getClass().equals(other.getClass());
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(getClass());
- }
- }
-
- /**
- * <b><i>(Experimental)</i></b> Default method implementations for {@link OutputTimeFn} when the
- * output time depends only on the window.
- *
- * <p>To complete an implementation, override {@link #assignOutputTime(BoundedWindow)}.
- */
- public abstract static class DependsOnlyOnWindow<W extends BoundedWindow>
- extends OutputTimeFn<W> {
-
- protected DependsOnlyOnWindow() {
- super();
- }
-
- /**
- * Returns the output timestamp to use for data in the specified {@code window}.
- *
- * <p>Note that the result of this method must be between the maximum possible input timestamp
- * in {@code window} and {@code window.maxTimestamp()} (inclusive on both sides).
- *
- * <p>For example, using {@code Sessions.withGapDuration(gapDuration)}, we know that all input
- * timestamps must lie at least {@code gapDuration} from the end of the session, so
- * {@code window.maxTimestamp() - gapDuration} is an acceptable assigned timestamp.
- *
- * @see #assignOutputTime(Instant, BoundedWindow)
- */
- protected abstract Instant assignOutputTime(W window);
-
- /**
- * {@inheritDoc}
- *
- * @return the result of {#link assignOutputTime(BoundedWindow) assignOutputTime(window)}.
- */
- @Override
- public final Instant assignOutputTime(Instant timestamp, W window) {
- return assignOutputTime(window);
- }
-
- /**
- * {@inheritDoc}
- *
- * @return the same timestamp as both argument timestamps, which are necessarily equal.
- */
- @Override
- public final Instant combine(Instant outputTimestamp, Instant otherOutputTimestamp) {
- return outputTimestamp;
- }
-
- /**
- * {@inheritDoc}
- *
- * @return the result of
- * {@link #assignOutputTime(BoundedWindow) assignOutputTime(resultWindow)}.
- */
- @Override
- public final Instant merge(W resultWindow, Iterable<? extends Instant> mergingTimestamps) {
- return assignOutputTime(resultWindow);
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true}.
- */
- @Override
- public final boolean dependsOnlyOnWindow() {
- return true;
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true}. Since the output time depends only on the window, it can
- * certainly be ascertained given a single input timestamp.
- */
- @Override
- public final boolean dependsOnlyOnEarliestInputTimestamp() {
- return true;
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true} if the two {@link OutputTimeFn} instances have the same class, by
- * default.
- */
- @Override
- public boolean equals(Object other) {
- if (other == null) {
- return false;
- }
-
- return this.getClass().equals(other.getClass());
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(getClass());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java
deleted file mode 100644
index b5d67fa..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.joda.time.Instant;
-
-/**
- * <b><i>(Experimental)</i></b> Static utility methods and provided implementations for
- * {@link OutputTimeFn}.
- */
-@Experimental(Experimental.Kind.OUTPUT_TIME)
-public class OutputTimeFns {
- /**
- * The policy of outputting at the earliest of the input timestamps for non-late input data
- * that led to a computed value.
- *
- * <p>For example, suppose <i>v</i><sub>1</sub> through <i>v</i><sub>n</sub> are all on-time
- * elements being aggregated via some function {@code f} into
- * {@code f}(<i>v</i><sub>1</sub>, ..., <i>v</i><sub>n</sub>. When emitted, the output
- * timestamp of the result will be the earliest of the event time timestamps
- *
- * <p>If data arrives late, it has no effect on the output timestamp.
- */
- public static OutputTimeFn<BoundedWindow> outputAtEarliestInputTimestamp() {
- return new OutputAtEarliestInputTimestamp();
- }
-
- /**
- * The policy of holding the watermark to the latest of the input timestamps
- * for non-late input data that led to a computed value.
- *
- * <p>For example, suppose <i>v</i><sub>1</sub> through <i>v</i><sub>n</sub> are all on-time
- * elements being aggregated via some function {@code f} into
- * {@code f}(<i>v</i><sub>1</sub>, ..., <i>v</i><sub>n</sub>. When emitted, the output
- * timestamp of the result will be the latest of the event time timestamps
- *
- * <p>If data arrives late, it has no effect on the output timestamp.
- */
- public static OutputTimeFn<BoundedWindow> outputAtLatestInputTimestamp() {
- return new OutputAtLatestInputTimestamp();
- }
-
- /**
- * The policy of outputting with timestamps at the end of the window.
- *
- * <p>Note that this output timestamp depends only on the window. See
- * {#link dependsOnlyOnWindow()}.
- *
- * <p>When windows merge, instead of using {@link OutputTimeFn#combine} to obtain an output
- * timestamp for the results in the new window, it is mandatory to obtain a new output
- * timestamp from {@link OutputTimeFn#assignOutputTime} with the new window and an arbitrary
- * timestamp (because it is guaranteed that the timestamp is irrelevant).
- *
- * <p>For non-merging window functions, this {@link OutputTimeFn} works transparently.
- */
- public static OutputTimeFn<BoundedWindow> outputAtEndOfWindow() {
- return new OutputAtEndOfWindow();
- }
-
- /**
- * Applies the given {@link OutputTimeFn} to the given output times, obtaining
- * the output time for a value computed. See {@link OutputTimeFn#combine} for
- * a full specification.
- *
- * @throws IllegalArgumentException if {@code outputTimes} is empty.
- */
- public static Instant combineOutputTimes(
- OutputTimeFn<?> outputTimeFn, Iterable<? extends Instant> outputTimes) {
- checkArgument(
- !Iterables.isEmpty(outputTimes),
- "Collection of output times must not be empty in %s.combineOutputTimes",
- OutputTimeFns.class.getName());
-
- @Nullable
- Instant combinedOutputTime = null;
- for (Instant outputTime : outputTimes) {
- combinedOutputTime =
- combinedOutputTime == null
- ? outputTime : outputTimeFn.combine(combinedOutputTime, outputTime);
- }
- return combinedOutputTime;
- }
-
- /**
- * See {@link #outputAtEarliestInputTimestamp}.
- */
- private static class OutputAtEarliestInputTimestamp extends OutputTimeFn.Defaults<BoundedWindow> {
- @Override
- public Instant assignOutputTime(Instant inputTimestamp, BoundedWindow window) {
- return inputTimestamp;
- }
-
- @Override
- public Instant combine(Instant outputTime, Instant otherOutputTime) {
- return Ordering.natural().min(outputTime, otherOutputTime);
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true}. The result of any combine will be the earliest input timestamp.
- */
- @Override
- public boolean dependsOnlyOnEarliestInputTimestamp() {
- return true;
- }
- }
-
- /**
- * See {@link #outputAtLatestInputTimestamp}.
- */
- private static class OutputAtLatestInputTimestamp extends OutputTimeFn.Defaults<BoundedWindow> {
- @Override
- public Instant assignOutputTime(Instant inputTimestamp, BoundedWindow window) {
- return inputTimestamp;
- }
-
- @Override
- public Instant combine(Instant outputTime, Instant otherOutputTime) {
- return Ordering.natural().max(outputTime, otherOutputTime);
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code false}.
- */
- @Override
- public boolean dependsOnlyOnEarliestInputTimestamp() {
- return false;
- }
- }
-
- private static class OutputAtEndOfWindow extends OutputTimeFn.DependsOnlyOnWindow<BoundedWindow> {
-
- /**
- *{@inheritDoc}
- *
- *@return {@code window.maxTimestamp()}.
- */
- @Override
- protected Instant assignOutputTime(BoundedWindow window) {
- return window.maxTimestamp();
- }
-
- @Override
- public String toString() {
- return getClass().getCanonicalName();
- }
- }
-
- public static RunnerApi.OutputTime toProto(OutputTimeFn<?> outputTimeFn) {
- if (outputTimeFn instanceof OutputAtEarliestInputTimestamp) {
- return RunnerApi.OutputTime.EARLIEST_IN_PANE;
- } else if (outputTimeFn instanceof OutputAtLatestInputTimestamp) {
- return RunnerApi.OutputTime.LATEST_IN_PANE;
- } else if (outputTimeFn instanceof OutputAtEndOfWindow) {
- return RunnerApi.OutputTime.END_OF_WINDOW;
- } else {
- throw new IllegalArgumentException(
- String.format(
- "Cannot convert %s to %s: %s",
- OutputTimeFn.class.getCanonicalName(),
- RunnerApi.OutputTime.class.getCanonicalName(),
- outputTimeFn));
- }
- }
-
- public static OutputTimeFn<?> fromProto(RunnerApi.OutputTime proto) {
- switch (proto) {
- case EARLIEST_IN_PANE:
- return OutputTimeFns.outputAtEarliestInputTimestamp();
- case LATEST_IN_PANE:
- return OutputTimeFns.outputAtLatestInputTimestamp();
- case END_OF_WINDOW:
- return OutputTimeFns.outputAtEndOfWindow();
- case UNRECOGNIZED:
- default:
- // Whether or not it is proto that cannot recognize it (due to the version of the
- // generated code we link to) or the switch hasn't been updated to handle it,
- // the situation is the same: we don't know what this OutputTime means
- throw new IllegalArgumentException(
- String.format(
- "Cannot convert unknown %s to %s: %s",
- RunnerApi.OutputTime.class.getCanonicalName(),
- OutputTimeFn.class.getCanonicalName(),
- proto));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java
new file mode 100644
index 0000000..39fe8a9
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+import java.util.Arrays;
+import java.util.Collections;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.joda.time.Instant;
+
+/**
+ * Policies for combining timestamps that occur within a window.
+ */
+@Experimental(Experimental.Kind.OUTPUT_TIME)
+public enum TimestampCombiner {
+ /**
+ * The policy of taking at the earliest of a set of timestamps.
+ *
+ * <p>When used in windowed aggregations, the timestamps of non-late inputs will be combined after
+ * they are shifted by the {@link WindowFn} (to allow downstream watermark progress).
+ *
+ * <p>If data arrives late, it has no effect on the output timestamp.
+ */
+ EARLIEST {
+ @Override
+ public Instant combine(Iterable<? extends Instant> timestamps) {
+ return Ordering.natural().min(timestamps);
+ }
+
+ @Override
+ public Instant merge(BoundedWindow intoWindow, Iterable<? extends Instant> mergingTimestamps) {
+ return combine(mergingTimestamps);
+ }
+
+ @Override
+ public boolean dependsOnlyOnEarliestTimestamp() {
+ return true;
+ }
+
+ @Override
+ public boolean dependsOnlyOnWindow() {
+ return false;
+ }
+ },
+
+ /**
+ * The policy of taking the latest of a set of timestamps.
+ *
+ * <p>When used in windowed aggregations, the timestamps of non-late inputs will be combined after
+ * they are shifted by the {@link WindowFn} (to allow downstream watermark progress).
+ *
+ * <p>If data arrives late, it has no effect on the output timestamp.
+ */
+ LATEST {
+ @Override
+ public Instant combine(Iterable<? extends Instant> timestamps) {
+ return Ordering.natural().max(timestamps);
+ }
+
+ @Override
+ public Instant merge(BoundedWindow intoWindow, Iterable<? extends Instant> mergingTimestamps) {
+ return combine(mergingTimestamps);
+ }
+
+ @Override
+ public boolean dependsOnlyOnEarliestTimestamp() {
+ return false;
+ }
+
+ @Override
+ public boolean dependsOnlyOnWindow() {
+ return false;
+ }
+ },
+
+ /**
+ * The policy of using the end of the window, regardless of input timestamps.
+ *
+ * <p>When used in windowed aggregations, the timestamps of non-late inputs will be combined after
+ * they are shifted by the {@link WindowFn} (to allow downstream watermark progress).
+ *
+ * <p>If data arrives late, it has no effect on the output timestamp.
+ */
+ END_OF_WINDOW {
+ @Override
+ public Instant combine(Iterable<? extends Instant> timestamps) {
+ checkArgument(Iterables.size(timestamps) > 0);
+ return Iterables.get(timestamps, 0);
+ }
+
+ @Override
+ public Instant merge(BoundedWindow intoWindow, Iterable<? extends Instant> mergingTimestamps) {
+ return intoWindow.maxTimestamp();
+ }
+
+ @Override
+ public boolean dependsOnlyOnEarliestTimestamp() {
+ return false;
+ }
+
+ @Override
+ public boolean dependsOnlyOnWindow() {
+ return true;
+ }
+ };
+
+ /**
+ * Combines the given times, which must be from the same window and must have been passed through
+ * {@link #merge}.
+ *
+ * <ul>
+ * <li>{@code combine} must be commutative: {@code combine(a, b).equals(combine(b, a))}.
+ * <li>{@code combine} must be associative: {@code combine(a, combine(b,
+ * c)).equals(combine(combine(a, b), c))}.
+ * </ul>
+ */
+ public abstract Instant combine(Iterable<? extends Instant> timestamps);
+
+ /**
+ * Merges the given timestamps, which may have originated in separate windows, into the context of
+ * the result window.
+ */
+ public abstract Instant merge(
+ BoundedWindow intoWindow, Iterable<? extends Instant> mergingTimestamps);
+
+ /**
+ * Shorthand for {@link #merge} with just one element, to place it into the context of
+ * a window.
+ *
+ * <p>For example, the {@link #END_OF_WINDOW} policy moves the timestamp to the end of the window.
+ */
+ public final Instant assign(BoundedWindow intoWindow, Instant timestamp) {
+ return merge(intoWindow, Collections.singleton(timestamp));
+ }
+
+ /**
+ * Varargs variant of {@link #combine}.
+ */
+ public final Instant combine(Instant... timestamps) {
+ return combine(Arrays.asList(timestamps));
+ }
+
+ /**
+ * Varargs variant of {@link #merge}.
+ */
+ public final Instant merge(BoundedWindow intoWindow, Instant... timestamps) {
+ return merge(intoWindow, Arrays.asList(timestamps));
+ }
+
+ /**
+ * Returns {@code true} if the result of combination of many output timestamps actually depends
+ * only on the earliest.
+ *
+ * <p>This may allow optimizations when it is very efficient to retrieve the earliest timestamp
+ * to be combined.
+ */
+ public abstract boolean dependsOnlyOnEarliestTimestamp();
+
+ /**
+ * Returns {@code true} if the result does not depend on what outputs were combined but only
+ * the window they are in. The canonical example is if all timestamps are sure to
+ * be the end of the window.
+ *
+ * <p>This may allow optimizations, since it is typically very efficient to retrieve the window
+ * and combining output timestamps is not necessary.
+ */
+ public abstract boolean dependsOnlyOnWindow();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index 1000ff7..cb7b430 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -193,7 +193,7 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
@Nullable abstract AccumulationMode getAccumulationMode();
@Nullable abstract Duration getAllowedLateness();
@Nullable abstract ClosingBehavior getClosingBehavior();
- @Nullable abstract OutputTimeFn<?> getOutputTimeFn();
+ @Nullable abstract TimestampCombiner getTimestampCombiner();
abstract Builder<T> toBuilder();
@@ -204,7 +204,7 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
abstract Builder<T> setAccumulationMode(AccumulationMode mode);
abstract Builder<T> setAllowedLateness(Duration allowedLateness);
abstract Builder<T> setClosingBehavior(ClosingBehavior closingBehavior);
- abstract Builder<T> setOutputTimeFn(OutputTimeFn<?> outputTimeFn);
+ abstract Builder<T> setTimestampCombiner(TimestampCombiner timestampCombiner);
abstract Window<T> build();
}
@@ -273,12 +273,12 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
}
/**
- * <b><i>(Experimental)</i></b> Override the default {@link OutputTimeFn}, to control
+ * <b><i>(Experimental)</i></b> Override the default {@link TimestampCombiner}, to control
* the output timestamp of values output from a {@link GroupByKey} operation.
*/
@Experimental(Kind.OUTPUT_TIME)
- public Window<T> withOutputTimeFn(OutputTimeFn<?> outputTimeFn) {
- return toBuilder().setOutputTimeFn(outputTimeFn).build();
+ public Window<T> withTimestampCombiner(TimestampCombiner timestampCombiner) {
+ return toBuilder().setTimestampCombiner(timestampCombiner).build();
}
/**
@@ -300,8 +300,6 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
* Get the output strategy of this {@link Window Window PTransform}. For internal use
* only.
*/
- // Rawtype cast of OutputTimeFn cannot be eliminated with intermediate variable, as it is
- // casting between wildcards
public WindowingStrategy<?, ?> getOutputStrategyInternal(
WindowingStrategy<?, ?> inputStrategy) {
WindowingStrategy<?, ?> result = inputStrategy;
@@ -320,8 +318,8 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
if (getClosingBehavior() != null) {
result = result.withClosingBehavior(getClosingBehavior());
}
- if (getOutputTimeFn() != null) {
- result = result.withOutputTimeFn(getOutputTimeFn());
+ if (getTimestampCombiner() != null) {
+ result = result.withTimestampCombiner(getTimestampCombiner());
}
return result;
}
@@ -411,9 +409,9 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
.withLabel("Window Closing Behavior"));
}
- if (getOutputTimeFn() != null) {
- builder.add(DisplayData.item("outputTimeFn", getOutputTimeFn().getClass())
- .withLabel("Output Time Function"));
+ if (getTimestampCombiner() != null) {
+ builder.add(DisplayData.item("timestampCombiner", getTimestampCombiner().toString())
+ .withLabel("Timestamp Combiner"));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
index 0c27c4f..706e039 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
@@ -22,7 +22,7 @@ import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -57,13 +57,14 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollecti
// If the input has already had its windows merged, then the GBK that performed the merge
// will have set originalStrategy.getWindowFn() to InvalidWindows, causing the GBK contained
// here to fail. Instead, we install a valid WindowFn that leaves all windows unchanged.
- // The OutputTimeFn is set to ensure the GroupByKey does not shift elements forwards in time.
+ // The TimestampCombiner is set to ensure the GroupByKey does not shift elements forwards in
+ // time.
// Because this outputs as fast as possible, this should not hold the watermark.
Window<KV<K, V>> rewindow =
Window.<KV<K, V>>into(new IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
.triggering(new ReshuffleTrigger<>())
.discardingFiredPanes()
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+ .withTimestampCombiner(TimestampCombiner.EARLIEST)
.withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
return input.apply(rewindow)
[4/4] beam git commit: This closes #2683: Replace OutputTimeFn UDF
with TimestampCombiner enum
Posted by ke...@apache.org.
This closes #2683: Replace OutputTimeFn UDF with TimestampCombiner enum
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a32371eb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a32371eb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a32371eb
Branch: refs/heads/master
Commit: a32371eb3e49916ffb2d898e34ee8d13edd65cf8
Parents: 7339882 f38e427
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Apr 26 15:08:41 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Apr 26 15:08:41 2017 -0700
----------------------------------------------------------------------
.../beam/examples/complete/game/GameStats.java | 4 +-
.../translation/utils/ApexStateInternals.java | 26 +-
.../translation/GroupByKeyTranslatorTest.java | 10 +-
.../utils/ApexStateInternalsTest.java | 33 +-
.../core/construction/WindowingStrategies.java | 52 ++-
.../construction/WindowingStrategiesTest.java | 6 +-
.../runners/core/InMemoryStateInternals.java | 32 +-
.../beam/runners/core/ReduceFnRunner.java | 4 +-
.../beam/runners/core/SplittableParDo.java | 8 +-
.../apache/beam/runners/core/StateMerging.java | 32 +-
.../org/apache/beam/runners/core/StateTag.java | 11 +-
.../org/apache/beam/runners/core/StateTags.java | 16 +-
.../core/TestInMemoryStateInternals.java | 2 +-
.../apache/beam/runners/core/WatermarkHold.java | 45 +--
.../core/GroupAlsoByWindowsProperties.java | 20 +-
.../core/InMemoryStateInternalsTest.java | 34 +-
.../beam/runners/core/ReduceFnRunnerTest.java | 38 +--
.../beam/runners/core/ReduceFnTester.java | 13 +-
.../apache/beam/runners/core/StateTagTest.java | 16 +-
.../CopyOnAccessInMemoryStateInternals.java | 24 +-
.../direct/ParDoMultiOverrideFactory.java | 6 +-
.../CopyOnAccessInMemoryStateInternalsTest.java | 54 ++--
.../functions/HashingFlinkCombineRunner.java | 19 +-
.../functions/SortingFlinkCombineRunner.java | 30 +-
.../state/FlinkBroadcastStateInternals.java | 8 +-
.../state/FlinkKeyGroupStateInternals.java | 8 +-
.../state/FlinkSplitStateInternals.java | 8 +-
.../streaming/state/FlinkStateInternals.java | 34 +-
.../streaming/FlinkStateInternalsTest.java | 34 +-
.../spark/stateful/SparkStateInternals.java | 33 +-
.../translation/SparkAbstractCombineFn.java | 4 +-
.../spark/translation/SparkGlobalCombineFn.java | 37 ++-
.../spark/translation/SparkKeyedCombineFn.java | 37 ++-
sdks/java/core/pom.xml | 5 -
.../beam/sdk/testing/WindowFnTestUtils.java | 53 +++-
.../apache/beam/sdk/transforms/GroupByKey.java | 3 +-
.../sdk/transforms/windowing/OutputTimeFn.java | 314 -------------------
.../sdk/transforms/windowing/OutputTimeFns.java | 212 -------------
.../transforms/windowing/TimestampCombiner.java | 186 +++++++++++
.../beam/sdk/transforms/windowing/Window.java | 22 +-
.../org/apache/beam/sdk/util/Reshuffle.java | 7 +-
.../apache/beam/sdk/util/WindowingStrategy.java | 176 +++--------
.../apache/beam/sdk/util/state/StateBinder.java | 12 +-
.../apache/beam/sdk/util/state/StateSpecs.java | 23 +-
.../beam/sdk/util/state/WatermarkHoldState.java | 19 +-
.../org/apache/beam/SdkCoreApiSurfaceTest.java | 1 -
.../beam/sdk/transforms/GroupByKeyTest.java | 10 +-
.../sdk/transforms/join/CoGroupByKeyTest.java | 6 +-
.../transforms/windowing/OutputTimeFnsTest.java | 51 ---
.../sdk/transforms/windowing/SessionsTest.java | 6 +-
.../sdk/transforms/windowing/WindowTest.java | 23 +-
.../sdk/transforms/windowing/WindowingTest.java | 2 +-
.../org/apache/beam/GcpCoreApiSurfaceTest.java | 1 -
53 files changed, 740 insertions(+), 1130 deletions(-)
----------------------------------------------------------------------