You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/05/17 02:03:14 UTC

[beam] branch master updated: [BEAM-2939] Ensure that we update the watermark even when no elements are processed. (#11735)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c9d9828  [BEAM-2939] Ensure that we update the watermark even when no elements are processed. (#11735)
c9d9828 is described below

commit c9d9828cecc2c092443619c162de7fd89ad1b1d9
Author: Lukasz Cwik <lu...@gmail.com>
AuthorDate: Sat May 16 19:02:47 2020 -0700

    [BEAM-2939] Ensure that we update the watermark even when no elements are processed. (#11735)
---
 .../src/main/java/org/apache/beam/sdk/io/Read.java | 182 +++++++++++++++------
 1 file changed, 130 insertions(+), 52 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index 3574eef..e02c938 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -21,13 +21,19 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
 
 import com.google.auto.value.AutoValue;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.Arrays;
 import java.util.List;
 import java.util.NoSuchElementException;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
 import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark.NoopCheckpointMark;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
@@ -49,7 +55,6 @@ import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.NameUtils;
 import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
@@ -443,42 +448,43 @@ public class Read {
     private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceAsSDFWrapperFn.class);
     private static final int DEFAULT_DESIRED_NUM_SPLITS = 20;
     private static final int DEFAULT_BUNDLE_FINALIZATION_LIMIT_MINS = 10;
-    private final Coder<CheckpointT> restrictionCoder;
+    private final Coder<CheckpointT> checkpointCoder;
 
-    private UnboundedSourceAsSDFWrapperFn(Coder<CheckpointT> restrictionCoder) {
-      this.restrictionCoder = restrictionCoder;
+    private UnboundedSourceAsSDFWrapperFn(Coder<CheckpointT> checkpointCoder) {
+      this.checkpointCoder = checkpointCoder;
     }
 
     @GetInitialRestriction
-    public KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> initialRestriction(
+    public UnboundedSourceRestriction<OutputT, CheckpointT> initialRestriction(
         @Element UnboundedSource<OutputT, CheckpointT> element) {
-      return KV.of(element, null);
+      return UnboundedSourceRestriction.create(element, null, BoundedWindow.TIMESTAMP_MIN_VALUE);
     }
 
     @SplitRestriction
     public void splitRestriction(
-        @Restriction KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> restriction,
-        OutputReceiver<KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT>> receiver,
+        @Restriction UnboundedSourceRestriction<OutputT, CheckpointT> restriction,
+        OutputReceiver<UnboundedSourceRestriction<OutputT, CheckpointT>> receiver,
         PipelineOptions pipelineOptions)
         throws Exception {
       // The empty unbounded source is trivially done and hence we don't need to output any splits
       // for it.
-      if (restriction.getKey() instanceof EmptyUnboundedSource) {
+      if (restriction.getSource() instanceof EmptyUnboundedSource) {
         return;
       }
 
       // The UnboundedSource API does not support splitting after a meaningful checkpoint mark has
       // been created.
-      if (restriction.getValue() != null
-          && !(restriction.getValue()
+      if (restriction.getCheckpoint() != null
+          && !(restriction.getCheckpoint()
               instanceof UnboundedSource.CheckpointMark.NoopCheckpointMark)) {
         receiver.output(restriction);
       }
 
       try {
         for (UnboundedSource<OutputT, CheckpointT> split :
-            restriction.getKey().split(DEFAULT_DESIRED_NUM_SPLITS, pipelineOptions)) {
-          receiver.output(KV.of(split, null));
+            restriction.getSource().split(DEFAULT_DESIRED_NUM_SPLITS, pipelineOptions)) {
+          receiver.output(
+              UnboundedSourceRestriction.create(split, null, restriction.getWatermark()));
         }
       } catch (Exception e) {
         receiver.output(restriction);
@@ -487,51 +493,54 @@ public class Read {
 
     @NewTracker
     public RestrictionTracker<
-            KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT>, UnboundedSourceValue<OutputT>[]>
+            UnboundedSourceRestriction<OutputT, CheckpointT>, UnboundedSourceValue<OutputT>[]>
         restrictionTracker(
-            @Restriction KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> restriction,
+            @Restriction UnboundedSourceRestriction<OutputT, CheckpointT> restriction,
             PipelineOptions pipelineOptions) {
       return new UnboundedSourceAsSDFRestrictionTracker(restriction, pipelineOptions);
     }
 
     @ProcessElement
     public ProcessContinuation processElement(
-        RestrictionTracker<
-                KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT>, UnboundedSourceValue[]>
+        RestrictionTracker<UnboundedSourceRestriction<OutputT, CheckpointT>, UnboundedSourceValue[]>
             tracker,
         ManualWatermarkEstimator<Instant> watermarkEstimator,
         OutputReceiver<ValueWithRecordId<OutputT>> receiver,
         BundleFinalizer bundleFinalizer)
         throws IOException {
-      KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> initialRestriction =
+      UnboundedSourceRestriction<OutputT, CheckpointT> initialRestriction =
           tracker.currentRestriction();
 
       UnboundedSourceValue<OutputT>[] out = new UnboundedSourceValue[1];
       while (tracker.tryClaim(out)) {
         receiver.outputWithTimestamp(
             new ValueWithRecordId<>(out[0].getValue(), out[0].getId()), out[0].getTimestamp());
-        watermarkEstimator.setWatermark(ensureTimestampWithinBounds(out[0].getWatermark()));
       }
 
+      UnboundedSourceRestriction<OutputT, CheckpointT> currentRestriction =
+          tracker.currentRestriction();
+
+      // Advance the watermark even if zero elements may have been output.
+      watermarkEstimator.setWatermark(
+          ensureTimestampWithinBounds(currentRestriction.getWatermark()));
+
       // Add the checkpoint mark to be finalized if the checkpoint mark isn't trivial and is not
       // the initial restriction. The initial restriction would have been finalized as part of
       // a prior bundle being executed.
-      KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> currentRestriction =
-          tracker.currentRestriction();
       @SuppressWarnings("ReferenceEquality")
       boolean isInitialRestriction = initialRestriction == currentRestriction;
-      if (currentRestriction.getValue() != null
+      if (currentRestriction.getCheckpoint() != null
           && !isInitialRestriction
-          && !(tracker.currentRestriction().getValue() instanceof NoopCheckpointMark)) {
+          && !(tracker.currentRestriction().getCheckpoint() instanceof NoopCheckpointMark)) {
         bundleFinalizer.afterBundleCommit(
             Instant.now().plus(Duration.standardMinutes(DEFAULT_BUNDLE_FINALIZATION_LIMIT_MINS)),
-            currentRestriction.getValue()::finalizeCheckpoint);
+            currentRestriction.getCheckpoint()::finalizeCheckpoint);
       }
 
       // If we have been split/checkpoint by a runner, the tracker will have been updated to the
       // empty source and we will return stop. Otherwise the unbounded source has only temporarily
       // run out of work.
-      if (tracker.currentRestriction().getKey() instanceof EmptyUnboundedSource) {
+      if (currentRestriction.getSource() instanceof EmptyUnboundedSource) {
         return ProcessContinuation.stop();
       }
       return ProcessContinuation.resume();
@@ -558,24 +567,23 @@ public class Read {
     }
 
     @GetRestrictionCoder
-    public Coder<KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT>> restrictionCoder() {
-      return KvCoder.of(
+    public Coder<UnboundedSourceRestriction<OutputT, CheckpointT>> restrictionCoder() {
+      return new UnboundedSourceRestrictionCoder<>(
           SerializableCoder.of(new TypeDescriptor<UnboundedSource<OutputT, CheckpointT>>() {}),
-          NullableCoder.of(restrictionCoder));
+          NullableCoder.of(checkpointCoder));
     }
 
     /**
      * A POJO representing all the values we need to pass between the {@link UnboundedReader} and
      * the {@link org.apache.beam.sdk.transforms.DoFn.ProcessElement @ProcessElement} method of the
-     * splittable DoFn.
+     * splittable DoFn for each output element.
      */
     @AutoValue
     abstract static class UnboundedSourceValue<T> {
 
-      public static <T> UnboundedSourceValue<T> create(
-          byte[] id, T value, Instant timestamp, Instant watermark) {
+      public static <T> UnboundedSourceValue<T> create(byte[] id, T value, Instant timestamp) {
         return new AutoValue_Read_UnboundedSourceAsSDFWrapperFn_UnboundedSourceValue<T>(
-            id, value, timestamp, watermark);
+            id, value, timestamp);
       }
 
       @SuppressWarnings("mutable")
@@ -584,10 +592,78 @@ public class Read {
       public abstract T getValue();
 
       public abstract Instant getTimestamp();
+    }
+
+    /**
+     * A POJO representing all the state we need to maintain between the {@link UnboundedReader} and
+     * future {@link org.apache.beam.sdk.transforms.DoFn.ProcessElement @ProcessElement} calls.
+     */
+    @AutoValue
+    abstract static class UnboundedSourceRestriction<OutputT, CheckpointT extends CheckpointMark>
+        implements Serializable {
+      public static <OutputT, CheckpointT extends CheckpointMark>
+          UnboundedSourceRestriction<OutputT, CheckpointT> create(
+              UnboundedSource<OutputT, CheckpointT> source,
+              CheckpointT checkpoint,
+              Instant watermark) {
+        return new AutoValue_Read_UnboundedSourceAsSDFWrapperFn_UnboundedSourceRestriction<>(
+            source, checkpoint, watermark);
+      }
+
+      public abstract UnboundedSource<OutputT, CheckpointT> getSource();
+
+      @Nullable
+      public abstract CheckpointT getCheckpoint();
 
       public abstract Instant getWatermark();
     }
 
+    /** A {@link Coder} for {@link UnboundedSourceRestriction}s. */
+    private static class UnboundedSourceRestrictionCoder<
+            OutputT, CheckpointT extends CheckpointMark>
+        extends StructuredCoder<UnboundedSourceRestriction<OutputT, CheckpointT>> {
+
+      private final Coder<UnboundedSource<OutputT, CheckpointT>> sourceCoder;
+      private final Coder<CheckpointT> checkpointCoder;
+
+      private UnboundedSourceRestrictionCoder(
+          Coder<UnboundedSource<OutputT, CheckpointT>> sourceCoder,
+          Coder<CheckpointT> checkpointCoder) {
+        this.sourceCoder = sourceCoder;
+        this.checkpointCoder = checkpointCoder;
+      }
+
+      @Override
+      public void encode(
+          UnboundedSourceRestriction<OutputT, CheckpointT> value, OutputStream outStream)
+          throws CoderException, IOException {
+        sourceCoder.encode(value.getSource(), outStream);
+        checkpointCoder.encode(value.getCheckpoint(), outStream);
+        InstantCoder.of().encode(value.getWatermark(), outStream);
+      }
+
+      @Override
+      public UnboundedSourceRestriction<OutputT, CheckpointT> decode(InputStream inStream)
+          throws CoderException, IOException {
+        return UnboundedSourceRestriction.create(
+            sourceCoder.decode(inStream),
+            checkpointCoder.decode(inStream),
+            InstantCoder.of().decode(inStream));
+      }
+
+      @Override
+      public List<? extends Coder<?>> getCoderArguments() {
+        return Arrays.asList(sourceCoder, checkpointCoder);
+      }
+
+      @Override
+      public void verifyDeterministic() throws NonDeterministicException {
+        verifyDeterministic(sourceCoder, "source coder not deterministic");
+        verifyDeterministic(checkpointCoder, "checkpoint coder not deterministic");
+        verifyDeterministic(InstantCoder.of(), "watermark coder not deterministic");
+      }
+    }
+
     /**
      * A marker implementation that is used to represent the primary "source" when performing a
      * split. The methods on this object are not meant to be called and only exist to fulfill the
@@ -685,15 +761,15 @@ public class Read {
     private static class UnboundedSourceAsSDFRestrictionTracker<
             OutputT, CheckpointT extends CheckpointMark>
         extends RestrictionTracker<
-            KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT>, UnboundedSourceValue<OutputT>[]>
+            UnboundedSourceRestriction<OutputT, CheckpointT>, UnboundedSourceValue<OutputT>[]>
         implements HasProgress {
-      private final KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> initialRestriction;
+      private final UnboundedSourceRestriction<OutputT, CheckpointT> initialRestriction;
       private final PipelineOptions pipelineOptions;
       private UnboundedSource.UnboundedReader<OutputT> currentReader;
       private boolean readerHasBeenStarted;
 
       UnboundedSourceAsSDFRestrictionTracker(
-          KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> initialRestriction,
+          UnboundedSourceRestriction<OutputT, CheckpointT> initialRestriction,
           PipelineOptions pipelineOptions) {
         this.initialRestriction = initialRestriction;
         this.pipelineOptions = pipelineOptions;
@@ -705,8 +781,8 @@ public class Read {
           if (currentReader == null) {
             currentReader =
                 initialRestriction
-                    .getKey()
-                    .createReader(pipelineOptions, initialRestriction.getValue());
+                    .getSource()
+                    .createReader(pipelineOptions, initialRestriction.getCheckpoint());
           }
           if (!readerHasBeenStarted) {
             readerHasBeenStarted = true;
@@ -720,8 +796,7 @@ public class Read {
               UnboundedSourceValue.create(
                   currentReader.getCurrentRecordId(),
                   currentReader.getCurrent(),
-                  currentReader.getCurrentTimestamp(),
-                  currentReader.getWatermark());
+                  currentReader.getCurrentTimestamp());
           return true;
         } catch (IOException e) {
           if (currentReader != null) {
@@ -748,29 +823,32 @@ public class Read {
 
       /** The value is invalid if {@link #tryClaim} has ever thrown an exception. */
       @Override
-      public KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> currentRestriction() {
+      public UnboundedSourceRestriction<OutputT, CheckpointT> currentRestriction() {
         if (currentReader == null) {
           return initialRestriction;
         }
-        return KV.of(
+        return UnboundedSourceRestriction.create(
             (UnboundedSource<OutputT, CheckpointT>) currentReader.getCurrentSource(),
-            (CheckpointT) currentReader.getCheckpointMark());
+            (CheckpointT) currentReader.getCheckpointMark(),
+            currentReader.getWatermark());
       }
 
       @Override
-      public SplitResult<KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT>> trySplit(
+      public SplitResult<UnboundedSourceRestriction<OutputT, CheckpointT>> trySplit(
           double fractionOfRemainder) {
         // Don't split if we have claimed all since the SDF wrapper will be finishing soon.
         // Our split result sets the primary to have no checkpoint mark associated
         // with it since when we resume we don't have any state but we specifically pass
         // the checkpoint mark to the current reader so that when we finish the current bundle
         // we may register for finalization.
-        KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> currentRestriction =
-            currentRestriction();
-        SplitResult<KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT>> result =
-            SplitResult.of(KV.of(EmptyUnboundedSource.INSTANCE, null), currentRestriction);
+        UnboundedSourceRestriction<OutputT, CheckpointT> currentRestriction = currentRestriction();
+        SplitResult<UnboundedSourceRestriction<OutputT, CheckpointT>> result =
+            SplitResult.of(
+                UnboundedSourceRestriction.create(
+                    EmptyUnboundedSource.INSTANCE, null, currentRestriction.getWatermark()),
+                currentRestriction);
         currentReader =
-            EmptyUnboundedSource.INSTANCE.createReader(null, currentRestriction.getValue());
+            EmptyUnboundedSource.INSTANCE.createReader(null, currentRestriction.getCheckpoint());
         return result;
       }
 
@@ -785,7 +863,7 @@ public class Read {
       @Override
       public Progress getProgress() {
         // We treat the empty source as implicitly done.
-        if (currentRestriction().getKey() instanceof EmptyUnboundedSource) {
+        if (currentRestriction().getSource() instanceof EmptyUnboundedSource) {
           return RestrictionTracker.Progress.from(1, 0);
         }
 
@@ -793,8 +871,8 @@ public class Read {
           try {
             currentReader =
                 initialRestriction
-                    .getKey()
-                    .createReader(pipelineOptions, initialRestriction.getValue());
+                    .getSource()
+                    .createReader(pipelineOptions, initialRestriction.getCheckpoint());
           } catch (IOException e) {
             throw new RuntimeException(e);
           }