You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2020/03/30 00:28:53 UTC
[beam] branch release-2.20.0 updated: [BEAM-9557] Fix timer window
boundary checking (#11252)
This is an automated email from the ASF dual-hosted git repository.
amaliujia pushed a commit to branch release-2.20.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.20.0 by this push:
new ef4b21b [BEAM-9557] Fix timer window boundary checking (#11252)
ef4b21b is described below
commit ef4b21bb499379e956c554e4c9a1429135d39715
Author: Rui Wang <am...@users.noreply.github.com>
AuthorDate: Sat Mar 28 17:11:36 2020 -0700
[BEAM-9557] Fix timer window boundary checking (#11252)
---
.../apache/beam/runners/core/SimpleDoFnRunner.java | 27 +++++++++++++++++-----
1 file changed, 21 insertions(+), 6 deletions(-)
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 71efa12..fa5c695 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -1187,12 +1187,27 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
}
Instant windowExpiry = window.maxTimestamp().plus(allowedLateness);
- checkArgument(
- !target.isAfter(windowExpiry),
- "Attempted to set event time timer that outputs for %s but that is"
- + " after the expiration of window %s",
- target,
- windowExpiry);
+ if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
+ checkArgument(
+ !outputTimestamp.isAfter(target),
+ "Attempted to set an event time timer with an output timestamp of %s that is"
+ + " after the timer firing timestamp %s",
+ outputTimestamp,
+ target);
+ checkArgument(
+ !target.isAfter(windowExpiry),
+ "Attempted to set an event time timer with a firing timestamp of %s that is"
+ + " after the expiration of window %s",
+ target,
+ windowExpiry);
+ } else {
+ checkArgument(
+ !outputTimestamp.isAfter(windowExpiry),
+ "Attempted to set a processing-time timer with an output timestamp of %s that is"
+ + " after the expiration of window %s",
+ outputTimestamp,
+ windowExpiry);
+ }
}
/**