You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/07/06 17:20:21 UTC

[15/50] [abbrv] incubator-beam git commit: Fix NPE in UnboundedReadFromBoundedSource

Fix NPE in UnboundedReadFromBoundedSource


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/562beafb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/562beafb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/562beafb

Branch: refs/heads/runners-spark2
Commit: 562beafb59e14b568369259ac9c6f855ee9c80a3
Parents: 942b6e0
Author: Pei He <pe...@google.com>
Authored: Mon Jun 27 18:21:37 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Jul 6 10:18:50 2016 -0700

----------------------------------------------------------------------
 .../core/UnboundedReadFromBoundedSource.java     | 19 +++++++++----------
 .../core/UnboundedReadFromBoundedSourceTest.java |  9 +++++++++
 2 files changed, 18 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/562beafb/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
index 2b3d1c7..f54af3b 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
@@ -167,10 +167,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PInput, PColle
     public Reader createReader(PipelineOptions options, Checkpoint<T> checkpoint)
         throws IOException {
       if (checkpoint == null) {
-        return new Reader(
-            Collections.<TimestampedValue<T>>emptyList() /* residualElements */,
-            boundedSource,
-            options);
+        return new Reader(null /* residualElements */, boundedSource, options);
       } else {
         return new Reader(checkpoint.residualElements, checkpoint.residualSource, options);
       }
@@ -189,11 +186,11 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PInput, PColle
 
     @VisibleForTesting
     static class Checkpoint<T> implements UnboundedSource.CheckpointMark {
-      private final List<TimestampedValue<T>> residualElements;
+      private final @Nullable List<TimestampedValue<T>> residualElements;
       private final @Nullable BoundedSource<T> residualSource;
 
       public Checkpoint(
-          List<TimestampedValue<T>> residualElements,
+          @Nullable List<TimestampedValue<T>> residualElements,
           @Nullable BoundedSource<T> residualSource) {
         this.residualElements = residualElements;
         this.residualSource = residualSource;
@@ -203,7 +200,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PInput, PColle
       public void finalizeCheckpoint() {}
 
       @VisibleForTesting
-      List<TimestampedValue<T>> getResidualElements() {
+      @Nullable List<TimestampedValue<T>> getResidualElements() {
         return residualElements;
       }
 
@@ -286,7 +283,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PInput, PColle
       private boolean done;
 
       Reader(
-          List<TimestampedValue<T>> residualElementsList,
+          @Nullable List<TimestampedValue<T>> residualElementsList,
           @Nullable BoundedSource<T> residualSource,
           PipelineOptions options) {
         init(residualElementsList, residualSource, options);
@@ -295,10 +292,12 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PInput, PColle
       }
 
       private void init(
-          List<TimestampedValue<T>> residualElementsList,
+          @Nullable List<TimestampedValue<T>> residualElementsList,
           @Nullable BoundedSource<T> residualSource,
           PipelineOptions options) {
-        this.residualElements = new ResidualElements(residualElementsList);
+        this.residualElements = residualElementsList == null
+            ? new ResidualElements(Collections.<TimestampedValue<T>>emptyList())
+                : new ResidualElements(residualElementsList);
         this.residualSource =
             residualSource == null ? null : new ResidualSource(residualSource, options);
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/562beafb/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
index afd0927..dfbc675 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.core;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
 import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint;
@@ -169,6 +170,10 @@ public class UnboundedReadFromBoundedSourceTest {
         checkpoint.finalizeCheckpoint();
       }
     }
+    Checkpoint<T> checkpointDone = reader.getCheckpointMark();
+    assertTrue(checkpointDone.getResidualElements() == null
+        || checkpointDone.getResidualElements().isEmpty());
+
     assertEquals(expectedElements.size(), actual.size());
     assertEquals(Sets.newHashSet(expectedElements), Sets.newHashSet(actual));
   }
@@ -230,6 +235,10 @@ public class UnboundedReadFromBoundedSourceTest {
         hasNext = reader.advance();
       }
     }
+    Checkpoint<T> checkpointDone = reader.getCheckpointMark();
+    assertTrue(checkpointDone.getResidualElements() == null
+        || checkpointDone.getResidualElements().isEmpty());
+
     assertEquals(expectedElements.size(), actual.size());
     assertEquals(Sets.newHashSet(expectedElements), Sets.newHashSet(actual));
   }