You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2020/05/26 22:17:26 UTC

[beam] branch release-2.22.0 updated: [BEAM-2939, BEAM-10057] Ensure that we can process an EmptyUnboundedSource and also prevent splitting on it. (#11781) (#11818)

This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch release-2.22.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.22.0 by this push:
     new ca24e4b  [BEAM-2939, BEAM-10057] Ensure that we can process an EmptyUnboundedSource and also prevent splitting on it. (#11781) (#11818)
ca24e4b is described below

commit ca24e4b2d1607da83daaf5576761d62d81dac906
Author: Brian Hulette <bh...@google.com>
AuthorDate: Tue May 26 15:17:05 2020 -0700

    [BEAM-2939, BEAM-10057] Ensure that we can process an EmptyUnboundedSource and also prevent splitting on it. (#11781) (#11818)
    
    Co-authored-by: Lukasz Cwik <lu...@gmail.com>
---
 .../java/core/src/main/java/org/apache/beam/sdk/io/Read.java | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index e02c938..19baa8d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -725,7 +725,7 @@ public class Read {
 
         @Override
         public Instant getWatermark() {
-          throw new UnsupportedOperationException("getWatermark is never meant to be invoked.");
+          return BoundedWindow.TIMESTAMP_MAX_VALUE;
         }
 
         @Override
@@ -836,16 +836,20 @@ public class Read {
       @Override
       public SplitResult<UnboundedSourceRestriction<OutputT, CheckpointT>> trySplit(
           double fractionOfRemainder) {
-        // Don't split if we have claimed all since the SDF wrapper will be finishing soon.
+        // Don't split if we have the empty sources since the SDF wrapper will be finishing soon.
+        UnboundedSourceRestriction<OutputT, CheckpointT> currentRestriction = currentRestriction();
+        if (currentRestriction.getSource() instanceof EmptyUnboundedSource) {
+          return null;
+        }
+
         // Our split result sets the primary to have no checkpoint mark associated
         // with it since when we resume we don't have any state but we specifically pass
         // the checkpoint mark to the current reader so that when we finish the current bundle
         // we may register for finalization.
-        UnboundedSourceRestriction<OutputT, CheckpointT> currentRestriction = currentRestriction();
         SplitResult<UnboundedSourceRestriction<OutputT, CheckpointT>> result =
             SplitResult.of(
                 UnboundedSourceRestriction.create(
-                    EmptyUnboundedSource.INSTANCE, null, currentRestriction.getWatermark()),
+                    EmptyUnboundedSource.INSTANCE, null, BoundedWindow.TIMESTAMP_MAX_VALUE),
                 currentRestriction);
         currentReader =
             EmptyUnboundedSource.INSTANCE.createReader(null, currentRestriction.getCheckpoint());