You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/04/13 19:57:59 UTC
[beam] branch master updated: [BEAM-9562] Fix output timestamp to
be inferred from scheduled time when in the event time domain.
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 009578e [BEAM-9562] Fix output timestamp to be inferred from scheduled time when in the event time domain.
new a7f444f Merge pull request #11402 from lukecwik/timers
009578e is described below
commit 009578e374523f5acd8d24543ef1ceec30542a95
Author: Luke Cwik <lc...@google.com>
AuthorDate: Mon Apr 13 11:37:46 2020 -0700
[BEAM-9562] Fix output timestamp to be inferred from scheduled time when in the event time domain.
---
.../main/java/org/apache/beam/sdk/state/Timer.java | 22 ++++++++
.../apache/beam/fn/harness/FnApiDoFnRunner.java | 64 +++++++++++++++++-----
2 files changed, 73 insertions(+), 13 deletions(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
index f944469..437df4d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
@@ -50,12 +50,34 @@ public interface Timer {
* <p>For {@link TimeDomain#PROCESSING_TIME}, the behavior is be unpredictable, since processing
* time timers are ignored after a window has expired. Instead, it is recommended to use {@link
* #setRelative()}.
+ *
+ * <p>If the {@link #withOutputTimestamp output timestamp} has not been explicitly set then the
+ * default output timestamp per {@link TimeDomain} is:
+ *
+ * <ul>
+ * <li>{@link TimeDomain#EVENT_TIME}: the firing time of this new timer.
+ * <li>{@link TimeDomain#PROCESSING_TIME}: current element's timestamp or current timer's output
+ * timestamp.
+ * <li>{@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME}: current element's timestamp or current
+ * timer's output timestamp.
+ * </ul>
*/
void set(Instant absoluteTime);
/**
* Sets the timer relative to the current time, according to any offset and alignment specified.
* Using {@link #offset(Duration)} and {@link #align(Duration)}.
+ *
+ * <p>If the {@link #withOutputTimestamp output timestamp} has not been explicitly set then the
+ * default output timestamp per {@link TimeDomain} is:
+ *
+ * <ul>
+ * <li>{@link TimeDomain#EVENT_TIME}: the firing time of this new timer.
+ * <li>{@link TimeDomain#PROCESSING_TIME}: current element's timestamp or current timer's output
+ * timestamp.
+ * <li>{@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME}: current element's timestamp or current
+ * timer's output timestamp.
+ * </ul>
*/
void setRelative();
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 9d3193e..13b9c57 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -885,10 +885,11 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
private final TimeDomain timeDomain;
private final Duration allowedLateness;
private final Instant fireTimestamp;
- private Instant holdTimestamp;
+ private final Instant elementTimestampOrTimerHoldTimestamp;
private final BoundedWindow boundedWindow;
private final PaneInfo paneInfo;
+ private Instant outputTimestamp;
private Duration period = Duration.ZERO;
private Duration offset = Duration.ZERO;
@@ -897,13 +898,13 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
K userKey,
String dynamicTimerTag,
BoundedWindow boundedWindow,
- Instant initialHoldTimestamp,
+ Instant elementTimestampOrTimerHoldTimestamp,
Instant elementTimestampOrTimerFireTimestamp,
PaneInfo paneInfo) {
this.timerId = timerId;
this.userKey = userKey;
this.dynamicTimerTag = dynamicTimerTag;
- this.holdTimestamp = initialHoldTimestamp;
+ this.elementTimestampOrTimerHoldTimestamp = elementTimestampOrTimerHoldTimestamp;
this.boundedWindow = boundedWindow;
this.paneInfo = paneInfo;
@@ -990,15 +991,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
@Override
public org.apache.beam.sdk.state.Timer withOutputTimestamp(Instant outputTime) {
- Instant windowExpiry = LateDataUtils.garbageCollectionTime(currentWindow, allowedLateness);
- checkArgument(
- !outputTime.isAfter(windowExpiry),
- "Attempted to set timer with output timestamp %s but that is after"
- + " the expiration of window %s",
- outputTime,
- windowExpiry);
-
- this.holdTimestamp = outputTime;
+ this.outputTimestamp = outputTime;
return this;
}
/**
@@ -1016,6 +1009,51 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
}
private void output(Instant scheduledTime) {
+ if (outputTimestamp != null) {
+ checkArgument(
+ !outputTimestamp.isBefore(elementTimestampOrTimerHoldTimestamp),
+ "output timestamp %s should be after input message timestamp or output timestamp of firing timers %s",
+ outputTimestamp,
+ elementTimestampOrTimerHoldTimestamp);
+ }
+
+ // Output timestamp is set to the delivery time if not initialized by an user.
+ if (outputTimestamp == null && TimeDomain.EVENT_TIME.equals(timeDomain)) {
+ outputTimestamp = scheduledTime;
+ }
+
+ // For processing timers
+ if (outputTimestamp == null) {
+ // For processing timers output timestamp will be:
+ // 1) timestamp of input element
+ // OR
+ // 2) hold timestamp of firing timer.
+ outputTimestamp = elementTimestampOrTimerHoldTimestamp;
+ }
+
+ Instant windowExpiry = LateDataUtils.garbageCollectionTime(currentWindow, allowedLateness);
+ if (TimeDomain.EVENT_TIME.equals(timeDomain)) {
+ checkArgument(
+ !outputTimestamp.isAfter(scheduledTime),
+ "Attempted to set an event-time timer with an output timestamp of %s that is"
+ + " after the timer firing timestamp %s",
+ outputTimestamp,
+ scheduledTime);
+ checkArgument(
+ !scheduledTime.isAfter(windowExpiry),
+ "Attempted to set an event-time timer with a firing timestamp of %s that is"
+ + " after the expiration of window %s",
+ scheduledTime,
+ windowExpiry);
+ } else {
+ checkArgument(
+ !outputTimestamp.isAfter(windowExpiry),
+ "Attempted to set a processing-time timer with an output timestamp of %s that is"
+ + " after the expiration of window %s",
+ outputTimestamp,
+ windowExpiry);
+ }
+
TimerHandler<K> consumer = (TimerHandler) timerHandlers.get(timerId);
try {
consumer.accept(
@@ -1024,7 +1062,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
dynamicTimerTag,
Collections.singletonList(boundedWindow),
scheduledTime,
- holdTimestamp,
+ outputTimestamp,
paneInfo));
} catch (Throwable t) {
throw UserCodeException.wrap(t);