You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/09/28 09:21:40 UTC

[2/3] incubator-beam git commit: fix potential NPE in checkpointing of UnboundedSourceWrapper

fix potential NPE in checkpointing of UnboundedSourceWrapper

This moves all the initialization code to the open() method which ensures
that no snapshot can occur before the state has been initialized correctly.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f3f2a977
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f3f2a977
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f3f2a977

Branch: refs/heads/master
Commit: f3f2a9779a5c355a5902a783f3e72609ff71717f
Parents: cf14e80
Author: Maximilian Michels <mx...@apache.org>
Authored: Fri Sep 16 18:42:43 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Sep 28 11:14:21 2016 +0200

----------------------------------------------------------------------
 .../streaming/io/UnboundedSourceWrapper.java    | 39 ++++++++++++--------
 .../streaming/UnboundedSourceWrapperTest.java   |  3 ++
 2 files changed, 27 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f3f2a977/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 64cf703..68a83e8 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -91,6 +91,7 @@ public class UnboundedSourceWrapper<
   private transient List<UnboundedSource.UnboundedReader<OutputT>> localReaders;
 
   /**
+   * Flag to indicate whether the source is running.
    * Initialize here and not in run() to prevent races where we cancel a job before run() is
    * ever called or run() is called after cancel().
    */
@@ -154,19 +155,17 @@ public class UnboundedSourceWrapper<
     splitSources = source.generateInitialSplits(parallelism, pipelineOptions);
   }
 
-  @Override
-  public void run(SourceContext<WindowedValue<OutputT>> ctx) throws Exception {
-    if (!(ctx instanceof StreamSource.ManualWatermarkContext)) {
-      throw new RuntimeException(
-          "Cannot emit watermarks, this hints at a misconfiguration/bug.");
-    }
 
-    context = (StreamSource.ManualWatermarkContext<WindowedValue<OutputT>>) ctx;
+  /**
+   * Initialize and restore state before starting execution of the source.
+   */
+  @Override
+  public void open(Configuration parameters) throws Exception {
     runtimeContext = (StreamingRuntimeContext) getRuntimeContext();
 
     // figure out which split sources we're responsible for
-    int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
-    int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+    int subtaskIndex = runtimeContext.getIndexOfThisSubtask();
+    int numSubtasks = runtimeContext.getNumberOfParallelSubtasks();
 
     localSplitSources = new ArrayList<>();
     localReaders = new ArrayList<>();
@@ -183,12 +182,12 @@ public class UnboundedSourceWrapper<
           new Function<
               KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>,
               UnboundedSource<OutputT, CheckpointMarkT>>() {
-        @Override
-        public UnboundedSource<OutputT, CheckpointMarkT> apply(
-            KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> input) {
-          return input.getKey();
-        }
-      });
+            @Override
+            public UnboundedSource<OutputT, CheckpointMarkT> apply(
+                KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> input) {
+              return input.getKey();
+            }
+          });
 
       for (KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> restored:
           restoredState) {
@@ -215,6 +214,16 @@ public class UnboundedSourceWrapper<
         subtaskIndex,
         numSubtasks,
         localSplitSources);
+  }
+
+  @Override
+  public void run(SourceContext<WindowedValue<OutputT>> ctx) throws Exception {
+    if (!(ctx instanceof StreamSource.ManualWatermarkContext)) {
+      throw new RuntimeException(
+          "Cannot emit watermarks, this hints at a misconfiguration/bug.");
+    }
+
+    context = (StreamSource.ManualWatermarkContext<WindowedValue<OutputT>>) ctx;
 
     if (localReaders.size() == 0) {
       // do nothing, but still look busy ...

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f3f2a977/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
index e728653..9e8261a 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
@@ -106,6 +106,7 @@ public class UnboundedSourceWrapperTest {
     setupSourceOperator(sourceOperator, numTasks);
 
     try {
+      sourceOperator.open();
       sourceOperator.run(checkpointLock,
           new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
             private int count = 0;
@@ -173,6 +174,7 @@ public class UnboundedSourceWrapperTest {
     boolean readFirstBatchOfElements = false;
 
     try {
+      sourceOperator.open();
       sourceOperator.run(checkpointLock,
           new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
             private int count = 0;
@@ -237,6 +239,7 @@ public class UnboundedSourceWrapperTest {
 
     // run again and verify that we see the other elements
     try {
+      restoredSourceOperator.open();
       restoredSourceOperator.run(checkpointLock,
           new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
             private int count = 0;