You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/06/18 20:09:44 UTC
[beam] branch master updated: Merge pull request #21940 from [21941] Fix no output timestamp case
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new de5c56a5b8a Merge pull request #21940 from [21941] Fix no output timestamp case
de5c56a5b8a is described below
commit de5c56a5b8a8a030e7e67323a696d52495e37f7f
Author: Reuven Lax <re...@google.com>
AuthorDate: Sat Jun 18 13:09:38 2022 -0700
Merge pull request #21940 from [21941] Fix no output timestamp case
* fix no output timestamp case
* revert unnecessary file
---
.../dataflow/worker/WindmillTimerInternals.java | 21 +++++++++++++++++++--
1 file changed, 19 insertions(+), 2 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
index c1f89c679d5..b5dbcb436d8 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
@@ -53,6 +53,11 @@ import org.joda.time.Instant;
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
class WindmillTimerInternals implements TimerInternals {
+ private static final Instant OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE =
+ GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.millis(1));
+
+ private static final Instant OUTPUT_TIMESTAMP_MAX_VALUE =
+ BoundedWindow.TIMESTAMP_MAX_VALUE.plus(Duration.millis(1));
private static final String TIMER_HOLD_PREFIX = "/h";
// Map from timer id to its TimerData. If it is to be deleted, we still need
@@ -286,8 +291,14 @@ class WindmillTimerInternals implements TimerInternals {
builder.setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getTimestamp()));
// Store the output timestamp in the metadata timestamp.
- builder.setMetadataTimestamp(
- WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getOutputTimestamp()));
+ Instant outputTimestamp = timerData.getOutputTimestamp();
+ if (outputTimestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
+ // We can't encode any value larger than BoundedWindow.TIMESTAMP_MAX_VALUE, so use the end of
+ // the global window
+ // here instead.
+ outputTimestamp = OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE;
+ }
+ builder.setMetadataTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(outputTimestamp));
return builder;
}
@@ -375,7 +386,13 @@ class WindmillTimerInternals implements TimerInternals {
throw new RuntimeException(e);
}
} else if (timer.hasMetadataTimestamp()) {
+ // We use BoundedWindow.TIMESTAMP_MAX_VALUE+1 to indicate "no output timestamp" so make sure
+ // to change the upper
+ // bound.
outputTimestamp = WindmillTimeUtils.windmillToHarnessTimestamp(timer.getMetadataTimestamp());
+ if (outputTimestamp.equals(OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE)) {
+ outputTimestamp = OUTPUT_TIMESTAMP_MAX_VALUE;
+ }
}
StateNamespace namespace = StateNamespaces.fromString(namespaceString, windowCoder);