You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/27 03:32:54 UTC
[2/4] beam git commit: Revert "Replace OutputTimeFn UDF with
TimestampCombiner enum"
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
index eac465c..2967f2c 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
@@ -26,9 +26,8 @@ import java.util.List;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -54,9 +53,8 @@ public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends Bou
Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
@SuppressWarnings("unchecked")
- TimestampCombiner timestampCombiner =
- (TimestampCombiner) windowingStrategy.getTimestampCombiner();
- WindowFn<Object, W> windowFn = windowingStrategy.getWindowFn();
+ OutputTimeFn<? super BoundedWindow> outputTimeFn =
+ (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
// get all elements so that we can sort them, has to fit into
// memory
@@ -90,19 +88,18 @@ public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends Bou
// create accumulator using the first elements key
WindowedValue<KV<K, InputT>> currentValue = iterator.next();
K key = currentValue.getValue().getKey();
- W currentWindow = (W) Iterables.getOnlyElement(currentValue.getWindows());
+ BoundedWindow currentWindow = Iterables.getOnlyElement(currentValue.getWindows());
InputT firstValue = currentValue.getValue().getValue();
AccumT accumulator = flinkCombiner.firstInput(
key, firstValue, options, sideInputReader, currentValue.getWindows());
- // we use this to keep track of the timestamps assigned by the TimestampCombiner
+ // we use this to keep track of the timestamps assigned by the OutputTimeFn
Instant windowTimestamp =
- timestampCombiner.assign(
- currentWindow, windowFn.getOutputTime(currentValue.getTimestamp(), currentWindow));
+ outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow);
while (iterator.hasNext()) {
WindowedValue<KV<K, InputT>> nextValue = iterator.next();
- W nextWindow = (W) Iterables.getOnlyElement(nextValue.getWindows());
+ BoundedWindow nextWindow = Iterables.getOnlyElement(nextValue.getWindows());
if (currentWindow.equals(nextWindow)) {
// continue accumulating and merge windows
@@ -111,12 +108,9 @@ public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends Bou
accumulator = flinkCombiner.addInput(key, accumulator, value,
options, sideInputReader, currentValue.getWindows());
- windowTimestamp =
- timestampCombiner.combine(
- windowTimestamp,
- timestampCombiner.assign(
- currentWindow,
- windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow)));
+ windowTimestamp = outputTimeFn.combine(
+ windowTimestamp,
+ outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow));
} else {
// emit the value that we currently have
@@ -133,9 +127,7 @@ public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends Bou
InputT value = nextValue.getValue().getValue();
accumulator = flinkCombiner.firstInput(key, value,
options, sideInputReader, currentValue.getWindows());
- windowTimestamp =
- timestampCombiner.assign(
- currentWindow, windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow));
+ windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
index d015c38..3203446 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.util.CombineContextFactory;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
@@ -176,9 +176,9 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
- StateTag<? super K, WatermarkHoldState> address,
- TimestampCombiner timestampCombiner) {
+ public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+ StateTag<? super K, WatermarkHoldState<W>> address,
+ OutputTimeFn<? super W> outputTimeFn) {
throw new UnsupportedOperationException(
String.format("%s is not supported", WatermarkHoldState.class.getSimpleName()));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
index 2dd7c96..24b340e 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
@@ -38,7 +38,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
@@ -186,9 +186,9 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
- StateTag<? super K, WatermarkHoldState> address,
- TimestampCombiner timestampCombiner) {
+ public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+ StateTag<? super K, WatermarkHoldState<W>> address,
+ OutputTimeFn<? super W> outputTimeFn) {
throw new UnsupportedOperationException(
String.format("%s is not supported", CombiningState.class.getSimpleName()));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
index 17ea62a..2bf0bf1 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.MapState;
@@ -146,9 +146,9 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
- StateTag<? super K, WatermarkHoldState> address,
- TimestampCombiner timestampCombiner) {
+ public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+ StateTag<? super K, WatermarkHoldState<W>> address,
+ OutputTimeFn<? super W> outputTimeFn) {
throw new UnsupportedOperationException(
String.format("%s is not supported", CombiningState.class.getSimpleName()));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index 878c914..4f961e5 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -32,7 +32,7 @@ import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.CombineContextFactory;
import org.apache.beam.sdk.util.state.BagState;
@@ -185,12 +185,12 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
- StateTag<? super K, WatermarkHoldState> address,
- TimestampCombiner timestampCombiner) {
+ public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+ StateTag<? super K, WatermarkHoldState<W>> address,
+ OutputTimeFn<? super W> outputTimeFn) {
return new FlinkWatermarkHoldState<>(
- flinkStateBackend, FlinkStateInternals.this, address, namespace, timestampCombiner);
+ flinkStateBackend, FlinkStateInternals.this, address, namespace, outputTimeFn);
}
});
}
@@ -912,9 +912,9 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
}
private static class FlinkWatermarkHoldState<K, W extends BoundedWindow>
- implements WatermarkHoldState {
- private final StateTag<? super K, WatermarkHoldState> address;
- private final TimestampCombiner timestampCombiner;
+ implements WatermarkHoldState<W> {
+ private final StateTag<? super K, WatermarkHoldState<W>> address;
+ private final OutputTimeFn<? super W> outputTimeFn;
private final StateNamespace namespace;
private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
private final FlinkStateInternals<K> flinkStateInternals;
@@ -923,11 +923,11 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
public FlinkWatermarkHoldState(
KeyedStateBackend<ByteBuffer> flinkStateBackend,
FlinkStateInternals<K> flinkStateInternals,
- StateTag<? super K, WatermarkHoldState> address,
+ StateTag<? super K, WatermarkHoldState<W>> address,
StateNamespace namespace,
- TimestampCombiner timestampCombiner) {
+ OutputTimeFn<? super W> outputTimeFn) {
this.address = address;
- this.timestampCombiner = timestampCombiner;
+ this.outputTimeFn = outputTimeFn;
this.namespace = namespace;
this.flinkStateBackend = flinkStateBackend;
this.flinkStateInternals = flinkStateInternals;
@@ -937,12 +937,12 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
}
@Override
- public TimestampCombiner getTimestampCombiner() {
- return timestampCombiner;
+ public OutputTimeFn<? super W> getOutputTimeFn() {
+ return outputTimeFn;
}
@Override
- public WatermarkHoldState readLater() {
+ public WatermarkHoldState<W> readLater() {
return this;
}
@@ -983,7 +983,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
state.update(value);
flinkStateInternals.watermarkHolds.put(namespace.stringKey(), value);
} else {
- Instant combined = timestampCombiner.combine(current, value);
+ Instant combined = outputTimeFn.combine(current, value);
state.update(combined);
flinkStateInternals.watermarkHolds.put(namespace.stringKey(), combined);
}
@@ -1035,7 +1035,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
if (!address.equals(that.address)) {
return false;
}
- if (!timestampCombiner.equals(that.timestampCombiner)) {
+ if (!outputTimeFn.equals(that.outputTimeFn)) {
return false;
}
return namespace.equals(that.namespace);
@@ -1045,7 +1045,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
@Override
public int hashCode() {
int result = address.hashCode();
- result = 31 * result + timestampCombiner.hashCode();
+ result = 31 * result + outputTimeFn.hashCode();
result = 31 * result + namespace.hashCode();
return result;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
index 17c43bf..d140271 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
@@ -77,12 +77,14 @@ public class FlinkStateInternalsTest {
"sumInteger", VarIntCoder.of(), Sum.ofIntegers());
private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
StateTags.bag("stringBag", StringUtf8Coder.of());
- private static final StateTag<Object, WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
- StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
- private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR =
- StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
- private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR =
- StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
+ private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
+ WATERMARK_EARLIEST_ADDR =
+ StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
+ private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
+ WATERMARK_LATEST_ADDR =
+ StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp());
+ private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR =
+ StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow());
FlinkStateInternals<String> underTest;
@@ -272,7 +274,7 @@ public class FlinkStateInternalsTest {
@Test
public void testWatermarkEarliestState() throws Exception {
- WatermarkHoldState value =
+ WatermarkHoldState<BoundedWindow> value =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
// State instances are cached, but depend on the namespace.
@@ -296,7 +298,7 @@ public class FlinkStateInternalsTest {
@Test
public void testWatermarkLatestState() throws Exception {
- WatermarkHoldState value =
+ WatermarkHoldState<BoundedWindow> value =
underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
// State instances are cached, but depend on the namespace.
@@ -320,7 +322,7 @@ public class FlinkStateInternalsTest {
@Test
public void testWatermarkEndOfWindowState() throws Exception {
- WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
+ WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
// State instances are cached, but depend on the namespace.
assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
@@ -337,7 +339,7 @@ public class FlinkStateInternalsTest {
@Test
public void testWatermarkStateIsEmpty() throws Exception {
- WatermarkHoldState value =
+ WatermarkHoldState<BoundedWindow> value =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
assertThat(value.isEmpty().read(), Matchers.is(true));
@@ -351,9 +353,9 @@ public class FlinkStateInternalsTest {
@Test
public void testMergeEarliestWatermarkIntoSource() throws Exception {
- WatermarkHoldState value1 =
+ WatermarkHoldState<BoundedWindow> value1 =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
- WatermarkHoldState value2 =
+ WatermarkHoldState<BoundedWindow> value2 =
underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
value1.add(new Instant(3000));
@@ -370,11 +372,11 @@ public class FlinkStateInternalsTest {
@Test
public void testMergeLatestWatermarkIntoSource() throws Exception {
- WatermarkHoldState value1 =
+ WatermarkHoldState<BoundedWindow> value1 =
underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
- WatermarkHoldState value2 =
+ WatermarkHoldState<BoundedWindow> value2 =
underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
- WatermarkHoldState value3 =
+ WatermarkHoldState<BoundedWindow> value3 =
underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
value1.add(new Instant(3000));
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
index c967521..725e9d3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
@@ -34,7 +34,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
@@ -166,10 +166,10 @@ class SparkStateInternals<K> implements StateInternals<K> {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
- StateTag<? super K, WatermarkHoldState> address,
- TimestampCombiner timestampCombiner) {
- return new SparkWatermarkHoldState(namespace, address, timestampCombiner);
+ public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+ StateTag<? super K, WatermarkHoldState<W>> address,
+ OutputTimeFn<? super W> outputTimeFn) {
+ return new SparkWatermarkHoldState<>(namespace, address, outputTimeFn);
}
}
@@ -250,21 +250,21 @@ class SparkStateInternals<K> implements StateInternals<K> {
}
}
- private class SparkWatermarkHoldState extends AbstractState<Instant>
- implements WatermarkHoldState {
+ private class SparkWatermarkHoldState<W extends BoundedWindow>
+ extends AbstractState<Instant> implements WatermarkHoldState<W> {
- private final TimestampCombiner timestampCombiner;
+ private final OutputTimeFn<? super W> outputTimeFn;
public SparkWatermarkHoldState(
StateNamespace namespace,
- StateTag<?, WatermarkHoldState> address,
- TimestampCombiner timestampCombiner) {
+ StateTag<?, WatermarkHoldState<W>> address,
+ OutputTimeFn<? super W> outputTimeFn) {
super(namespace, address, InstantCoder.of());
- this.timestampCombiner = timestampCombiner;
+ this.outputTimeFn = outputTimeFn;
}
@Override
- public SparkWatermarkHoldState readLater() {
+ public SparkWatermarkHoldState<W> readLater() {
return this;
}
@@ -276,10 +276,7 @@ class SparkStateInternals<K> implements StateInternals<K> {
@Override
public void add(Instant outputTime) {
Instant combined = read();
- combined =
- (combined == null)
- ? outputTime
- : getTimestampCombiner().combine(combined, outputTime);
+ combined = (combined == null) ? outputTime : outputTimeFn.combine(combined, outputTime);
writeValue(combined);
}
@@ -298,8 +295,8 @@ class SparkStateInternals<K> implements StateInternals<K> {
}
@Override
- public TimestampCombiner getTimestampCombiner() {
- return timestampCombiner;
+ public OutputTimeFn<? super W> getOutputTimeFn() {
+ return outputTimeFn;
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
index 7d06d6b..fa1c3fc 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
@@ -50,7 +50,7 @@ import org.apache.beam.sdk.values.TupleTag;
public class SparkAbstractCombineFn implements Serializable {
protected final SparkRuntimeContext runtimeContext;
protected final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs;
- protected final WindowingStrategy<?, BoundedWindow> windowingStrategy;
+ protected final WindowingStrategy<?, ?> windowingStrategy;
public SparkAbstractCombineFn(
@@ -59,7 +59,7 @@ public class SparkAbstractCombineFn implements Serializable {
WindowingStrategy<?, ?> windowingStrategy) {
this.runtimeContext = runtimeContext;
this.sideInputs = sideInputs;
- this.windowingStrategy = (WindowingStrategy<?, BoundedWindow>) windowingStrategy;
+ this.windowingStrategy = windowingStrategy;
}
// each Spark task should get it's own copy of this SparkKeyedCombineFn, and since Spark tasks
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
index 7d026c6..23f5d20 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
@@ -29,9 +29,8 @@ import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
@@ -71,8 +70,9 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
// sort exploded inputs.
Iterable<WindowedValue<InputT>> sortedInputs = sortByWindows(input.explodeWindows());
- TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
- WindowFn<?, BoundedWindow> windowFn = windowingStrategy.getWindowFn();
+ @SuppressWarnings("unchecked")
+ OutputTimeFn<? super BoundedWindow> outputTimeFn =
+ (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
//--- inputs iterator, by window order.
final Iterator<WindowedValue<InputT>> iterator = sortedInputs.iterator();
@@ -84,13 +84,9 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
accumulator = combineFn.addInput(accumulator, currentInput.getValue(),
ctxtForInput(currentInput));
- // keep track of the timestamps assigned by the TimestampCombiner.
+ // keep track of the timestamps assigned by the OutputTimeFn.
Instant windowTimestamp =
- timestampCombiner.assign(
- currentWindow,
- windowingStrategy
- .getWindowFn()
- .getOutputTime(currentInput.getTimestamp(), currentWindow));
+ outputTimeFn.assignOutputTime(currentInput.getTimestamp(), currentWindow);
// accumulate the next windows, or output.
List<WindowedValue<AccumT>> output = Lists.newArrayList();
@@ -113,13 +109,8 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
// keep accumulating and carry on ;-)
accumulator = combineFn.addInput(accumulator, nextValue.getValue(),
ctxtForInput(nextValue));
- windowTimestamp =
- timestampCombiner.merge(
- currentWindow,
- windowTimestamp,
- windowingStrategy
- .getWindowFn()
- .getOutputTime(nextValue.getTimestamp(), currentWindow));
+ windowTimestamp = outputTimeFn.combine(windowTimestamp,
+ outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow));
} else {
// moving to the next window, first add the current accumulation to output
// and initialize the accumulator.
@@ -130,8 +121,7 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
accumulator = combineFn.addInput(accumulator, nextValue.getValue(),
ctxtForInput(nextValue));
currentWindow = nextWindow;
- windowTimestamp = timestampCombiner.assign(currentWindow,
- windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow));
+ windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
}
}
@@ -172,7 +162,8 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
Iterable<WindowedValue<AccumT>> sortedAccumulators = sortByWindows(accumulators);
@SuppressWarnings("unchecked")
- TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
+ OutputTimeFn<? super BoundedWindow> outputTimeFn =
+ (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
//--- accumulators iterator, by window order.
final Iterator<WindowedValue<AccumT>> iterator = sortedAccumulators.iterator();
@@ -183,7 +174,7 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
List<AccumT> currentWindowAccumulators = Lists.newArrayList();
currentWindowAccumulators.add(currentValue.getValue());
- // keep track of the timestamps assigned by the TimestampCombiner,
+ // keep track of the timestamps assigned by the OutputTimeFn,
// in createCombiner we already merge the timestamps assigned
// to individual elements, here we will just merge them.
List<Instant> windowTimestamps = Lists.newArrayList();
@@ -215,7 +206,7 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
// add the current accumulation to the output and initialize the accumulation.
// merge the timestamps of all accumulators to merge.
- Instant mergedTimestamp = timestampCombiner.merge(currentWindow, windowTimestamps);
+ Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps);
// merge accumulators.
// transforming a KV<K, Iterable<AccumT>> into a KV<K, Iterable<AccumT>>.
@@ -240,7 +231,7 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
}
// merge the last chunk of accumulators.
- Instant mergedTimestamp = timestampCombiner.merge(currentWindow, windowTimestamps);
+ Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps);
Iterable<AccumT> accumsToMerge = Iterables.unmodifiableIterable(currentWindowAccumulators);
WindowedValue<Iterable<AccumT>> preMergeWindowedValue = WindowedValue.of(
accumsToMerge, mergedTimestamp, currentWindow, PaneInfo.NO_FIRING);
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
index 66c03bc..b5d243f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
@@ -29,9 +29,8 @@ import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
@@ -73,8 +72,9 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
// sort exploded inputs.
Iterable<WindowedValue<KV<K, InputT>>> sortedInputs = sortByWindows(wkvi.explodeWindows());
- TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
- WindowFn<?, BoundedWindow> windowFn = windowingStrategy.getWindowFn();
+ @SuppressWarnings("unchecked")
+ OutputTimeFn<? super BoundedWindow> outputTimeFn =
+ (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
//--- inputs iterator, by window order.
final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInputs.iterator();
@@ -87,13 +87,9 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
accumulator = combineFn.addInput(key, accumulator, currentInput.getValue().getValue(),
ctxtForInput(currentInput));
- // keep track of the timestamps assigned by the TimestampCombiner.
+ // keep track of the timestamps assigned by the OutputTimeFn.
Instant windowTimestamp =
- timestampCombiner.assign(
- currentWindow,
- windowingStrategy
- .getWindowFn()
- .getOutputTime(currentInput.getTimestamp(), currentWindow));
+ outputTimeFn.assignOutputTime(currentInput.getTimestamp(), currentWindow);
// accumulate the next windows, or output.
List<WindowedValue<KV<K, AccumT>>> output = Lists.newArrayList();
@@ -116,12 +112,8 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
// keep accumulating and carry on ;-)
accumulator = combineFn.addInput(key, accumulator, nextValue.getValue().getValue(),
ctxtForInput(nextValue));
- windowTimestamp =
- timestampCombiner.combine(
- windowTimestamp,
- timestampCombiner.assign(
- currentWindow,
- windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow)));
+ windowTimestamp = outputTimeFn.combine(windowTimestamp,
+ outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow));
} else {
// moving to the next window, first add the current accumulation to output
// and initialize the accumulator.
@@ -132,9 +124,7 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
accumulator = combineFn.addInput(key, accumulator, nextValue.getValue().getValue(),
ctxtForInput(nextValue));
currentWindow = nextWindow;
- windowTimestamp =
- timestampCombiner.assign(
- currentWindow, windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow));
+ windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
}
}
@@ -180,7 +170,8 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
Iterable<WindowedValue<KV<K, AccumT>>> sortedAccumulators = sortByWindows(accumulators);
@SuppressWarnings("unchecked")
- TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
+ OutputTimeFn<? super BoundedWindow> outputTimeFn =
+ (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
//--- accumulators iterator, by window order.
final Iterator<WindowedValue<KV<K, AccumT>>> iterator = sortedAccumulators.iterator();
@@ -192,7 +183,7 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
List<AccumT> currentWindowAccumulators = Lists.newArrayList();
currentWindowAccumulators.add(currentValue.getValue().getValue());
- // keep track of the timestamps assigned by the TimestampCombiner,
+ // keep track of the timestamps assigned by the OutputTimeFn,
// in createCombiner we already merge the timestamps assigned
// to individual elements, here we will just merge them.
List<Instant> windowTimestamps = Lists.newArrayList();
@@ -224,7 +215,7 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
// add the current accumulation to the output and initialize the accumulation.
// merge the timestamps of all accumulators to merge.
- Instant mergedTimestamp = timestampCombiner.merge(currentWindow, windowTimestamps);
+ Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps);
// merge accumulators.
// transforming a KV<K, Iterable<AccumT>> into a KV<K, Iterable<AccumT>>.
@@ -250,7 +241,7 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
}
// merge the last chunk of accumulators.
- Instant mergedTimestamp = timestampCombiner.merge(currentWindow, windowTimestamps);
+ Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps);
Iterable<AccumT> accumsToMerge = Iterables.unmodifiableIterable(currentWindowAccumulators);
WindowedValue<KV<K, Iterable<AccumT>>> preMergeWindowedValue = WindowedValue.of(
KV.of(key, accumsToMerge), mergedTimestamp, currentWindow, PaneInfo.NO_FIRING);
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 58b5a84..6c46453 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -135,6 +135,11 @@
<dependencies>
<dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-common-runner-api</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
index e8c2f8d..63e7903 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
@@ -17,14 +17,11 @@
*/
package org.apache.beam.sdk.testing;
-import static com.google.common.base.Preconditions.checkArgument;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -38,7 +35,8 @@ import javax.annotation.Nullable;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
@@ -254,19 +252,20 @@ public class WindowFnTestUtils {
/**
* Verifies that later-ending merged windows from any of the timestamps hold up output of
- * earlier-ending windows, using the provided {@link WindowFn} and {@link TimestampCombiner}.
+ * earlier-ending windows, using the provided {@link WindowFn} and {@link OutputTimeFn}.
*
* <p>Given a list of lists of timestamps, where each list is expected to merge into a single
* window with end times in ascending order, assigns and merges windows for each list (as though
- * each were a separate key/user session). Then combines each timestamp in the list according to
- * the provided {@link TimestampCombiner}.
+ * each were a separate key/user session). Then maps each timestamp in the list according to
+ * {@link OutputTimeFn#assignOutputTime outputTimeFn.assignOutputTime()} and
+ * {@link OutputTimeFn#combine outputTimeFn.combine()}.
*
* <p>Verifies that a overlapping windows do not hold each other up via the watermark.
*/
public static <T, W extends IntervalWindow>
void validateGetOutputTimestamps(
WindowFn<T, W> windowFn,
- TimestampCombiner timestampCombiner,
+ OutputTimeFn<? super W> outputTimeFn,
List<List<Long>> timestampsPerWindow) throws Exception {
// Assign windows to each timestamp, then merge them, storing the merged windows in
@@ -301,11 +300,10 @@ public class WindowFnTestUtils {
List<Instant> outputInstants = new ArrayList<>();
for (long inputTimestamp : timestampsForWindow) {
- outputInstants.add(
- assignOutputTime(timestampCombiner, new Instant(inputTimestamp), window));
+ outputInstants.add(outputTimeFn.assignOutputTime(new Instant(inputTimestamp), window));
}
- combinedOutputTimestamps.add(combineOutputTimes(timestampCombiner, outputInstants));
+ combinedOutputTimestamps.add(OutputTimeFns.combineOutputTimes(outputTimeFn, outputInstants));
}
// Consider windows in increasing order of max timestamp; ensure the output timestamp is after
@@ -323,37 +321,4 @@ public class WindowFnTestUtils {
earlierEndingWindow = window;
}
}
-
- private static Instant assignOutputTime(
- TimestampCombiner timestampCombiner, Instant inputTimestamp, BoundedWindow window) {
- switch (timestampCombiner) {
- case EARLIEST:
- case LATEST:
- return inputTimestamp;
- case END_OF_WINDOW:
- return window.maxTimestamp();
- default:
- throw new IllegalArgumentException(
- String.format("Unknown %s: %s", TimestampCombiner.class, timestampCombiner));
- }
- }
-
- private static Instant combineOutputTimes(
- TimestampCombiner timestampCombiner, Iterable<Instant> outputInstants) {
- checkArgument(
- !Iterables.isEmpty(outputInstants),
- "Cannot combine zero instants with %s",
- timestampCombiner);
- switch(timestampCombiner) {
- case EARLIEST:
- return Ordering.natural().min(outputInstants);
- case LATEST:
- return Ordering.natural().max(outputInstants);
- case END_OF_WINDOW:
- return outputInstants.iterator().next();
- default:
- throw new IllegalArgumentException(
- String.format("Unknown %s: %s", TimestampCombiner.class, timestampCombiner));
- }
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
index d9c4c9f..cc92102 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
@@ -25,7 +25,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -98,7 +97,7 @@ import org.apache.beam.sdk.values.PCollection.IsBounded;
* for details on the estimation.
*
* <p>The timestamp for each emitted pane is determined by the
- * {@link Window#withTimestampCombiner(TimestampCombiner)} windowing operation}.
+ * {@link Window#withOutputTimeFn windowing operation}.
* The output {@code PCollection} will have the same {@link WindowFn}
* as the input.
*
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java
new file mode 100644
index 0000000..0efd278
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import com.google.common.collect.Ordering;
+import java.io.Serializable;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.joda.time.Instant;
+
+/**
+ * <b><i>(Experimental)</i></b> A function from timestamps of input values to the timestamp for a
+ * computed value.
+ *
+ * <p>The function is represented via three components:
+ * <ol>
+ * <li>{@link #assignOutputTime} calculates an output timestamp for any input
+ * value in a particular window.</li>
+ * <li>The output timestamps for all non-late input values within a window are combined
+ * according to {@link #combine combine()}, a commutative and associative operation on
+ * the output timestamps.</li>
+ * <li>The output timestamp when windows merge is provided by {@link #merge merge()}.</li>
+ * </ol>
+ *
+ * <p>This abstract class cannot be subclassed directly, by design: it may grow
+ * in consumer-compatible ways that require mutually-exclusive default implementations. To
+ * create a concrete subclass, extend {@link OutputTimeFn.Defaults} or
+ * {@link OutputTimeFn.DependsOnlyOnWindow}. Note that as long as this class remains
+ * experimental, we may also choose to change it in arbitrary backwards-incompatible ways.
+ *
+ * @param <W> the type of window. Contravariant: methods accepting any subtype of
+ * {@code OutputTimeFn<W>} should use the parameter type {@code OutputTimeFn<? super W>}.
+ */
+@Experimental(Experimental.Kind.OUTPUT_TIME)
+public abstract class OutputTimeFn<W extends BoundedWindow> implements Serializable {
+
+ protected OutputTimeFn() { }
+
+ /**
+ * Returns the output timestamp to use for data depending on the given
+ * {@code inputTimestamp} in the specified {@code window}.
+ *
+ * <p>The result of this method must be between {@code inputTimestamp} and
+ * {@code window.maxTimestamp()} (inclusive on both sides).
+ *
+ * <p>This function must be monotonic across input timestamps. Specifically, if {@code A < B},
+ * then {@code assignOutputTime(A, window) <= assignOutputTime(B, window)}.
+ *
+ * <p>For a {@link WindowFn} that doesn't produce overlapping windows, this can (and typically
+ * should) just return {@code inputTimestamp}. In the presence of overlapping windows, it is
+ * suggested that the result in later overlapping windows is past the end of earlier windows
+ * so that the later windows don't prevent the watermark from
+ * progressing past the end of the earlier window.
+ *
+ * <p>See the overview of {@link OutputTimeFn} for the consistency properties required
+ * between {@link #assignOutputTime}, {@link #combine}, and {@link #merge}.
+ */
+ public abstract Instant assignOutputTime(Instant inputTimestamp, W window);
+
+ /**
+ * Combines the given output times, which must be from the same window, into an output time
+ * for a computed value.
+ *
+ * <ul>
+ * <li>{@code combine} must be commutative: {@code combine(a, b).equals(combine(b, a))}.</li>
+ * <li>{@code combine} must be associative:
+ * {@code combine(a, combine(b, c)).equals(combine(combine(a, b), c))}.</li>
+ * </ul>
+ */
+ public abstract Instant combine(Instant outputTime, Instant otherOutputTime);
+
+ /**
+ * Merges the given output times, presumed to be combined output times for windows that
+ * are merging, into an output time for the {@code resultWindow}.
+ *
+ * <p>When windows {@code w1} and {@code w2} merge to become a new window {@code w1plus2},
+ * then {@link #merge} must be implemented such that the output time is the same as
+ * if all timestamps were assigned in {@code w1plus2}. Formally:
+ *
+ * <p>{@code fn.merge(w, fn.assignOutputTime(t1, w1), fn.assignOutputTime(t2, w2))}
+ *
+ * <p>must be equal to
+ *
+ * <p>{@code fn.combine(fn.assignOutputTime(t1, w1plus2), fn.assignOutputTime(t2, w1plus2))}
+ *
+ * <p>If the assigned time depends only on the window, the correct implementation of
+ * {@link #merge merge()} necessarily returns the result of
+ * {@link #assignOutputTime assignOutputTime(t1, w1plus2)}
+ * (which equals {@link #assignOutputTime assignOutputTime(t2, w1plus2)}.
+ * Defaults for this case are provided by {@link DependsOnlyOnWindow}.
+ *
+ * <p>For many other {@link OutputTimeFn} implementations, such as taking the earliest or latest
+ * timestamp, this will be the same as {@link #combine combine()}. Defaults for this
+ * case are provided by {@link Defaults}.
+ */
+ public abstract Instant merge(W intoWindow, Iterable<? extends Instant> mergingTimestamps);
+
+ /**
+ * Returns {@code true} if the result of combination of many output timestamps actually depends
+ * only on the earliest.
+ *
+ * <p>This may allow optimizations when it is very efficient to retrieve the earliest timestamp
+ * to be combined.
+ */
+ public abstract boolean dependsOnlyOnEarliestInputTimestamp();
+
+ /**
+ * Returns {@code true} if the result does not depend on what outputs were combined but only
+ * the window they are in. The canonical example is if all timestamps are sure to
+ * be the end of the window.
+ *
+ * <p>This may allow optimizations, since it is typically very efficient to retrieve the window
+ * and combining output timestamps is not necessary.
+ *
+ * <p>If the assigned output time for an implementation depends only on the window, consider
+ * extending {@link DependsOnlyOnWindow}, which returns {@code true} here and also provides
+ * a framework for easily implementing a correct {@link #merge}, {@link #combine} and
+ * {@link #assignOutputTime}.
+ */
+ public abstract boolean dependsOnlyOnWindow();
+
+ /**
+ * <b><i>(Experimental)</i></b> Default method implementations for {@link OutputTimeFn} where the
+ * output time depends on the input element timestamps and possibly the window.
+ *
+ * <p>To complete an implementation, override {@link #assignOutputTime}, at a minimum.
+ *
+ * <p>By default, {@link #combine} and {@link #merge} return the earliest timestamp of their
+ * inputs.
+ */
+ public abstract static class Defaults<W extends BoundedWindow> extends OutputTimeFn<W> {
+
+ protected Defaults() {
+ super();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return the earlier of the two timestamps.
+ */
+ @Override
+ public Instant combine(Instant outputTimestamp, Instant otherOutputTimestamp) {
+ return Ordering.natural().min(outputTimestamp, otherOutputTimestamp);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return the result of {@link #combine combine(outputTimstamp, otherOutputTimestamp)},
+ * by default.
+ */
+ @Override
+ public Instant merge(W resultWindow, Iterable<? extends Instant> mergingTimestamps) {
+ return OutputTimeFns.combineOutputTimes(this, mergingTimestamps);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return {@code false} by default. An {@link OutputTimeFn} that is known to depend only on the
+ * window should extend {@link OutputTimeFn.DependsOnlyOnWindow}.
+ */
+ @Override
+ public boolean dependsOnlyOnWindow() {
+ return false;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return {@code true} by default.
+ */
+ @Override
+ public boolean dependsOnlyOnEarliestInputTimestamp() {
+ return false;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return {@code true} if the two {@link OutputTimeFn} instances have the same class, by
+ * default.
+ */
+ @Override
+ public boolean equals(Object other) {
+ if (other == null) {
+ return false;
+ }
+
+ return this.getClass().equals(other.getClass());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getClass());
+ }
+ }
+
+ /**
+ * <b><i>(Experimental)</i></b> Default method implementations for {@link OutputTimeFn} when the
+ * output time depends only on the window.
+ *
+ * <p>To complete an implementation, override {@link #assignOutputTime(BoundedWindow)}.
+ */
+ public abstract static class DependsOnlyOnWindow<W extends BoundedWindow>
+ extends OutputTimeFn<W> {
+
+ protected DependsOnlyOnWindow() {
+ super();
+ }
+
+ /**
+ * Returns the output timestamp to use for data in the specified {@code window}.
+ *
+ * <p>Note that the result of this method must be between the maximum possible input timestamp
+ * in {@code window} and {@code window.maxTimestamp()} (inclusive on both sides).
+ *
+ * <p>For example, using {@code Sessions.withGapDuration(gapDuration)}, we know that all input
+ * timestamps must lie at least {@code gapDuration} from the end of the session, so
+ * {@code window.maxTimestamp() - gapDuration} is an acceptable assigned timestamp.
+ *
+ * @see #assignOutputTime(Instant, BoundedWindow)
+ */
+ protected abstract Instant assignOutputTime(W window);
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return the result of {#link assignOutputTime(BoundedWindow) assignOutputTime(window)}.
+ */
+ @Override
+ public final Instant assignOutputTime(Instant timestamp, W window) {
+ return assignOutputTime(window);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return the same timestamp as both argument timestamps, which are necessarily equal.
+ */
+ @Override
+ public final Instant combine(Instant outputTimestamp, Instant otherOutputTimestamp) {
+ return outputTimestamp;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return the result of
+ * {@link #assignOutputTime(BoundedWindow) assignOutputTime(resultWindow)}.
+ */
+ @Override
+ public final Instant merge(W resultWindow, Iterable<? extends Instant> mergingTimestamps) {
+ return assignOutputTime(resultWindow);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return {@code true}.
+ */
+ @Override
+ public final boolean dependsOnlyOnWindow() {
+ return true;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return {@code true}. Since the output time depends only on the window, it can
+ * certainly be ascertained given a single input timestamp.
+ */
+ @Override
+ public final boolean dependsOnlyOnEarliestInputTimestamp() {
+ return true;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return {@code true} if the two {@link OutputTimeFn} instances have the same class, by
+ * default.
+ */
+ @Override
+ public boolean equals(Object other) {
+ if (other == null) {
+ return false;
+ }
+
+ return this.getClass().equals(other.getClass());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getClass());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java
new file mode 100644
index 0000000..b5d67fa
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.joda.time.Instant;
+
+/**
+ * <b><i>(Experimental)</i></b> Static utility methods and provided implementations for
+ * {@link OutputTimeFn}.
+ */
+@Experimental(Experimental.Kind.OUTPUT_TIME)
+public class OutputTimeFns {
+ /**
+ * The policy of outputting at the earliest of the input timestamps for non-late input data
+ * that led to a computed value.
+ *
+ * <p>For example, suppose <i>v</i><sub>1</sub> through <i>v</i><sub>n</sub> are all on-time
+ * elements being aggregated via some function {@code f} into
+ * {@code f}(<i>v</i><sub>1</sub>, ..., <i>v</i><sub>n</sub>. When emitted, the output
+ * timestamp of the result will be the earliest of the event time timestamps
+ *
+ * <p>If data arrives late, it has no effect on the output timestamp.
+ */
+ public static OutputTimeFn<BoundedWindow> outputAtEarliestInputTimestamp() {
+ return new OutputAtEarliestInputTimestamp();
+ }
+
+ /**
+ * The policy of holding the watermark to the latest of the input timestamps
+ * for non-late input data that led to a computed value.
+ *
+ * <p>For example, suppose <i>v</i><sub>1</sub> through <i>v</i><sub>n</sub> are all on-time
+ * elements being aggregated via some function {@code f} into
+ * {@code f}(<i>v</i><sub>1</sub>, ..., <i>v</i><sub>n</sub>. When emitted, the output
+ * timestamp of the result will be the latest of the event time timestamps
+ *
+ * <p>If data arrives late, it has no effect on the output timestamp.
+ */
+ public static OutputTimeFn<BoundedWindow> outputAtLatestInputTimestamp() {
+ return new OutputAtLatestInputTimestamp();
+ }
+
+ /**
+ * The policy of outputting with timestamps at the end of the window.
+ *
+ * <p>Note that this output timestamp depends only on the window. See
+ * {#link dependsOnlyOnWindow()}.
+ *
+ * <p>When windows merge, instead of using {@link OutputTimeFn#combine} to obtain an output
+ * timestamp for the results in the new window, it is mandatory to obtain a new output
+ * timestamp from {@link OutputTimeFn#assignOutputTime} with the new window and an arbitrary
+ * timestamp (because it is guaranteed that the timestamp is irrelevant).
+ *
+ * <p>For non-merging window functions, this {@link OutputTimeFn} works transparently.
+ */
+ public static OutputTimeFn<BoundedWindow> outputAtEndOfWindow() {
+ return new OutputAtEndOfWindow();
+ }
+
+ /**
+ * Applies the given {@link OutputTimeFn} to the given output times, obtaining
+ * the output time for a value computed. See {@link OutputTimeFn#combine} for
+ * a full specification.
+ *
+ * @throws IllegalArgumentException if {@code outputTimes} is empty.
+ */
+ public static Instant combineOutputTimes(
+ OutputTimeFn<?> outputTimeFn, Iterable<? extends Instant> outputTimes) {
+ checkArgument(
+ !Iterables.isEmpty(outputTimes),
+ "Collection of output times must not be empty in %s.combineOutputTimes",
+ OutputTimeFns.class.getName());
+
+ @Nullable
+ Instant combinedOutputTime = null;
+ for (Instant outputTime : outputTimes) {
+ combinedOutputTime =
+ combinedOutputTime == null
+ ? outputTime : outputTimeFn.combine(combinedOutputTime, outputTime);
+ }
+ return combinedOutputTime;
+ }
+
+ /**
+ * See {@link #outputAtEarliestInputTimestamp}.
+ */
+ private static class OutputAtEarliestInputTimestamp extends OutputTimeFn.Defaults<BoundedWindow> {
+ @Override
+ public Instant assignOutputTime(Instant inputTimestamp, BoundedWindow window) {
+ return inputTimestamp;
+ }
+
+ @Override
+ public Instant combine(Instant outputTime, Instant otherOutputTime) {
+ return Ordering.natural().min(outputTime, otherOutputTime);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return {@code true}. The result of any combine will be the earliest input timestamp.
+ */
+ @Override
+ public boolean dependsOnlyOnEarliestInputTimestamp() {
+ return true;
+ }
+ }
+
+ /**
+ * See {@link #outputAtLatestInputTimestamp}.
+ */
+ private static class OutputAtLatestInputTimestamp extends OutputTimeFn.Defaults<BoundedWindow> {
+ @Override
+ public Instant assignOutputTime(Instant inputTimestamp, BoundedWindow window) {
+ return inputTimestamp;
+ }
+
+ @Override
+ public Instant combine(Instant outputTime, Instant otherOutputTime) {
+ return Ordering.natural().max(outputTime, otherOutputTime);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return {@code false}.
+ */
+ @Override
+ public boolean dependsOnlyOnEarliestInputTimestamp() {
+ return false;
+ }
+ }
+
+ private static class OutputAtEndOfWindow extends OutputTimeFn.DependsOnlyOnWindow<BoundedWindow> {
+
+ /**
+ *{@inheritDoc}
+ *
+ *@return {@code window.maxTimestamp()}.
+ */
+ @Override
+ protected Instant assignOutputTime(BoundedWindow window) {
+ return window.maxTimestamp();
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getCanonicalName();
+ }
+ }
+
+ public static RunnerApi.OutputTime toProto(OutputTimeFn<?> outputTimeFn) {
+ if (outputTimeFn instanceof OutputAtEarliestInputTimestamp) {
+ return RunnerApi.OutputTime.EARLIEST_IN_PANE;
+ } else if (outputTimeFn instanceof OutputAtLatestInputTimestamp) {
+ return RunnerApi.OutputTime.LATEST_IN_PANE;
+ } else if (outputTimeFn instanceof OutputAtEndOfWindow) {
+ return RunnerApi.OutputTime.END_OF_WINDOW;
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot convert %s to %s: %s",
+ OutputTimeFn.class.getCanonicalName(),
+ RunnerApi.OutputTime.class.getCanonicalName(),
+ outputTimeFn));
+ }
+ }
+
+ public static OutputTimeFn<?> fromProto(RunnerApi.OutputTime proto) {
+ switch (proto) {
+ case EARLIEST_IN_PANE:
+ return OutputTimeFns.outputAtEarliestInputTimestamp();
+ case LATEST_IN_PANE:
+ return OutputTimeFns.outputAtLatestInputTimestamp();
+ case END_OF_WINDOW:
+ return OutputTimeFns.outputAtEndOfWindow();
+ case UNRECOGNIZED:
+ default:
+ // Whether or not it is proto that cannot recognize it (due to the version of the
+ // generated code we link to) or the switch hasn't been updated to handle it,
+ // the situation is the same: we don't know what this OutputTime means
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot convert unknown %s to %s: %s",
+ RunnerApi.OutputTime.class.getCanonicalName(),
+ OutputTimeFn.class.getCanonicalName(),
+ proto));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java
deleted file mode 100644
index 39fe8a9..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
-import java.util.Arrays;
-import java.util.Collections;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.joda.time.Instant;
-
-/**
- * Policies for combining timestamps that occur within a window.
- */
-@Experimental(Experimental.Kind.OUTPUT_TIME)
-public enum TimestampCombiner {
- /**
- * The policy of taking at the earliest of a set of timestamps.
- *
- * <p>When used in windowed aggregations, the timestamps of non-late inputs will be combined after
- * they are shifted by the {@link WindowFn} (to allow downstream watermark progress).
- *
- * <p>If data arrives late, it has no effect on the output timestamp.
- */
- EARLIEST {
- @Override
- public Instant combine(Iterable<? extends Instant> timestamps) {
- return Ordering.natural().min(timestamps);
- }
-
- @Override
- public Instant merge(BoundedWindow intoWindow, Iterable<? extends Instant> mergingTimestamps) {
- return combine(mergingTimestamps);
- }
-
- @Override
- public boolean dependsOnlyOnEarliestTimestamp() {
- return true;
- }
-
- @Override
- public boolean dependsOnlyOnWindow() {
- return false;
- }
- },
-
- /**
- * The policy of taking the latest of a set of timestamps.
- *
- * <p>When used in windowed aggregations, the timestamps of non-late inputs will be combined after
- * they are shifted by the {@link WindowFn} (to allow downstream watermark progress).
- *
- * <p>If data arrives late, it has no effect on the output timestamp.
- */
- LATEST {
- @Override
- public Instant combine(Iterable<? extends Instant> timestamps) {
- return Ordering.natural().max(timestamps);
- }
-
- @Override
- public Instant merge(BoundedWindow intoWindow, Iterable<? extends Instant> mergingTimestamps) {
- return combine(mergingTimestamps);
- }
-
- @Override
- public boolean dependsOnlyOnEarliestTimestamp() {
- return false;
- }
-
- @Override
- public boolean dependsOnlyOnWindow() {
- return false;
- }
- },
-
- /**
- * The policy of using the end of the window, regardless of input timestamps.
- *
- * <p>When used in windowed aggregations, the timestamps of non-late inputs will be combined after
- * they are shifted by the {@link WindowFn} (to allow downstream watermark progress).
- *
- * <p>If data arrives late, it has no effect on the output timestamp.
- */
- END_OF_WINDOW {
- @Override
- public Instant combine(Iterable<? extends Instant> timestamps) {
- checkArgument(Iterables.size(timestamps) > 0);
- return Iterables.get(timestamps, 0);
- }
-
- @Override
- public Instant merge(BoundedWindow intoWindow, Iterable<? extends Instant> mergingTimestamps) {
- return intoWindow.maxTimestamp();
- }
-
- @Override
- public boolean dependsOnlyOnEarliestTimestamp() {
- return false;
- }
-
- @Override
- public boolean dependsOnlyOnWindow() {
- return true;
- }
- };
-
- /**
- * Combines the given times, which must be from the same window and must have been passed through
- * {@link #merge}.
- *
- * <ul>
- * <li>{@code combine} must be commutative: {@code combine(a, b).equals(combine(b, a))}.
- * <li>{@code combine} must be associative: {@code combine(a, combine(b,
- * c)).equals(combine(combine(a, b), c))}.
- * </ul>
- */
- public abstract Instant combine(Iterable<? extends Instant> timestamps);
-
- /**
- * Merges the given timestamps, which may have originated in separate windows, into the context of
- * the result window.
- */
- public abstract Instant merge(
- BoundedWindow intoWindow, Iterable<? extends Instant> mergingTimestamps);
-
- /**
- * Shorthand for {@link #merge} with just one element, to place it into the context of
- * a window.
- *
- * <p>For example, the {@link #END_OF_WINDOW} policy moves the timestamp to the end of the window.
- */
- public final Instant assign(BoundedWindow intoWindow, Instant timestamp) {
- return merge(intoWindow, Collections.singleton(timestamp));
- }
-
- /**
- * Varargs variant of {@link #combine}.
- */
- public final Instant combine(Instant... timestamps) {
- return combine(Arrays.asList(timestamps));
- }
-
- /**
- * Varargs variant of {@link #merge}.
- */
- public final Instant merge(BoundedWindow intoWindow, Instant... timestamps) {
- return merge(intoWindow, Arrays.asList(timestamps));
- }
-
- /**
- * Returns {@code true} if the result of combination of many output timestamps actually depends
- * only on the earliest.
- *
- * <p>This may allow optimizations when it is very efficient to retrieve the earliest timestamp
- * to be combined.
- */
- public abstract boolean dependsOnlyOnEarliestTimestamp();
-
- /**
- * Returns {@code true} if the result does not depend on what outputs were combined but only
- * the window they are in. The canonical example is if all timestamps are sure to
- * be the end of the window.
- *
- * <p>This may allow optimizations, since it is typically very efficient to retrieve the window
- * and combining output timestamps is not necessary.
- */
- public abstract boolean dependsOnlyOnWindow();
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index cb7b430..1000ff7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -193,7 +193,7 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
@Nullable abstract AccumulationMode getAccumulationMode();
@Nullable abstract Duration getAllowedLateness();
@Nullable abstract ClosingBehavior getClosingBehavior();
- @Nullable abstract TimestampCombiner getTimestampCombiner();
+ @Nullable abstract OutputTimeFn<?> getOutputTimeFn();
abstract Builder<T> toBuilder();
@@ -204,7 +204,7 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
abstract Builder<T> setAccumulationMode(AccumulationMode mode);
abstract Builder<T> setAllowedLateness(Duration allowedLateness);
abstract Builder<T> setClosingBehavior(ClosingBehavior closingBehavior);
- abstract Builder<T> setTimestampCombiner(TimestampCombiner timestampCombiner);
+ abstract Builder<T> setOutputTimeFn(OutputTimeFn<?> outputTimeFn);
abstract Window<T> build();
}
@@ -273,12 +273,12 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
}
/**
- * <b><i>(Experimental)</i></b> Override the default {@link TimestampCombiner}, to control
+ * <b><i>(Experimental)</i></b> Override the default {@link OutputTimeFn}, to control
* the output timestamp of values output from a {@link GroupByKey} operation.
*/
@Experimental(Kind.OUTPUT_TIME)
- public Window<T> withTimestampCombiner(TimestampCombiner timestampCombiner) {
- return toBuilder().setTimestampCombiner(timestampCombiner).build();
+ public Window<T> withOutputTimeFn(OutputTimeFn<?> outputTimeFn) {
+ return toBuilder().setOutputTimeFn(outputTimeFn).build();
}
/**
@@ -300,6 +300,8 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
* Get the output strategy of this {@link Window Window PTransform}. For internal use
* only.
*/
+ // Rawtype cast of OutputTimeFn cannot be eliminated with intermediate variable, as it is
+ // casting between wildcards
public WindowingStrategy<?, ?> getOutputStrategyInternal(
WindowingStrategy<?, ?> inputStrategy) {
WindowingStrategy<?, ?> result = inputStrategy;
@@ -318,8 +320,8 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
if (getClosingBehavior() != null) {
result = result.withClosingBehavior(getClosingBehavior());
}
- if (getTimestampCombiner() != null) {
- result = result.withTimestampCombiner(getTimestampCombiner());
+ if (getOutputTimeFn() != null) {
+ result = result.withOutputTimeFn(getOutputTimeFn());
}
return result;
}
@@ -409,9 +411,9 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
.withLabel("Window Closing Behavior"));
}
- if (getTimestampCombiner() != null) {
- builder.add(DisplayData.item("timestampCombiner", getTimestampCombiner().toString())
- .withLabel("Timestamp Combiner"));
+ if (getOutputTimeFn() != null) {
+ builder.add(DisplayData.item("outputTimeFn", getOutputTimeFn().getClass())
+ .withLabel("Output Time Function"));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
index 706e039..0c27c4f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
@@ -22,7 +22,7 @@ import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -57,14 +57,13 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollecti
// If the input has already had its windows merged, then the GBK that performed the merge
// will have set originalStrategy.getWindowFn() to InvalidWindows, causing the GBK contained
// here to fail. Instead, we install a valid WindowFn that leaves all windows unchanged.
- // The TimestampCombiner is set to ensure the GroupByKey does not shift elements forwards in
- // time.
+ // The OutputTimeFn is set to ensure the GroupByKey does not shift elements forwards in time.
// Because this outputs as fast as possible, this should not hold the watermark.
Window<KV<K, V>> rewindow =
Window.<KV<K, V>>into(new IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
.triggering(new ReshuffleTrigger<>())
.discardingFiredPanes()
- .withTimestampCombiner(TimestampCombiner.EARLIEST)
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
.withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
return input.apply(rewindow)