You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/26 22:38:22 UTC
[2/4] beam git commit: Replace OutputTimeFn UDF with
TimestampCombiner enum
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
index 2967f2c..eac465c 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
@@ -26,8 +26,9 @@ import java.util.List;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -53,8 +54,9 @@ public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends Bou
Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
@SuppressWarnings("unchecked")
- OutputTimeFn<? super BoundedWindow> outputTimeFn =
- (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
+ TimestampCombiner timestampCombiner =
+ (TimestampCombiner) windowingStrategy.getTimestampCombiner();
+ WindowFn<Object, W> windowFn = windowingStrategy.getWindowFn();
// get all elements so that we can sort them, has to fit into
// memory
@@ -88,18 +90,19 @@ public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends Bou
// create accumulator using the first elements key
WindowedValue<KV<K, InputT>> currentValue = iterator.next();
K key = currentValue.getValue().getKey();
- BoundedWindow currentWindow = Iterables.getOnlyElement(currentValue.getWindows());
+ W currentWindow = (W) Iterables.getOnlyElement(currentValue.getWindows());
InputT firstValue = currentValue.getValue().getValue();
AccumT accumulator = flinkCombiner.firstInput(
key, firstValue, options, sideInputReader, currentValue.getWindows());
- // we use this to keep track of the timestamps assigned by the OutputTimeFn
+ // we use this to keep track of the timestamps assigned by the TimestampCombiner
Instant windowTimestamp =
- outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow);
+ timestampCombiner.assign(
+ currentWindow, windowFn.getOutputTime(currentValue.getTimestamp(), currentWindow));
while (iterator.hasNext()) {
WindowedValue<KV<K, InputT>> nextValue = iterator.next();
- BoundedWindow nextWindow = Iterables.getOnlyElement(nextValue.getWindows());
+ W nextWindow = (W) Iterables.getOnlyElement(nextValue.getWindows());
if (currentWindow.equals(nextWindow)) {
// continue accumulating and merge windows
@@ -108,9 +111,12 @@ public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends Bou
accumulator = flinkCombiner.addInput(key, accumulator, value,
options, sideInputReader, currentValue.getWindows());
- windowTimestamp = outputTimeFn.combine(
- windowTimestamp,
- outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow));
+ windowTimestamp =
+ timestampCombiner.combine(
+ windowTimestamp,
+ timestampCombiner.assign(
+ currentWindow,
+ windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow)));
} else {
// emit the value that we currently have
@@ -127,7 +133,9 @@ public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends Bou
InputT value = nextValue.getValue().getValue();
accumulator = flinkCombiner.firstInput(key, value,
options, sideInputReader, currentValue.getWindows());
- windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
+ windowTimestamp =
+ timestampCombiner.assign(
+ currentWindow, windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
index 3203446..d015c38 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CombineContextFactory;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
@@ -176,9 +176,9 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
- StateTag<? super K, WatermarkHoldState<W>> address,
- OutputTimeFn<? super W> outputTimeFn) {
+ public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+ StateTag<? super K, WatermarkHoldState> address,
+ TimestampCombiner timestampCombiner) {
throw new UnsupportedOperationException(
String.format("%s is not supported", WatermarkHoldState.class.getSimpleName()));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
index 24b340e..2dd7c96 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
@@ -38,7 +38,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
@@ -186,9 +186,9 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
- StateTag<? super K, WatermarkHoldState<W>> address,
- OutputTimeFn<? super W> outputTimeFn) {
+ public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+ StateTag<? super K, WatermarkHoldState> address,
+ TimestampCombiner timestampCombiner) {
throw new UnsupportedOperationException(
String.format("%s is not supported", CombiningState.class.getSimpleName()));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
index 2bf0bf1..17ea62a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.MapState;
@@ -146,9 +146,9 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
- StateTag<? super K, WatermarkHoldState<W>> address,
- OutputTimeFn<? super W> outputTimeFn) {
+ public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+ StateTag<? super K, WatermarkHoldState> address,
+ TimestampCombiner timestampCombiner) {
throw new UnsupportedOperationException(
String.format("%s is not supported", CombiningState.class.getSimpleName()));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index 4f961e5..878c914 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -32,7 +32,7 @@ import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.CombineContextFactory;
import org.apache.beam.sdk.util.state.BagState;
@@ -185,12 +185,12 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
- StateTag<? super K, WatermarkHoldState<W>> address,
- OutputTimeFn<? super W> outputTimeFn) {
+ public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+ StateTag<? super K, WatermarkHoldState> address,
+ TimestampCombiner timestampCombiner) {
return new FlinkWatermarkHoldState<>(
- flinkStateBackend, FlinkStateInternals.this, address, namespace, outputTimeFn);
+ flinkStateBackend, FlinkStateInternals.this, address, namespace, timestampCombiner);
}
});
}
@@ -912,9 +912,9 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
}
private static class FlinkWatermarkHoldState<K, W extends BoundedWindow>
- implements WatermarkHoldState<W> {
- private final StateTag<? super K, WatermarkHoldState<W>> address;
- private final OutputTimeFn<? super W> outputTimeFn;
+ implements WatermarkHoldState {
+ private final StateTag<? super K, WatermarkHoldState> address;
+ private final TimestampCombiner timestampCombiner;
private final StateNamespace namespace;
private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
private final FlinkStateInternals<K> flinkStateInternals;
@@ -923,11 +923,11 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
public FlinkWatermarkHoldState(
KeyedStateBackend<ByteBuffer> flinkStateBackend,
FlinkStateInternals<K> flinkStateInternals,
- StateTag<? super K, WatermarkHoldState<W>> address,
+ StateTag<? super K, WatermarkHoldState> address,
StateNamespace namespace,
- OutputTimeFn<? super W> outputTimeFn) {
+ TimestampCombiner timestampCombiner) {
this.address = address;
- this.outputTimeFn = outputTimeFn;
+ this.timestampCombiner = timestampCombiner;
this.namespace = namespace;
this.flinkStateBackend = flinkStateBackend;
this.flinkStateInternals = flinkStateInternals;
@@ -937,12 +937,12 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
}
@Override
- public OutputTimeFn<? super W> getOutputTimeFn() {
- return outputTimeFn;
+ public TimestampCombiner getTimestampCombiner() {
+ return timestampCombiner;
}
@Override
- public WatermarkHoldState<W> readLater() {
+ public WatermarkHoldState readLater() {
return this;
}
@@ -983,7 +983,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
state.update(value);
flinkStateInternals.watermarkHolds.put(namespace.stringKey(), value);
} else {
- Instant combined = outputTimeFn.combine(current, value);
+ Instant combined = timestampCombiner.combine(current, value);
state.update(combined);
flinkStateInternals.watermarkHolds.put(namespace.stringKey(), combined);
}
@@ -1035,7 +1035,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
if (!address.equals(that.address)) {
return false;
}
- if (!outputTimeFn.equals(that.outputTimeFn)) {
+ if (!timestampCombiner.equals(that.timestampCombiner)) {
return false;
}
return namespace.equals(that.namespace);
@@ -1045,7 +1045,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
@Override
public int hashCode() {
int result = address.hashCode();
- result = 31 * result + outputTimeFn.hashCode();
+ result = 31 * result + timestampCombiner.hashCode();
result = 31 * result + namespace.hashCode();
return result;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
index d140271..17c43bf 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
@@ -77,14 +77,12 @@ public class FlinkStateInternalsTest {
"sumInteger", VarIntCoder.of(), Sum.ofIntegers());
private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
StateTags.bag("stringBag", StringUtf8Coder.of());
- private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
- WATERMARK_EARLIEST_ADDR =
- StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
- private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
- WATERMARK_LATEST_ADDR =
- StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp());
- private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR =
- StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow());
+ private static final StateTag<Object, WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
+ StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
+ private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR =
+ StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
+ private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR =
+ StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
FlinkStateInternals<String> underTest;
@@ -274,7 +272,7 @@ public class FlinkStateInternalsTest {
@Test
public void testWatermarkEarliestState() throws Exception {
- WatermarkHoldState<BoundedWindow> value =
+ WatermarkHoldState value =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
// State instances are cached, but depend on the namespace.
@@ -298,7 +296,7 @@ public class FlinkStateInternalsTest {
@Test
public void testWatermarkLatestState() throws Exception {
- WatermarkHoldState<BoundedWindow> value =
+ WatermarkHoldState value =
underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
// State instances are cached, but depend on the namespace.
@@ -322,7 +320,7 @@ public class FlinkStateInternalsTest {
@Test
public void testWatermarkEndOfWindowState() throws Exception {
- WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
+ WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
// State instances are cached, but depend on the namespace.
assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
@@ -339,7 +337,7 @@ public class FlinkStateInternalsTest {
@Test
public void testWatermarkStateIsEmpty() throws Exception {
- WatermarkHoldState<BoundedWindow> value =
+ WatermarkHoldState value =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
assertThat(value.isEmpty().read(), Matchers.is(true));
@@ -353,9 +351,9 @@ public class FlinkStateInternalsTest {
@Test
public void testMergeEarliestWatermarkIntoSource() throws Exception {
- WatermarkHoldState<BoundedWindow> value1 =
+ WatermarkHoldState value1 =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
- WatermarkHoldState<BoundedWindow> value2 =
+ WatermarkHoldState value2 =
underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
value1.add(new Instant(3000));
@@ -372,11 +370,11 @@ public class FlinkStateInternalsTest {
@Test
public void testMergeLatestWatermarkIntoSource() throws Exception {
- WatermarkHoldState<BoundedWindow> value1 =
+ WatermarkHoldState value1 =
underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
- WatermarkHoldState<BoundedWindow> value2 =
+ WatermarkHoldState value2 =
underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
- WatermarkHoldState<BoundedWindow> value3 =
+ WatermarkHoldState value3 =
underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
value1.add(new Instant(3000));
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
index 725e9d3..c967521 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
@@ -34,7 +34,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
@@ -166,10 +166,10 @@ class SparkStateInternals<K> implements StateInternals<K> {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
- StateTag<? super K, WatermarkHoldState<W>> address,
- OutputTimeFn<? super W> outputTimeFn) {
- return new SparkWatermarkHoldState<>(namespace, address, outputTimeFn);
+ public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+ StateTag<? super K, WatermarkHoldState> address,
+ TimestampCombiner timestampCombiner) {
+ return new SparkWatermarkHoldState(namespace, address, timestampCombiner);
}
}
@@ -250,21 +250,21 @@ class SparkStateInternals<K> implements StateInternals<K> {
}
}
- private class SparkWatermarkHoldState<W extends BoundedWindow>
- extends AbstractState<Instant> implements WatermarkHoldState<W> {
+ private class SparkWatermarkHoldState extends AbstractState<Instant>
+ implements WatermarkHoldState {
- private final OutputTimeFn<? super W> outputTimeFn;
+ private final TimestampCombiner timestampCombiner;
public SparkWatermarkHoldState(
StateNamespace namespace,
- StateTag<?, WatermarkHoldState<W>> address,
- OutputTimeFn<? super W> outputTimeFn) {
+ StateTag<?, WatermarkHoldState> address,
+ TimestampCombiner timestampCombiner) {
super(namespace, address, InstantCoder.of());
- this.outputTimeFn = outputTimeFn;
+ this.timestampCombiner = timestampCombiner;
}
@Override
- public SparkWatermarkHoldState<W> readLater() {
+ public SparkWatermarkHoldState readLater() {
return this;
}
@@ -276,7 +276,10 @@ class SparkStateInternals<K> implements StateInternals<K> {
@Override
public void add(Instant outputTime) {
Instant combined = read();
- combined = (combined == null) ? outputTime : outputTimeFn.combine(combined, outputTime);
+ combined =
+ (combined == null)
+ ? outputTime
+ : getTimestampCombiner().combine(combined, outputTime);
writeValue(combined);
}
@@ -295,8 +298,8 @@ class SparkStateInternals<K> implements StateInternals<K> {
}
@Override
- public OutputTimeFn<? super W> getOutputTimeFn() {
- return outputTimeFn;
+ public TimestampCombiner getTimestampCombiner() {
+ return timestampCombiner;
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
index fa1c3fc..7d06d6b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
@@ -50,7 +50,7 @@ import org.apache.beam.sdk.values.TupleTag;
public class SparkAbstractCombineFn implements Serializable {
protected final SparkRuntimeContext runtimeContext;
protected final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs;
- protected final WindowingStrategy<?, ?> windowingStrategy;
+ protected final WindowingStrategy<?, BoundedWindow> windowingStrategy;
public SparkAbstractCombineFn(
@@ -59,7 +59,7 @@ public class SparkAbstractCombineFn implements Serializable {
WindowingStrategy<?, ?> windowingStrategy) {
this.runtimeContext = runtimeContext;
this.sideInputs = sideInputs;
- this.windowingStrategy = windowingStrategy;
+ this.windowingStrategy = (WindowingStrategy<?, BoundedWindow>) windowingStrategy;
}
// each Spark task should get it's own copy of this SparkKeyedCombineFn, and since Spark tasks
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
index 23f5d20..7d026c6 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
@@ -29,8 +29,9 @@ import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
@@ -70,9 +71,8 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
// sort exploded inputs.
Iterable<WindowedValue<InputT>> sortedInputs = sortByWindows(input.explodeWindows());
- @SuppressWarnings("unchecked")
- OutputTimeFn<? super BoundedWindow> outputTimeFn =
- (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
+ TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
+ WindowFn<?, BoundedWindow> windowFn = windowingStrategy.getWindowFn();
//--- inputs iterator, by window order.
final Iterator<WindowedValue<InputT>> iterator = sortedInputs.iterator();
@@ -84,9 +84,13 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
accumulator = combineFn.addInput(accumulator, currentInput.getValue(),
ctxtForInput(currentInput));
- // keep track of the timestamps assigned by the OutputTimeFn.
+ // keep track of the timestamps assigned by the TimestampCombiner.
Instant windowTimestamp =
- outputTimeFn.assignOutputTime(currentInput.getTimestamp(), currentWindow);
+ timestampCombiner.assign(
+ currentWindow,
+ windowingStrategy
+ .getWindowFn()
+ .getOutputTime(currentInput.getTimestamp(), currentWindow));
// accumulate the next windows, or output.
List<WindowedValue<AccumT>> output = Lists.newArrayList();
@@ -109,8 +113,13 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
// keep accumulating and carry on ;-)
accumulator = combineFn.addInput(accumulator, nextValue.getValue(),
ctxtForInput(nextValue));
- windowTimestamp = outputTimeFn.combine(windowTimestamp,
- outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow));
+ windowTimestamp =
+ timestampCombiner.merge(
+ currentWindow,
+ windowTimestamp,
+ windowingStrategy
+ .getWindowFn()
+ .getOutputTime(nextValue.getTimestamp(), currentWindow));
} else {
// moving to the next window, first add the current accumulation to output
// and initialize the accumulator.
@@ -121,7 +130,8 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
accumulator = combineFn.addInput(accumulator, nextValue.getValue(),
ctxtForInput(nextValue));
currentWindow = nextWindow;
- windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
+ windowTimestamp = timestampCombiner.assign(currentWindow,
+ windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow));
}
}
@@ -162,8 +172,7 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
Iterable<WindowedValue<AccumT>> sortedAccumulators = sortByWindows(accumulators);
@SuppressWarnings("unchecked")
- OutputTimeFn<? super BoundedWindow> outputTimeFn =
- (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
+ TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
//--- accumulators iterator, by window order.
final Iterator<WindowedValue<AccumT>> iterator = sortedAccumulators.iterator();
@@ -174,7 +183,7 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
List<AccumT> currentWindowAccumulators = Lists.newArrayList();
currentWindowAccumulators.add(currentValue.getValue());
- // keep track of the timestamps assigned by the OutputTimeFn,
+ // keep track of the timestamps assigned by the TimestampCombiner,
// in createCombiner we already merge the timestamps assigned
// to individual elements, here we will just merge them.
List<Instant> windowTimestamps = Lists.newArrayList();
@@ -206,7 +215,7 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
// add the current accumulation to the output and initialize the accumulation.
// merge the timestamps of all accumulators to merge.
- Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps);
+ Instant mergedTimestamp = timestampCombiner.merge(currentWindow, windowTimestamps);
// merge accumulators.
// transforming a KV<K, Iterable<AccumT>> into a KV<K, Iterable<AccumT>>.
@@ -231,7 +240,7 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
}
// merge the last chunk of accumulators.
- Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps);
+ Instant mergedTimestamp = timestampCombiner.merge(currentWindow, windowTimestamps);
Iterable<AccumT> accumsToMerge = Iterables.unmodifiableIterable(currentWindowAccumulators);
WindowedValue<Iterable<AccumT>> preMergeWindowedValue = WindowedValue.of(
accumsToMerge, mergedTimestamp, currentWindow, PaneInfo.NO_FIRING);
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
index b5d243f..66c03bc 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
@@ -29,8 +29,9 @@ import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
@@ -72,9 +73,8 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
// sort exploded inputs.
Iterable<WindowedValue<KV<K, InputT>>> sortedInputs = sortByWindows(wkvi.explodeWindows());
- @SuppressWarnings("unchecked")
- OutputTimeFn<? super BoundedWindow> outputTimeFn =
- (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
+ TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
+ WindowFn<?, BoundedWindow> windowFn = windowingStrategy.getWindowFn();
//--- inputs iterator, by window order.
final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInputs.iterator();
@@ -87,9 +87,13 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
accumulator = combineFn.addInput(key, accumulator, currentInput.getValue().getValue(),
ctxtForInput(currentInput));
- // keep track of the timestamps assigned by the OutputTimeFn.
+ // keep track of the timestamps assigned by the TimestampCombiner.
Instant windowTimestamp =
- outputTimeFn.assignOutputTime(currentInput.getTimestamp(), currentWindow);
+ timestampCombiner.assign(
+ currentWindow,
+ windowingStrategy
+ .getWindowFn()
+ .getOutputTime(currentInput.getTimestamp(), currentWindow));
// accumulate the next windows, or output.
List<WindowedValue<KV<K, AccumT>>> output = Lists.newArrayList();
@@ -112,8 +116,12 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
// keep accumulating and carry on ;-)
accumulator = combineFn.addInput(key, accumulator, nextValue.getValue().getValue(),
ctxtForInput(nextValue));
- windowTimestamp = outputTimeFn.combine(windowTimestamp,
- outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow));
+ windowTimestamp =
+ timestampCombiner.combine(
+ windowTimestamp,
+ timestampCombiner.assign(
+ currentWindow,
+ windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow)));
} else {
// moving to the next window, first add the current accumulation to output
// and initialize the accumulator.
@@ -124,7 +132,9 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
accumulator = combineFn.addInput(key, accumulator, nextValue.getValue().getValue(),
ctxtForInput(nextValue));
currentWindow = nextWindow;
- windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
+ windowTimestamp =
+ timestampCombiner.assign(
+ currentWindow, windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow));
}
}
@@ -170,8 +180,7 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
Iterable<WindowedValue<KV<K, AccumT>>> sortedAccumulators = sortByWindows(accumulators);
@SuppressWarnings("unchecked")
- OutputTimeFn<? super BoundedWindow> outputTimeFn =
- (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
+ TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
//--- accumulators iterator, by window order.
final Iterator<WindowedValue<KV<K, AccumT>>> iterator = sortedAccumulators.iterator();
@@ -183,7 +192,7 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
List<AccumT> currentWindowAccumulators = Lists.newArrayList();
currentWindowAccumulators.add(currentValue.getValue().getValue());
- // keep track of the timestamps assigned by the OutputTimeFn,
+ // keep track of the timestamps assigned by the TimestampCombiner,
// in createCombiner we already merge the timestamps assigned
// to individual elements, here we will just merge them.
List<Instant> windowTimestamps = Lists.newArrayList();
@@ -215,7 +224,7 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
// add the current accumulation to the output and initialize the accumulation.
// merge the timestamps of all accumulators to merge.
- Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps);
+ Instant mergedTimestamp = timestampCombiner.merge(currentWindow, windowTimestamps);
// merge accumulators.
// transforming a KV<K, Iterable<AccumT>> into a KV<K, Iterable<AccumT>>.
@@ -241,7 +250,7 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
}
// merge the last chunk of accumulators.
- Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps);
+ Instant mergedTimestamp = timestampCombiner.merge(currentWindow, windowTimestamps);
Iterable<AccumT> accumsToMerge = Iterables.unmodifiableIterable(currentWindowAccumulators);
WindowedValue<KV<K, Iterable<AccumT>>> preMergeWindowedValue = WindowedValue.of(
KV.of(key, accumsToMerge), mergedTimestamp, currentWindow, PaneInfo.NO_FIRING);
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 6c46453..58b5a84 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -135,11 +135,6 @@
<dependencies>
<dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-common-runner-api</artifactId>
- </dependency>
-
- <dependency>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
index 63e7903..e8c2f8d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
@@ -17,11 +17,14 @@
*/
package org.apache.beam.sdk.testing;
+import static com.google.common.base.Preconditions.checkArgument;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -35,8 +38,7 @@ import javax.annotation.Nullable;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
@@ -252,20 +254,19 @@ public class WindowFnTestUtils {
/**
* Verifies that later-ending merged windows from any of the timestamps hold up output of
- * earlier-ending windows, using the provided {@link WindowFn} and {@link OutputTimeFn}.
+ * earlier-ending windows, using the provided {@link WindowFn} and {@link TimestampCombiner}.
*
* <p>Given a list of lists of timestamps, where each list is expected to merge into a single
* window with end times in ascending order, assigns and merges windows for each list (as though
- * each were a separate key/user session). Then maps each timestamp in the list according to
- * {@link OutputTimeFn#assignOutputTime outputTimeFn.assignOutputTime()} and
- * {@link OutputTimeFn#combine outputTimeFn.combine()}.
+ * each were a separate key/user session). Then combines each timestamp in the list according to
+ * the provided {@link TimestampCombiner}.
*
* <p>Verifies that a overlapping windows do not hold each other up via the watermark.
*/
public static <T, W extends IntervalWindow>
void validateGetOutputTimestamps(
WindowFn<T, W> windowFn,
- OutputTimeFn<? super W> outputTimeFn,
+ TimestampCombiner timestampCombiner,
List<List<Long>> timestampsPerWindow) throws Exception {
// Assign windows to each timestamp, then merge them, storing the merged windows in
@@ -300,10 +301,11 @@ public class WindowFnTestUtils {
List<Instant> outputInstants = new ArrayList<>();
for (long inputTimestamp : timestampsForWindow) {
- outputInstants.add(outputTimeFn.assignOutputTime(new Instant(inputTimestamp), window));
+ outputInstants.add(
+ assignOutputTime(timestampCombiner, new Instant(inputTimestamp), window));
}
- combinedOutputTimestamps.add(OutputTimeFns.combineOutputTimes(outputTimeFn, outputInstants));
+ combinedOutputTimestamps.add(combineOutputTimes(timestampCombiner, outputInstants));
}
// Consider windows in increasing order of max timestamp; ensure the output timestamp is after
@@ -321,4 +323,37 @@ public class WindowFnTestUtils {
earlierEndingWindow = window;
}
}
+
+ private static Instant assignOutputTime(
+ TimestampCombiner timestampCombiner, Instant inputTimestamp, BoundedWindow window) {
+ switch (timestampCombiner) {
+ case EARLIEST:
+ case LATEST:
+ return inputTimestamp;
+ case END_OF_WINDOW:
+ return window.maxTimestamp();
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unknown %s: %s", TimestampCombiner.class, timestampCombiner));
+ }
+ }
+
+ private static Instant combineOutputTimes(
+ TimestampCombiner timestampCombiner, Iterable<Instant> outputInstants) {
+ checkArgument(
+ !Iterables.isEmpty(outputInstants),
+ "Cannot combine zero instants with %s",
+ timestampCombiner);
+ switch(timestampCombiner) {
+ case EARLIEST:
+ return Ordering.natural().min(outputInstants);
+ case LATEST:
+ return Ordering.natural().max(outputInstants);
+ case END_OF_WINDOW:
+ return outputInstants.iterator().next();
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unknown %s: %s", TimestampCombiner.class, timestampCombiner));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
index cc92102..d9c4c9f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -97,7 +98,7 @@ import org.apache.beam.sdk.values.PCollection.IsBounded;
* for details on the estimation.
*
* <p>The timestamp for each emitted pane is determined by the
- * {@link Window#withOutputTimeFn windowing operation}.
+ * {@link Window#withTimestampCombiner(TimestampCombiner)} windowing operation}.
* The output {@code PCollection} will have the same {@link WindowFn}
* as the input.
*
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java
deleted file mode 100644
index 0efd278..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import com.google.common.collect.Ordering;
-import java.io.Serializable;
-import java.util.Objects;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.joda.time.Instant;
-
-/**
- * <b><i>(Experimental)</i></b> A function from timestamps of input values to the timestamp for a
- * computed value.
- *
- * <p>The function is represented via three components:
- * <ol>
- * <li>{@link #assignOutputTime} calculates an output timestamp for any input
- * value in a particular window.</li>
- * <li>The output timestamps for all non-late input values within a window are combined
- * according to {@link #combine combine()}, a commutative and associative operation on
- * the output timestamps.</li>
- * <li>The output timestamp when windows merge is provided by {@link #merge merge()}.</li>
- * </ol>
- *
- * <p>This abstract class cannot be subclassed directly, by design: it may grow
- * in consumer-compatible ways that require mutually-exclusive default implementations. To
- * create a concrete subclass, extend {@link OutputTimeFn.Defaults} or
- * {@link OutputTimeFn.DependsOnlyOnWindow}. Note that as long as this class remains
- * experimental, we may also choose to change it in arbitrary backwards-incompatible ways.
- *
- * @param <W> the type of window. Contravariant: methods accepting any subtype of
- * {@code OutputTimeFn<W>} should use the parameter type {@code OutputTimeFn<? super W>}.
- */
-@Experimental(Experimental.Kind.OUTPUT_TIME)
-public abstract class OutputTimeFn<W extends BoundedWindow> implements Serializable {
-
- protected OutputTimeFn() { }
-
- /**
- * Returns the output timestamp to use for data depending on the given
- * {@code inputTimestamp} in the specified {@code window}.
- *
- * <p>The result of this method must be between {@code inputTimestamp} and
- * {@code window.maxTimestamp()} (inclusive on both sides).
- *
- * <p>This function must be monotonic across input timestamps. Specifically, if {@code A < B},
- * then {@code assignOutputTime(A, window) <= assignOutputTime(B, window)}.
- *
- * <p>For a {@link WindowFn} that doesn't produce overlapping windows, this can (and typically
- * should) just return {@code inputTimestamp}. In the presence of overlapping windows, it is
- * suggested that the result in later overlapping windows is past the end of earlier windows
- * so that the later windows don't prevent the watermark from
- * progressing past the end of the earlier window.
- *
- * <p>See the overview of {@link OutputTimeFn} for the consistency properties required
- * between {@link #assignOutputTime}, {@link #combine}, and {@link #merge}.
- */
- public abstract Instant assignOutputTime(Instant inputTimestamp, W window);
-
- /**
- * Combines the given output times, which must be from the same window, into an output time
- * for a computed value.
- *
- * <ul>
- * <li>{@code combine} must be commutative: {@code combine(a, b).equals(combine(b, a))}.</li>
- * <li>{@code combine} must be associative:
- * {@code combine(a, combine(b, c)).equals(combine(combine(a, b), c))}.</li>
- * </ul>
- */
- public abstract Instant combine(Instant outputTime, Instant otherOutputTime);
-
- /**
- * Merges the given output times, presumed to be combined output times for windows that
- * are merging, into an output time for the {@code resultWindow}.
- *
- * <p>When windows {@code w1} and {@code w2} merge to become a new window {@code w1plus2},
- * then {@link #merge} must be implemented such that the output time is the same as
- * if all timestamps were assigned in {@code w1plus2}. Formally:
- *
- * <p>{@code fn.merge(w, fn.assignOutputTime(t1, w1), fn.assignOutputTime(t2, w2))}
- *
- * <p>must be equal to
- *
- * <p>{@code fn.combine(fn.assignOutputTime(t1, w1plus2), fn.assignOutputTime(t2, w1plus2))}
- *
- * <p>If the assigned time depends only on the window, the correct implementation of
- * {@link #merge merge()} necessarily returns the result of
- * {@link #assignOutputTime assignOutputTime(t1, w1plus2)}
- * (which equals {@link #assignOutputTime assignOutputTime(t2, w1plus2)}.
- * Defaults for this case are provided by {@link DependsOnlyOnWindow}.
- *
- * <p>For many other {@link OutputTimeFn} implementations, such as taking the earliest or latest
- * timestamp, this will be the same as {@link #combine combine()}. Defaults for this
- * case are provided by {@link Defaults}.
- */
- public abstract Instant merge(W intoWindow, Iterable<? extends Instant> mergingTimestamps);
-
- /**
- * Returns {@code true} if the result of combination of many output timestamps actually depends
- * only on the earliest.
- *
- * <p>This may allow optimizations when it is very efficient to retrieve the earliest timestamp
- * to be combined.
- */
- public abstract boolean dependsOnlyOnEarliestInputTimestamp();
-
- /**
- * Returns {@code true} if the result does not depend on what outputs were combined but only
- * the window they are in. The canonical example is if all timestamps are sure to
- * be the end of the window.
- *
- * <p>This may allow optimizations, since it is typically very efficient to retrieve the window
- * and combining output timestamps is not necessary.
- *
- * <p>If the assigned output time for an implementation depends only on the window, consider
- * extending {@link DependsOnlyOnWindow}, which returns {@code true} here and also provides
- * a framework for easily implementing a correct {@link #merge}, {@link #combine} and
- * {@link #assignOutputTime}.
- */
- public abstract boolean dependsOnlyOnWindow();
-
- /**
- * <b><i>(Experimental)</i></b> Default method implementations for {@link OutputTimeFn} where the
- * output time depends on the input element timestamps and possibly the window.
- *
- * <p>To complete an implementation, override {@link #assignOutputTime}, at a minimum.
- *
- * <p>By default, {@link #combine} and {@link #merge} return the earliest timestamp of their
- * inputs.
- */
- public abstract static class Defaults<W extends BoundedWindow> extends OutputTimeFn<W> {
-
- protected Defaults() {
- super();
- }
-
- /**
- * {@inheritDoc}
- *
- * @return the earlier of the two timestamps.
- */
- @Override
- public Instant combine(Instant outputTimestamp, Instant otherOutputTimestamp) {
- return Ordering.natural().min(outputTimestamp, otherOutputTimestamp);
- }
-
- /**
- * {@inheritDoc}
- *
- * @return the result of {@link #combine combine(outputTimstamp, otherOutputTimestamp)},
- * by default.
- */
- @Override
- public Instant merge(W resultWindow, Iterable<? extends Instant> mergingTimestamps) {
- return OutputTimeFns.combineOutputTimes(this, mergingTimestamps);
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code false} by default. An {@link OutputTimeFn} that is known to depend only on the
- * window should extend {@link OutputTimeFn.DependsOnlyOnWindow}.
- */
- @Override
- public boolean dependsOnlyOnWindow() {
- return false;
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true} by default.
- */
- @Override
- public boolean dependsOnlyOnEarliestInputTimestamp() {
- return false;
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true} if the two {@link OutputTimeFn} instances have the same class, by
- * default.
- */
- @Override
- public boolean equals(Object other) {
- if (other == null) {
- return false;
- }
-
- return this.getClass().equals(other.getClass());
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(getClass());
- }
- }
-
- /**
- * <b><i>(Experimental)</i></b> Default method implementations for {@link OutputTimeFn} when the
- * output time depends only on the window.
- *
- * <p>To complete an implementation, override {@link #assignOutputTime(BoundedWindow)}.
- */
- public abstract static class DependsOnlyOnWindow<W extends BoundedWindow>
- extends OutputTimeFn<W> {
-
- protected DependsOnlyOnWindow() {
- super();
- }
-
- /**
- * Returns the output timestamp to use for data in the specified {@code window}.
- *
- * <p>Note that the result of this method must be between the maximum possible input timestamp
- * in {@code window} and {@code window.maxTimestamp()} (inclusive on both sides).
- *
- * <p>For example, using {@code Sessions.withGapDuration(gapDuration)}, we know that all input
- * timestamps must lie at least {@code gapDuration} from the end of the session, so
- * {@code window.maxTimestamp() - gapDuration} is an acceptable assigned timestamp.
- *
- * @see #assignOutputTime(Instant, BoundedWindow)
- */
- protected abstract Instant assignOutputTime(W window);
-
- /**
- * {@inheritDoc}
- *
- * @return the result of {#link assignOutputTime(BoundedWindow) assignOutputTime(window)}.
- */
- @Override
- public final Instant assignOutputTime(Instant timestamp, W window) {
- return assignOutputTime(window);
- }
-
- /**
- * {@inheritDoc}
- *
- * @return the same timestamp as both argument timestamps, which are necessarily equal.
- */
- @Override
- public final Instant combine(Instant outputTimestamp, Instant otherOutputTimestamp) {
- return outputTimestamp;
- }
-
- /**
- * {@inheritDoc}
- *
- * @return the result of
- * {@link #assignOutputTime(BoundedWindow) assignOutputTime(resultWindow)}.
- */
- @Override
- public final Instant merge(W resultWindow, Iterable<? extends Instant> mergingTimestamps) {
- return assignOutputTime(resultWindow);
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true}.
- */
- @Override
- public final boolean dependsOnlyOnWindow() {
- return true;
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true}. Since the output time depends only on the window, it can
- * certainly be ascertained given a single input timestamp.
- */
- @Override
- public final boolean dependsOnlyOnEarliestInputTimestamp() {
- return true;
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true} if the two {@link OutputTimeFn} instances have the same class, by
- * default.
- */
- @Override
- public boolean equals(Object other) {
- if (other == null) {
- return false;
- }
-
- return this.getClass().equals(other.getClass());
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(getClass());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java
deleted file mode 100644
index b5d67fa..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.joda.time.Instant;
-
-/**
- * <b><i>(Experimental)</i></b> Static utility methods and provided implementations for
- * {@link OutputTimeFn}.
- */
-@Experimental(Experimental.Kind.OUTPUT_TIME)
-public class OutputTimeFns {
- /**
- * The policy of outputting at the earliest of the input timestamps for non-late input data
- * that led to a computed value.
- *
- * <p>For example, suppose <i>v</i><sub>1</sub> through <i>v</i><sub>n</sub> are all on-time
- * elements being aggregated via some function {@code f} into
- * {@code f}(<i>v</i><sub>1</sub>, ..., <i>v</i><sub>n</sub>. When emitted, the output
- * timestamp of the result will be the earliest of the event time timestamps
- *
- * <p>If data arrives late, it has no effect on the output timestamp.
- */
- public static OutputTimeFn<BoundedWindow> outputAtEarliestInputTimestamp() {
- return new OutputAtEarliestInputTimestamp();
- }
-
- /**
- * The policy of holding the watermark to the latest of the input timestamps
- * for non-late input data that led to a computed value.
- *
- * <p>For example, suppose <i>v</i><sub>1</sub> through <i>v</i><sub>n</sub> are all on-time
- * elements being aggregated via some function {@code f} into
- * {@code f}(<i>v</i><sub>1</sub>, ..., <i>v</i><sub>n</sub>. When emitted, the output
- * timestamp of the result will be the latest of the event time timestamps
- *
- * <p>If data arrives late, it has no effect on the output timestamp.
- */
- public static OutputTimeFn<BoundedWindow> outputAtLatestInputTimestamp() {
- return new OutputAtLatestInputTimestamp();
- }
-
- /**
- * The policy of outputting with timestamps at the end of the window.
- *
- * <p>Note that this output timestamp depends only on the window. See
- * {#link dependsOnlyOnWindow()}.
- *
- * <p>When windows merge, instead of using {@link OutputTimeFn#combine} to obtain an output
- * timestamp for the results in the new window, it is mandatory to obtain a new output
- * timestamp from {@link OutputTimeFn#assignOutputTime} with the new window and an arbitrary
- * timestamp (because it is guaranteed that the timestamp is irrelevant).
- *
- * <p>For non-merging window functions, this {@link OutputTimeFn} works transparently.
- */
- public static OutputTimeFn<BoundedWindow> outputAtEndOfWindow() {
- return new OutputAtEndOfWindow();
- }
-
- /**
- * Applies the given {@link OutputTimeFn} to the given output times, obtaining
- * the output time for a value computed. See {@link OutputTimeFn#combine} for
- * a full specification.
- *
- * @throws IllegalArgumentException if {@code outputTimes} is empty.
- */
- public static Instant combineOutputTimes(
- OutputTimeFn<?> outputTimeFn, Iterable<? extends Instant> outputTimes) {
- checkArgument(
- !Iterables.isEmpty(outputTimes),
- "Collection of output times must not be empty in %s.combineOutputTimes",
- OutputTimeFns.class.getName());
-
- @Nullable
- Instant combinedOutputTime = null;
- for (Instant outputTime : outputTimes) {
- combinedOutputTime =
- combinedOutputTime == null
- ? outputTime : outputTimeFn.combine(combinedOutputTime, outputTime);
- }
- return combinedOutputTime;
- }
-
- /**
- * See {@link #outputAtEarliestInputTimestamp}.
- */
- private static class OutputAtEarliestInputTimestamp extends OutputTimeFn.Defaults<BoundedWindow> {
- @Override
- public Instant assignOutputTime(Instant inputTimestamp, BoundedWindow window) {
- return inputTimestamp;
- }
-
- @Override
- public Instant combine(Instant outputTime, Instant otherOutputTime) {
- return Ordering.natural().min(outputTime, otherOutputTime);
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true}. The result of any combine will be the earliest input timestamp.
- */
- @Override
- public boolean dependsOnlyOnEarliestInputTimestamp() {
- return true;
- }
- }
-
- /**
- * See {@link #outputAtLatestInputTimestamp}.
- */
- private static class OutputAtLatestInputTimestamp extends OutputTimeFn.Defaults<BoundedWindow> {
- @Override
- public Instant assignOutputTime(Instant inputTimestamp, BoundedWindow window) {
- return inputTimestamp;
- }
-
- @Override
- public Instant combine(Instant outputTime, Instant otherOutputTime) {
- return Ordering.natural().max(outputTime, otherOutputTime);
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code false}.
- */
- @Override
- public boolean dependsOnlyOnEarliestInputTimestamp() {
- return false;
- }
- }
-
- private static class OutputAtEndOfWindow extends OutputTimeFn.DependsOnlyOnWindow<BoundedWindow> {
-
- /**
- *{@inheritDoc}
- *
- *@return {@code window.maxTimestamp()}.
- */
- @Override
- protected Instant assignOutputTime(BoundedWindow window) {
- return window.maxTimestamp();
- }
-
- @Override
- public String toString() {
- return getClass().getCanonicalName();
- }
- }
-
- public static RunnerApi.OutputTime toProto(OutputTimeFn<?> outputTimeFn) {
- if (outputTimeFn instanceof OutputAtEarliestInputTimestamp) {
- return RunnerApi.OutputTime.EARLIEST_IN_PANE;
- } else if (outputTimeFn instanceof OutputAtLatestInputTimestamp) {
- return RunnerApi.OutputTime.LATEST_IN_PANE;
- } else if (outputTimeFn instanceof OutputAtEndOfWindow) {
- return RunnerApi.OutputTime.END_OF_WINDOW;
- } else {
- throw new IllegalArgumentException(
- String.format(
- "Cannot convert %s to %s: %s",
- OutputTimeFn.class.getCanonicalName(),
- RunnerApi.OutputTime.class.getCanonicalName(),
- outputTimeFn));
- }
- }
-
- public static OutputTimeFn<?> fromProto(RunnerApi.OutputTime proto) {
- switch (proto) {
- case EARLIEST_IN_PANE:
- return OutputTimeFns.outputAtEarliestInputTimestamp();
- case LATEST_IN_PANE:
- return OutputTimeFns.outputAtLatestInputTimestamp();
- case END_OF_WINDOW:
- return OutputTimeFns.outputAtEndOfWindow();
- case UNRECOGNIZED:
- default:
- // Whether or not it is proto that cannot recognize it (due to the version of the
- // generated code we link to) or the switch hasn't been updated to handle it,
- // the situation is the same: we don't know what this OutputTime means
- throw new IllegalArgumentException(
- String.format(
- "Cannot convert unknown %s to %s: %s",
- RunnerApi.OutputTime.class.getCanonicalName(),
- OutputTimeFn.class.getCanonicalName(),
- proto));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java
new file mode 100644
index 0000000..39fe8a9
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+import java.util.Arrays;
+import java.util.Collections;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.joda.time.Instant;
+
+/**
+ * Policies for combining timestamps that occur within a window.
+ */
+@Experimental(Experimental.Kind.OUTPUT_TIME)
+public enum TimestampCombiner {
+ /**
+ * The policy of taking at the earliest of a set of timestamps.
+ *
+ * <p>When used in windowed aggregations, the timestamps of non-late inputs will be combined after
+ * they are shifted by the {@link WindowFn} (to allow downstream watermark progress).
+ *
+ * <p>If data arrives late, it has no effect on the output timestamp.
+ */
+ EARLIEST {
+ @Override
+ public Instant combine(Iterable<? extends Instant> timestamps) {
+ return Ordering.natural().min(timestamps);
+ }
+
+ @Override
+ public Instant merge(BoundedWindow intoWindow, Iterable<? extends Instant> mergingTimestamps) {
+ return combine(mergingTimestamps);
+ }
+
+ @Override
+ public boolean dependsOnlyOnEarliestTimestamp() {
+ return true;
+ }
+
+ @Override
+ public boolean dependsOnlyOnWindow() {
+ return false;
+ }
+ },
+
+ /**
+ * The policy of taking the latest of a set of timestamps.
+ *
+ * <p>When used in windowed aggregations, the timestamps of non-late inputs will be combined after
+ * they are shifted by the {@link WindowFn} (to allow downstream watermark progress).
+ *
+ * <p>If data arrives late, it has no effect on the output timestamp.
+ */
+ LATEST {
+ @Override
+ public Instant combine(Iterable<? extends Instant> timestamps) {
+ return Ordering.natural().max(timestamps);
+ }
+
+ @Override
+ public Instant merge(BoundedWindow intoWindow, Iterable<? extends Instant> mergingTimestamps) {
+ return combine(mergingTimestamps);
+ }
+
+ @Override
+ public boolean dependsOnlyOnEarliestTimestamp() {
+ return false;
+ }
+
+ @Override
+ public boolean dependsOnlyOnWindow() {
+ return false;
+ }
+ },
+
+ /**
+ * The policy of using the end of the window, regardless of input timestamps.
+ *
+ * <p>When used in windowed aggregations, the timestamps of non-late inputs will be combined after
+ * they are shifted by the {@link WindowFn} (to allow downstream watermark progress).
+ *
+ * <p>If data arrives late, it has no effect on the output timestamp.
+ */
+ END_OF_WINDOW {
+ @Override
+ public Instant combine(Iterable<? extends Instant> timestamps) {
+ checkArgument(Iterables.size(timestamps) > 0);
+ return Iterables.get(timestamps, 0);
+ }
+
+ @Override
+ public Instant merge(BoundedWindow intoWindow, Iterable<? extends Instant> mergingTimestamps) {
+ return intoWindow.maxTimestamp();
+ }
+
+ @Override
+ public boolean dependsOnlyOnEarliestTimestamp() {
+ return false;
+ }
+
+ @Override
+ public boolean dependsOnlyOnWindow() {
+ return true;
+ }
+ };
+
+ /**
+ * Combines the given times, which must be from the same window and must have been passed through
+ * {@link #merge}.
+ *
+ * <ul>
+ * <li>{@code combine} must be commutative: {@code combine(a, b).equals(combine(b, a))}.
+ * <li>{@code combine} must be associative: {@code combine(a, combine(b,
+ * c)).equals(combine(combine(a, b), c))}.
+ * </ul>
+ */
+ public abstract Instant combine(Iterable<? extends Instant> timestamps);
+
+ /**
+ * Merges the given timestamps, which may have originated in separate windows, into the context of
+ * the result window.
+ */
+ public abstract Instant merge(
+ BoundedWindow intoWindow, Iterable<? extends Instant> mergingTimestamps);
+
+ /**
+ * Shorthand for {@link #merge} with just one element, to place it into the context of
+ * a window.
+ *
+ * <p>For example, the {@link #END_OF_WINDOW} policy moves the timestamp to the end of the window.
+ */
+ public final Instant assign(BoundedWindow intoWindow, Instant timestamp) {
+ return merge(intoWindow, Collections.singleton(timestamp));
+ }
+
+ /**
+ * Varargs variant of {@link #combine}.
+ */
+ public final Instant combine(Instant... timestamps) {
+ return combine(Arrays.asList(timestamps));
+ }
+
+ /**
+ * Varargs variant of {@link #merge}.
+ */
+ public final Instant merge(BoundedWindow intoWindow, Instant... timestamps) {
+ return merge(intoWindow, Arrays.asList(timestamps));
+ }
+
+ /**
+ * Returns {@code true} if the result of combination of many output timestamps actually depends
+ * only on the earliest.
+ *
+ * <p>This may allow optimizations when it is very efficient to retrieve the earliest timestamp
+ * to be combined.
+ */
+ public abstract boolean dependsOnlyOnEarliestTimestamp();
+
+ /**
+ * Returns {@code true} if the result does not depend on what outputs were combined but only
+ * the window they are in. The canonical example is if all timestamps are sure to
+ * be the end of the window.
+ *
+ * <p>This may allow optimizations, since it is typically very efficient to retrieve the window
+ * and combining output timestamps is not necessary.
+ */
+ public abstract boolean dependsOnlyOnWindow();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index 1000ff7..cb7b430 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -193,7 +193,7 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
@Nullable abstract AccumulationMode getAccumulationMode();
@Nullable abstract Duration getAllowedLateness();
@Nullable abstract ClosingBehavior getClosingBehavior();
- @Nullable abstract OutputTimeFn<?> getOutputTimeFn();
+ @Nullable abstract TimestampCombiner getTimestampCombiner();
abstract Builder<T> toBuilder();
@@ -204,7 +204,7 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
abstract Builder<T> setAccumulationMode(AccumulationMode mode);
abstract Builder<T> setAllowedLateness(Duration allowedLateness);
abstract Builder<T> setClosingBehavior(ClosingBehavior closingBehavior);
- abstract Builder<T> setOutputTimeFn(OutputTimeFn<?> outputTimeFn);
+ abstract Builder<T> setTimestampCombiner(TimestampCombiner timestampCombiner);
abstract Window<T> build();
}
@@ -273,12 +273,12 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
}
/**
- * <b><i>(Experimental)</i></b> Override the default {@link OutputTimeFn}, to control
+ * <b><i>(Experimental)</i></b> Override the default {@link TimestampCombiner}, to control
* the output timestamp of values output from a {@link GroupByKey} operation.
*/
@Experimental(Kind.OUTPUT_TIME)
- public Window<T> withOutputTimeFn(OutputTimeFn<?> outputTimeFn) {
- return toBuilder().setOutputTimeFn(outputTimeFn).build();
+ public Window<T> withTimestampCombiner(TimestampCombiner timestampCombiner) {
+ return toBuilder().setTimestampCombiner(timestampCombiner).build();
}
/**
@@ -300,8 +300,6 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
* Get the output strategy of this {@link Window Window PTransform}. For internal use
* only.
*/
- // Rawtype cast of OutputTimeFn cannot be eliminated with intermediate variable, as it is
- // casting between wildcards
public WindowingStrategy<?, ?> getOutputStrategyInternal(
WindowingStrategy<?, ?> inputStrategy) {
WindowingStrategy<?, ?> result = inputStrategy;
@@ -320,8 +318,8 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
if (getClosingBehavior() != null) {
result = result.withClosingBehavior(getClosingBehavior());
}
- if (getOutputTimeFn() != null) {
- result = result.withOutputTimeFn(getOutputTimeFn());
+ if (getTimestampCombiner() != null) {
+ result = result.withTimestampCombiner(getTimestampCombiner());
}
return result;
}
@@ -411,9 +409,9 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
.withLabel("Window Closing Behavior"));
}
- if (getOutputTimeFn() != null) {
- builder.add(DisplayData.item("outputTimeFn", getOutputTimeFn().getClass())
- .withLabel("Output Time Function"));
+ if (getTimestampCombiner() != null) {
+ builder.add(DisplayData.item("timestampCombiner", getTimestampCombiner().toString())
+ .withLabel("Timestamp Combiner"));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
index 0c27c4f..706e039 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
@@ -22,7 +22,7 @@ import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -57,13 +57,14 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollecti
// If the input has already had its windows merged, then the GBK that performed the merge
// will have set originalStrategy.getWindowFn() to InvalidWindows, causing the GBK contained
// here to fail. Instead, we install a valid WindowFn that leaves all windows unchanged.
- // The OutputTimeFn is set to ensure the GroupByKey does not shift elements forwards in time.
+ // The TimestampCombiner is set to ensure the GroupByKey does not shift elements forwards in
+ // time.
// Because this outputs as fast as possible, this should not hold the watermark.
Window<KV<K, V>> rewindow =
Window.<KV<K, V>>into(new IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
.triggering(new ReshuffleTrigger<>())
.discardingFiredPanes()
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+ .withTimestampCombiner(TimestampCombiner.EARLIEST)
.withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
return input.apply(rewindow)