You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/05/17 02:03:14 UTC
[beam] branch master updated: [BEAM-2939] Ensure that we update the
watermark even when no elements are processed. (#11735)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c9d9828 [BEAM-2939] Ensure that we update the watermark even when no elements are processed. (#11735)
c9d9828 is described below
commit c9d9828cecc2c092443619c162de7fd89ad1b1d9
Author: Lukasz Cwik <lu...@gmail.com>
AuthorDate: Sat May 16 19:02:47 2020 -0700
[BEAM-2939] Ensure that we update the watermark even when no elements are processed. (#11735)
---
.../src/main/java/org/apache/beam/sdk/io/Read.java | 182 +++++++++++++++------
1 file changed, 130 insertions(+), 52 deletions(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index 3574eef..e02c938 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -21,13 +21,19 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
import com.google.auto.value.AutoValue;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark.NoopCheckpointMark;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
@@ -49,7 +55,6 @@ import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.NameUtils;
import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
@@ -443,42 +448,43 @@ public class Read {
private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceAsSDFWrapperFn.class);
private static final int DEFAULT_DESIRED_NUM_SPLITS = 20;
private static final int DEFAULT_BUNDLE_FINALIZATION_LIMIT_MINS = 10;
- private final Coder<CheckpointT> restrictionCoder;
+ private final Coder<CheckpointT> checkpointCoder;
- private UnboundedSourceAsSDFWrapperFn(Coder<CheckpointT> restrictionCoder) {
- this.restrictionCoder = restrictionCoder;
+ private UnboundedSourceAsSDFWrapperFn(Coder<CheckpointT> checkpointCoder) {
+ this.checkpointCoder = checkpointCoder;
}
@GetInitialRestriction
- public KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> initialRestriction(
+ public UnboundedSourceRestriction<OutputT, CheckpointT> initialRestriction(
@Element UnboundedSource<OutputT, CheckpointT> element) {
- return KV.of(element, null);
+ return UnboundedSourceRestriction.create(element, null, BoundedWindow.TIMESTAMP_MIN_VALUE);
}
@SplitRestriction
public void splitRestriction(
- @Restriction KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> restriction,
- OutputReceiver<KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT>> receiver,
+ @Restriction UnboundedSourceRestriction<OutputT, CheckpointT> restriction,
+ OutputReceiver<UnboundedSourceRestriction<OutputT, CheckpointT>> receiver,
PipelineOptions pipelineOptions)
throws Exception {
// The empty unbounded source is trivially done and hence we don't need to output any splits
// for it.
- if (restriction.getKey() instanceof EmptyUnboundedSource) {
+ if (restriction.getSource() instanceof EmptyUnboundedSource) {
return;
}
// The UnboundedSource API does not support splitting after a meaningful checkpoint mark has
// been created.
- if (restriction.getValue() != null
- && !(restriction.getValue()
+ if (restriction.getCheckpoint() != null
+ && !(restriction.getCheckpoint()
instanceof UnboundedSource.CheckpointMark.NoopCheckpointMark)) {
receiver.output(restriction);
}
try {
for (UnboundedSource<OutputT, CheckpointT> split :
- restriction.getKey().split(DEFAULT_DESIRED_NUM_SPLITS, pipelineOptions)) {
- receiver.output(KV.of(split, null));
+ restriction.getSource().split(DEFAULT_DESIRED_NUM_SPLITS, pipelineOptions)) {
+ receiver.output(
+ UnboundedSourceRestriction.create(split, null, restriction.getWatermark()));
}
} catch (Exception e) {
receiver.output(restriction);
@@ -487,51 +493,54 @@ public class Read {
@NewTracker
public RestrictionTracker<
- KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT>, UnboundedSourceValue<OutputT>[]>
+ UnboundedSourceRestriction<OutputT, CheckpointT>, UnboundedSourceValue<OutputT>[]>
restrictionTracker(
- @Restriction KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> restriction,
+ @Restriction UnboundedSourceRestriction<OutputT, CheckpointT> restriction,
PipelineOptions pipelineOptions) {
return new UnboundedSourceAsSDFRestrictionTracker(restriction, pipelineOptions);
}
@ProcessElement
public ProcessContinuation processElement(
- RestrictionTracker<
- KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT>, UnboundedSourceValue[]>
+ RestrictionTracker<UnboundedSourceRestriction<OutputT, CheckpointT>, UnboundedSourceValue[]>
tracker,
ManualWatermarkEstimator<Instant> watermarkEstimator,
OutputReceiver<ValueWithRecordId<OutputT>> receiver,
BundleFinalizer bundleFinalizer)
throws IOException {
- KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> initialRestriction =
+ UnboundedSourceRestriction<OutputT, CheckpointT> initialRestriction =
tracker.currentRestriction();
UnboundedSourceValue<OutputT>[] out = new UnboundedSourceValue[1];
while (tracker.tryClaim(out)) {
receiver.outputWithTimestamp(
new ValueWithRecordId<>(out[0].getValue(), out[0].getId()), out[0].getTimestamp());
- watermarkEstimator.setWatermark(ensureTimestampWithinBounds(out[0].getWatermark()));
}
+ UnboundedSourceRestriction<OutputT, CheckpointT> currentRestriction =
+ tracker.currentRestriction();
+
+ // Advance the watermark even if zero elements may have been output.
+ watermarkEstimator.setWatermark(
+ ensureTimestampWithinBounds(currentRestriction.getWatermark()));
+
// Add the checkpoint mark to be finalized if the checkpoint mark isn't trivial and is not
// the initial restriction. The initial restriction would have been finalized as part of
// a prior bundle being executed.
- KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> currentRestriction =
- tracker.currentRestriction();
@SuppressWarnings("ReferenceEquality")
boolean isInitialRestriction = initialRestriction == currentRestriction;
- if (currentRestriction.getValue() != null
+ if (currentRestriction.getCheckpoint() != null
&& !isInitialRestriction
- && !(tracker.currentRestriction().getValue() instanceof NoopCheckpointMark)) {
+ && !(tracker.currentRestriction().getCheckpoint() instanceof NoopCheckpointMark)) {
bundleFinalizer.afterBundleCommit(
Instant.now().plus(Duration.standardMinutes(DEFAULT_BUNDLE_FINALIZATION_LIMIT_MINS)),
- currentRestriction.getValue()::finalizeCheckpoint);
+ currentRestriction.getCheckpoint()::finalizeCheckpoint);
}
// If we have been split/checkpoint by a runner, the tracker will have been updated to the
// empty source and we will return stop. Otherwise the unbounded source has only temporarily
// run out of work.
- if (tracker.currentRestriction().getKey() instanceof EmptyUnboundedSource) {
+ if (currentRestriction.getSource() instanceof EmptyUnboundedSource) {
return ProcessContinuation.stop();
}
return ProcessContinuation.resume();
@@ -558,24 +567,23 @@ public class Read {
}
@GetRestrictionCoder
- public Coder<KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT>> restrictionCoder() {
- return KvCoder.of(
+ public Coder<UnboundedSourceRestriction<OutputT, CheckpointT>> restrictionCoder() {
+ return new UnboundedSourceRestrictionCoder<>(
SerializableCoder.of(new TypeDescriptor<UnboundedSource<OutputT, CheckpointT>>() {}),
- NullableCoder.of(restrictionCoder));
+ NullableCoder.of(checkpointCoder));
}
/**
* A POJO representing all the values we need to pass between the {@link UnboundedReader} and
* the {@link org.apache.beam.sdk.transforms.DoFn.ProcessElement @ProcessElement} method of the
- * splittable DoFn.
+ * splittable DoFn for each output element.
*/
@AutoValue
abstract static class UnboundedSourceValue<T> {
- public static <T> UnboundedSourceValue<T> create(
- byte[] id, T value, Instant timestamp, Instant watermark) {
+ public static <T> UnboundedSourceValue<T> create(byte[] id, T value, Instant timestamp) {
return new AutoValue_Read_UnboundedSourceAsSDFWrapperFn_UnboundedSourceValue<T>(
- id, value, timestamp, watermark);
+ id, value, timestamp);
}
@SuppressWarnings("mutable")
@@ -584,10 +592,78 @@ public class Read {
public abstract T getValue();
public abstract Instant getTimestamp();
+ }
+
+ /**
+ * A POJO representing all the state we need to maintain between the {@link UnboundedReader} and
+ * future {@link org.apache.beam.sdk.transforms.DoFn.ProcessElement @ProcessElement} calls.
+ */
+ @AutoValue
+ abstract static class UnboundedSourceRestriction<OutputT, CheckpointT extends CheckpointMark>
+ implements Serializable {
+ public static <OutputT, CheckpointT extends CheckpointMark>
+ UnboundedSourceRestriction<OutputT, CheckpointT> create(
+ UnboundedSource<OutputT, CheckpointT> source,
+ CheckpointT checkpoint,
+ Instant watermark) {
+ return new AutoValue_Read_UnboundedSourceAsSDFWrapperFn_UnboundedSourceRestriction<>(
+ source, checkpoint, watermark);
+ }
+
+ public abstract UnboundedSource<OutputT, CheckpointT> getSource();
+
+ @Nullable
+ public abstract CheckpointT getCheckpoint();
public abstract Instant getWatermark();
}
+ /** A {@link Coder} for {@link UnboundedSourceRestriction}s. */
+ private static class UnboundedSourceRestrictionCoder<
+ OutputT, CheckpointT extends CheckpointMark>
+ extends StructuredCoder<UnboundedSourceRestriction<OutputT, CheckpointT>> {
+
+ private final Coder<UnboundedSource<OutputT, CheckpointT>> sourceCoder;
+ private final Coder<CheckpointT> checkpointCoder;
+
+ private UnboundedSourceRestrictionCoder(
+ Coder<UnboundedSource<OutputT, CheckpointT>> sourceCoder,
+ Coder<CheckpointT> checkpointCoder) {
+ this.sourceCoder = sourceCoder;
+ this.checkpointCoder = checkpointCoder;
+ }
+
+ @Override
+ public void encode(
+ UnboundedSourceRestriction<OutputT, CheckpointT> value, OutputStream outStream)
+ throws CoderException, IOException {
+ sourceCoder.encode(value.getSource(), outStream);
+ checkpointCoder.encode(value.getCheckpoint(), outStream);
+ InstantCoder.of().encode(value.getWatermark(), outStream);
+ }
+
+ @Override
+ public UnboundedSourceRestriction<OutputT, CheckpointT> decode(InputStream inStream)
+ throws CoderException, IOException {
+ return UnboundedSourceRestriction.create(
+ sourceCoder.decode(inStream),
+ checkpointCoder.decode(inStream),
+ InstantCoder.of().decode(inStream));
+ }
+
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return Arrays.asList(sourceCoder, checkpointCoder);
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ verifyDeterministic(sourceCoder, "source coder not deterministic");
+ verifyDeterministic(checkpointCoder, "checkpoint coder not deterministic");
+ verifyDeterministic(InstantCoder.of(), "watermark coder not deterministic");
+ }
+ }
+
/**
* A marker implementation that is used to represent the primary "source" when performing a
* split. The methods on this object are not meant to be called and only exist to fulfill the
@@ -685,15 +761,15 @@ public class Read {
private static class UnboundedSourceAsSDFRestrictionTracker<
OutputT, CheckpointT extends CheckpointMark>
extends RestrictionTracker<
- KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT>, UnboundedSourceValue<OutputT>[]>
+ UnboundedSourceRestriction<OutputT, CheckpointT>, UnboundedSourceValue<OutputT>[]>
implements HasProgress {
- private final KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> initialRestriction;
+ private final UnboundedSourceRestriction<OutputT, CheckpointT> initialRestriction;
private final PipelineOptions pipelineOptions;
private UnboundedSource.UnboundedReader<OutputT> currentReader;
private boolean readerHasBeenStarted;
UnboundedSourceAsSDFRestrictionTracker(
- KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> initialRestriction,
+ UnboundedSourceRestriction<OutputT, CheckpointT> initialRestriction,
PipelineOptions pipelineOptions) {
this.initialRestriction = initialRestriction;
this.pipelineOptions = pipelineOptions;
@@ -705,8 +781,8 @@ public class Read {
if (currentReader == null) {
currentReader =
initialRestriction
- .getKey()
- .createReader(pipelineOptions, initialRestriction.getValue());
+ .getSource()
+ .createReader(pipelineOptions, initialRestriction.getCheckpoint());
}
if (!readerHasBeenStarted) {
readerHasBeenStarted = true;
@@ -720,8 +796,7 @@ public class Read {
UnboundedSourceValue.create(
currentReader.getCurrentRecordId(),
currentReader.getCurrent(),
- currentReader.getCurrentTimestamp(),
- currentReader.getWatermark());
+ currentReader.getCurrentTimestamp());
return true;
} catch (IOException e) {
if (currentReader != null) {
@@ -748,29 +823,32 @@ public class Read {
/** The value is invalid if {@link #tryClaim} has ever thrown an exception. */
@Override
- public KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> currentRestriction() {
+ public UnboundedSourceRestriction<OutputT, CheckpointT> currentRestriction() {
if (currentReader == null) {
return initialRestriction;
}
- return KV.of(
+ return UnboundedSourceRestriction.create(
(UnboundedSource<OutputT, CheckpointT>) currentReader.getCurrentSource(),
- (CheckpointT) currentReader.getCheckpointMark());
+ (CheckpointT) currentReader.getCheckpointMark(),
+ currentReader.getWatermark());
}
@Override
- public SplitResult<KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT>> trySplit(
+ public SplitResult<UnboundedSourceRestriction<OutputT, CheckpointT>> trySplit(
double fractionOfRemainder) {
// Don't split if we have claimed all since the SDF wrapper will be finishing soon.
// Our split result sets the primary to have no checkpoint mark associated
// with it since when we resume we don't have any state but we specifically pass
// the checkpoint mark to the current reader so that when we finish the current bundle
// we may register for finalization.
- KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> currentRestriction =
- currentRestriction();
- SplitResult<KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT>> result =
- SplitResult.of(KV.of(EmptyUnboundedSource.INSTANCE, null), currentRestriction);
+ UnboundedSourceRestriction<OutputT, CheckpointT> currentRestriction = currentRestriction();
+ SplitResult<UnboundedSourceRestriction<OutputT, CheckpointT>> result =
+ SplitResult.of(
+ UnboundedSourceRestriction.create(
+ EmptyUnboundedSource.INSTANCE, null, currentRestriction.getWatermark()),
+ currentRestriction);
currentReader =
- EmptyUnboundedSource.INSTANCE.createReader(null, currentRestriction.getValue());
+ EmptyUnboundedSource.INSTANCE.createReader(null, currentRestriction.getCheckpoint());
return result;
}
@@ -785,7 +863,7 @@ public class Read {
@Override
public Progress getProgress() {
// We treat the empty source as implicitly done.
- if (currentRestriction().getKey() instanceof EmptyUnboundedSource) {
+ if (currentRestriction().getSource() instanceof EmptyUnboundedSource) {
return RestrictionTracker.Progress.from(1, 0);
}
@@ -793,8 +871,8 @@ public class Read {
try {
currentReader =
initialRestriction
- .getKey()
- .createReader(pipelineOptions, initialRestriction.getValue());
+ .getSource()
+ .createReader(pipelineOptions, initialRestriction.getCheckpoint());
} catch (IOException e) {
throw new RuntimeException(e);
}