You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2020/05/26 22:17:26 UTC
[beam] branch release-2.22.0 updated: [BEAM-2939,
BEAM-10057] Ensure that we can process an EmptyUnboundedSource and
also prevent splitting on it. (#11781) (#11818)
This is an automated email from the ASF dual-hosted git repository.
bhulette pushed a commit to branch release-2.22.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.22.0 by this push:
new ca24e4b [BEAM-2939, BEAM-10057] Ensure that we can process an EmptyUnboundedSource and also prevent splitting on it. (#11781) (#11818)
ca24e4b is described below
commit ca24e4b2d1607da83daaf5576761d62d81dac906
Author: Brian Hulette <bh...@google.com>
AuthorDate: Tue May 26 15:17:05 2020 -0700
[BEAM-2939, BEAM-10057] Ensure that we can process an EmptyUnboundedSource and also prevent splitting on it. (#11781) (#11818)
Co-authored-by: Lukasz Cwik <lu...@gmail.com>
---
.../java/core/src/main/java/org/apache/beam/sdk/io/Read.java | 12 ++++++++----
1 file changed, 8 insertions(+), 4 deletions(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index e02c938..19baa8d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -725,7 +725,7 @@ public class Read {
@Override
public Instant getWatermark() {
- throw new UnsupportedOperationException("getWatermark is never meant to be invoked.");
+ return BoundedWindow.TIMESTAMP_MAX_VALUE;
}
@Override
@@ -836,16 +836,20 @@ public class Read {
@Override
public SplitResult<UnboundedSourceRestriction<OutputT, CheckpointT>> trySplit(
double fractionOfRemainder) {
- // Don't split if we have claimed all since the SDF wrapper will be finishing soon.
+ // Don't split if we have the empty sources since the SDF wrapper will be finishing soon.
+ UnboundedSourceRestriction<OutputT, CheckpointT> currentRestriction = currentRestriction();
+ if (currentRestriction.getSource() instanceof EmptyUnboundedSource) {
+ return null;
+ }
+
// Our split result sets the primary to have no checkpoint mark associated
// with it since when we resume we don't have any state but we specifically pass
// the checkpoint mark to the current reader so that when we finish the current bundle
// we may register for finalization.
- UnboundedSourceRestriction<OutputT, CheckpointT> currentRestriction = currentRestriction();
SplitResult<UnboundedSourceRestriction<OutputT, CheckpointT>> result =
SplitResult.of(
UnboundedSourceRestriction.create(
- EmptyUnboundedSource.INSTANCE, null, currentRestriction.getWatermark()),
+ EmptyUnboundedSource.INSTANCE, null, BoundedWindow.TIMESTAMP_MAX_VALUE),
currentRestriction);
currentReader =
EmptyUnboundedSource.INSTANCE.createReader(null, currentRestriction.getCheckpoint());