You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2022/06/15 17:24:58 UTC
[beam] branch master updated: convert windmill min timestamp to beam min timestamp
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 955177b8c38 convert windmill min timestamp to beam min timestamp
new 21049601776 Merge pull request #21740 from Naireen/weird_timestamps
955177b8c38 is described below
commit 955177b8c38868e3dffad9ab4d7ee31dda06cc92
Author: Naireen Hussain <na...@google.com>
AuthorDate: Wed Jun 8 00:01:14 2022 +0000
convert windmill min timestamp to beam min timestamp
---
.../apache/beam/runners/dataflow/worker/WindmillTimeUtils.java | 3 +++
.../beam/runners/dataflow/worker/WindmillTimeUtilsTest.java | 10 ++++++++++
2 files changed, 13 insertions(+)
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtils.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtils.java
index 8552c27d596..9732826bdd6 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtils.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtils.java
@@ -45,6 +45,9 @@ public class WindmillTimeUtils {
// Windmill should never send us an unknown timestamp.
Preconditions.checkArgument(timestampUs != Long.MIN_VALUE);
Instant result = new Instant(divideAndRoundDown(timestampUs, 1000));
+ if (result.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
+ return BoundedWindow.TIMESTAMP_MIN_VALUE;
+ }
if (result.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
// End of time.
return BoundedWindow.TIMESTAMP_MAX_VALUE;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtilsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtilsTest.java
index 5f910c3acb5..84e76d2f8bd 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtilsTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtilsTest.java
@@ -23,6 +23,7 @@ import static org.apache.beam.runners.dataflow.worker.WindmillTimeUtils.windmill
import static org.junit.Assert.assertEquals;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -56,6 +57,15 @@ public class WindmillTimeUtilsTest {
assertEquals(new Instant(-17), windmillToHarnessTimestamp(-16987));
assertEquals(new Instant(-17), windmillToHarnessTimestamp(-17000));
assertEquals(new Instant(-18), windmillToHarnessTimestamp(-17001));
+ assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE, windmillToHarnessTimestamp(Long.MIN_VALUE + 1));
+ assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE, windmillToHarnessTimestamp(Long.MIN_VALUE + 2));
+ // Long.MIN_VALUE = -9223372036854775808, need to add 1808 microseconds to get to next
+ // millisecond returned by Beam.
+ assertEquals(
+ BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1)),
+ windmillToHarnessTimestamp(Long.MIN_VALUE + 1808));
+ assertEquals(
+ BoundedWindow.TIMESTAMP_MIN_VALUE, windmillToHarnessTimestamp(Long.MIN_VALUE + 1807));
}
@Test