You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2020/03/30 00:28:53 UTC

[beam] branch release-2.20.0 updated: [BEAM-9557] Fix timer window boundary checking (#11252)

This is an automated email from the ASF dual-hosted git repository.

amaliujia pushed a commit to branch release-2.20.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.20.0 by this push:
     new ef4b21b  [BEAM-9557] Fix timer window boundary checking (#11252)
ef4b21b is described below

commit ef4b21bb499379e956c554e4c9a1429135d39715
Author: Rui Wang <am...@users.noreply.github.com>
AuthorDate: Sat Mar 28 17:11:36 2020 -0700

    [BEAM-9557] Fix timer window boundary checking (#11252)
---
 .../apache/beam/runners/core/SimpleDoFnRunner.java | 27 +++++++++++++++++-----
 1 file changed, 21 insertions(+), 6 deletions(-)

diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 71efa12..fa5c695 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -1187,12 +1187,27 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
       }
 
       Instant windowExpiry = window.maxTimestamp().plus(allowedLateness);
-      checkArgument(
-          !target.isAfter(windowExpiry),
-          "Attempted to set event time timer that outputs for %s but that is"
-              + " after the expiration of window %s",
-          target,
-          windowExpiry);
+      if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
+        checkArgument(
+            !outputTimestamp.isAfter(target),
+            "Attempted to set an event time timer with an output timestamp of %s that is"
+                + " after the timer firing timestamp %s",
+            outputTimestamp,
+            target);
+        checkArgument(
+            !target.isAfter(windowExpiry),
+            "Attempted to set an event time timer with a firing timestamp of %s that is"
+                + " after the expiration of window %s",
+            target,
+            windowExpiry);
+      } else {
+        checkArgument(
+            !outputTimestamp.isAfter(windowExpiry),
+            "Attempted to set a processing-time timer with an output timestamp of %s that is"
+                + " after the expiration of window %s",
+            outputTimestamp,
+            windowExpiry);
+      }
     }
 
     /**