You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/20 16:26:19 UTC

[2/3] beam git commit: Do not call advance when all elements are consumed

Do not call advance when all elements are consumed

This prevents UnboundedReadFromBoundedSource from attempting to read
elements from a reader where elements are known to not exist. This
defends against bounded readers which expect to be discarded the first
time they return false from advance().


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/00f55f8e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/00f55f8e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/00f55f8e

Branch: refs/heads/master
Commit: 00f55f8e3bebf70de1d5497c40e0950b8bd6cbdd
Parents: 636185e
Author: Pei He <pe...@gmail.com>
Authored: Mon Mar 20 08:51:04 2017 +0800
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Mar 20 09:24:46 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/UnboundedReadFromBoundedSource.java | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/00f55f8e/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
index 3073076..0c173a0 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
@@ -458,22 +458,28 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
       private PipelineOptions options;
       private @Nullable BoundedReader<T> reader;
       private boolean closed;
+      private boolean readerDone;
 
       public ResidualSource(BoundedSource<T> residualSource, PipelineOptions options) {
         this.residualSource = checkNotNull(residualSource, "residualSource");
         this.options = checkNotNull(options, "options");
         this.reader = null;
         this.closed = false;
+        this.readerDone = false;
       }
 
       private boolean advance() throws IOException {
         checkArgument(!closed, "advance() call on closed %s", getClass().getName());
+        if (readerDone) {
+          return false;
+        }
         if (reader == null) {
           reader = residualSource.createReader(options);
-          return reader.start();
+          readerDone = !reader.start();
         } else {
-          return reader.advance();
+          readerDone = !reader.advance();
         }
+        return !readerDone;
       }
 
       T getCurrent() throws NoSuchElementException {