You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2022/09/21 17:12:35 UTC

[beam] branch master updated: [Java SDK core] emit watermark from PeriodicSequence (#23301) (#23302)

This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0510d88c4d0 [Java SDK core] emit watermark from PeriodicSequence (#23301) (#23302)
0510d88c4d0 is described below

commit 0510d88c4d0cc74dfe865fb9ae61456ac11651c8
Author: Jan Lukavský <je...@seznam.cz>
AuthorDate: Wed Sep 21 19:12:27 2022 +0200

    [Java SDK core] emit watermark from PeriodicSequence (#23301) (#23302)
---
 .../beam/sdk/transforms/PeriodicSequence.java      | 20 +++++
 .../beam/sdk/transforms/PeriodicSequenceTest.java  | 85 ++++++++++++++++++----
 2 files changed, 89 insertions(+), 16 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java
index a42c948b3a9..6f0aeac859c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java
@@ -25,8 +25,12 @@ import java.util.Objects;
 import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.schemas.JavaFieldSchema;
 import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
 import org.checkerframework.checker.nullness.qual.Nullable;
@@ -178,9 +182,22 @@ public class PeriodicSequence
       return new OutputRangeTracker(restriction);
     }
 
+    @GetInitialWatermarkEstimatorState
+    public Instant getInitialWatermarkState() {
+      return BoundedWindow.TIMESTAMP_MIN_VALUE;
+    }
+
+    @NewWatermarkEstimator
+    public WatermarkEstimator<Instant> newWatermarkEstimator(
+        @WatermarkEstimatorState Instant state) {
+
+      return new WatermarkEstimators.Manual(state);
+    }
+
     @ProcessElement
     public ProcessContinuation processElement(
         @Element SequenceDefinition srcElement,
+        ManualWatermarkEstimator<Instant> estimator,
         OutputReceiver<Instant> out,
         RestrictionTracker<OffsetRange, Long> restrictionTracker) {
 
@@ -190,11 +207,14 @@ public class PeriodicSequence
 
       boolean claimSuccess = true;
 
+      estimator.setWatermark(Instant.ofEpochMilli(restriction.getFrom()));
+
       while (claimSuccess && Instant.ofEpochMilli(nextOutput).isBeforeNow()) {
         claimSuccess = restrictionTracker.tryClaim(nextOutput);
         if (claimSuccess) {
           Instant output = Instant.ofEpochMilli(nextOutput);
           out.outputWithTimestamp(output, output);
+          estimator.setWatermark(output);
           nextOutput = nextOutput + interval;
         }
       }
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicSequenceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicSequenceTest.java
index 3197c70337a..33ef3cd9cf0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicSequenceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicSequenceTest.java
@@ -17,7 +17,13 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -25,8 +31,15 @@ import org.apache.beam.sdk.testing.UsesImpulse;
 import org.apache.beam.sdk.testing.UsesStatefulParDo;
 import org.apache.beam.sdk.testing.UsesUnboundedPCollections;
 import org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Streams;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Rule;
@@ -40,11 +53,17 @@ import org.junit.runners.JUnit4;
 public class PeriodicSequenceTest {
   @Rule public transient TestPipeline p = TestPipeline.create();
 
-  public static class ExtractTsDoFn<InputT> extends DoFn<InputT, KV<InputT, Instant>> {
+  public static class ExtractTsDoFn<InputT>
+      extends DoFn<InputT, TimestampedValue<KV<InputT, Instant>>> {
+
     @ProcessElement
-    public void processElement(DoFn<InputT, KV<InputT, Instant>>.ProcessContext c)
+    public void processElement(
+        @Element InputT element,
+        @Timestamp Instant ts,
+        OutputReceiver<TimestampedValue<KV<InputT, Instant>>> output)
         throws Exception {
-      c.output(KV.of(c.element(), c.timestamp()));
+
+      output.output(TimestampedValue.of(KV.of(element, ts), Instant.now()));
     }
   }
 
@@ -57,29 +76,63 @@ public class PeriodicSequenceTest {
     UsesUnboundedSplittableParDo.class
   })
   public void testOutputsProperElements() {
-    Instant instant = Instant.now();
-
-    Instant startTime = instant.minus(Duration.standardHours(100));
-    long duration = 500;
+    Instant startTime = Instant.now().plus(Duration.standardSeconds(2));
     Duration interval = Duration.millis(250);
     long intervalMillis = interval.getMillis();
+    long duration = 3 * intervalMillis;
     Instant stopTime = startTime.plus(Duration.millis(duration));
 
-    PCollection<KV<Instant, Instant>> result =
-        p.apply(
-                Create.<PeriodicSequence.SequenceDefinition>of(
-                    new PeriodicSequence.SequenceDefinition(startTime, stopTime, interval)))
+    PCollection<TimestampedValue<KV<Instant, Instant>>> result =
+        p.apply(Create.of(new PeriodicSequence.SequenceDefinition(startTime, stopTime, interval)))
             .apply(PeriodicSequence.create())
-            .apply(ParDo.of(new ExtractTsDoFn<>())); // used to validate timestamp
+            .apply(
+                Window.<Instant>into(FixedWindows.of(interval))
+                    .withTimestampCombiner(TimestampCombiner.EARLIEST))
+            .apply(WithKeys.of("dummy"))
+            .apply(GroupByKey.create())
+            .apply(
+                MapElements.into(TypeDescriptor.of(Instant.class))
+                    .via(e -> Iterables.getOnlyElement(e.getValue())))
+            .apply(ParDo.of(new ExtractTsDoFn<>())); // validate timestamps
 
-    ArrayList<KV<Instant, Instant>> expectedResults =
-        new ArrayList<>((int) (duration / intervalMillis + 1));
+    ArrayList<Instant> expectedResults = new ArrayList<>();
     for (long i = 0; i <= duration; i += intervalMillis) {
       Instant el = startTime.plus(Duration.millis(i));
-      expectedResults.add(KV.of(el, el));
+      expectedResults.add(el);
     }
 
-    PAssert.that(result).containsInAnyOrder(expectedResults);
+    PAssert.that(result)
+        .satisfies(
+            values -> {
+              List<TimestampedValue<KV<Instant, Instant>>> sortedValues =
+                  Streams.stream(values)
+                      .sorted(Comparator.comparing(e -> e.getValue().getValue()))
+                      .collect(Collectors.toList());
+
+              assertEquals(
+                  expectedResults,
+                  sortedValues.stream()
+                      .map(e -> e.getValue().getValue())
+                      .collect(Collectors.toList()));
+
+              Instant minTs =
+                  sortedValues.stream()
+                      .min(Comparator.comparing(TimestampedValue::getTimestamp))
+                      .get()
+                      .getTimestamp();
+              Instant maxTs =
+                  sortedValues.stream()
+                      .max(Comparator.comparing(TimestampedValue::getTimestamp))
+                      .get()
+                      .getTimestamp();
+              final long expectedDiff = intervalMillis / 2;
+              assertTrue(
+                  String.format(
+                      "Expected processing-time diff at least %d, got %d",
+                      expectedDiff, maxTs.getMillis() - minTs.getMillis()),
+                  maxTs.getMillis() - minTs.getMillis() > expectedDiff);
+              return null;
+            });
 
     p.run().waitUntilFinish();
   }