You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2020/02/09 07:55:39 UTC
[beam] branch master updated: Merge pull request #10627:[BEAM-2535]
Support outputTimestamp and watermark holds in processing timers.
This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a005fd7 Merge pull request #10627:[BEAM-2535] Support outputTimestamp and watermark holds in processing timers.
a005fd7 is described below
commit a005fd765a762183ca88df90f261f6d4a20cf3e0
Author: Rehman <re...@gmail.com>
AuthorDate: Sun Feb 9 12:55:27 2020 +0500
Merge pull request #10627:[BEAM-2535] Support outputTimestamp and watermark holds in processing timers.
---
.../apache/beam/runners/core/SimpleDoFnRunner.java | 80 +++++++++++++-----
.../beam/runners/direct/WatermarkManager.java | 47 +++++++++--
.../beam/runners/direct/WatermarkManagerTest.java | 5 +-
.../org/apache/beam/sdk/transforms/ParDoTest.java | 97 +++++++++++++++++++++-
.../apache/beam/fn/harness/FnApiDoFnRunner.java | 1 -
5 files changed, 199 insertions(+), 31 deletions(-)
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index aaae986..98d8e04 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -762,7 +762,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
try {
TimerSpec spec = (TimerSpec) signature.timerDeclarations().get(timerId).field().get(fn);
return new TimerInternalsTimer(
- window(), getNamespace(), timerId, spec, stepContext.timerInternals());
+ window(), getNamespace(), timerId, spec, timestamp(), stepContext.timerInternals());
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
@@ -774,7 +774,12 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
TimerSpec spec =
(TimerSpec) signature.timerFamilyDeclarations().get(timerFamilyId).field().get(fn);
return new TimerInternalsTimerMap(
- timerFamilyId, window(), getNamespace(), spec, stepContext.timerInternals());
+ timerFamilyId,
+ window(),
+ getNamespace(),
+ spec,
+ timestamp(),
+ stepContext.timerInternals());
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
@@ -949,7 +954,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
try {
TimerSpec spec = (TimerSpec) signature.timerDeclarations().get(timerId).field().get(fn);
return new TimerInternalsTimer(
- window, getNamespace(), timerId, spec, stepContext.timerInternals());
+ window, getNamespace(), timerId, spec, timestamp(), stepContext.timerInternals());
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
@@ -961,7 +966,12 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
TimerSpec spec =
(TimerSpec) signature.timerFamilyDeclarations().get(timerFamilyId).field().get(fn);
return new TimerInternalsTimerMap(
- timerFamilyId, window(), getNamespace(), spec, stepContext.timerInternals());
+ timerFamilyId,
+ window(),
+ getNamespace(),
+ spec,
+ timestamp(),
+ stepContext.timerInternals());
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
@@ -1006,6 +1016,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
private final TimerSpec spec;
private Instant target;
private Instant outputTimestamp;
+ private final Instant elementInputTimestamp;
private Duration period = Duration.ZERO;
private Duration offset = Duration.ZERO;
@@ -1014,12 +1025,14 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
StateNamespace namespace,
String timerId,
TimerSpec spec,
+ Instant elementInputTimestamp,
TimerInternals timerInternals) {
this.window = window;
this.namespace = namespace;
this.timerId = timerId;
this.timerFamilyId = "";
this.spec = spec;
+ this.elementInputTimestamp = elementInputTimestamp;
this.timerInternals = timerInternals;
}
@@ -1029,12 +1042,14 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
String timerId,
String timerFamilyId,
TimerSpec spec,
+ Instant elementInputTimestamp,
TimerInternals timerInternals) {
this.window = window;
this.namespace = namespace;
this.timerId = timerId;
this.timerFamilyId = timerFamilyId;
this.spec = spec;
+ this.elementInputTimestamp = elementInputTimestamp;
this.timerInternals = timerInternals;
}
@@ -1111,24 +1126,35 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
* </ul>
*/
private void setAndVerifyOutputTimestamp() {
- // Output timestamp is currently not supported in processing time timers.
- if (outputTimestamp != null && !TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
- throw new IllegalStateException("Cannot set outputTimestamp in processing time domain.");
+
+ if (outputTimestamp != null) {
+ checkArgument(
+ !outputTimestamp.isBefore(elementInputTimestamp),
+ "output timestamp %s should be after input message timestamp or output timestamp of firing timers %s",
+ outputTimestamp,
+ elementInputTimestamp);
}
+
// Output timestamp is set to the delivery time if not initialized by an user.
- if (outputTimestamp == null) {
+ if (outputTimestamp == null && TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
outputTimestamp = target;
}
-
- if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
- Instant windowExpiry = window.maxTimestamp().plus(allowedLateness);
- checkArgument(
- !target.isAfter(windowExpiry),
- "Attempted to set event time timer that outputs for %s but that is"
- + " after the expiration of window %s",
- target,
- windowExpiry);
+ // For processing timers
+ if (outputTimestamp == null) {
+ // For processing timers output timestamp will be:
+ // 1) timestamp of input element
+ // OR
+ // 2) output timestamp of firing timer.
+ outputTimestamp = elementInputTimestamp;
}
+
+ Instant windowExpiry = window.maxTimestamp().plus(allowedLateness);
+ checkArgument(
+ !target.isAfter(windowExpiry),
+ "Attempted to set event time timer that outputs for %s but that is"
+ + " after the expiration of window %s",
+ target,
+ windowExpiry);
}
/**
@@ -1163,6 +1189,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
private final BoundedWindow window;
private final StateNamespace namespace;
private final TimerSpec spec;
+ private final Instant elementInputTimestamp;
private final String timerFamilyId;
public TimerInternalsTimerMap(
@@ -1170,10 +1197,12 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
BoundedWindow window,
StateNamespace namespace,
TimerSpec spec,
+ Instant elementInputTimestamp,
TimerInternals timerInternals) {
this.window = window;
this.namespace = namespace;
this.spec = spec;
+ this.elementInputTimestamp = elementInputTimestamp;
this.timerInternals = timerInternals;
this.timerFamilyId = timerFamilyId;
}
@@ -1181,7 +1210,14 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
@Override
public void set(String timerId, Instant absoluteTime) {
Timer timer =
- new TimerInternalsTimer(window, namespace, timerId, timerFamilyId, spec, timerInternals);
+ new TimerInternalsTimer(
+ window,
+ namespace,
+ timerId,
+ timerFamilyId,
+ spec,
+ elementInputTimestamp,
+ timerInternals);
timer.set(absoluteTime);
timers.put(timerId, timer);
}
@@ -1191,7 +1227,13 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
if (timers.get(timerId) == null) {
Timer timer =
new TimerInternalsTimer(
- window, namespace, timerId, timerFamilyId, spec, timerInternals);
+ window,
+ namespace,
+ timerId,
+ timerFamilyId,
+ spec,
+ elementInputTimestamp,
+ timerInternals);
timers.put(timerId, timer);
}
return timers.get(timerId);
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index 265ebb1..35cfd23 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -327,10 +327,19 @@ public class WatermarkManager<ExecutableT, CollectionT> {
if (pendingTimers.isEmpty()) {
return BoundedWindow.TIMESTAMP_MAX_VALUE;
} else {
- return pendingTimers.firstEntry().getElement().getOutputTimestamp();
+ return getMinimumOutputTimestamp(pendingTimers);
}
}
+ private Instant getMinimumOutputTimestamp(SortedMultiset<TimerData> timers) {
+ Instant minimumOutputTimestamp = timers.firstEntry().getElement().getOutputTimestamp();
+ for (TimerData timerData : timers) {
+ minimumOutputTimestamp =
+ INSTANT_ORDERING.min(timerData.getOutputTimestamp(), minimumOutputTimestamp);
+ }
+ return minimumOutputTimestamp;
+ }
+
@VisibleForTesting
synchronized void updateTimers(TimerUpdate update) {
NavigableSet<TimerData> keyTimers =
@@ -597,20 +606,29 @@ public class WatermarkManager<ExecutableT, CollectionT> {
Instant earliest = THE_END_OF_TIME.get();
for (NavigableSet<TimerData> timers : processingTimers.values()) {
if (!timers.isEmpty()) {
- earliest = INSTANT_ORDERING.min(timers.first().getTimestamp(), earliest);
+ earliest = INSTANT_ORDERING.min(getMinimumOutputTimestamp(timers), earliest);
}
}
for (NavigableSet<TimerData> timers : synchronizedProcessingTimers.values()) {
if (!timers.isEmpty()) {
- earliest = INSTANT_ORDERING.min(timers.first().getTimestamp(), earliest);
+ earliest = INSTANT_ORDERING.min(getMinimumOutputTimestamp(timers), earliest);
}
}
if (!pendingTimers.isEmpty()) {
- earliest = INSTANT_ORDERING.min(pendingTimers.first().getTimestamp(), earliest);
+ earliest = INSTANT_ORDERING.min(getMinimumOutputTimestamp(pendingTimers), earliest);
}
return earliest;
}
+ private Instant getMinimumOutputTimestamp(NavigableSet<TimerData> timers) {
+ Instant minimumOutputTimestamp = timers.first().getOutputTimestamp();
+ for (TimerData timerData : timers) {
+ minimumOutputTimestamp =
+ INSTANT_ORDERING.min(timerData.getOutputTimestamp(), minimumOutputTimestamp);
+ }
+ return minimumOutputTimestamp;
+ }
+
private synchronized void updateTimers(TimerUpdate update) {
Map<TimeDomain, NavigableSet<TimerData>> timerMap = timerMap(update.key);
Table<StateNamespace, String, TimerData> existingTimersForKey =
@@ -738,15 +756,25 @@ public class WatermarkManager<ExecutableT, CollectionT> {
private final String name;
private final SynchronizedProcessingTimeInputWatermark inputWm;
+ private final PerKeyHolds holds;
private AtomicReference<Instant> latestRefresh;
public SynchronizedProcessingTimeOutputWatermark(
String name, SynchronizedProcessingTimeInputWatermark inputWm) {
this.name = name;
this.inputWm = inputWm;
+ holds = new PerKeyHolds();
this.latestRefresh = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
}
+ public synchronized void updateHold(Object key, Instant newHold) {
+ if (newHold == null) {
+ holds.removeHold(key);
+ } else {
+ holds.updateHold(key, newHold);
+ }
+ }
+
@Override
public String getName() {
return name;
@@ -780,7 +808,8 @@ public class WatermarkManager<ExecutableT, CollectionT> {
// downstream timers to.
Instant oldRefresh = latestRefresh.get();
Instant newTimestamp =
- INSTANT_ORDERING.min(inputWm.get(), inputWm.getEarliestTimerTimestamp());
+ INSTANT_ORDERING.min(
+ inputWm.get(), holds.getMinHold(), inputWm.getEarliestTimerTimestamp());
latestRefresh.set(newTimestamp);
return updateAndTrace(getName(), oldRefresh, newTimestamp);
}
@@ -788,6 +817,7 @@ public class WatermarkManager<ExecutableT, CollectionT> {
@Override
public synchronized String toString() {
return MoreObjects.toStringHelper(SynchronizedProcessingTimeOutputWatermark.class)
+ .add("holds", holds)
.add("latestRefresh", latestRefresh)
.toString();
}
@@ -1133,6 +1163,9 @@ public class WatermarkManager<ExecutableT, CollectionT> {
TransformWatermarks transformWms = transformToWatermarks.get(executable);
transformWms.setEventTimeHold(
inputBundle == null ? null : inputBundle.getKey(), pending.getEarliestHold());
+
+ transformWms.setSynchronizedProcessingTimeHold(
+ inputBundle == null ? null : inputBundle.getKey(), pending.getEarliestHold());
}
/**
@@ -1438,6 +1471,10 @@ public class WatermarkManager<ExecutableT, CollectionT> {
outputWatermark.updateHold(key, newHold);
}
+ private void setSynchronizedProcessingTimeHold(Object key, Instant newHold) {
+ synchronizedProcessingOutputWatermark.updateHold(key, newHold);
+ }
+
private void removePending(Bundle<?, ?> bundle) {
inputWatermark.removePending(bundle);
synchronizedProcessingInputWatermark.removePending(bundle);
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index 5e9cfc2..eef43c7 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -1052,10 +1052,9 @@ public class WatermarkManagerTest implements Serializable {
Collections.emptyList(),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
- assertThat(filteredDoubledWms.getSynchronizedProcessingOutputTime(), equalTo(clock.now()));
clock.set(new Instant(Long.MAX_VALUE));
- assertThat(filteredWms.getSynchronizedProcessingOutputTime(), equalTo(new Instant(4096)));
+
assertThat(
filteredDoubledWms.getSynchronizedProcessingOutputTime(),
not(greaterThan(new Instant(4096))));
@@ -1161,7 +1160,7 @@ public class WatermarkManagerTest implements Serializable {
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
- assertThat(downstreamWms.getSynchronizedProcessingInputTime(), not(lessThan(clock.now())));
+ assertThat(downstreamWms.getSynchronizedProcessingInputTime(), not(greaterThan(clock.now())));
}
@Test
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 65f7003..6bcb1fe 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -97,6 +97,7 @@ import org.apache.beam.sdk.testing.UsesSideInputsWithDifferentCoders;
import org.apache.beam.sdk.testing.UsesStatefulParDo;
import org.apache.beam.sdk.testing.UsesStrictTimerOrdering;
import org.apache.beam.sdk.testing.UsesTestStream;
+import org.apache.beam.sdk.testing.UsesTestStreamWithOutputTimestamp;
import org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime;
import org.apache.beam.sdk.testing.UsesTimerMap;
import org.apache.beam.sdk.testing.UsesTimersInParDo;
@@ -3936,8 +3937,12 @@ public class ParDoTest implements Serializable {
@ProcessElement
public void processElement(
- @TimerId(timerId) Timer timer, OutputReceiver<KV<String, Long>> o) {
- timer.withOutputTimestamp(new Instant(5)).set(new Instant(10));
+ @TimerId(timerId) Timer timer,
+ @Timestamp Instant timestamp,
+ OutputReceiver<KV<String, Long>> o) {
+ timer
+ .withOutputTimestamp(timestamp.plus(Duration.millis(5)))
+ .set(timestamp.plus(Duration.millis(10)));
// Output a message. This will cause the next DoFn to set a timer as well.
o.output(KV.of("foo", 100L));
}
@@ -3958,6 +3963,7 @@ public class ParDoTest implements Serializable {
@ProcessElement
public void processElement(
@TimerId(timerId) Timer timer,
+ @Timestamp Instant timestamp,
@StateId("timerFired") ValueState<Boolean> timerFiredState) {
Boolean timerFired = timerFiredState.read();
assertTrue(timerFired == null || !timerFired);
@@ -3966,7 +3972,7 @@ public class ParDoTest implements Serializable {
// DoFn timer's watermark hold. This timer should not fire until the previous timer
// fires and removes
// the watermark hold.
- timer.set(new Instant(8));
+ timer.set(timestamp.plus(Duration.millis(8)));
}
@OnTimer(timerId)
@@ -3996,6 +4002,91 @@ public class ParDoTest implements Serializable {
pipeline.run();
}
+ @Test
+ @Category({
+ ValidatesRunner.class,
+ UsesStatefulParDo.class,
+ UsesTimersInParDo.class,
+ UsesTestStreamWithProcessingTime.class,
+ UsesTestStreamWithOutputTimestamp.class
+ })
+ public void testOutputTimestampWithProcessingTime() {
+ final String timerId = "foo";
+ DoFn<KV<String, Integer>, KV<String, Integer>> fn1 =
+ new DoFn<KV<String, Integer>, KV<String, Integer>>() {
+
+ @TimerId(timerId)
+ private final TimerSpec timer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+ @ProcessElement
+ public void processElement(
+ @TimerId(timerId) Timer timer,
+ @Timestamp Instant timestamp,
+ OutputReceiver<KV<String, Integer>> o) {
+ timer
+ .withOutputTimestamp(timestamp.plus(Duration.standardSeconds(5)))
+ .offset(Duration.standardSeconds(10))
+ .setRelative();
+ // Output a message. This will cause the next DoFn to set a timer as well.
+ o.output(KV.of("foo", 100));
+ }
+
+ @OnTimer(timerId)
+ public void onTimer(OnTimerContext c, BoundedWindow w) {}
+ };
+
+ DoFn<KV<String, Integer>, Integer> fn2 =
+ new DoFn<KV<String, Integer>, Integer>() {
+
+ @TimerId(timerId)
+ private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @StateId("timerFired")
+ final StateSpec<ValueState<Boolean>> timerFiredState = StateSpecs.value();
+
+ @ProcessElement
+ public void processElement(
+ @TimerId(timerId) Timer timer,
+ @StateId("timerFired") ValueState<Boolean> timerFiredState) {
+ Boolean timerFired = timerFiredState.read();
+ assertTrue(timerFired == null || !timerFired);
+ // Set a timer to 8. This is earlier than the previous DoFn's timer, but after the
+ // previous
+ // DoFn timer's watermark hold. This timer should not fire until the previous timer
+ // fires and removes
+ // the watermark hold.
+ timer.set(new Instant(8));
+ }
+
+ @OnTimer(timerId)
+ public void onTimer(
+ @StateId("timerFired") ValueState<Boolean> timerFiredState,
+ OutputReceiver<Integer> o) {
+ timerFiredState.write(true);
+ o.output(100);
+ }
+ };
+
+ TestStream<KV<String, Integer>> stream =
+ TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+ .advanceProcessingTime(Duration.standardSeconds(1))
+ // Cause fn2 to set a timer.
+ .addElements(KV.of("key", 1))
+ // Normally this would case fn2's timer to expire, but it shouldn't here because of
+ // the output timestamp.
+ .advanceProcessingTime(Duration.standardSeconds(9))
+ .advanceWatermarkTo(new Instant(11))
+ // If the timer fired, then this would case fn2 to fail with an assertion error.
+ .addElements(KV.of("key", 1))
+ .advanceProcessingTime(Duration.standardSeconds(100))
+ .advanceWatermarkToInfinity();
+ PCollection<Integer> output =
+ pipeline.apply(stream).apply("first", ParDo.of(fn1)).apply("second", ParDo.of(fn2));
+
+ PAssert.that(output).containsInAnyOrder(100); // result output
+ pipeline.run();
+ }
+
private static class TwoTimerTest extends PTransform<PBegin, PDone> {
private static PTransform<PBegin, PDone> of(
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 70d0d0a..35e08a0 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -916,7 +916,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, OutputT> {
this.currentOutputTimestamp = outputTime;
return this;
}
-
/**
* For event time timers the target time should be prior to window GC time. So it returns
* min(time to set, GC Time of window).