You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2022/07/26 23:07:07 UTC

[beam] branch master updated: convert windmill min timestamp to beam min timestamp (#21915)

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ad20d2acdc convert windmill min timestamp to beam min timestamp (#21915)
3ad20d2acdc is described below

commit 3ad20d2acdc43582bc307157f29bc015292590d7
Author: Naireen Hussain <na...@live.com>
AuthorDate: Tue Jul 26 16:07:00 2022 -0700

    convert windmill min timestamp to beam min timestamp (#21915)
    
    * convert windmill min timestamp to beam min timestamp
    
    * convert windmill min timestamp to beam min timestamp
    
    Co-authored-by: Naireen Hussain <na...@google.com>
---
 .../runners/dataflow/worker/WindmillTimeUtils.java |  3 +++
 .../dataflow/worker/WindmillTimeUtilsTest.java     | 10 +++++++
 .../worker/WindmillTimerInternalsTest.java         | 31 +++++++++++++++++++---
 3 files changed, 40 insertions(+), 4 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtils.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtils.java
index 8552c27d596..9732826bdd6 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtils.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtils.java
@@ -45,6 +45,9 @@ public class WindmillTimeUtils {
     // Windmill should never send us an unknown timestamp.
     Preconditions.checkArgument(timestampUs != Long.MIN_VALUE);
     Instant result = new Instant(divideAndRoundDown(timestampUs, 1000));
+    if (result.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
+      return BoundedWindow.TIMESTAMP_MIN_VALUE;
+    }
     if (result.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
       // End of time.
       return BoundedWindow.TIMESTAMP_MAX_VALUE;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtilsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtilsTest.java
index 5f910c3acb5..84e76d2f8bd 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtilsTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtilsTest.java
@@ -23,6 +23,7 @@ import static org.apache.beam.runners.dataflow.worker.WindmillTimeUtils.windmill
 import static org.junit.Assert.assertEquals;
 
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -56,6 +57,15 @@ public class WindmillTimeUtilsTest {
     assertEquals(new Instant(-17), windmillToHarnessTimestamp(-16987));
     assertEquals(new Instant(-17), windmillToHarnessTimestamp(-17000));
     assertEquals(new Instant(-18), windmillToHarnessTimestamp(-17001));
+    assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE, windmillToHarnessTimestamp(Long.MIN_VALUE + 1));
+    assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE, windmillToHarnessTimestamp(Long.MIN_VALUE + 2));
+    // Long.MIN_VALUE = -9223372036854775808, need to add 1808 microseconds to get to next
+    // millisecond returned by Beam.
+    assertEquals(
+        BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1)),
+        windmillToHarnessTimestamp(Long.MIN_VALUE + 1808));
+    assertEquals(
+        BoundedWindow.TIMESTAMP_MIN_VALUE, windmillToHarnessTimestamp(Long.MIN_VALUE + 1807));
   }
 
   @Test
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java
index 2d222b534c7..8632034a9b2 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java
@@ -88,12 +88,22 @@ public class WindmillTimerInternalsTest {
                       TimerData.of(
                           namespace, timestamp, timestamp.minus(Duration.millis(1)), timeDomain));
               for (TimerData timer : anonymousTimers) {
-                assertThat(
+                Instant expectedTimestamp =
+                    timer.getOutputTimestamp().isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)
+                        ? BoundedWindow.TIMESTAMP_MIN_VALUE
+                        : timer.getOutputTimestamp();
+                TimerData computed =
                     WindmillTimerInternals.windmillTimerToTimerData(
                         prefix,
                         WindmillTimerInternals.timerDataToWindmillTimer(stateFamily, prefix, timer),
-                        coder),
-                    equalTo(timer));
+                        coder);
+                // The function itself bounds output, so we dont expect the original input as the
+                // output, we expect it to be bounded
+                TimerData expected =
+                    TimerData.of(
+                        timer.getNamespace(), timestamp, expectedTimestamp, timer.getDomain());
+
+                assertThat(computed, equalTo(expected));
               }
 
               for (String timerId : TEST_TIMER_IDS) {
@@ -117,13 +127,26 @@ public class WindmillTimerInternalsTest {
                             timeDomain));
 
                 for (TimerData timer : timers) {
+                  Instant expectedTimestamp =
+                      timer.getOutputTimestamp().isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)
+                          ? BoundedWindow.TIMESTAMP_MIN_VALUE
+                          : timer.getOutputTimestamp();
+
+                  TimerData expected =
+                      TimerData.of(
+                          timer.getTimerId(),
+                          timer.getTimerFamilyId(),
+                          timer.getNamespace(),
+                          timer.getTimestamp(),
+                          expectedTimestamp,
+                          timer.getDomain());
                   assertThat(
                       WindmillTimerInternals.windmillTimerToTimerData(
                           prefix,
                           WindmillTimerInternals.timerDataToWindmillTimer(
                               stateFamily, prefix, timer),
                           coder),
-                      equalTo(timer));
+                      equalTo(expected));
                 }
               }
             }