You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/04/13 19:57:59 UTC

[beam] branch master updated: [BEAM-9562] Fix output timestamp to be inferred from scheduled time when in the event time domain.

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 009578e  [BEAM-9562] Fix output timestamp to be inferred from scheduled time when in the event time domain.
     new a7f444f  Merge pull request #11402 from lukecwik/timers
009578e is described below

commit 009578e374523f5acd8d24543ef1ceec30542a95
Author: Luke Cwik <lc...@google.com>
AuthorDate: Mon Apr 13 11:37:46 2020 -0700

    [BEAM-9562] Fix output timestamp to be inferred from scheduled time when in the event time domain.
---
 .../main/java/org/apache/beam/sdk/state/Timer.java | 22 ++++++++
 .../apache/beam/fn/harness/FnApiDoFnRunner.java    | 64 +++++++++++++++++-----
 2 files changed, 73 insertions(+), 13 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
index f944469..437df4d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
@@ -50,12 +50,34 @@ public interface Timer {
    * <p>For {@link TimeDomain#PROCESSING_TIME}, the behavior is be unpredictable, since processing
    * time timers are ignored after a window has expired. Instead, it is recommended to use {@link
    * #setRelative()}.
+   *
+   * <p>If the {@link #withOutputTimestamp output timestamp} has not been explicitly set then the
+   * default output timestamp per {@link TimeDomain} is:
+   *
+   * <ul>
+   *   <li>{@link TimeDomain#EVENT_TIME}: the firing time of this new timer.
+   *   <li>{@link TimeDomain#PROCESSING_TIME}: current element's timestamp or current timer's output
+   *       timestamp.
+   *   <li>{@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME}: current element's timestamp or current
+   *       timer's output timestamp.
+   * </ul>
    */
   void set(Instant absoluteTime);
 
   /**
    * Sets the timer relative to the current time, according to any offset and alignment specified.
    * Using {@link #offset(Duration)} and {@link #align(Duration)}.
+   *
+   * <p>If the {@link #withOutputTimestamp output timestamp} has not been explicitly set then the
+   * default output timestamp per {@link TimeDomain} is:
+   *
+   * <ul>
+   *   <li>{@link TimeDomain#EVENT_TIME}: the firing time of this new timer.
+   *   <li>{@link TimeDomain#PROCESSING_TIME}: current element's timestamp or current timer's output
+   *       timestamp.
+   *   <li>{@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME}: current element's timestamp or current
+   *       timer's output timestamp.
+   * </ul>
    */
   void setRelative();
 
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 9d3193e..13b9c57 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -885,10 +885,11 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
     private final TimeDomain timeDomain;
     private final Duration allowedLateness;
     private final Instant fireTimestamp;
-    private Instant holdTimestamp;
+    private final Instant elementTimestampOrTimerHoldTimestamp;
     private final BoundedWindow boundedWindow;
     private final PaneInfo paneInfo;
 
+    private Instant outputTimestamp;
     private Duration period = Duration.ZERO;
     private Duration offset = Duration.ZERO;
 
@@ -897,13 +898,13 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
         K userKey,
         String dynamicTimerTag,
         BoundedWindow boundedWindow,
-        Instant initialHoldTimestamp,
+        Instant elementTimestampOrTimerHoldTimestamp,
         Instant elementTimestampOrTimerFireTimestamp,
         PaneInfo paneInfo) {
       this.timerId = timerId;
       this.userKey = userKey;
       this.dynamicTimerTag = dynamicTimerTag;
-      this.holdTimestamp = initialHoldTimestamp;
+      this.elementTimestampOrTimerHoldTimestamp = elementTimestampOrTimerHoldTimestamp;
       this.boundedWindow = boundedWindow;
       this.paneInfo = paneInfo;
 
@@ -990,15 +991,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
 
     @Override
     public org.apache.beam.sdk.state.Timer withOutputTimestamp(Instant outputTime) {
-      Instant windowExpiry = LateDataUtils.garbageCollectionTime(currentWindow, allowedLateness);
-      checkArgument(
-          !outputTime.isAfter(windowExpiry),
-          "Attempted to set timer with output timestamp %s but that is after"
-              + " the expiration of window %s",
-          outputTime,
-          windowExpiry);
-
-      this.holdTimestamp = outputTime;
+      this.outputTimestamp = outputTime;
       return this;
     }
     /**
@@ -1016,6 +1009,51 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
     }
 
     private void output(Instant scheduledTime) {
+      if (outputTimestamp != null) {
+        checkArgument(
+            !outputTimestamp.isBefore(elementTimestampOrTimerHoldTimestamp),
+            "output timestamp %s should be after input message timestamp or output timestamp of firing timers %s",
+            outputTimestamp,
+            elementTimestampOrTimerHoldTimestamp);
+      }
+
+      // Output timestamp is set to the delivery time if not initialized by an user.
+      if (outputTimestamp == null && TimeDomain.EVENT_TIME.equals(timeDomain)) {
+        outputTimestamp = scheduledTime;
+      }
+
+      // For processing timers
+      if (outputTimestamp == null) {
+        // For processing timers output timestamp will be:
+        // 1) timestamp of input element
+        // OR
+        // 2) hold timestamp of firing timer.
+        outputTimestamp = elementTimestampOrTimerHoldTimestamp;
+      }
+
+      Instant windowExpiry = LateDataUtils.garbageCollectionTime(currentWindow, allowedLateness);
+      if (TimeDomain.EVENT_TIME.equals(timeDomain)) {
+        checkArgument(
+            !outputTimestamp.isAfter(scheduledTime),
+            "Attempted to set an event-time timer with an output timestamp of %s that is"
+                + " after the timer firing timestamp %s",
+            outputTimestamp,
+            scheduledTime);
+        checkArgument(
+            !scheduledTime.isAfter(windowExpiry),
+            "Attempted to set an event-time timer with a firing timestamp of %s that is"
+                + " after the expiration of window %s",
+            scheduledTime,
+            windowExpiry);
+      } else {
+        checkArgument(
+            !outputTimestamp.isAfter(windowExpiry),
+            "Attempted to set a processing-time timer with an output timestamp of %s that is"
+                + " after the expiration of window %s",
+            outputTimestamp,
+            windowExpiry);
+      }
+
       TimerHandler<K> consumer = (TimerHandler) timerHandlers.get(timerId);
       try {
         consumer.accept(
@@ -1024,7 +1062,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
                 dynamicTimerTag,
                 Collections.singletonList(boundedWindow),
                 scheduledTime,
-                holdTimestamp,
+                outputTimestamp,
                 paneInfo));
       } catch (Throwable t) {
         throw UserCodeException.wrap(t);