You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/03/13 07:17:11 UTC

git commit: SPARK-1019: pyspark RDD take() throws an NPE

Repository: spark
Updated Branches:
  refs/heads/master 6bd2eaa4a -> 4ea23db0e


SPARK-1019: pyspark RDD take() throws an NPE

Author: Patrick Wendell <pw...@gmail.com>

Closes #112 from pwendell/pyspark-take and squashes the following commits:

daae80e [Patrick Wendell] SPARK-1019: pyspark RDD take() throws an NPE


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4ea23db0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4ea23db0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4ea23db0

Branch: refs/heads/master
Commit: 4ea23db0efff2f39ac5b8f0bd1d9a6ffa3eceb0d
Parents: 6bd2eaa
Author: Patrick Wendell <pw...@gmail.com>
Authored: Wed Mar 12 23:16:59 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Wed Mar 12 23:16:59 2014 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/TaskContext.scala       | 3 ++-
 .../main/scala/org/apache/spark/api/python/PythonRDD.scala   | 8 ++++++++
 2 files changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4ea23db0/core/src/main/scala/org/apache/spark/TaskContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index cae983e..be53ca2 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -46,6 +46,7 @@ class TaskContext(
   }
 
   def executeOnCompleteCallbacks() {
-    onCompleteCallbacks.foreach{_()}
+    // Process complete callbacks in the reverse order of registration
+    onCompleteCallbacks.reverse.foreach{_()}
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4ea23db0/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index e4d0285..b67286a 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -100,6 +100,14 @@ private[spark] class PythonRDD[T: ClassTag](
       }
     }.start()
 
+    /*
+     * Partial fix for SPARK-1019: Attempts to stop reading the input stream since
+     * other completion callbacks might invalidate the input. Because interruption
+     * is not synchronous this still leaves a potential race where the interruption is
+     * processed only after the stream becomes invalid.
+     */
+    context.addOnCompleteCallback(() => context.interrupted = true)
+
     // Return an iterator that read lines from the process's stdout
     val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))
     val stdoutIterator = new Iterator[Array[Byte]] {