You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2020/03/29 00:11:53 UTC

[beam] branch master updated: [BEAM-9557] Fix timer window boundary checking (#11252)

This is an automated email from the ASF dual-hosted git repository.

amaliujia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b58ba45  [BEAM-9557] Fix timer window boundary checking (#11252)
b58ba45 is described below

commit b58ba45a6b79341ad1a1a663efa7a18a413f05a7
Author: Rui Wang <am...@users.noreply.github.com>
AuthorDate: Sat Mar 28 17:11:36 2020 -0700

    [BEAM-9557] Fix timer window boundary checking (#11252)
---
 .../apache/beam/runners/core/SimpleDoFnRunner.java | 27 +++++++++++++++++-----
 1 file changed, 21 insertions(+), 6 deletions(-)

diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 75f9e14..0b98a51 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -987,12 +987,27 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
       }
 
       Instant windowExpiry = window.maxTimestamp().plus(allowedLateness);
-      checkArgument(
-          !target.isAfter(windowExpiry),
-          "Attempted to set event time timer that outputs for %s but that is"
-              + " after the expiration of window %s",
-          target,
-          windowExpiry);
+      if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
+        checkArgument(
+            !outputTimestamp.isAfter(target),
+            "Attempted to set an event time timer with an output timestamp of %s that is"
+                + " after the timer firing timestamp %s",
+            outputTimestamp,
+            target);
+        checkArgument(
+            !target.isAfter(windowExpiry),
+            "Attempted to set an event time timer with a firing timestamp of %s that is"
+                + " after the expiration of window %s",
+            target,
+            windowExpiry);
+      } else {
+        checkArgument(
+            !outputTimestamp.isAfter(windowExpiry),
+            "Attempted to set a processing-time timer with an output timestamp of %s that is"
+                + " after the expiration of window %s",
+            outputTimestamp,
+            windowExpiry);
+      }
     }
 
     /**