You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/06/18 20:09:44 UTC

[beam] branch master updated: Merge pull request #21940 from [21941] Fix no output timestamp case

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new de5c56a5b8a Merge pull request #21940 from [21941] Fix no output timestamp case
de5c56a5b8a is described below

commit de5c56a5b8a8a030e7e67323a696d52495e37f7f
Author: Reuven Lax <re...@google.com>
AuthorDate: Sat Jun 18 13:09:38 2022 -0700

    Merge pull request #21940 from [21941] Fix no output timestamp case
    
    * fix no output timestamp case
    
    * revert unnecessary file
---
 .../dataflow/worker/WindmillTimerInternals.java     | 21 +++++++++++++++++++--
 1 file changed, 19 insertions(+), 2 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
index c1f89c679d5..b5dbcb436d8 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
@@ -53,6 +53,11 @@ import org.joda.time.Instant;
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
 })
 class WindmillTimerInternals implements TimerInternals {
+  private static final Instant OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE =
+      GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.millis(1));
+
+  private static final Instant OUTPUT_TIMESTAMP_MAX_VALUE =
+      BoundedWindow.TIMESTAMP_MAX_VALUE.plus(Duration.millis(1));
 
   private static final String TIMER_HOLD_PREFIX = "/h";
   // Map from timer id to its TimerData. If it is to be deleted, we still need
@@ -286,8 +291,14 @@ class WindmillTimerInternals implements TimerInternals {
     builder.setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getTimestamp()));
 
     // Store the output timestamp in the metadata timestamp.
-    builder.setMetadataTimestamp(
-        WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getOutputTimestamp()));
+    Instant outputTimestamp = timerData.getOutputTimestamp();
+    if (outputTimestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
+      // We can't encode any value larger than BoundedWindow.TIMESTAMP_MAX_VALUE, so use the end of
+      // the global window
+      // here instead.
+      outputTimestamp = OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE;
+    }
+    builder.setMetadataTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(outputTimestamp));
     return builder;
   }
 
@@ -375,7 +386,13 @@ class WindmillTimerInternals implements TimerInternals {
         throw new RuntimeException(e);
       }
     } else if (timer.hasMetadataTimestamp()) {
+      // We use BoundedWindow.TIMESTAMP_MAX_VALUE+1 to indicate "no output timestamp" so make sure
+      // to change the upper
+      // bound.
       outputTimestamp = WindmillTimeUtils.windmillToHarnessTimestamp(timer.getMetadataTimestamp());
+      if (outputTimestamp.equals(OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE)) {
+        outputTimestamp = OUTPUT_TIMESTAMP_MAX_VALUE;
+      }
     }
 
     StateNamespace namespace = StateNamespaces.fromString(namespaceString, windowCoder);