You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/20 16:26:19 UTC
[2/3] beam git commit: Do not call advance when all elements are
consumed
Do not call advance when all elements are consumed
This prevents UnboundedReadFromBoundedSource from attempting to read
elements from a reader where elements are known to not exist. This
defends against bounded readers which expect to be discarded the first
time they return false from advance().
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/00f55f8e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/00f55f8e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/00f55f8e
Branch: refs/heads/master
Commit: 00f55f8e3bebf70de1d5497c40e0950b8bd6cbdd
Parents: 636185e
Author: Pei He <pe...@gmail.com>
Authored: Mon Mar 20 08:51:04 2017 +0800
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Mar 20 09:24:46 2017 -0700
----------------------------------------------------------------------
.../beam/runners/core/UnboundedReadFromBoundedSource.java | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/00f55f8e/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
index 3073076..0c173a0 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
@@ -458,22 +458,28 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
private PipelineOptions options;
private @Nullable BoundedReader<T> reader;
private boolean closed;
+ private boolean readerDone;
public ResidualSource(BoundedSource<T> residualSource, PipelineOptions options) {
this.residualSource = checkNotNull(residualSource, "residualSource");
this.options = checkNotNull(options, "options");
this.reader = null;
this.closed = false;
+ this.readerDone = false;
}
private boolean advance() throws IOException {
checkArgument(!closed, "advance() call on closed %s", getClass().getName());
+ if (readerDone) {
+ return false;
+ }
if (reader == null) {
reader = residualSource.createReader(options);
- return reader.start();
+ readerDone = !reader.start();
} else {
- return reader.advance();
+ readerDone = !reader.advance();
}
+ return !readerDone;
}
T getCurrent() throws NoSuchElementException {