You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/27 03:32:55 UTC
[3/4] beam git commit: Revert "Replace OutputTimeFn UDF with
TimestampCombiner enum"
Revert "Replace OutputTimeFn UDF with TimestampCombiner enum"
This reverts commit f38e4271334fced94e8dc1dc97f47b60fa810586.
It will require a synchronous Dataflow worker container bump.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/83d41fcc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/83d41fcc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/83d41fcc
Branch: refs/heads/master
Commit: 83d41fcce0c7b123459e5d26ab9938de49f48dab
Parents: bb12a56
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Apr 26 19:33:55 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Apr 26 19:33:55 2017 -0700
----------------------------------------------------------------------
.../beam/examples/complete/game/GameStats.java | 4 +-
.../translation/utils/ApexStateInternals.java | 26 +-
.../translation/GroupByKeyTranslatorTest.java | 10 +-
.../utils/ApexStateInternalsTest.java | 33 +-
.../core/construction/WindowingStrategies.java | 52 +--
.../construction/WindowingStrategiesTest.java | 6 +-
.../runners/core/InMemoryStateInternals.java | 32 +-
.../beam/runners/core/ReduceFnRunner.java | 4 +-
.../beam/runners/core/SplittableParDo.java | 8 +-
.../apache/beam/runners/core/StateMerging.java | 32 +-
.../org/apache/beam/runners/core/StateTag.java | 11 +-
.../org/apache/beam/runners/core/StateTags.java | 16 +-
.../core/TestInMemoryStateInternals.java | 2 +-
.../apache/beam/runners/core/WatermarkHold.java | 45 ++-
.../core/GroupAlsoByWindowsProperties.java | 20 +-
.../core/InMemoryStateInternalsTest.java | 34 +-
.../beam/runners/core/ReduceFnRunnerTest.java | 38 +--
.../beam/runners/core/ReduceFnTester.java | 13 +-
.../apache/beam/runners/core/StateTagTest.java | 16 +-
.../CopyOnAccessInMemoryStateInternals.java | 24 +-
.../direct/ParDoMultiOverrideFactory.java | 6 +-
.../CopyOnAccessInMemoryStateInternalsTest.java | 54 ++--
.../functions/HashingFlinkCombineRunner.java | 19 +-
.../functions/SortingFlinkCombineRunner.java | 30 +-
.../state/FlinkBroadcastStateInternals.java | 8 +-
.../state/FlinkKeyGroupStateInternals.java | 8 +-
.../state/FlinkSplitStateInternals.java | 8 +-
.../streaming/state/FlinkStateInternals.java | 34 +-
.../streaming/FlinkStateInternalsTest.java | 34 +-
.../spark/stateful/SparkStateInternals.java | 33 +-
.../translation/SparkAbstractCombineFn.java | 4 +-
.../spark/translation/SparkGlobalCombineFn.java | 37 +--
.../spark/translation/SparkKeyedCombineFn.java | 37 +--
sdks/java/core/pom.xml | 5 +
.../beam/sdk/testing/WindowFnTestUtils.java | 53 +---
.../apache/beam/sdk/transforms/GroupByKey.java | 3 +-
.../sdk/transforms/windowing/OutputTimeFn.java | 314 +++++++++++++++++++
.../sdk/transforms/windowing/OutputTimeFns.java | 212 +++++++++++++
.../transforms/windowing/TimestampCombiner.java | 186 -----------
.../beam/sdk/transforms/windowing/Window.java | 22 +-
.../org/apache/beam/sdk/util/Reshuffle.java | 7 +-
.../apache/beam/sdk/util/WindowingStrategy.java | 176 ++++++++---
.../apache/beam/sdk/util/state/StateBinder.java | 12 +-
.../apache/beam/sdk/util/state/StateSpecs.java | 23 +-
.../beam/sdk/util/state/WatermarkHoldState.java | 19 +-
.../org/apache/beam/SdkCoreApiSurfaceTest.java | 1 +
.../beam/sdk/transforms/GroupByKeyTest.java | 10 +-
.../sdk/transforms/join/CoGroupByKeyTest.java | 6 +-
.../transforms/windowing/OutputTimeFnsTest.java | 51 +++
.../sdk/transforms/windowing/SessionsTest.java | 6 +-
.../sdk/transforms/windowing/WindowTest.java | 23 +-
.../sdk/transforms/windowing/WindowingTest.java | 2 +-
.../org/apache/beam/GcpCoreApiSurfaceTest.java | 1 +
53 files changed, 1130 insertions(+), 740 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index e0048b7..b6c05be 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -43,8 +43,8 @@ import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.Sessions;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -313,7 +313,7 @@ public class GameStats extends LeaderBoard {
userEvents
.apply("WindowIntoSessions", Window.<KV<String, Integer>>into(
Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap())))
- .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))
+ .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
// For this use, we care only about the existence of the session, not any particular
// information aggregated over it, so the following is an efficient way to do that.
.apply(Combine.perKey(x -> 0))
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
index ec8f666..cfc57cd 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
@@ -45,7 +45,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.state.BagState;
@@ -150,10 +150,10 @@ public class ApexStateInternals<K> implements StateInternals<K> {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
- StateTag<? super K, WatermarkHoldState> address,
- TimestampCombiner timestampCombiner) {
- return new ApexWatermarkHoldState<>(namespace, address, timestampCombiner);
+ public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+ StateTag<? super K, WatermarkHoldState<W>> address,
+ OutputTimeFn<? super W> outputTimeFn) {
+ return new ApexWatermarkHoldState<>(namespace, address, outputTimeFn);
}
@Override
@@ -269,16 +269,16 @@ public class ApexStateInternals<K> implements StateInternals<K> {
}
private final class ApexWatermarkHoldState<W extends BoundedWindow>
- extends AbstractState<Instant> implements WatermarkHoldState {
+ extends AbstractState<Instant> implements WatermarkHoldState<W> {
- private final TimestampCombiner timestampCombiner;
+ private final OutputTimeFn<? super W> outputTimeFn;
public ApexWatermarkHoldState(
StateNamespace namespace,
- StateTag<?, WatermarkHoldState> address,
- TimestampCombiner timestampCombiner) {
+ StateTag<?, WatermarkHoldState<W>> address,
+ OutputTimeFn<? super W> outputTimeFn) {
super(namespace, address, InstantCoder.of());
- this.timestampCombiner = timestampCombiner;
+ this.outputTimeFn = outputTimeFn;
}
@Override
@@ -294,7 +294,7 @@ public class ApexStateInternals<K> implements StateInternals<K> {
@Override
public void add(Instant outputTime) {
Instant combined = read();
- combined = (combined == null) ? outputTime : timestampCombiner.combine(combined, outputTime);
+ combined = (combined == null) ? outputTime : outputTimeFn.combine(combined, outputTime);
writeValue(combined);
}
@@ -313,8 +313,8 @@ public class ApexStateInternals<K> implements StateInternals<K> {
}
@Override
- public TimestampCombiner getTimestampCombiner() {
- return timestampCombiner;
+ public OutputTimeFn<? super W> getOutputTimeFn() {
+ return outputTimeFn;
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
index 9c61b47..193de71 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
@@ -43,7 +43,7 @@ import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Duration;
@@ -83,12 +83,12 @@ public class GroupByKeyTranslatorTest {
);
p.apply(Read.from(new TestSource(data, new Instant(5000))))
- .apply(
- Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
- .withTimestampCombiner(TimestampCombiner.LATEST))
+ .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
+ .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp()))
.apply(Count.<String>perElement())
.apply(ParDo.of(new KeyedByTimestamp<KV<String, Long>>()))
- .apply(ParDo.of(new EmbeddedCollector()));
+ .apply(ParDo.of(new EmbeddedCollector()))
+ ;
ApexRunnerResult result = (ApexRunnerResult) p.run();
result.getApexDAG();
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
index 225b654..7160e45 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
@@ -36,7 +36,7 @@ import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.GroupingState;
@@ -65,13 +65,14 @@ public class ApexStateInternalsTest {
"sumInteger", VarIntCoder.of(), Sum.ofIntegers());
private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
StateTags.bag("stringBag", StringUtf8Coder.of());
- private static final StateTag<Object, WatermarkHoldState>
+ private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
WATERMARK_EARLIEST_ADDR =
- StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
- private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR =
- StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
- private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR =
- StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
+ StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
+ private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
+ WATERMARK_LATEST_ADDR =
+ StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp());
+ private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR =
+ StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow());
private ApexStateInternals<String> underTest;
@@ -226,7 +227,7 @@ public class ApexStateInternalsTest {
@Test
public void testWatermarkEarliestState() throws Exception {
- WatermarkHoldState value =
+ WatermarkHoldState<BoundedWindow> value =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
// State instances are cached, but depend on the namespace.
@@ -250,7 +251,7 @@ public class ApexStateInternalsTest {
@Test
public void testWatermarkLatestState() throws Exception {
- WatermarkHoldState value =
+ WatermarkHoldState<BoundedWindow> value =
underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
// State instances are cached, but depend on the namespace.
@@ -274,7 +275,7 @@ public class ApexStateInternalsTest {
@Test
public void testWatermarkEndOfWindowState() throws Exception {
- WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
+ WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
// State instances are cached, but depend on the namespace.
assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
@@ -291,7 +292,7 @@ public class ApexStateInternalsTest {
@Test
public void testWatermarkStateIsEmpty() throws Exception {
- WatermarkHoldState value =
+ WatermarkHoldState<BoundedWindow> value =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
assertThat(value.isEmpty().read(), Matchers.is(true));
@@ -305,9 +306,9 @@ public class ApexStateInternalsTest {
@Test
public void testMergeEarliestWatermarkIntoSource() throws Exception {
- WatermarkHoldState value1 =
+ WatermarkHoldState<BoundedWindow> value1 =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
- WatermarkHoldState value2 =
+ WatermarkHoldState<BoundedWindow> value2 =
underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
value1.add(new Instant(3000));
@@ -324,11 +325,11 @@ public class ApexStateInternalsTest {
@Test
public void testMergeLatestWatermarkIntoSource() throws Exception {
- WatermarkHoldState value1 =
+ WatermarkHoldState<BoundedWindow> value1 =
underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
- WatermarkHoldState value2 =
+ WatermarkHoldState<BoundedWindow> value2 =
underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
- WatermarkHoldState value3 =
+ WatermarkHoldState<BoundedWindow> value3 =
underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
value1.add(new Instant(3000));
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
index 0c400db..3d7deef 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
@@ -28,15 +28,16 @@ import java.io.Serializable;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.OutputTime;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
+import org.apache.beam.sdk.util.WindowingStrategy.CombineWindowFnOutputTimes;
import org.joda.time.Duration;
/** Utilities for working with {@link WindowingStrategy WindowingStrategies}. */
@@ -114,42 +115,11 @@ public class WindowingStrategies implements Serializable {
}
}
- public static RunnerApi.OutputTime toProto(TimestampCombiner timestampCombiner) {
- switch(timestampCombiner) {
- case EARLIEST:
- return OutputTime.EARLIEST_IN_PANE;
- case END_OF_WINDOW:
- return OutputTime.END_OF_WINDOW;
- case LATEST:
- return OutputTime.LATEST_IN_PANE;
- default:
- throw new IllegalArgumentException(
- String.format(
- "Unknown %s: %s",
- TimestampCombiner.class.getSimpleName(),
- timestampCombiner));
- }
- }
-
- public static TimestampCombiner timestampCombinerFromProto(RunnerApi.OutputTime proto) {
- switch (proto) {
- case EARLIEST_IN_PANE:
- return TimestampCombiner.EARLIEST;
- case END_OF_WINDOW:
- return TimestampCombiner.END_OF_WINDOW;
- case LATEST_IN_PANE:
- return TimestampCombiner.LATEST;
- case UNRECOGNIZED:
- default:
- // Whether or not it is proto that cannot recognize it (due to the version of the
- // generated code we link to) or the switch hasn't been updated to handle it,
- // the situation is the same: we don't know what this OutputTime means
- throw new IllegalArgumentException(
- String.format(
- "Cannot convert unknown %s to %s: %s",
- RunnerApi.OutputTime.class.getCanonicalName(),
- OutputTime.class.getCanonicalName(),
- proto));
+ public static RunnerApi.OutputTime toProto(OutputTimeFn<?> outputTimeFn) {
+ if (outputTimeFn instanceof WindowingStrategy.CombineWindowFnOutputTimes) {
+ return toProto(((CombineWindowFnOutputTimes<?>) outputTimeFn).getOutputTimeFn());
+ } else {
+ return OutputTimeFns.toProto(outputTimeFn);
}
}
@@ -207,7 +177,7 @@ public class WindowingStrategies implements Serializable {
RunnerApi.WindowingStrategy.Builder windowingStrategyProto =
RunnerApi.WindowingStrategy.newBuilder()
- .setOutputTime(toProto(windowingStrategy.getTimestampCombiner()))
+ .setOutputTime(toProto(windowingStrategy.getOutputTimeFn()))
.setAccumulationMode(toProto(windowingStrategy.getMode()))
.setClosingBehavior(toProto(windowingStrategy.getClosingBehavior()))
.setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis())
@@ -259,7 +229,7 @@ public class WindowingStrategies implements Serializable {
"WindowFn");
WindowFn<?, ?> windowFn = (WindowFn<?, ?>) deserializedWindowFn;
- TimestampCombiner timestampCombiner = timestampCombinerFromProto(proto.getOutputTime());
+ OutputTimeFn<?> outputTimeFn = OutputTimeFns.fromProto(proto.getOutputTime());
AccumulationMode accumulationMode = fromProto(proto.getAccumulationMode());
Trigger trigger = Triggers.fromProto(proto.getTrigger());
ClosingBehavior closingBehavior = fromProto(proto.getClosingBehavior());
@@ -269,7 +239,7 @@ public class WindowingStrategies implements Serializable {
.withAllowedLateness(allowedLateness)
.withMode(accumulationMode)
.withTrigger(trigger)
- .withTimestampCombiner(timestampCombiner)
+ .withOutputTimeFn(outputTimeFn)
.withClosingBehavior(closingBehavior);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
index 78ac61c..62bba8e 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
@@ -25,7 +25,7 @@ import com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -68,14 +68,14 @@ public class WindowingStrategiesTest {
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withTrigger(REPRESENTATIVE_TRIGGER)
.withAllowedLateness(Duration.millis(71))
- .withTimestampCombiner(TimestampCombiner.EARLIEST)),
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())),
toProtoAndBackSpec(
WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN)
.withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY)
.withMode(AccumulationMode.DISCARDING_FIRED_PANES)
.withTrigger(REPRESENTATIVE_TRIGGER)
.withAllowedLateness(Duration.millis(93))
- .withTimestampCombiner(TimestampCombiner.LATEST)));
+ .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp())));
}
@Parameter(0)
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
index 9fb8e3f..55b7fc2 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
@@ -34,7 +34,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
@@ -156,10 +156,10 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
- StateTag<? super K, WatermarkHoldState> address,
- TimestampCombiner timestampCombiner) {
- return new InMemoryWatermarkHold<W>(timestampCombiner);
+ public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+ StateTag<? super K, WatermarkHoldState<W>> address,
+ OutputTimeFn<? super W> outputTimeFn) {
+ return new InMemoryWatermarkHold<W>(outputTimeFn);
}
@Override
@@ -233,19 +233,19 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
* An {@link InMemoryState} implementation of {@link WatermarkHoldState}.
*/
public static final class InMemoryWatermarkHold<W extends BoundedWindow>
- implements WatermarkHoldState, InMemoryState<InMemoryWatermarkHold<W>> {
+ implements WatermarkHoldState<W>, InMemoryState<InMemoryWatermarkHold<W>> {
- private final TimestampCombiner timestampCombiner;
+ private final OutputTimeFn<? super W> outputTimeFn;
@Nullable
private Instant combinedHold = null;
- public InMemoryWatermarkHold(TimestampCombiner timestampCombiner) {
- this.timestampCombiner = timestampCombiner;
+ public InMemoryWatermarkHold(OutputTimeFn<? super W> outputTimeFn) {
+ this.outputTimeFn = outputTimeFn;
}
@Override
- public InMemoryWatermarkHold readLater() {
+ public InMemoryWatermarkHold<W> readLater() {
return this;
}
@@ -263,10 +263,8 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
@Override
public void add(Instant outputTime) {
- combinedHold =
- combinedHold == null
- ? outputTime
- : timestampCombiner.combine(combinedHold, outputTime);
+ combinedHold = combinedHold == null ? outputTime
+ : outputTimeFn.combine(combinedHold, outputTime);
}
@Override
@@ -289,8 +287,8 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
}
@Override
- public TimestampCombiner getTimestampCombiner() {
- return timestampCombiner;
+ public OutputTimeFn<? super W> getOutputTimeFn() {
+ return outputTimeFn;
}
@Override
@@ -301,7 +299,7 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
@Override
public InMemoryWatermarkHold<W> copy() {
InMemoryWatermarkHold<W> that =
- new InMemoryWatermarkHold<>(timestampCombiner);
+ new InMemoryWatermarkHold<>(outputTimeFn);
that.combinedHold = this.combinedHold;
return that;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 4c70c97..34db752 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -47,9 +47,9 @@ import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.SideInputReader;
@@ -171,7 +171,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
* <ul>
* <li>State: Bag of hold timestamps.
* <li>State style: RENAMED
- * <li>Merging: Depending on {@link TimestampCombiner}, may need to be recalculated on merging.
+ * <li>Merging: Depending on {@link OutputTimeFn}, may need to be recalculated on merging.
* When a pane fires it may be necessary to add (back) an end-of-window or garbage collection
* hold.
* <li>Lifetime: Cleared when a pane fires or when the window is garbage collected.
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 5273e86..31d89ee 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -45,7 +45,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -355,10 +355,10 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
* the input watermark when the first {@link DoFn.ProcessElement} call for this element
* completes.
*/
- private static final StateTag<Object, WatermarkHoldState> watermarkHoldTag =
+ private static final StateTag<Object, WatermarkHoldState<GlobalWindow>> watermarkHoldTag =
StateTags.makeSystemTagInternal(
StateTags.<GlobalWindow>watermarkStateInternal(
- "hold", TimestampCombiner.LATEST));
+ "hold", OutputTimeFns.outputAtLatestInputTimestamp()));
/**
* The state cell containing a copy of the element. Written during the first {@link
@@ -480,7 +480,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
stateInternals.state(stateNamespace, elementTag);
ValueState<RestrictionT> restrictionState =
stateInternals.state(stateNamespace, restrictionTag);
- WatermarkHoldState holdState =
+ WatermarkHoldState<GlobalWindow> holdState =
stateInternals.state(stateNamespace, watermarkHoldTag);
ElementAndRestriction<WindowedValue<InputT>, RestrictionT> elementAndRestriction;
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
index ce37fd3..3410850 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
@@ -218,24 +218,24 @@ public class StateMerging {
*/
public static <K, W extends BoundedWindow> void prefetchWatermarks(
MergingStateAccessor<K, W> context,
- StateTag<? super K, WatermarkHoldState> address) {
- Map<W, WatermarkHoldState> map = context.accessInEachMergingWindow(address);
- WatermarkHoldState result = context.access(address);
+ StateTag<? super K, WatermarkHoldState<W>> address) {
+ Map<W, WatermarkHoldState<W>> map = context.accessInEachMergingWindow(address);
+ WatermarkHoldState<W> result = context.access(address);
if (map.isEmpty()) {
// Nothing to prefetch.
return;
}
if (map.size() == 1 && map.values().contains(result)
- && result.getTimestampCombiner().dependsOnlyOnEarliestTimestamp()) {
+ && result.getOutputTimeFn().dependsOnlyOnEarliestInputTimestamp()) {
// Nothing to change.
return;
}
- if (result.getTimestampCombiner().dependsOnlyOnWindow()) {
+ if (result.getOutputTimeFn().dependsOnlyOnWindow()) {
// No need to read existing holds.
return;
}
// Prefetch.
- for (WatermarkHoldState source : map.values()) {
+ for (WatermarkHoldState<W> source : map.values()) {
prefetchRead(source);
}
}
@@ -250,7 +250,7 @@ public class StateMerging {
*/
public static <K, W extends BoundedWindow> void mergeWatermarks(
MergingStateAccessor<K, W> context,
- StateTag<? super K, WatermarkHoldState> address,
+ StateTag<? super K, WatermarkHoldState<W>> address,
W mergeResult) {
mergeWatermarks(
context.accessInEachMergingWindow(address).values(), context.access(address), mergeResult);
@@ -261,31 +261,31 @@ public class StateMerging {
* into {@code result}, where the final merge result window is {@code mergeResult}.
*/
public static <W extends BoundedWindow> void mergeWatermarks(
- Collection<WatermarkHoldState> sources, WatermarkHoldState result,
+ Collection<WatermarkHoldState<W>> sources, WatermarkHoldState<W> result,
W resultWindow) {
if (sources.isEmpty()) {
// Nothing to merge.
return;
}
if (sources.size() == 1 && sources.contains(result)
- && result.getTimestampCombiner().dependsOnlyOnEarliestTimestamp()) {
+ && result.getOutputTimeFn().dependsOnlyOnEarliestInputTimestamp()) {
// Nothing to merge.
return;
}
- if (result.getTimestampCombiner().dependsOnlyOnWindow()) {
+ if (result.getOutputTimeFn().dependsOnlyOnWindow()) {
// Clear sources.
- for (WatermarkHoldState source : sources) {
+ for (WatermarkHoldState<W> source : sources) {
source.clear();
}
// Update directly from window-derived hold.
- Instant hold =
- result.getTimestampCombiner().assign(resultWindow, BoundedWindow.TIMESTAMP_MIN_VALUE);
+ Instant hold = result.getOutputTimeFn().assignOutputTime(
+ BoundedWindow.TIMESTAMP_MIN_VALUE, resultWindow);
checkState(hold.isAfter(BoundedWindow.TIMESTAMP_MIN_VALUE));
result.add(hold);
} else {
// Prefetch.
List<ReadableState<Instant>> futures = new ArrayList<>(sources.size());
- for (WatermarkHoldState source : sources) {
+ for (WatermarkHoldState<W> source : sources) {
futures.add(source);
}
// Read.
@@ -297,12 +297,12 @@ public class StateMerging {
}
}
// Clear sources.
- for (WatermarkHoldState source : sources) {
+ for (WatermarkHoldState<W> source : sources) {
source.clear();
}
if (!outputTimesToMerge.isEmpty()) {
// Merge and update.
- result.add(result.getTimestampCombiner().merge(resultWindow, outputTimesToMerge));
+ result.add(result.getOutputTimeFn().merge(resultWindow, outputTimesToMerge));
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
index a5d262a..12c59ad 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.MapState;
@@ -115,10 +115,11 @@ public interface StateTag<K, StateT extends State> extends Serializable {
/**
* Bind to a watermark {@link StateSpec}.
*
- * <p>This accepts the {@link TimestampCombiner} that dictates how watermark hold timestamps
- * added to the returned {@link WatermarkHoldState} are to be combined.
+ * <p>This accepts the {@link OutputTimeFn} that dictates how watermark hold timestamps added to
+ * the returned {@link WatermarkHoldState} are to be combined.
*/
- <W extends BoundedWindow> WatermarkHoldState bindWatermark(
- StateTag<? super K, WatermarkHoldState> spec, TimestampCombiner timestampCombiner);
+ <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+ StateTag<? super K, WatermarkHoldState<W>> spec,
+ OutputTimeFn<? super W> outputTimeFn);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
index 2b3f4b8..3a45569 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.MapState;
@@ -110,11 +110,11 @@ public class StateTags {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+ public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
String id,
- StateSpec<? super K, WatermarkHoldState> spec,
- TimestampCombiner timestampCombiner) {
- return binder.bindWatermark(tagForSpec(id, spec), timestampCombiner);
+ StateSpec<? super K, WatermarkHoldState<W>> spec,
+ OutputTimeFn<? super W> outputTimeFn) {
+ return binder.bindWatermark(tagForSpec(id, spec), outputTimeFn);
}
};
}
@@ -228,10 +228,10 @@ public class StateTags {
/**
* Create a state tag for holding the watermark.
*/
- public static <W extends BoundedWindow> StateTag<Object, WatermarkHoldState>
- watermarkStateInternal(String id, TimestampCombiner timestampCombiner) {
+ public static <W extends BoundedWindow> StateTag<Object, WatermarkHoldState<W>>
+ watermarkStateInternal(String id, OutputTimeFn<? super W> outputTimeFn) {
return new SimpleStateTag<>(
- new StructuredId(id), StateSpecs.watermarkStateInternal(timestampCombiner));
+ new StructuredId(id), StateSpecs.watermarkStateInternal(outputTimeFn));
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
index 1dfb85f..0321a33 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
@@ -52,7 +52,7 @@ public class TestInMemoryStateInternals<K> extends InMemoryStateInternals<K> {
Instant minimum = null;
for (State storage : inMemoryState.values()) {
if (storage instanceof WatermarkHoldState) {
- Instant hold = ((WatermarkHoldState) storage).read();
+ Instant hold = ((WatermarkHoldState<?>) storage).read();
if (minimum == null || (hold != null && hold.isBefore(minimum))) {
minimum = hold;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
index 9bb9c62..d3c4bc7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
@@ -23,8 +23,9 @@ import com.google.common.annotations.VisibleForTesting;
import java.io.Serializable;
import javax.annotation.Nullable;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -54,38 +55,37 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
* used for elements.
*/
public static <W extends BoundedWindow>
- StateTag<Object, WatermarkHoldState> watermarkHoldTagForTimestampCombiner(
- TimestampCombiner timestampCombiner) {
- return StateTags.<Object, WatermarkHoldState>makeSystemTagInternal(
- StateTags.<W>watermarkStateInternal("hold", timestampCombiner));
+ StateTag<Object, WatermarkHoldState<W>> watermarkHoldTagForOutputTimeFn(
+ OutputTimeFn<? super W> outputTimeFn) {
+ return StateTags.<Object, WatermarkHoldState<W>>makeSystemTagInternal(
+ StateTags.<W>watermarkStateInternal("hold", outputTimeFn));
}
/**
* Tag for state containing end-of-window and garbage collection output watermark holds.
- * (We can't piggy-back on the data hold state since the timestampCombiner may be
- * {@link TimestampCombiner#EARLIEST}, in which case every pane will
+ * (We can't piggy-back on the data hold state since the outputTimeFn may be
+ * {@link OutputTimeFns#outputAtLatestInputTimestamp()}, in which case every pane will
* would take the end-of-window time as its element time.)
*/
@VisibleForTesting
- public static final StateTag<Object, WatermarkHoldState> EXTRA_HOLD_TAG =
+ public static final StateTag<Object, WatermarkHoldState<BoundedWindow>> EXTRA_HOLD_TAG =
StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal(
- "extra", TimestampCombiner.EARLIEST));
+ "extra", OutputTimeFns.outputAtEarliestInputTimestamp()));
private final TimerInternals timerInternals;
private final WindowingStrategy<?, W> windowingStrategy;
- private final StateTag<Object, WatermarkHoldState> elementHoldTag;
+ private final StateTag<Object, WatermarkHoldState<W>> elementHoldTag;
public WatermarkHold(TimerInternals timerInternals, WindowingStrategy<?, W> windowingStrategy) {
this.timerInternals = timerInternals;
this.windowingStrategy = windowingStrategy;
- this.elementHoldTag =
- watermarkHoldTagForTimestampCombiner(windowingStrategy.getTimestampCombiner());
+ this.elementHoldTag = watermarkHoldTagForOutputTimeFn(windowingStrategy.getOutputTimeFn());
}
/**
* Add a hold to prevent the output watermark progressing beyond the (possibly adjusted) timestamp
- * of the element in {@code context}. We allow the actual hold time to be shifted later by the
- * {@link TimestampCombiner}, but no further than the end of the window. The hold will
+ * of the element in {@code context}. We allow the actual hold time to be shifted later by
+ * {@link OutputTimeFn#assignOutputTime}, but no further than the end of the window. The hold will
* remain until cleared by {@link #extractAndRelease}. Return the timestamp at which the hold
* was placed, or {@literal null} if no hold was placed.
*
@@ -199,18 +199,15 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
* strategy's output time function.
*/
private Instant shift(Instant timestamp, W window) {
- Instant shifted =
- windowingStrategy
- .getTimestampCombiner()
- .assign(window, windowingStrategy.getWindowFn().getOutputTime(timestamp, window));
+ Instant shifted = windowingStrategy.getOutputTimeFn().assignOutputTime(timestamp, window);
checkState(!shifted.isBefore(timestamp),
- "TimestampCombiner moved element from %s to earlier time %s for window %s",
+ "OutputTimeFn moved element from %s to earlier time %s for window %s",
BoundedWindow.formatTimestamp(timestamp),
BoundedWindow.formatTimestamp(shifted),
window);
checkState(timestamp.isAfter(window.maxTimestamp())
|| !shifted.isAfter(window.maxTimestamp()),
- "TimestampCombiner moved element from %s to %s which is beyond end of "
+ "OutputTimeFn moved element from %s to %s which is beyond end of "
+ "window %s",
timestamp, shifted, window);
@@ -220,7 +217,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
/**
* Attempt to add an 'element hold'. Return the {@link Instant} at which the hold was
* added (ie the element timestamp plus any forward shift requested by the
- * {@link WindowingStrategy#getTimestampCombiner}), or {@literal null} if no hold was added.
+ * {@link WindowingStrategy#getOutputTimeFn}), or {@literal null} if no hold was added.
* The hold is only added if both:
* <ol>
* <li>The backend will be able to respect it. In other words the output watermark cannot
@@ -453,7 +450,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
* Return (a future for) the earliest hold for {@code context}. Clear all the holds after
* reading, but add/restore an end-of-window or garbage collection hold if required.
*
- * <p>The returned timestamp is the output timestamp according to the {@link TimestampCombiner}
+ * <p>The returned timestamp is the output timestamp according to the {@link OutputTimeFn}
* from the windowing strategy of this {@link WatermarkHold}, combined across all the non-late
* elements in the current pane. If there is no such value the timestamp is the end
* of the window.
@@ -465,8 +462,8 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
+ "outputWatermark:{}",
context.key(), context.window(), timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
- final WatermarkHoldState elementHoldState = context.state().access(elementHoldTag);
- final WatermarkHoldState extraHoldState = context.state().access(EXTRA_HOLD_TAG);
+ final WatermarkHoldState<W> elementHoldState = context.state().access(elementHoldTag);
+ final WatermarkHoldState<BoundedWindow> extraHoldState = context.state().access(EXTRA_HOLD_TAG);
return new ReadableState<OldAndNewHolds>() {
@Override
public ReadableState<OldAndNewHolds> readLater() {
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
index 81ac5fa..d0a8923 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
@@ -43,10 +43,10 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
@@ -149,7 +149,7 @@ public class GroupAlsoByWindowsProperties {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10)))
- .withTimestampCombiner(TimestampCombiner.EARLIEST);
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp());
List<WindowedValue<KV<String, Iterable<String>>>> result =
runGABW(
@@ -200,7 +200,7 @@ public class GroupAlsoByWindowsProperties {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10)))
- .withTimestampCombiner(TimestampCombiner.EARLIEST);
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp());
List<WindowedValue<KV<String, Long>>> result =
runGABW(
@@ -348,7 +348,7 @@ public class GroupAlsoByWindowsProperties {
/**
* Tests that for a simple sequence of elements on the same key, the given GABW implementation
* correctly groups them according to fixed windows and also sets the output timestamp according
- * to the policy {@link TimestampCombiner#END_OF_WINDOW}.
+ * to the policy {@link OutputTimeFns#outputAtEndOfWindow()}.
*/
public static void groupsElementsIntoFixedWindowsWithEndOfWindowTimestamp(
GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
@@ -356,7 +356,7 @@ public class GroupAlsoByWindowsProperties {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
- .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW);
+ .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
List<WindowedValue<KV<String, Iterable<String>>>> result =
runGABW(
@@ -386,7 +386,7 @@ public class GroupAlsoByWindowsProperties {
/**
* Tests that for a simple sequence of elements on the same key, the given GABW implementation
* correctly groups them according to fixed windows and also sets the output timestamp according
- * to the policy {@link TimestampCombiner#LATEST}.
+ * to the policy {@link OutputTimeFns#outputAtLatestInputTimestamp()}.
*/
public static void groupsElementsIntoFixedWindowsWithLatestTimestamp(
GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
@@ -394,7 +394,7 @@ public class GroupAlsoByWindowsProperties {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
- .withTimestampCombiner(TimestampCombiner.LATEST);
+ .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp());
List<WindowedValue<KV<String, Iterable<String>>>> result =
runGABW(
@@ -431,7 +431,7 @@ public class GroupAlsoByWindowsProperties {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
- .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW);
+ .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
List<WindowedValue<KV<String, Iterable<String>>>> result =
runGABW(
@@ -468,7 +468,7 @@ public class GroupAlsoByWindowsProperties {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
- .withTimestampCombiner(TimestampCombiner.LATEST);
+ .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp());
BoundedWindow unmergedWindow = window(15, 25);
List<WindowedValue<KV<String, Iterable<String>>>> result =
@@ -508,7 +508,7 @@ public class GroupAlsoByWindowsProperties {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
- .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW);
+ .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
BoundedWindow secondWindow = window(15, 25);
List<WindowedValue<KV<String, Long>>> result =
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
index 6248401..34ddae6 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.GroupingState;
@@ -71,12 +71,14 @@ public class InMemoryStateInternalsTest {
StateTags.set("stringSet", StringUtf8Coder.of());
private static final StateTag<Object, MapState<String, Integer>> STRING_MAP_ADDR =
StateTags.map("stringMap", StringUtf8Coder.of(), VarIntCoder.of());
- private static final StateTag<Object, WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
- StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
- private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR =
- StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
- private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR =
- StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
+ private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
+ WATERMARK_EARLIEST_ADDR =
+ StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
+ private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
+ WATERMARK_LATEST_ADDR =
+ StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp());
+ private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR =
+ StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow());
InMemoryStateInternals<String> underTest = InMemoryStateInternals.forKey("dummyKey");
@@ -440,7 +442,7 @@ public class InMemoryStateInternalsTest {
@Test
public void testWatermarkEarliestState() throws Exception {
- WatermarkHoldState value =
+ WatermarkHoldState<BoundedWindow> value =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
// State instances are cached, but depend on the namespace.
@@ -464,7 +466,7 @@ public class InMemoryStateInternalsTest {
@Test
public void testWatermarkLatestState() throws Exception {
- WatermarkHoldState value =
+ WatermarkHoldState<BoundedWindow> value =
underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
// State instances are cached, but depend on the namespace.
@@ -488,7 +490,7 @@ public class InMemoryStateInternalsTest {
@Test
public void testWatermarkEndOfWindowState() throws Exception {
- WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
+ WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
// State instances are cached, but depend on the namespace.
assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
@@ -505,7 +507,7 @@ public class InMemoryStateInternalsTest {
@Test
public void testWatermarkStateIsEmpty() throws Exception {
- WatermarkHoldState value =
+ WatermarkHoldState<BoundedWindow> value =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
assertThat(value.isEmpty().read(), Matchers.is(true));
@@ -519,9 +521,9 @@ public class InMemoryStateInternalsTest {
@Test
public void testMergeEarliestWatermarkIntoSource() throws Exception {
- WatermarkHoldState value1 =
+ WatermarkHoldState<BoundedWindow> value1 =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
- WatermarkHoldState value2 =
+ WatermarkHoldState<BoundedWindow> value2 =
underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
value1.add(new Instant(3000));
@@ -538,11 +540,11 @@ public class InMemoryStateInternalsTest {
@Test
public void testMergeLatestWatermarkIntoSource() throws Exception {
- WatermarkHoldState value1 =
+ WatermarkHoldState<BoundedWindow> value1 =
underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
- WatermarkHoldState value2 =
+ WatermarkHoldState<BoundedWindow> value2 =
underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
- WatermarkHoldState value3 =
+ WatermarkHoldState<BoundedWindow> value3 =
underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
value1.add(new Instant(3000));
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 44bc538..0d4d992 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -56,12 +56,12 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Never;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -210,7 +210,7 @@ public class ReduceFnRunnerTest {
WindowingStrategy<?, IntervalWindow> strategy =
WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
- .withTimestampCombiner(TimestampCombiner.EARLIEST)
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
.withMode(AccumulationMode.DISCARDING_FIRED_PANES)
.withAllowedLateness(Duration.millis(100));
@@ -284,7 +284,7 @@ public class ReduceFnRunnerTest {
WindowingStrategy<?, IntervalWindow> strategy =
WindowingStrategy.of((WindowFn<?, IntervalWindow>) windowFn)
- .withTimestampCombiner(TimestampCombiner.EARLIEST)
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
.withTrigger(AfterWatermark.pastEndOfWindow().withLateFirings(Never.ever()))
.withMode(AccumulationMode.DISCARDING_FIRED_PANES)
.withAllowedLateness(allowedLateness);
@@ -315,7 +315,7 @@ public class ReduceFnRunnerTest {
WindowingStrategy<?, IntervalWindow> strategy =
WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
- .withTimestampCombiner(TimestampCombiner.EARLIEST)
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withAllowedLateness(Duration.millis(100));
@@ -615,7 +615,7 @@ public class ReduceFnRunnerTest {
AfterWatermark.pastEndOfWindow())))
.withMode(AccumulationMode.DISCARDING_FIRED_PANES)
.withAllowedLateness(Duration.millis(100))
- .withTimestampCombiner(TimestampCombiner.EARLIEST)
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
.withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
tester.advanceInputWatermark(new Instant(0));
@@ -668,7 +668,7 @@ public class ReduceFnRunnerTest {
AfterWatermark.pastEndOfWindow())))
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withAllowedLateness(Duration.millis(100))
- .withTimestampCombiner(TimestampCombiner.EARLIEST)
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
.withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY));
tester.advanceInputWatermark(new Instant(0));
@@ -695,7 +695,7 @@ public class ReduceFnRunnerTest {
AfterWatermark.pastEndOfWindow())))
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withAllowedLateness(Duration.millis(100))
- .withTimestampCombiner(TimestampCombiner.EARLIEST)
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
.withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
tester.advanceInputWatermark(new Instant(0));
@@ -724,7 +724,7 @@ public class ReduceFnRunnerTest {
AfterWatermark.pastEndOfWindow())))
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withAllowedLateness(Duration.millis(100))
- .withTimestampCombiner(TimestampCombiner.EARLIEST)
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
.withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
tester.advanceInputWatermark(new Instant(0));
@@ -1195,7 +1195,7 @@ public class ReduceFnRunnerTest {
WindowingStrategy<?, IntervalWindow> strategy =
WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
- .withTimestampCombiner(TimestampCombiner.EARLIEST)
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
.withTrigger(
AfterEach.<IntervalWindow>inOrder(
Repeatedly.forever(
@@ -1251,16 +1251,16 @@ public class ReduceFnRunnerTest {
WindowingStrategy<?, IntervalWindow> strategy =
WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
- .withTimestampCombiner(TimestampCombiner.EARLIEST)
- .withTrigger(
- AfterEach.<IntervalWindow>inOrder(
- Repeatedly.forever(
- AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(new Duration(5)))
- .orFinally(AfterWatermark.pastEndOfWindow()),
- Repeatedly.forever(
- AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(new Duration(25)))))
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+ .withTrigger(AfterEach.<IntervalWindow>inOrder(
+ Repeatedly
+ .forever(
+ AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
+ new Duration(5)))
+ .orFinally(AfterWatermark.pastEndOfWindow()),
+ Repeatedly.forever(
+ AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
+ new Duration(25)))))
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withAllowedLateness(Duration.millis(100));
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index b5b5492..549fd8a 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -58,8 +58,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -161,7 +161,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
throws Exception {
WindowingStrategy<?, W> strategy =
WindowingStrategy.of(windowFn)
- .withTimestampCombiner(TimestampCombiner.EARLIEST)
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
.withMode(mode)
.withAllowedLateness(allowedDataLateness)
.withClosingBehavior(closingBehavior);
@@ -329,10 +329,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
assertHasOnlyGlobalAndAllowedTags(
ImmutableSet.copyOf(expectedWindows),
ImmutableSet.<StateTag<? super String, ?>>of(
- TriggerStateMachineRunner.FINISHED_BITS_TAG,
- PaneInfoTracker.PANE_INFO_TAG,
- WatermarkHold.watermarkHoldTagForTimestampCombiner(
- objectStrategy.getTimestampCombiner()),
+ TriggerStateMachineRunner.FINISHED_BITS_TAG, PaneInfoTracker.PANE_INFO_TAG,
+ WatermarkHold.watermarkHoldTagForOutputTimeFn(objectStrategy.getOutputTimeFn()),
WatermarkHold.EXTRA_HOLD_TAG));
}
@@ -347,8 +345,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
ImmutableSet.copyOf(expectedWindows),
ImmutableSet.<StateTag<? super String, ?>>of(
PaneInfoTracker.PANE_INFO_TAG,
- WatermarkHold.watermarkHoldTagForTimestampCombiner(
- objectStrategy.getTimestampCombiner()),
+ WatermarkHold.watermarkHoldTagForOutputTimeFn(objectStrategy.getOutputTimeFn()),
WatermarkHold.EXTRA_HOLD_TAG));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
index 10dcb62..5f5d92d 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -97,11 +97,15 @@ public class StateTagTest {
@Test
public void testWatermarkBagEquality() {
- StateTag<?, ?> foo1 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
- StateTag<?, ?> foo2 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
- StateTag<?, ?> bar = StateTags.watermarkStateInternal("bar", TimestampCombiner.EARLIEST);
-
- StateTag<?, ?> bar2 = StateTags.watermarkStateInternal("bar", TimestampCombiner.LATEST);
+ StateTag<?, ?> foo1 = StateTags.watermarkStateInternal(
+ "foo", OutputTimeFns.outputAtEarliestInputTimestamp());
+ StateTag<?, ?> foo2 = StateTags.watermarkStateInternal(
+ "foo", OutputTimeFns.outputAtEarliestInputTimestamp());
+ StateTag<?, ?> bar = StateTags.watermarkStateInternal(
+ "bar", OutputTimeFns.outputAtEarliestInputTimestamp());
+
+ StateTag<?, ?> bar2 = StateTags.watermarkStateInternal(
+ "bar", OutputTimeFns.outputAtLatestInputTimestamp());
// Same id, same fn.
assertEquals(foo1, foo2);
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
index 068b37f..0665812 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
@@ -43,7 +43,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
@@ -213,7 +213,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
Instant earliest = BoundedWindow.TIMESTAMP_MAX_VALUE;
for (State existingState : this.values()) {
if (existingState instanceof WatermarkHoldState) {
- Instant hold = ((WatermarkHoldState) existingState).read();
+ Instant hold = ((WatermarkHoldState<?>) existingState).read();
if (hold != null && hold.isBefore(earliest)) {
earliest = hold;
}
@@ -276,18 +276,18 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
public StateBinder<K> forNamespace(final StateNamespace namespace, final StateContext<?> c) {
return new StateBinder<K>() {
@Override
- public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
- StateTag<? super K, WatermarkHoldState> address,
- TimestampCombiner timestampCombiner) {
+ public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+ StateTag<? super K, WatermarkHoldState<W>> address,
+ OutputTimeFn<? super W> outputTimeFn) {
if (containedInUnderlying(namespace, address)) {
@SuppressWarnings("unchecked")
- InMemoryState<? extends WatermarkHoldState> existingState =
- (InMemoryState<? extends WatermarkHoldState>)
+ InMemoryState<? extends WatermarkHoldState<W>> existingState =
+ (InMemoryState<? extends WatermarkHoldState<W>>)
underlying.get().get(namespace, address, c);
return existingState.copy();
} else {
return new InMemoryWatermarkHold<>(
- timestampCombiner);
+ outputTimeFn);
}
}
@@ -419,7 +419,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
State state =
readTo.get(namespace, existingState.getKey(), StateContexts.nullContext());
if (state instanceof WatermarkHoldState) {
- Instant hold = ((WatermarkHoldState) state).read();
+ Instant hold = ((WatermarkHoldState<?>) state).read();
if (hold != null && hold.isBefore(earliestHold)) {
earliestHold = hold;
}
@@ -434,9 +434,9 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
public StateBinder<K> forNamespace(final StateNamespace namespace, final StateContext<?> c) {
return new StateBinder<K>() {
@Override
- public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
- StateTag<? super K, WatermarkHoldState> address,
- TimestampCombiner timestampCombiner) {
+ public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+ StateTag<? super K, WatermarkHoldState<W>> address,
+ OutputTimeFn<? super W> outputTimeFn) {
return underlying.get(namespace, address, c);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 322c995..b08aa8e 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -40,8 +40,8 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -135,14 +135,14 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
// to alter the flow of data. This entails:
// - trigger as fast as possible
// - maintain the full timestamps of elements
- // - ensure this GBK holds to the minimum of those timestamps (via TimestampCombiner)
+ // - ensure this GBK holds to the minimum of those timestamps (via OutputTimeFn)
// - discard past panes as it is "just a stream" of elements
.apply(
Window.<KV<K, WindowedValue<KV<K, InputT>>>>configure()
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.discardingFiredPanes()
.withAllowedLateness(inputWindowingStrategy.getAllowedLateness())
- .withTimestampCombiner(TimestampCombiner.EARLIEST))
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()))
// A full GBK to group by key _and_ window
.apply("Group by key", GroupByKey.<K, WindowedValue<KV<K, InputT>>>create())
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index f0aeece..68c6613 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -43,7 +43,8 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.GroupingState;
@@ -288,12 +289,13 @@ public class CopyOnAccessInMemoryStateInternalsTest {
CopyOnAccessInMemoryStateInternals<String> underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
- TimestampCombiner timestampCombiner = TimestampCombiner.EARLIEST;
+ OutputTimeFn<BoundedWindow> outputTimeFn =
+ OutputTimeFns.outputAtEarliestInputTimestamp();
StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, WatermarkHoldState> stateTag =
- StateTags.watermarkStateInternal("wmstate", timestampCombiner);
- WatermarkHoldState underlyingValue = underlying.state(namespace, stateTag);
+ StateTag<Object, WatermarkHoldState<BoundedWindow>> stateTag =
+ StateTags.watermarkStateInternal("wmstate", outputTimeFn);
+ WatermarkHoldState<?> underlyingValue = underlying.state(namespace, stateTag);
assertThat(underlyingValue.read(), nullValue());
underlyingValue.add(new Instant(250L));
@@ -301,7 +303,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
- WatermarkHoldState copyOnAccessState = internals.state(namespace, stateTag);
+ WatermarkHoldState<BoundedWindow> copyOnAccessState = internals.state(namespace, stateTag);
assertThat(copyOnAccessState.read(), equalTo(new Instant(250L)));
copyOnAccessState.add(new Instant(100L));
@@ -311,7 +313,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
copyOnAccessState.add(new Instant(500L));
assertThat(copyOnAccessState.read(), equalTo(new Instant(100L)));
- WatermarkHoldState reReadUnderlyingValue =
+ WatermarkHoldState<BoundedWindow> reReadUnderlyingValue =
underlying.state(namespace, stateTag);
assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read()));
}
@@ -512,15 +514,15 @@ public class CopyOnAccessInMemoryStateInternalsTest {
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
- StateTag<Object, WatermarkHoldState> firstHoldAddress =
- StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
- WatermarkHoldState firstHold =
+ StateTag<Object, WatermarkHoldState<BoundedWindow>> firstHoldAddress =
+ StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
+ WatermarkHoldState<BoundedWindow> firstHold =
internals.state(StateNamespaces.window(null, first), firstHoldAddress);
firstHold.add(new Instant(22L));
- StateTag<Object, WatermarkHoldState> secondHoldAddress =
- StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
- WatermarkHoldState secondHold =
+ StateTag<Object, WatermarkHoldState<BoundedWindow>> secondHoldAddress =
+ StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
+ WatermarkHoldState<BoundedWindow> secondHold =
internals.state(StateNamespaces.window(null, second), secondHoldAddress);
secondHold.add(new Instant(2L));
@@ -544,18 +546,18 @@ public class CopyOnAccessInMemoryStateInternalsTest {
};
CopyOnAccessInMemoryStateInternals<String> underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
- StateTag<Object, WatermarkHoldState> firstHoldAddress =
- StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
- WatermarkHoldState firstHold =
+ StateTag<Object, WatermarkHoldState<BoundedWindow>> firstHoldAddress =
+ StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
+ WatermarkHoldState<BoundedWindow> firstHold =
underlying.state(StateNamespaces.window(null, first), firstHoldAddress);
firstHold.add(new Instant(22L));
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying("foo", underlying.commit());
- StateTag<Object, WatermarkHoldState> secondHoldAddress =
- StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
- WatermarkHoldState secondHold =
+ StateTag<Object, WatermarkHoldState<BoundedWindow>> secondHoldAddress =
+ StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
+ WatermarkHoldState<BoundedWindow> secondHold =
internals.state(StateNamespaces.window(null, second), secondHoldAddress);
secondHold.add(new Instant(244L));
@@ -581,18 +583,18 @@ public class CopyOnAccessInMemoryStateInternalsTest {
};
CopyOnAccessInMemoryStateInternals<String> underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
- StateTag<Object, WatermarkHoldState> firstHoldAddress =
- StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
- WatermarkHoldState firstHold =
+ StateTag<Object, WatermarkHoldState<BoundedWindow>> firstHoldAddress =
+ StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
+ WatermarkHoldState<BoundedWindow> firstHold =
underlying.state(StateNamespaces.window(null, first), firstHoldAddress);
firstHold.add(new Instant(224L));
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying("foo", underlying.commit());
- StateTag<Object, WatermarkHoldState> secondHoldAddress =
- StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
- WatermarkHoldState secondHold =
+ StateTag<Object, WatermarkHoldState<BoundedWindow>> secondHoldAddress =
+ StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
+ WatermarkHoldState<BoundedWindow> secondHold =
internals.state(StateNamespaces.window(null, second), secondHoldAddress);
secondHold.add(new Instant(24L));
@@ -608,7 +610,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
internals
.state(
StateNamespaces.global(),
- StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST))
+ StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp()))
.add(new Instant(1234L));
thrown.expect(IllegalStateException.class);
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java
index 7ee2f69..b904bfe 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java
@@ -29,8 +29,8 @@ import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
@@ -60,8 +60,8 @@ public class HashingFlinkCombineRunner<
@SuppressWarnings("unchecked")
- TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
- WindowFn<Object, W> windowFn = windowingStrategy.getWindowFn();
+ OutputTimeFn<? super BoundedWindow> outputTimeFn =
+ (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
// Flink Iterable can be iterated over only once.
List<WindowedValue<KV<K, InputT>>> inputs = new ArrayList<>();
@@ -87,21 +87,14 @@ public class HashingFlinkCombineRunner<
AccumT accumT = flinkCombiner.firstInput(key, currentValue.getValue().getValue(),
options, sideInputReader, singletonW);
Instant windowTimestamp =
- timestampCombiner.assign(
- mergedWindow, windowFn.getOutputTime(currentValue.getTimestamp(), mergedWindow));
+ outputTimeFn.assignOutputTime(currentValue.getTimestamp(), mergedWindow);
accumAndInstant = new Tuple2<>(accumT, windowTimestamp);
mapState.put(mergedWindow, accumAndInstant);
} else {
accumAndInstant.f0 = flinkCombiner.addInput(key, accumAndInstant.f0,
currentValue.getValue().getValue(), options, sideInputReader, singletonW);
- accumAndInstant.f1 =
- timestampCombiner.combine(
- accumAndInstant.f1,
- timestampCombiner.assign(
- mergedWindow,
- windowingStrategy
- .getWindowFn()
- .getOutputTime(currentValue.getTimestamp(), mergedWindow)));
+ accumAndInstant.f1 = outputTimeFn.combine(accumAndInstant.f1,
+ outputTimeFn.assignOutputTime(currentValue.getTimestamp(), mergedWindow));
}
}
if (iterator.hasNext()) {