You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2022/09/21 17:12:35 UTC
[beam] branch master updated: [Java SDK core] emit watermark from PeriodicSequence (#23301) (#23302)
This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0510d88c4d0 [Java SDK core] emit watermark from PeriodicSequence (#23301) (#23302)
0510d88c4d0 is described below
commit 0510d88c4d0cc74dfe865fb9ae61456ac11651c8
Author: Jan Lukavský <je...@seznam.cz>
AuthorDate: Wed Sep 21 19:12:27 2022 +0200
[Java SDK core] emit watermark from PeriodicSequence (#23301) (#23302)
---
.../beam/sdk/transforms/PeriodicSequence.java | 20 +++++
.../beam/sdk/transforms/PeriodicSequenceTest.java | 85 ++++++++++++++++++----
2 files changed, 89 insertions(+), 16 deletions(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java
index a42c948b3a9..6f0aeac859c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java
@@ -25,8 +25,12 @@ import java.util.Objects;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -178,9 +182,22 @@ public class PeriodicSequence
return new OutputRangeTracker(restriction);
}
+ @GetInitialWatermarkEstimatorState
+ public Instant getInitialWatermarkState() {
+ return BoundedWindow.TIMESTAMP_MIN_VALUE;
+ }
+
+ @NewWatermarkEstimator
+ public WatermarkEstimator<Instant> newWatermarkEstimator(
+ @WatermarkEstimatorState Instant state) {
+
+ return new WatermarkEstimators.Manual(state);
+ }
+
@ProcessElement
public ProcessContinuation processElement(
@Element SequenceDefinition srcElement,
+ ManualWatermarkEstimator<Instant> estimator,
OutputReceiver<Instant> out,
RestrictionTracker<OffsetRange, Long> restrictionTracker) {
@@ -190,11 +207,14 @@ public class PeriodicSequence
boolean claimSuccess = true;
+ estimator.setWatermark(Instant.ofEpochMilli(restriction.getFrom()));
+
while (claimSuccess && Instant.ofEpochMilli(nextOutput).isBeforeNow()) {
claimSuccess = restrictionTracker.tryClaim(nextOutput);
if (claimSuccess) {
Instant output = Instant.ofEpochMilli(nextOutput);
out.outputWithTimestamp(output, output);
+ estimator.setWatermark(output);
nextOutput = nextOutput + interval;
}
}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicSequenceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicSequenceTest.java
index 3197c70337a..33ef3cd9cf0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicSequenceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicSequenceTest.java
@@ -17,7 +17,13 @@
*/
package org.apache.beam.sdk.transforms;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -25,8 +31,15 @@ import org.apache.beam.sdk.testing.UsesImpulse;
import org.apache.beam.sdk.testing.UsesStatefulParDo;
import org.apache.beam.sdk.testing.UsesUnboundedPCollections;
import org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Streams;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
@@ -40,11 +53,17 @@ import org.junit.runners.JUnit4;
public class PeriodicSequenceTest {
@Rule public transient TestPipeline p = TestPipeline.create();
- public static class ExtractTsDoFn<InputT> extends DoFn<InputT, KV<InputT, Instant>> {
+ public static class ExtractTsDoFn<InputT>
+ extends DoFn<InputT, TimestampedValue<KV<InputT, Instant>>> {
+
@ProcessElement
- public void processElement(DoFn<InputT, KV<InputT, Instant>>.ProcessContext c)
+ public void processElement(
+ @Element InputT element,
+ @Timestamp Instant ts,
+ OutputReceiver<TimestampedValue<KV<InputT, Instant>>> output)
throws Exception {
- c.output(KV.of(c.element(), c.timestamp()));
+
+ output.output(TimestampedValue.of(KV.of(element, ts), Instant.now()));
}
}
@@ -57,29 +76,63 @@ public class PeriodicSequenceTest {
UsesUnboundedSplittableParDo.class
})
public void testOutputsProperElements() {
- Instant instant = Instant.now();
-
- Instant startTime = instant.minus(Duration.standardHours(100));
- long duration = 500;
+ Instant startTime = Instant.now().plus(Duration.standardSeconds(2));
Duration interval = Duration.millis(250);
long intervalMillis = interval.getMillis();
+ long duration = 3 * intervalMillis;
Instant stopTime = startTime.plus(Duration.millis(duration));
- PCollection<KV<Instant, Instant>> result =
- p.apply(
- Create.<PeriodicSequence.SequenceDefinition>of(
- new PeriodicSequence.SequenceDefinition(startTime, stopTime, interval)))
+ PCollection<TimestampedValue<KV<Instant, Instant>>> result =
+ p.apply(Create.of(new PeriodicSequence.SequenceDefinition(startTime, stopTime, interval)))
.apply(PeriodicSequence.create())
- .apply(ParDo.of(new ExtractTsDoFn<>())); // used to validate timestamp
+ .apply(
+ Window.<Instant>into(FixedWindows.of(interval))
+ .withTimestampCombiner(TimestampCombiner.EARLIEST))
+ .apply(WithKeys.of("dummy"))
+ .apply(GroupByKey.create())
+ .apply(
+ MapElements.into(TypeDescriptor.of(Instant.class))
+ .via(e -> Iterables.getOnlyElement(e.getValue())))
+ .apply(ParDo.of(new ExtractTsDoFn<>())); // validate timestamps
- ArrayList<KV<Instant, Instant>> expectedResults =
- new ArrayList<>((int) (duration / intervalMillis + 1));
+ ArrayList<Instant> expectedResults = new ArrayList<>();
for (long i = 0; i <= duration; i += intervalMillis) {
Instant el = startTime.plus(Duration.millis(i));
- expectedResults.add(KV.of(el, el));
+ expectedResults.add(el);
}
- PAssert.that(result).containsInAnyOrder(expectedResults);
+ PAssert.that(result)
+ .satisfies(
+ values -> {
+ List<TimestampedValue<KV<Instant, Instant>>> sortedValues =
+ Streams.stream(values)
+ .sorted(Comparator.comparing(e -> e.getValue().getValue()))
+ .collect(Collectors.toList());
+
+ assertEquals(
+ expectedResults,
+ sortedValues.stream()
+ .map(e -> e.getValue().getValue())
+ .collect(Collectors.toList()));
+
+ Instant minTs =
+ sortedValues.stream()
+ .min(Comparator.comparing(TimestampedValue::getTimestamp))
+ .get()
+ .getTimestamp();
+ Instant maxTs =
+ sortedValues.stream()
+ .max(Comparator.comparing(TimestampedValue::getTimestamp))
+ .get()
+ .getTimestamp();
+ final long expectedDiff = intervalMillis / 2;
+ assertTrue(
+ String.format(
+ "Expected processing-time diff at least %d, got %d",
+ expectedDiff, maxTs.getMillis() - minTs.getMillis()),
+ maxTs.getMillis() - minTs.getMillis() > expectedDiff);
+ return null;
+ });
p.run().waitUntilFinish();
}