You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2022/06/15 17:24:58 UTC

[beam] branch master updated: convert windmill min timestamp to beam min timestamp

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 955177b8c38 convert windmill min timestamp to beam min timestamp
     new 21049601776 Merge pull request #21740 from Naireen/weird_timestamps
955177b8c38 is described below

commit 955177b8c38868e3dffad9ab4d7ee31dda06cc92
Author: Naireen Hussain <na...@google.com>
AuthorDate: Wed Jun 8 00:01:14 2022 +0000

    convert windmill min timestamp to beam min timestamp
---
 .../apache/beam/runners/dataflow/worker/WindmillTimeUtils.java |  3 +++
 .../beam/runners/dataflow/worker/WindmillTimeUtilsTest.java    | 10 ++++++++++
 2 files changed, 13 insertions(+)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtils.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtils.java
index 8552c27d596..9732826bdd6 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtils.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtils.java
@@ -45,6 +45,9 @@ public class WindmillTimeUtils {
     // Windmill should never send us an unknown timestamp.
     Preconditions.checkArgument(timestampUs != Long.MIN_VALUE);
     Instant result = new Instant(divideAndRoundDown(timestampUs, 1000));
+    if (result.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
+      return BoundedWindow.TIMESTAMP_MIN_VALUE;
+    }
     if (result.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
       // End of time.
       return BoundedWindow.TIMESTAMP_MAX_VALUE;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtilsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtilsTest.java
index 5f910c3acb5..84e76d2f8bd 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtilsTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtilsTest.java
@@ -23,6 +23,7 @@ import static org.apache.beam.runners.dataflow.worker.WindmillTimeUtils.windmill
 import static org.junit.Assert.assertEquals;
 
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -56,6 +57,15 @@ public class WindmillTimeUtilsTest {
     assertEquals(new Instant(-17), windmillToHarnessTimestamp(-16987));
     assertEquals(new Instant(-17), windmillToHarnessTimestamp(-17000));
     assertEquals(new Instant(-18), windmillToHarnessTimestamp(-17001));
+    assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE, windmillToHarnessTimestamp(Long.MIN_VALUE + 1));
+    assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE, windmillToHarnessTimestamp(Long.MIN_VALUE + 2));
+    // Long.MIN_VALUE = -9223372036854775808, need to add 1808 microseconds to get to next
+    // millisecond returned by Beam.
+    assertEquals(
+        BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1)),
+        windmillToHarnessTimestamp(Long.MIN_VALUE + 1808));
+    assertEquals(
+        BoundedWindow.TIMESTAMP_MIN_VALUE, windmillToHarnessTimestamp(Long.MIN_VALUE + 1807));
   }
 
   @Test