You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/07/06 17:20:21 UTC
[15/50] [abbrv] incubator-beam git commit: Fix NPE in
UnboundedReadFromBoundedSource
Fix NPE in UnboundedReadFromBoundedSource
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/562beafb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/562beafb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/562beafb
Branch: refs/heads/runners-spark2
Commit: 562beafb59e14b568369259ac9c6f855ee9c80a3
Parents: 942b6e0
Author: Pei He <pe...@google.com>
Authored: Mon Jun 27 18:21:37 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Jul 6 10:18:50 2016 -0700
----------------------------------------------------------------------
.../core/UnboundedReadFromBoundedSource.java | 19 +++++++++----------
.../core/UnboundedReadFromBoundedSourceTest.java | 9 +++++++++
2 files changed, 18 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/562beafb/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
index 2b3d1c7..f54af3b 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
@@ -167,10 +167,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PInput, PColle
public Reader createReader(PipelineOptions options, Checkpoint<T> checkpoint)
throws IOException {
if (checkpoint == null) {
- return new Reader(
- Collections.<TimestampedValue<T>>emptyList() /* residualElements */,
- boundedSource,
- options);
+ return new Reader(null /* residualElements */, boundedSource, options);
} else {
return new Reader(checkpoint.residualElements, checkpoint.residualSource, options);
}
@@ -189,11 +186,11 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PInput, PColle
@VisibleForTesting
static class Checkpoint<T> implements UnboundedSource.CheckpointMark {
- private final List<TimestampedValue<T>> residualElements;
+ private final @Nullable List<TimestampedValue<T>> residualElements;
private final @Nullable BoundedSource<T> residualSource;
public Checkpoint(
- List<TimestampedValue<T>> residualElements,
+ @Nullable List<TimestampedValue<T>> residualElements,
@Nullable BoundedSource<T> residualSource) {
this.residualElements = residualElements;
this.residualSource = residualSource;
@@ -203,7 +200,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PInput, PColle
public void finalizeCheckpoint() {}
@VisibleForTesting
- List<TimestampedValue<T>> getResidualElements() {
+ @Nullable List<TimestampedValue<T>> getResidualElements() {
return residualElements;
}
@@ -286,7 +283,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PInput, PColle
private boolean done;
Reader(
- List<TimestampedValue<T>> residualElementsList,
+ @Nullable List<TimestampedValue<T>> residualElementsList,
@Nullable BoundedSource<T> residualSource,
PipelineOptions options) {
init(residualElementsList, residualSource, options);
@@ -295,10 +292,12 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PInput, PColle
}
private void init(
- List<TimestampedValue<T>> residualElementsList,
+ @Nullable List<TimestampedValue<T>> residualElementsList,
@Nullable BoundedSource<T> residualSource,
PipelineOptions options) {
- this.residualElements = new ResidualElements(residualElementsList);
+ this.residualElements = residualElementsList == null
+ ? new ResidualElements(Collections.<TimestampedValue<T>>emptyList())
+ : new ResidualElements(residualElementsList);
this.residualSource =
residualSource == null ? null : new ResidualSource(residualSource, options);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/562beafb/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
index afd0927..dfbc675 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.core;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint;
@@ -169,6 +170,10 @@ public class UnboundedReadFromBoundedSourceTest {
checkpoint.finalizeCheckpoint();
}
}
+ Checkpoint<T> checkpointDone = reader.getCheckpointMark();
+ assertTrue(checkpointDone.getResidualElements() == null
+ || checkpointDone.getResidualElements().isEmpty());
+
assertEquals(expectedElements.size(), actual.size());
assertEquals(Sets.newHashSet(expectedElements), Sets.newHashSet(actual));
}
@@ -230,6 +235,10 @@ public class UnboundedReadFromBoundedSourceTest {
hasNext = reader.advance();
}
}
+ Checkpoint<T> checkpointDone = reader.getCheckpointMark();
+ assertTrue(checkpointDone.getResidualElements() == null
+ || checkpointDone.getResidualElements().isEmpty());
+
assertEquals(expectedElements.size(), actual.size());
assertEquals(Sets.newHashSet(expectedElements), Sets.newHashSet(actual));
}