You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/26 22:38:23 UTC
[3/4] beam git commit: Replace OutputTimeFn UDF with
TimestampCombiner enum
Replace OutputTimeFn UDF with TimestampCombiner enum
This also removes the last dependency from SDK core to Runner API proto.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f38e4271
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f38e4271
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f38e4271
Branch: refs/heads/master
Commit: f38e4271334fced94e8dc1dc97f47b60fa810586
Parents: f23dd67
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jan 26 19:56:06 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Apr 26 15:08:29 2017 -0700
----------------------------------------------------------------------
.../beam/examples/complete/game/GameStats.java | 4 +-
.../translation/utils/ApexStateInternals.java | 26 +-
.../translation/GroupByKeyTranslatorTest.java | 10 +-
.../utils/ApexStateInternalsTest.java | 33 +-
.../core/construction/WindowingStrategies.java | 52 ++-
.../construction/WindowingStrategiesTest.java | 6 +-
.../runners/core/InMemoryStateInternals.java | 32 +-
.../beam/runners/core/ReduceFnRunner.java | 4 +-
.../beam/runners/core/SplittableParDo.java | 8 +-
.../apache/beam/runners/core/StateMerging.java | 32 +-
.../org/apache/beam/runners/core/StateTag.java | 11 +-
.../org/apache/beam/runners/core/StateTags.java | 16 +-
.../core/TestInMemoryStateInternals.java | 2 +-
.../apache/beam/runners/core/WatermarkHold.java | 45 +--
.../core/GroupAlsoByWindowsProperties.java | 20 +-
.../core/InMemoryStateInternalsTest.java | 34 +-
.../beam/runners/core/ReduceFnRunnerTest.java | 38 +--
.../beam/runners/core/ReduceFnTester.java | 13 +-
.../apache/beam/runners/core/StateTagTest.java | 16 +-
.../CopyOnAccessInMemoryStateInternals.java | 24 +-
.../direct/ParDoMultiOverrideFactory.java | 6 +-
.../CopyOnAccessInMemoryStateInternalsTest.java | 54 ++--
.../functions/HashingFlinkCombineRunner.java | 19 +-
.../functions/SortingFlinkCombineRunner.java | 30 +-
.../state/FlinkBroadcastStateInternals.java | 8 +-
.../state/FlinkKeyGroupStateInternals.java | 8 +-
.../state/FlinkSplitStateInternals.java | 8 +-
.../streaming/state/FlinkStateInternals.java | 34 +-
.../streaming/FlinkStateInternalsTest.java | 34 +-
.../spark/stateful/SparkStateInternals.java | 33 +-
.../translation/SparkAbstractCombineFn.java | 4 +-
.../spark/translation/SparkGlobalCombineFn.java | 37 ++-
.../spark/translation/SparkKeyedCombineFn.java | 37 ++-
sdks/java/core/pom.xml | 5 -
.../beam/sdk/testing/WindowFnTestUtils.java | 53 +++-
.../apache/beam/sdk/transforms/GroupByKey.java | 3 +-
.../sdk/transforms/windowing/OutputTimeFn.java | 314 -------------------
.../sdk/transforms/windowing/OutputTimeFns.java | 212 -------------
.../transforms/windowing/TimestampCombiner.java | 186 +++++++++++
.../beam/sdk/transforms/windowing/Window.java | 22 +-
.../org/apache/beam/sdk/util/Reshuffle.java | 7 +-
.../apache/beam/sdk/util/WindowingStrategy.java | 176 +++--------
.../apache/beam/sdk/util/state/StateBinder.java | 12 +-
.../apache/beam/sdk/util/state/StateSpecs.java | 23 +-
.../beam/sdk/util/state/WatermarkHoldState.java | 19 +-
.../org/apache/beam/SdkCoreApiSurfaceTest.java | 1 -
.../beam/sdk/transforms/GroupByKeyTest.java | 10 +-
.../sdk/transforms/join/CoGroupByKeyTest.java | 6 +-
.../transforms/windowing/OutputTimeFnsTest.java | 51 ---
.../sdk/transforms/windowing/SessionsTest.java | 6 +-
.../sdk/transforms/windowing/WindowTest.java | 23 +-
.../sdk/transforms/windowing/WindowingTest.java | 2 +-
.../org/apache/beam/GcpCoreApiSurfaceTest.java | 1 -
53 files changed, 740 insertions(+), 1130 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index b6c05be..e0048b7 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -43,8 +43,8 @@ import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -313,7 +313,7 @@ public class GameStats extends LeaderBoard {
userEvents
.apply("WindowIntoSessions", Window.<KV<String, Integer>>into(
Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap())))
- .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
+ .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))
// For this use, we care only about the existence of the session, not any particular
// information aggregated over it, so the following is an efficient way to do that.
.apply(Combine.perKey(x -> 0))
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
index cfc57cd..ec8f666 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
@@ -45,7 +45,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.state.BagState;
@@ -150,10 +150,10 @@ public class ApexStateInternals<K> implements StateInternals<K> {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
- StateTag<? super K, WatermarkHoldState<W>> address,
- OutputTimeFn<? super W> outputTimeFn) {
- return new ApexWatermarkHoldState<>(namespace, address, outputTimeFn);
+ public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+ StateTag<? super K, WatermarkHoldState> address,
+ TimestampCombiner timestampCombiner) {
+ return new ApexWatermarkHoldState<>(namespace, address, timestampCombiner);
}
@Override
@@ -269,16 +269,16 @@ public class ApexStateInternals<K> implements StateInternals<K> {
}
private final class ApexWatermarkHoldState<W extends BoundedWindow>
- extends AbstractState<Instant> implements WatermarkHoldState<W> {
+ extends AbstractState<Instant> implements WatermarkHoldState {
- private final OutputTimeFn<? super W> outputTimeFn;
+ private final TimestampCombiner timestampCombiner;
public ApexWatermarkHoldState(
StateNamespace namespace,
- StateTag<?, WatermarkHoldState<W>> address,
- OutputTimeFn<? super W> outputTimeFn) {
+ StateTag<?, WatermarkHoldState> address,
+ TimestampCombiner timestampCombiner) {
super(namespace, address, InstantCoder.of());
- this.outputTimeFn = outputTimeFn;
+ this.timestampCombiner = timestampCombiner;
}
@Override
@@ -294,7 +294,7 @@ public class ApexStateInternals<K> implements StateInternals<K> {
@Override
public void add(Instant outputTime) {
Instant combined = read();
- combined = (combined == null) ? outputTime : outputTimeFn.combine(combined, outputTime);
+ combined = (combined == null) ? outputTime : timestampCombiner.combine(combined, outputTime);
writeValue(combined);
}
@@ -313,8 +313,8 @@ public class ApexStateInternals<K> implements StateInternals<K> {
}
@Override
- public OutputTimeFn<? super W> getOutputTimeFn() {
- return outputTimeFn;
+ public TimestampCombiner getTimestampCombiner() {
+ return timestampCombiner;
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
index 193de71..9c61b47 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
@@ -43,7 +43,7 @@ import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Duration;
@@ -83,12 +83,12 @@ public class GroupByKeyTranslatorTest {
);
p.apply(Read.from(new TestSource(data, new Instant(5000))))
- .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
- .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp()))
+ .apply(
+ Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
+ .withTimestampCombiner(TimestampCombiner.LATEST))
.apply(Count.<String>perElement())
.apply(ParDo.of(new KeyedByTimestamp<KV<String, Long>>()))
- .apply(ParDo.of(new EmbeddedCollector()))
- ;
+ .apply(ParDo.of(new EmbeddedCollector()));
ApexRunnerResult result = (ApexRunnerResult) p.run();
result.getApexDAG();
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
index 7160e45..225b654 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
@@ -36,7 +36,7 @@ import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.GroupingState;
@@ -65,14 +65,13 @@ public class ApexStateInternalsTest {
"sumInteger", VarIntCoder.of(), Sum.ofIntegers());
private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
StateTags.bag("stringBag", StringUtf8Coder.of());
- private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
+ private static final StateTag<Object, WatermarkHoldState>
WATERMARK_EARLIEST_ADDR =
- StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
- private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
- WATERMARK_LATEST_ADDR =
- StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp());
- private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR =
- StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow());
+ StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
+ private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR =
+ StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
+ private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR =
+ StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
private ApexStateInternals<String> underTest;
@@ -227,7 +226,7 @@ public class ApexStateInternalsTest {
@Test
public void testWatermarkEarliestState() throws Exception {
- WatermarkHoldState<BoundedWindow> value =
+ WatermarkHoldState value =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
// State instances are cached, but depend on the namespace.
@@ -251,7 +250,7 @@ public class ApexStateInternalsTest {
@Test
public void testWatermarkLatestState() throws Exception {
- WatermarkHoldState<BoundedWindow> value =
+ WatermarkHoldState value =
underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
// State instances are cached, but depend on the namespace.
@@ -275,7 +274,7 @@ public class ApexStateInternalsTest {
@Test
public void testWatermarkEndOfWindowState() throws Exception {
- WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
+ WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
// State instances are cached, but depend on the namespace.
assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
@@ -292,7 +291,7 @@ public class ApexStateInternalsTest {
@Test
public void testWatermarkStateIsEmpty() throws Exception {
- WatermarkHoldState<BoundedWindow> value =
+ WatermarkHoldState value =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
assertThat(value.isEmpty().read(), Matchers.is(true));
@@ -306,9 +305,9 @@ public class ApexStateInternalsTest {
@Test
public void testMergeEarliestWatermarkIntoSource() throws Exception {
- WatermarkHoldState<BoundedWindow> value1 =
+ WatermarkHoldState value1 =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
- WatermarkHoldState<BoundedWindow> value2 =
+ WatermarkHoldState value2 =
underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
value1.add(new Instant(3000));
@@ -325,11 +324,11 @@ public class ApexStateInternalsTest {
@Test
public void testMergeLatestWatermarkIntoSource() throws Exception {
- WatermarkHoldState<BoundedWindow> value1 =
+ WatermarkHoldState value1 =
underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
- WatermarkHoldState<BoundedWindow> value2 =
+ WatermarkHoldState value2 =
underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
- WatermarkHoldState<BoundedWindow> value3 =
+ WatermarkHoldState value3 =
underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
value1.add(new Instant(3000));
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
index 3d7deef..0c400db 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
@@ -28,16 +28,15 @@ import java.io.Serializable;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.OutputTime;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.util.WindowingStrategy.CombineWindowFnOutputTimes;
import org.joda.time.Duration;
/** Utilities for working with {@link WindowingStrategy WindowingStrategies}. */
@@ -115,11 +114,42 @@ public class WindowingStrategies implements Serializable {
}
}
- public static RunnerApi.OutputTime toProto(OutputTimeFn<?> outputTimeFn) {
- if (outputTimeFn instanceof WindowingStrategy.CombineWindowFnOutputTimes) {
- return toProto(((CombineWindowFnOutputTimes<?>) outputTimeFn).getOutputTimeFn());
- } else {
- return OutputTimeFns.toProto(outputTimeFn);
+ public static RunnerApi.OutputTime toProto(TimestampCombiner timestampCombiner) {
+ switch(timestampCombiner) {
+ case EARLIEST:
+ return OutputTime.EARLIEST_IN_PANE;
+ case END_OF_WINDOW:
+ return OutputTime.END_OF_WINDOW;
+ case LATEST:
+ return OutputTime.LATEST_IN_PANE;
+ default:
+ throw new IllegalArgumentException(
+ String.format(
+ "Unknown %s: %s",
+ TimestampCombiner.class.getSimpleName(),
+ timestampCombiner));
+ }
+ }
+
+ public static TimestampCombiner timestampCombinerFromProto(RunnerApi.OutputTime proto) {
+ switch (proto) {
+ case EARLIEST_IN_PANE:
+ return TimestampCombiner.EARLIEST;
+ case END_OF_WINDOW:
+ return TimestampCombiner.END_OF_WINDOW;
+ case LATEST_IN_PANE:
+ return TimestampCombiner.LATEST;
+ case UNRECOGNIZED:
+ default:
+ // Whether or not it is proto that cannot recognize it (due to the version of the
+ // generated code we link to) or the switch hasn't been updated to handle it,
+ // the situation is the same: we don't know what this OutputTime means
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot convert unknown %s to %s: %s",
+ RunnerApi.OutputTime.class.getCanonicalName(),
+ OutputTime.class.getCanonicalName(),
+ proto));
}
}
@@ -177,7 +207,7 @@ public class WindowingStrategies implements Serializable {
RunnerApi.WindowingStrategy.Builder windowingStrategyProto =
RunnerApi.WindowingStrategy.newBuilder()
- .setOutputTime(toProto(windowingStrategy.getOutputTimeFn()))
+ .setOutputTime(toProto(windowingStrategy.getTimestampCombiner()))
.setAccumulationMode(toProto(windowingStrategy.getMode()))
.setClosingBehavior(toProto(windowingStrategy.getClosingBehavior()))
.setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis())
@@ -229,7 +259,7 @@ public class WindowingStrategies implements Serializable {
"WindowFn");
WindowFn<?, ?> windowFn = (WindowFn<?, ?>) deserializedWindowFn;
- OutputTimeFn<?> outputTimeFn = OutputTimeFns.fromProto(proto.getOutputTime());
+ TimestampCombiner timestampCombiner = timestampCombinerFromProto(proto.getOutputTime());
AccumulationMode accumulationMode = fromProto(proto.getAccumulationMode());
Trigger trigger = Triggers.fromProto(proto.getTrigger());
ClosingBehavior closingBehavior = fromProto(proto.getClosingBehavior());
@@ -239,7 +269,7 @@ public class WindowingStrategies implements Serializable {
.withAllowedLateness(allowedLateness)
.withMode(accumulationMode)
.withTrigger(trigger)
- .withOutputTimeFn(outputTimeFn)
+ .withTimestampCombiner(timestampCombiner)
.withClosingBehavior(closingBehavior);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
index 62bba8e..78ac61c 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
@@ -25,7 +25,7 @@ import com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -68,14 +68,14 @@ public class WindowingStrategiesTest {
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withTrigger(REPRESENTATIVE_TRIGGER)
.withAllowedLateness(Duration.millis(71))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())),
+ .withTimestampCombiner(TimestampCombiner.EARLIEST)),
toProtoAndBackSpec(
WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN)
.withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY)
.withMode(AccumulationMode.DISCARDING_FIRED_PANES)
.withTrigger(REPRESENTATIVE_TRIGGER)
.withAllowedLateness(Duration.millis(93))
- .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp())));
+ .withTimestampCombiner(TimestampCombiner.LATEST)));
}
@Parameter(0)
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
index 55b7fc2..9fb8e3f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
@@ -34,7 +34,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
@@ -156,10 +156,10 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
- StateTag<? super K, WatermarkHoldState<W>> address,
- OutputTimeFn<? super W> outputTimeFn) {
- return new InMemoryWatermarkHold<W>(outputTimeFn);
+ public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+ StateTag<? super K, WatermarkHoldState> address,
+ TimestampCombiner timestampCombiner) {
+ return new InMemoryWatermarkHold<W>(timestampCombiner);
}
@Override
@@ -233,19 +233,19 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
* An {@link InMemoryState} implementation of {@link WatermarkHoldState}.
*/
public static final class InMemoryWatermarkHold<W extends BoundedWindow>
- implements WatermarkHoldState<W>, InMemoryState<InMemoryWatermarkHold<W>> {
+ implements WatermarkHoldState, InMemoryState<InMemoryWatermarkHold<W>> {
- private final OutputTimeFn<? super W> outputTimeFn;
+ private final TimestampCombiner timestampCombiner;
@Nullable
private Instant combinedHold = null;
- public InMemoryWatermarkHold(OutputTimeFn<? super W> outputTimeFn) {
- this.outputTimeFn = outputTimeFn;
+ public InMemoryWatermarkHold(TimestampCombiner timestampCombiner) {
+ this.timestampCombiner = timestampCombiner;
}
@Override
- public InMemoryWatermarkHold<W> readLater() {
+ public InMemoryWatermarkHold readLater() {
return this;
}
@@ -263,8 +263,10 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
@Override
public void add(Instant outputTime) {
- combinedHold = combinedHold == null ? outputTime
- : outputTimeFn.combine(combinedHold, outputTime);
+ combinedHold =
+ combinedHold == null
+ ? outputTime
+ : timestampCombiner.combine(combinedHold, outputTime);
}
@Override
@@ -287,8 +289,8 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
}
@Override
- public OutputTimeFn<? super W> getOutputTimeFn() {
- return outputTimeFn;
+ public TimestampCombiner getTimestampCombiner() {
+ return timestampCombiner;
}
@Override
@@ -299,7 +301,7 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
@Override
public InMemoryWatermarkHold<W> copy() {
InMemoryWatermarkHold<W> that =
- new InMemoryWatermarkHold<>(outputTimeFn);
+ new InMemoryWatermarkHold<>(timestampCombiner);
that.combinedHold = this.combinedHold;
return that;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 34db752..4c70c97 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -47,9 +47,9 @@ import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.SideInputReader;
@@ -171,7 +171,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
* <ul>
* <li>State: Bag of hold timestamps.
* <li>State style: RENAMED
- * <li>Merging: Depending on {@link OutputTimeFn}, may need to be recalculated on merging.
+ * <li>Merging: Depending on {@link TimestampCombiner}, may need to be recalculated on merging.
* When a pane fires it may be necessary to add (back) an end-of-window or garbage collection
* hold.
* <li>Lifetime: Cleared when a pane fires or when the window is garbage collected.
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 31d89ee..5273e86 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -45,7 +45,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -355,10 +355,10 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
* the input watermark when the first {@link DoFn.ProcessElement} call for this element
* completes.
*/
- private static final StateTag<Object, WatermarkHoldState<GlobalWindow>> watermarkHoldTag =
+ private static final StateTag<Object, WatermarkHoldState> watermarkHoldTag =
StateTags.makeSystemTagInternal(
StateTags.<GlobalWindow>watermarkStateInternal(
- "hold", OutputTimeFns.outputAtLatestInputTimestamp()));
+ "hold", TimestampCombiner.LATEST));
/**
* The state cell containing a copy of the element. Written during the first {@link
@@ -480,7 +480,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
stateInternals.state(stateNamespace, elementTag);
ValueState<RestrictionT> restrictionState =
stateInternals.state(stateNamespace, restrictionTag);
- WatermarkHoldState<GlobalWindow> holdState =
+ WatermarkHoldState holdState =
stateInternals.state(stateNamespace, watermarkHoldTag);
ElementAndRestriction<WindowedValue<InputT>, RestrictionT> elementAndRestriction;
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
index 3410850..ce37fd3 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
@@ -218,24 +218,24 @@ public class StateMerging {
*/
public static <K, W extends BoundedWindow> void prefetchWatermarks(
MergingStateAccessor<K, W> context,
- StateTag<? super K, WatermarkHoldState<W>> address) {
- Map<W, WatermarkHoldState<W>> map = context.accessInEachMergingWindow(address);
- WatermarkHoldState<W> result = context.access(address);
+ StateTag<? super K, WatermarkHoldState> address) {
+ Map<W, WatermarkHoldState> map = context.accessInEachMergingWindow(address);
+ WatermarkHoldState result = context.access(address);
if (map.isEmpty()) {
// Nothing to prefetch.
return;
}
if (map.size() == 1 && map.values().contains(result)
- && result.getOutputTimeFn().dependsOnlyOnEarliestInputTimestamp()) {
+ && result.getTimestampCombiner().dependsOnlyOnEarliestTimestamp()) {
// Nothing to change.
return;
}
- if (result.getOutputTimeFn().dependsOnlyOnWindow()) {
+ if (result.getTimestampCombiner().dependsOnlyOnWindow()) {
// No need to read existing holds.
return;
}
// Prefetch.
- for (WatermarkHoldState<W> source : map.values()) {
+ for (WatermarkHoldState source : map.values()) {
prefetchRead(source);
}
}
@@ -250,7 +250,7 @@ public class StateMerging {
*/
public static <K, W extends BoundedWindow> void mergeWatermarks(
MergingStateAccessor<K, W> context,
- StateTag<? super K, WatermarkHoldState<W>> address,
+ StateTag<? super K, WatermarkHoldState> address,
W mergeResult) {
mergeWatermarks(
context.accessInEachMergingWindow(address).values(), context.access(address), mergeResult);
@@ -261,31 +261,31 @@ public class StateMerging {
* into {@code result}, where the final merge result window is {@code mergeResult}.
*/
public static <W extends BoundedWindow> void mergeWatermarks(
- Collection<WatermarkHoldState<W>> sources, WatermarkHoldState<W> result,
+ Collection<WatermarkHoldState> sources, WatermarkHoldState result,
W resultWindow) {
if (sources.isEmpty()) {
// Nothing to merge.
return;
}
if (sources.size() == 1 && sources.contains(result)
- && result.getOutputTimeFn().dependsOnlyOnEarliestInputTimestamp()) {
+ && result.getTimestampCombiner().dependsOnlyOnEarliestTimestamp()) {
// Nothing to merge.
return;
}
- if (result.getOutputTimeFn().dependsOnlyOnWindow()) {
+ if (result.getTimestampCombiner().dependsOnlyOnWindow()) {
// Clear sources.
- for (WatermarkHoldState<W> source : sources) {
+ for (WatermarkHoldState source : sources) {
source.clear();
}
// Update directly from window-derived hold.
- Instant hold = result.getOutputTimeFn().assignOutputTime(
- BoundedWindow.TIMESTAMP_MIN_VALUE, resultWindow);
+ Instant hold =
+ result.getTimestampCombiner().assign(resultWindow, BoundedWindow.TIMESTAMP_MIN_VALUE);
checkState(hold.isAfter(BoundedWindow.TIMESTAMP_MIN_VALUE));
result.add(hold);
} else {
// Prefetch.
List<ReadableState<Instant>> futures = new ArrayList<>(sources.size());
- for (WatermarkHoldState<W> source : sources) {
+ for (WatermarkHoldState source : sources) {
futures.add(source);
}
// Read.
@@ -297,12 +297,12 @@ public class StateMerging {
}
}
// Clear sources.
- for (WatermarkHoldState<W> source : sources) {
+ for (WatermarkHoldState source : sources) {
source.clear();
}
if (!outputTimesToMerge.isEmpty()) {
// Merge and update.
- result.add(result.getOutputTimeFn().merge(resultWindow, outputTimesToMerge));
+ result.add(result.getTimestampCombiner().merge(resultWindow, outputTimesToMerge));
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
index 12c59ad..a5d262a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.MapState;
@@ -115,11 +115,10 @@ public interface StateTag<K, StateT extends State> extends Serializable {
/**
* Bind to a watermark {@link StateSpec}.
*
- * <p>This accepts the {@link OutputTimeFn} that dictates how watermark hold timestamps added to
- * the returned {@link WatermarkHoldState} are to be combined.
+ * <p>This accepts the {@link TimestampCombiner} that dictates how watermark hold timestamps
+ * added to the returned {@link WatermarkHoldState} are to be combined.
*/
- <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
- StateTag<? super K, WatermarkHoldState<W>> spec,
- OutputTimeFn<? super W> outputTimeFn);
+ <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+ StateTag<? super K, WatermarkHoldState> spec, TimestampCombiner timestampCombiner);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
index 3a45569..2b3f4b8 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.MapState;
@@ -110,11 +110,11 @@ public class StateTags {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+ public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
String id,
- StateSpec<? super K, WatermarkHoldState<W>> spec,
- OutputTimeFn<? super W> outputTimeFn) {
- return binder.bindWatermark(tagForSpec(id, spec), outputTimeFn);
+ StateSpec<? super K, WatermarkHoldState> spec,
+ TimestampCombiner timestampCombiner) {
+ return binder.bindWatermark(tagForSpec(id, spec), timestampCombiner);
}
};
}
@@ -228,10 +228,10 @@ public class StateTags {
/**
* Create a state tag for holding the watermark.
*/
- public static <W extends BoundedWindow> StateTag<Object, WatermarkHoldState<W>>
- watermarkStateInternal(String id, OutputTimeFn<? super W> outputTimeFn) {
+ public static <W extends BoundedWindow> StateTag<Object, WatermarkHoldState>
+ watermarkStateInternal(String id, TimestampCombiner timestampCombiner) {
return new SimpleStateTag<>(
- new StructuredId(id), StateSpecs.watermarkStateInternal(outputTimeFn));
+ new StructuredId(id), StateSpecs.watermarkStateInternal(timestampCombiner));
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
index 0321a33..1dfb85f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
@@ -52,7 +52,7 @@ public class TestInMemoryStateInternals<K> extends InMemoryStateInternals<K> {
Instant minimum = null;
for (State storage : inMemoryState.values()) {
if (storage instanceof WatermarkHoldState) {
- Instant hold = ((WatermarkHoldState<?>) storage).read();
+ Instant hold = ((WatermarkHoldState) storage).read();
if (minimum == null || (hold != null && hold.isBefore(minimum))) {
minimum = hold;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
index d3c4bc7..9bb9c62 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
@@ -23,9 +23,8 @@ import com.google.common.annotations.VisibleForTesting;
import java.io.Serializable;
import javax.annotation.Nullable;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -55,37 +54,38 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
* used for elements.
*/
public static <W extends BoundedWindow>
- StateTag<Object, WatermarkHoldState<W>> watermarkHoldTagForOutputTimeFn(
- OutputTimeFn<? super W> outputTimeFn) {
- return StateTags.<Object, WatermarkHoldState<W>>makeSystemTagInternal(
- StateTags.<W>watermarkStateInternal("hold", outputTimeFn));
+ StateTag<Object, WatermarkHoldState> watermarkHoldTagForTimestampCombiner(
+ TimestampCombiner timestampCombiner) {
+ return StateTags.<Object, WatermarkHoldState>makeSystemTagInternal(
+ StateTags.<W>watermarkStateInternal("hold", timestampCombiner));
}
/**
* Tag for state containing end-of-window and garbage collection output watermark holds.
- * (We can't piggy-back on the data hold state since the outputTimeFn may be
- * {@link OutputTimeFns#outputAtLatestInputTimestamp()}, in which case every pane will
+ * (We can't piggy-back on the data hold state since the timestampCombiner may be
+ * {@link TimestampCombiner#EARLIEST}, in which case every pane will
* would take the end-of-window time as its element time.)
*/
@VisibleForTesting
- public static final StateTag<Object, WatermarkHoldState<BoundedWindow>> EXTRA_HOLD_TAG =
+ public static final StateTag<Object, WatermarkHoldState> EXTRA_HOLD_TAG =
StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal(
- "extra", OutputTimeFns.outputAtEarliestInputTimestamp()));
+ "extra", TimestampCombiner.EARLIEST));
private final TimerInternals timerInternals;
private final WindowingStrategy<?, W> windowingStrategy;
- private final StateTag<Object, WatermarkHoldState<W>> elementHoldTag;
+ private final StateTag<Object, WatermarkHoldState> elementHoldTag;
public WatermarkHold(TimerInternals timerInternals, WindowingStrategy<?, W> windowingStrategy) {
this.timerInternals = timerInternals;
this.windowingStrategy = windowingStrategy;
- this.elementHoldTag = watermarkHoldTagForOutputTimeFn(windowingStrategy.getOutputTimeFn());
+ this.elementHoldTag =
+ watermarkHoldTagForTimestampCombiner(windowingStrategy.getTimestampCombiner());
}
/**
* Add a hold to prevent the output watermark progressing beyond the (possibly adjusted) timestamp
- * of the element in {@code context}. We allow the actual hold time to be shifted later by
- * {@link OutputTimeFn#assignOutputTime}, but no further than the end of the window. The hold will
+ * of the element in {@code context}. We allow the actual hold time to be shifted later by the
+ * {@link TimestampCombiner}, but no further than the end of the window. The hold will
* remain until cleared by {@link #extractAndRelease}. Return the timestamp at which the hold
* was placed, or {@literal null} if no hold was placed.
*
@@ -199,15 +199,18 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
* strategy's output time function.
*/
private Instant shift(Instant timestamp, W window) {
- Instant shifted = windowingStrategy.getOutputTimeFn().assignOutputTime(timestamp, window);
+ Instant shifted =
+ windowingStrategy
+ .getTimestampCombiner()
+ .assign(window, windowingStrategy.getWindowFn().getOutputTime(timestamp, window));
checkState(!shifted.isBefore(timestamp),
- "OutputTimeFn moved element from %s to earlier time %s for window %s",
+ "TimestampCombiner moved element from %s to earlier time %s for window %s",
BoundedWindow.formatTimestamp(timestamp),
BoundedWindow.formatTimestamp(shifted),
window);
checkState(timestamp.isAfter(window.maxTimestamp())
|| !shifted.isAfter(window.maxTimestamp()),
- "OutputTimeFn moved element from %s to %s which is beyond end of "
+ "TimestampCombiner moved element from %s to %s which is beyond end of "
+ "window %s",
timestamp, shifted, window);
@@ -217,7 +220,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
/**
* Attempt to add an 'element hold'. Return the {@link Instant} at which the hold was
* added (ie the element timestamp plus any forward shift requested by the
- * {@link WindowingStrategy#getOutputTimeFn}), or {@literal null} if no hold was added.
+ * {@link WindowingStrategy#getTimestampCombiner}), or {@literal null} if no hold was added.
* The hold is only added if both:
* <ol>
* <li>The backend will be able to respect it. In other words the output watermark cannot
@@ -450,7 +453,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
* Return (a future for) the earliest hold for {@code context}. Clear all the holds after
* reading, but add/restore an end-of-window or garbage collection hold if required.
*
- * <p>The returned timestamp is the output timestamp according to the {@link OutputTimeFn}
+ * <p>The returned timestamp is the output timestamp according to the {@link TimestampCombiner}
* from the windowing strategy of this {@link WatermarkHold}, combined across all the non-late
* elements in the current pane. If there is no such value the timestamp is the end
* of the window.
@@ -462,8 +465,8 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
+ "outputWatermark:{}",
context.key(), context.window(), timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
- final WatermarkHoldState<W> elementHoldState = context.state().access(elementHoldTag);
- final WatermarkHoldState<BoundedWindow> extraHoldState = context.state().access(EXTRA_HOLD_TAG);
+ final WatermarkHoldState elementHoldState = context.state().access(elementHoldTag);
+ final WatermarkHoldState extraHoldState = context.state().access(EXTRA_HOLD_TAG);
return new ReadableState<OldAndNewHolds>() {
@Override
public ReadableState<OldAndNewHolds> readLater() {
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
index d0a8923..81ac5fa 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
@@ -43,10 +43,10 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
@@ -149,7 +149,7 @@ public class GroupAlsoByWindowsProperties {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10)))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp());
+ .withTimestampCombiner(TimestampCombiner.EARLIEST);
List<WindowedValue<KV<String, Iterable<String>>>> result =
runGABW(
@@ -200,7 +200,7 @@ public class GroupAlsoByWindowsProperties {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10)))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp());
+ .withTimestampCombiner(TimestampCombiner.EARLIEST);
List<WindowedValue<KV<String, Long>>> result =
runGABW(
@@ -348,7 +348,7 @@ public class GroupAlsoByWindowsProperties {
/**
* Tests that for a simple sequence of elements on the same key, the given GABW implementation
* correctly groups them according to fixed windows and also sets the output timestamp according
- * to the policy {@link OutputTimeFns#outputAtEndOfWindow()}.
+ * to the policy {@link TimestampCombiner#END_OF_WINDOW}.
*/
public static void groupsElementsIntoFixedWindowsWithEndOfWindowTimestamp(
GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
@@ -356,7 +356,7 @@ public class GroupAlsoByWindowsProperties {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
- .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
+ .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW);
List<WindowedValue<KV<String, Iterable<String>>>> result =
runGABW(
@@ -386,7 +386,7 @@ public class GroupAlsoByWindowsProperties {
/**
* Tests that for a simple sequence of elements on the same key, the given GABW implementation
* correctly groups them according to fixed windows and also sets the output timestamp according
- * to the policy {@link OutputTimeFns#outputAtLatestInputTimestamp()}.
+ * to the policy {@link TimestampCombiner#LATEST}.
*/
public static void groupsElementsIntoFixedWindowsWithLatestTimestamp(
GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
@@ -394,7 +394,7 @@ public class GroupAlsoByWindowsProperties {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
- .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp());
+ .withTimestampCombiner(TimestampCombiner.LATEST);
List<WindowedValue<KV<String, Iterable<String>>>> result =
runGABW(
@@ -431,7 +431,7 @@ public class GroupAlsoByWindowsProperties {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
- .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
+ .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW);
List<WindowedValue<KV<String, Iterable<String>>>> result =
runGABW(
@@ -468,7 +468,7 @@ public class GroupAlsoByWindowsProperties {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
- .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp());
+ .withTimestampCombiner(TimestampCombiner.LATEST);
BoundedWindow unmergedWindow = window(15, 25);
List<WindowedValue<KV<String, Iterable<String>>>> result =
@@ -508,7 +508,7 @@ public class GroupAlsoByWindowsProperties {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
- .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
+ .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW);
BoundedWindow secondWindow = window(15, 25);
List<WindowedValue<KV<String, Long>>> result =
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
index 34ddae6..6248401 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.GroupingState;
@@ -71,14 +71,12 @@ public class InMemoryStateInternalsTest {
StateTags.set("stringSet", StringUtf8Coder.of());
private static final StateTag<Object, MapState<String, Integer>> STRING_MAP_ADDR =
StateTags.map("stringMap", StringUtf8Coder.of(), VarIntCoder.of());
- private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
- WATERMARK_EARLIEST_ADDR =
- StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
- private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
- WATERMARK_LATEST_ADDR =
- StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp());
- private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR =
- StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow());
+ private static final StateTag<Object, WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
+ StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
+ private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR =
+ StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
+ private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR =
+ StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
InMemoryStateInternals<String> underTest = InMemoryStateInternals.forKey("dummyKey");
@@ -442,7 +440,7 @@ public class InMemoryStateInternalsTest {
@Test
public void testWatermarkEarliestState() throws Exception {
- WatermarkHoldState<BoundedWindow> value =
+ WatermarkHoldState value =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
// State instances are cached, but depend on the namespace.
@@ -466,7 +464,7 @@ public class InMemoryStateInternalsTest {
@Test
public void testWatermarkLatestState() throws Exception {
- WatermarkHoldState<BoundedWindow> value =
+ WatermarkHoldState value =
underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
// State instances are cached, but depend on the namespace.
@@ -490,7 +488,7 @@ public class InMemoryStateInternalsTest {
@Test
public void testWatermarkEndOfWindowState() throws Exception {
- WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
+ WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
// State instances are cached, but depend on the namespace.
assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
@@ -507,7 +505,7 @@ public class InMemoryStateInternalsTest {
@Test
public void testWatermarkStateIsEmpty() throws Exception {
- WatermarkHoldState<BoundedWindow> value =
+ WatermarkHoldState value =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
assertThat(value.isEmpty().read(), Matchers.is(true));
@@ -521,9 +519,9 @@ public class InMemoryStateInternalsTest {
@Test
public void testMergeEarliestWatermarkIntoSource() throws Exception {
- WatermarkHoldState<BoundedWindow> value1 =
+ WatermarkHoldState value1 =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
- WatermarkHoldState<BoundedWindow> value2 =
+ WatermarkHoldState value2 =
underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
value1.add(new Instant(3000));
@@ -540,11 +538,11 @@ public class InMemoryStateInternalsTest {
@Test
public void testMergeLatestWatermarkIntoSource() throws Exception {
- WatermarkHoldState<BoundedWindow> value1 =
+ WatermarkHoldState value1 =
underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
- WatermarkHoldState<BoundedWindow> value2 =
+ WatermarkHoldState value2 =
underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
- WatermarkHoldState<BoundedWindow> value3 =
+ WatermarkHoldState value3 =
underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
value1.add(new Instant(3000));
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 0d4d992..44bc538 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -56,12 +56,12 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Never;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -210,7 +210,7 @@ public class ReduceFnRunnerTest {
WindowingStrategy<?, IntervalWindow> strategy =
WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+ .withTimestampCombiner(TimestampCombiner.EARLIEST)
.withMode(AccumulationMode.DISCARDING_FIRED_PANES)
.withAllowedLateness(Duration.millis(100));
@@ -284,7 +284,7 @@ public class ReduceFnRunnerTest {
WindowingStrategy<?, IntervalWindow> strategy =
WindowingStrategy.of((WindowFn<?, IntervalWindow>) windowFn)
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+ .withTimestampCombiner(TimestampCombiner.EARLIEST)
.withTrigger(AfterWatermark.pastEndOfWindow().withLateFirings(Never.ever()))
.withMode(AccumulationMode.DISCARDING_FIRED_PANES)
.withAllowedLateness(allowedLateness);
@@ -315,7 +315,7 @@ public class ReduceFnRunnerTest {
WindowingStrategy<?, IntervalWindow> strategy =
WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+ .withTimestampCombiner(TimestampCombiner.EARLIEST)
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withAllowedLateness(Duration.millis(100));
@@ -615,7 +615,7 @@ public class ReduceFnRunnerTest {
AfterWatermark.pastEndOfWindow())))
.withMode(AccumulationMode.DISCARDING_FIRED_PANES)
.withAllowedLateness(Duration.millis(100))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+ .withTimestampCombiner(TimestampCombiner.EARLIEST)
.withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
tester.advanceInputWatermark(new Instant(0));
@@ -668,7 +668,7 @@ public class ReduceFnRunnerTest {
AfterWatermark.pastEndOfWindow())))
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withAllowedLateness(Duration.millis(100))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+ .withTimestampCombiner(TimestampCombiner.EARLIEST)
.withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY));
tester.advanceInputWatermark(new Instant(0));
@@ -695,7 +695,7 @@ public class ReduceFnRunnerTest {
AfterWatermark.pastEndOfWindow())))
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withAllowedLateness(Duration.millis(100))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+ .withTimestampCombiner(TimestampCombiner.EARLIEST)
.withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
tester.advanceInputWatermark(new Instant(0));
@@ -724,7 +724,7 @@ public class ReduceFnRunnerTest {
AfterWatermark.pastEndOfWindow())))
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withAllowedLateness(Duration.millis(100))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+ .withTimestampCombiner(TimestampCombiner.EARLIEST)
.withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
tester.advanceInputWatermark(new Instant(0));
@@ -1195,7 +1195,7 @@ public class ReduceFnRunnerTest {
WindowingStrategy<?, IntervalWindow> strategy =
WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+ .withTimestampCombiner(TimestampCombiner.EARLIEST)
.withTrigger(
AfterEach.<IntervalWindow>inOrder(
Repeatedly.forever(
@@ -1251,16 +1251,16 @@ public class ReduceFnRunnerTest {
WindowingStrategy<?, IntervalWindow> strategy =
WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
- .withTrigger(AfterEach.<IntervalWindow>inOrder(
- Repeatedly
- .forever(
- AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
- new Duration(5)))
- .orFinally(AfterWatermark.pastEndOfWindow()),
- Repeatedly.forever(
- AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
- new Duration(25)))))
+ .withTimestampCombiner(TimestampCombiner.EARLIEST)
+ .withTrigger(
+ AfterEach.<IntervalWindow>inOrder(
+ Repeatedly.forever(
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(new Duration(5)))
+ .orFinally(AfterWatermark.pastEndOfWindow()),
+ Repeatedly.forever(
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(new Duration(25)))))
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withAllowedLateness(Duration.millis(100));
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 549fd8a..b5b5492 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -58,8 +58,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -161,7 +161,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
throws Exception {
WindowingStrategy<?, W> strategy =
WindowingStrategy.of(windowFn)
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+ .withTimestampCombiner(TimestampCombiner.EARLIEST)
.withMode(mode)
.withAllowedLateness(allowedDataLateness)
.withClosingBehavior(closingBehavior);
@@ -329,8 +329,10 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
assertHasOnlyGlobalAndAllowedTags(
ImmutableSet.copyOf(expectedWindows),
ImmutableSet.<StateTag<? super String, ?>>of(
- TriggerStateMachineRunner.FINISHED_BITS_TAG, PaneInfoTracker.PANE_INFO_TAG,
- WatermarkHold.watermarkHoldTagForOutputTimeFn(objectStrategy.getOutputTimeFn()),
+ TriggerStateMachineRunner.FINISHED_BITS_TAG,
+ PaneInfoTracker.PANE_INFO_TAG,
+ WatermarkHold.watermarkHoldTagForTimestampCombiner(
+ objectStrategy.getTimestampCombiner()),
WatermarkHold.EXTRA_HOLD_TAG));
}
@@ -345,7 +347,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
ImmutableSet.copyOf(expectedWindows),
ImmutableSet.<StateTag<? super String, ?>>of(
PaneInfoTracker.PANE_INFO_TAG,
- WatermarkHold.watermarkHoldTagForOutputTimeFn(objectStrategy.getOutputTimeFn()),
+ WatermarkHold.watermarkHoldTagForTimestampCombiner(
+ objectStrategy.getTimestampCombiner()),
WatermarkHold.EXTRA_HOLD_TAG));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
index 5f5d92d..10dcb62 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -97,15 +97,11 @@ public class StateTagTest {
@Test
public void testWatermarkBagEquality() {
- StateTag<?, ?> foo1 = StateTags.watermarkStateInternal(
- "foo", OutputTimeFns.outputAtEarliestInputTimestamp());
- StateTag<?, ?> foo2 = StateTags.watermarkStateInternal(
- "foo", OutputTimeFns.outputAtEarliestInputTimestamp());
- StateTag<?, ?> bar = StateTags.watermarkStateInternal(
- "bar", OutputTimeFns.outputAtEarliestInputTimestamp());
-
- StateTag<?, ?> bar2 = StateTags.watermarkStateInternal(
- "bar", OutputTimeFns.outputAtLatestInputTimestamp());
+ StateTag<?, ?> foo1 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
+ StateTag<?, ?> foo2 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
+ StateTag<?, ?> bar = StateTags.watermarkStateInternal("bar", TimestampCombiner.EARLIEST);
+
+ StateTag<?, ?> bar2 = StateTags.watermarkStateInternal("bar", TimestampCombiner.LATEST);
// Same id, same fn.
assertEquals(foo1, foo2);
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
index 0665812..068b37f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
@@ -43,7 +43,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
@@ -213,7 +213,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
Instant earliest = BoundedWindow.TIMESTAMP_MAX_VALUE;
for (State existingState : this.values()) {
if (existingState instanceof WatermarkHoldState) {
- Instant hold = ((WatermarkHoldState<?>) existingState).read();
+ Instant hold = ((WatermarkHoldState) existingState).read();
if (hold != null && hold.isBefore(earliest)) {
earliest = hold;
}
@@ -276,18 +276,18 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
public StateBinder<K> forNamespace(final StateNamespace namespace, final StateContext<?> c) {
return new StateBinder<K>() {
@Override
- public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
- StateTag<? super K, WatermarkHoldState<W>> address,
- OutputTimeFn<? super W> outputTimeFn) {
+ public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+ StateTag<? super K, WatermarkHoldState> address,
+ TimestampCombiner timestampCombiner) {
if (containedInUnderlying(namespace, address)) {
@SuppressWarnings("unchecked")
- InMemoryState<? extends WatermarkHoldState<W>> existingState =
- (InMemoryState<? extends WatermarkHoldState<W>>)
+ InMemoryState<? extends WatermarkHoldState> existingState =
+ (InMemoryState<? extends WatermarkHoldState>)
underlying.get().get(namespace, address, c);
return existingState.copy();
} else {
return new InMemoryWatermarkHold<>(
- outputTimeFn);
+ timestampCombiner);
}
}
@@ -419,7 +419,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
State state =
readTo.get(namespace, existingState.getKey(), StateContexts.nullContext());
if (state instanceof WatermarkHoldState) {
- Instant hold = ((WatermarkHoldState<?>) state).read();
+ Instant hold = ((WatermarkHoldState) state).read();
if (hold != null && hold.isBefore(earliestHold)) {
earliestHold = hold;
}
@@ -434,9 +434,9 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
public StateBinder<K> forNamespace(final StateNamespace namespace, final StateContext<?> c) {
return new StateBinder<K>() {
@Override
- public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
- StateTag<? super K, WatermarkHoldState<W>> address,
- OutputTimeFn<? super W> outputTimeFn) {
+ public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+ StateTag<? super K, WatermarkHoldState> address,
+ TimestampCombiner timestampCombiner) {
return underlying.get(namespace, address, c);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index b08aa8e..322c995 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -40,8 +40,8 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -135,14 +135,14 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
// to alter the flow of data. This entails:
// - trigger as fast as possible
// - maintain the full timestamps of elements
- // - ensure this GBK holds to the minimum of those timestamps (via OutputTimeFn)
+ // - ensure this GBK holds to the minimum of those timestamps (via TimestampCombiner)
// - discard past panes as it is "just a stream" of elements
.apply(
Window.<KV<K, WindowedValue<KV<K, InputT>>>>configure()
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.discardingFiredPanes()
.withAllowedLateness(inputWindowingStrategy.getAllowedLateness())
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()))
+ .withTimestampCombiner(TimestampCombiner.EARLIEST))
// A full GBK to group by key _and_ window
.apply("Group by key", GroupByKey.<K, WindowedValue<KV<K, InputT>>>create())
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index 68c6613..f0aeece 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -43,8 +43,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.GroupingState;
@@ -289,13 +288,12 @@ public class CopyOnAccessInMemoryStateInternalsTest {
CopyOnAccessInMemoryStateInternals<String> underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
- OutputTimeFn<BoundedWindow> outputTimeFn =
- OutputTimeFns.outputAtEarliestInputTimestamp();
+ TimestampCombiner timestampCombiner = TimestampCombiner.EARLIEST;
StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, WatermarkHoldState<BoundedWindow>> stateTag =
- StateTags.watermarkStateInternal("wmstate", outputTimeFn);
- WatermarkHoldState<?> underlyingValue = underlying.state(namespace, stateTag);
+ StateTag<Object, WatermarkHoldState> stateTag =
+ StateTags.watermarkStateInternal("wmstate", timestampCombiner);
+ WatermarkHoldState underlyingValue = underlying.state(namespace, stateTag);
assertThat(underlyingValue.read(), nullValue());
underlyingValue.add(new Instant(250L));
@@ -303,7 +301,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
- WatermarkHoldState<BoundedWindow> copyOnAccessState = internals.state(namespace, stateTag);
+ WatermarkHoldState copyOnAccessState = internals.state(namespace, stateTag);
assertThat(copyOnAccessState.read(), equalTo(new Instant(250L)));
copyOnAccessState.add(new Instant(100L));
@@ -313,7 +311,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
copyOnAccessState.add(new Instant(500L));
assertThat(copyOnAccessState.read(), equalTo(new Instant(100L)));
- WatermarkHoldState<BoundedWindow> reReadUnderlyingValue =
+ WatermarkHoldState reReadUnderlyingValue =
underlying.state(namespace, stateTag);
assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read()));
}
@@ -514,15 +512,15 @@ public class CopyOnAccessInMemoryStateInternalsTest {
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
- StateTag<Object, WatermarkHoldState<BoundedWindow>> firstHoldAddress =
- StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
- WatermarkHoldState<BoundedWindow> firstHold =
+ StateTag<Object, WatermarkHoldState> firstHoldAddress =
+ StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
+ WatermarkHoldState firstHold =
internals.state(StateNamespaces.window(null, first), firstHoldAddress);
firstHold.add(new Instant(22L));
- StateTag<Object, WatermarkHoldState<BoundedWindow>> secondHoldAddress =
- StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
- WatermarkHoldState<BoundedWindow> secondHold =
+ StateTag<Object, WatermarkHoldState> secondHoldAddress =
+ StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
+ WatermarkHoldState secondHold =
internals.state(StateNamespaces.window(null, second), secondHoldAddress);
secondHold.add(new Instant(2L));
@@ -546,18 +544,18 @@ public class CopyOnAccessInMemoryStateInternalsTest {
};
CopyOnAccessInMemoryStateInternals<String> underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
- StateTag<Object, WatermarkHoldState<BoundedWindow>> firstHoldAddress =
- StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
- WatermarkHoldState<BoundedWindow> firstHold =
+ StateTag<Object, WatermarkHoldState> firstHoldAddress =
+ StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
+ WatermarkHoldState firstHold =
underlying.state(StateNamespaces.window(null, first), firstHoldAddress);
firstHold.add(new Instant(22L));
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying("foo", underlying.commit());
- StateTag<Object, WatermarkHoldState<BoundedWindow>> secondHoldAddress =
- StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
- WatermarkHoldState<BoundedWindow> secondHold =
+ StateTag<Object, WatermarkHoldState> secondHoldAddress =
+ StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
+ WatermarkHoldState secondHold =
internals.state(StateNamespaces.window(null, second), secondHoldAddress);
secondHold.add(new Instant(244L));
@@ -583,18 +581,18 @@ public class CopyOnAccessInMemoryStateInternalsTest {
};
CopyOnAccessInMemoryStateInternals<String> underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
- StateTag<Object, WatermarkHoldState<BoundedWindow>> firstHoldAddress =
- StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
- WatermarkHoldState<BoundedWindow> firstHold =
+ StateTag<Object, WatermarkHoldState> firstHoldAddress =
+ StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
+ WatermarkHoldState firstHold =
underlying.state(StateNamespaces.window(null, first), firstHoldAddress);
firstHold.add(new Instant(224L));
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying("foo", underlying.commit());
- StateTag<Object, WatermarkHoldState<BoundedWindow>> secondHoldAddress =
- StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
- WatermarkHoldState<BoundedWindow> secondHold =
+ StateTag<Object, WatermarkHoldState> secondHoldAddress =
+ StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
+ WatermarkHoldState secondHold =
internals.state(StateNamespaces.window(null, second), secondHoldAddress);
secondHold.add(new Instant(24L));
@@ -610,7 +608,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
internals
.state(
StateNamespaces.global(),
- StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp()))
+ StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST))
.add(new Instant(1234L));
thrown.expect(IllegalStateException.class);
http://git-wip-us.apache.org/repos/asf/beam/blob/f38e4271/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java
index b904bfe..7ee2f69 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java
@@ -29,8 +29,8 @@ import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
@@ -60,8 +60,8 @@ public class HashingFlinkCombineRunner<
@SuppressWarnings("unchecked")
- OutputTimeFn<? super BoundedWindow> outputTimeFn =
- (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
+ TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
+ WindowFn<Object, W> windowFn = windowingStrategy.getWindowFn();
// Flink Iterable can be iterated over only once.
List<WindowedValue<KV<K, InputT>>> inputs = new ArrayList<>();
@@ -87,14 +87,21 @@ public class HashingFlinkCombineRunner<
AccumT accumT = flinkCombiner.firstInput(key, currentValue.getValue().getValue(),
options, sideInputReader, singletonW);
Instant windowTimestamp =
- outputTimeFn.assignOutputTime(currentValue.getTimestamp(), mergedWindow);
+ timestampCombiner.assign(
+ mergedWindow, windowFn.getOutputTime(currentValue.getTimestamp(), mergedWindow));
accumAndInstant = new Tuple2<>(accumT, windowTimestamp);
mapState.put(mergedWindow, accumAndInstant);
} else {
accumAndInstant.f0 = flinkCombiner.addInput(key, accumAndInstant.f0,
currentValue.getValue().getValue(), options, sideInputReader, singletonW);
- accumAndInstant.f1 = outputTimeFn.combine(accumAndInstant.f1,
- outputTimeFn.assignOutputTime(currentValue.getTimestamp(), mergedWindow));
+ accumAndInstant.f1 =
+ timestampCombiner.combine(
+ accumAndInstant.f1,
+ timestampCombiner.assign(
+ mergedWindow,
+ windowingStrategy
+ .getWindowFn()
+ .getOutputTime(currentValue.getTimestamp(), mergedWindow)));
}
}
if (iterator.hasNext()) {