You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2020/03/29 00:11:53 UTC
[beam] branch master updated: [BEAM-9557] Fix timer window boundary
checking (#11252)
This is an automated email from the ASF dual-hosted git repository.
amaliujia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b58ba45 [BEAM-9557] Fix timer window boundary checking (#11252)
b58ba45 is described below
commit b58ba45a6b79341ad1a1a663efa7a18a413f05a7
Author: Rui Wang <am...@users.noreply.github.com>
AuthorDate: Sat Mar 28 17:11:36 2020 -0700
[BEAM-9557] Fix timer window boundary checking (#11252)
---
.../apache/beam/runners/core/SimpleDoFnRunner.java | 27 +++++++++++++++++-----
1 file changed, 21 insertions(+), 6 deletions(-)
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 75f9e14..0b98a51 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -987,12 +987,27 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
}
Instant windowExpiry = window.maxTimestamp().plus(allowedLateness);
- checkArgument(
- !target.isAfter(windowExpiry),
- "Attempted to set event time timer that outputs for %s but that is"
- + " after the expiration of window %s",
- target,
- windowExpiry);
+ if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
+ checkArgument(
+ !outputTimestamp.isAfter(target),
+ "Attempted to set an event time timer with an output timestamp of %s that is"
+ + " after the timer firing timestamp %s",
+ outputTimestamp,
+ target);
+ checkArgument(
+ !target.isAfter(windowExpiry),
+ "Attempted to set an event time timer with a firing timestamp of %s that is"
+ + " after the expiration of window %s",
+ target,
+ windowExpiry);
+ } else {
+ checkArgument(
+ !outputTimestamp.isAfter(windowExpiry),
+ "Attempted to set a processing-time timer with an output timestamp of %s that is"
+ + " after the expiration of window %s",
+ outputTimestamp,
+ windowExpiry);
+ }
}
/**