You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2022/07/26 23:07:07 UTC
[beam] branch master updated: convert windmill min timestamp to beam min timestamp (#21915)
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3ad20d2acdc convert windmill min timestamp to beam min timestamp (#21915)
3ad20d2acdc is described below
commit 3ad20d2acdc43582bc307157f29bc015292590d7
Author: Naireen Hussain <na...@live.com>
AuthorDate: Tue Jul 26 16:07:00 2022 -0700
convert windmill min timestamp to beam min timestamp (#21915)
* convert windmill min timestamp to beam min timestamp
* convert windmill min timestamp to beam min timestamp
Co-authored-by: Naireen Hussain <na...@google.com>
---
.../runners/dataflow/worker/WindmillTimeUtils.java | 3 +++
.../dataflow/worker/WindmillTimeUtilsTest.java | 10 +++++++
.../worker/WindmillTimerInternalsTest.java | 31 +++++++++++++++++++---
3 files changed, 40 insertions(+), 4 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtils.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtils.java
index 8552c27d596..9732826bdd6 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtils.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtils.java
@@ -45,6 +45,9 @@ public class WindmillTimeUtils {
// Windmill should never send us an unknown timestamp.
Preconditions.checkArgument(timestampUs != Long.MIN_VALUE);
Instant result = new Instant(divideAndRoundDown(timestampUs, 1000));
+ if (result.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
+ return BoundedWindow.TIMESTAMP_MIN_VALUE;
+ }
if (result.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
// End of time.
return BoundedWindow.TIMESTAMP_MAX_VALUE;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtilsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtilsTest.java
index 5f910c3acb5..84e76d2f8bd 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtilsTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtilsTest.java
@@ -23,6 +23,7 @@ import static org.apache.beam.runners.dataflow.worker.WindmillTimeUtils.windmill
import static org.junit.Assert.assertEquals;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -56,6 +57,15 @@ public class WindmillTimeUtilsTest {
assertEquals(new Instant(-17), windmillToHarnessTimestamp(-16987));
assertEquals(new Instant(-17), windmillToHarnessTimestamp(-17000));
assertEquals(new Instant(-18), windmillToHarnessTimestamp(-17001));
+ assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE, windmillToHarnessTimestamp(Long.MIN_VALUE + 1));
+ assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE, windmillToHarnessTimestamp(Long.MIN_VALUE + 2));
+ // Long.MIN_VALUE = -9223372036854775808, need to add 1808 microseconds to get to next
+ // millisecond returned by Beam.
+ assertEquals(
+ BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1)),
+ windmillToHarnessTimestamp(Long.MIN_VALUE + 1808));
+ assertEquals(
+ BoundedWindow.TIMESTAMP_MIN_VALUE, windmillToHarnessTimestamp(Long.MIN_VALUE + 1807));
}
@Test
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java
index 2d222b534c7..8632034a9b2 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java
@@ -88,12 +88,22 @@ public class WindmillTimerInternalsTest {
TimerData.of(
namespace, timestamp, timestamp.minus(Duration.millis(1)), timeDomain));
for (TimerData timer : anonymousTimers) {
- assertThat(
+ Instant expectedTimestamp =
+ timer.getOutputTimestamp().isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)
+ ? BoundedWindow.TIMESTAMP_MIN_VALUE
+ : timer.getOutputTimestamp();
+ TimerData computed =
WindmillTimerInternals.windmillTimerToTimerData(
prefix,
WindmillTimerInternals.timerDataToWindmillTimer(stateFamily, prefix, timer),
- coder),
- equalTo(timer));
+ coder);
+ // The function itself bounds output, so we dont expect the original input as the
+ // output, we expect it to be bounded
+ TimerData expected =
+ TimerData.of(
+ timer.getNamespace(), timestamp, expectedTimestamp, timer.getDomain());
+
+ assertThat(computed, equalTo(expected));
}
for (String timerId : TEST_TIMER_IDS) {
@@ -117,13 +127,26 @@ public class WindmillTimerInternalsTest {
timeDomain));
for (TimerData timer : timers) {
+ Instant expectedTimestamp =
+ timer.getOutputTimestamp().isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)
+ ? BoundedWindow.TIMESTAMP_MIN_VALUE
+ : timer.getOutputTimestamp();
+
+ TimerData expected =
+ TimerData.of(
+ timer.getTimerId(),
+ timer.getTimerFamilyId(),
+ timer.getNamespace(),
+ timer.getTimestamp(),
+ expectedTimestamp,
+ timer.getDomain());
assertThat(
WindmillTimerInternals.windmillTimerToTimerData(
prefix,
WindmillTimerInternals.timerDataToWindmillTimer(
stateFamily, prefix, timer),
coder),
- equalTo(timer));
+ equalTo(expected));
}
}
}