You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/03/13 07:17:11 UTC
git commit: SPARK-1019: pyspark RDD take() throws an NPE
Repository: spark
Updated Branches:
refs/heads/master 6bd2eaa4a -> 4ea23db0e
SPARK-1019: pyspark RDD take() throws an NPE
Author: Patrick Wendell <pw...@gmail.com>
Closes #112 from pwendell/pyspark-take and squashes the following commits:
daae80e [Patrick Wendell] SPARK-1019: pyspark RDD take() throws an NPE
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4ea23db0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4ea23db0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4ea23db0
Branch: refs/heads/master
Commit: 4ea23db0efff2f39ac5b8f0bd1d9a6ffa3eceb0d
Parents: 6bd2eaa
Author: Patrick Wendell <pw...@gmail.com>
Authored: Wed Mar 12 23:16:59 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Wed Mar 12 23:16:59 2014 -0700
----------------------------------------------------------------------
core/src/main/scala/org/apache/spark/TaskContext.scala | 3 ++-
.../main/scala/org/apache/spark/api/python/PythonRDD.scala | 8 ++++++++
2 files changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/4ea23db0/core/src/main/scala/org/apache/spark/TaskContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index cae983e..be53ca2 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -46,6 +46,7 @@ class TaskContext(
}
def executeOnCompleteCallbacks() {
- onCompleteCallbacks.foreach{_()}
+ // Process complete callbacks in the reverse order of registration
+ onCompleteCallbacks.reverse.foreach{_()}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4ea23db0/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index e4d0285..b67286a 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -100,6 +100,14 @@ private[spark] class PythonRDD[T: ClassTag](
}
}.start()
+ /*
+ * Partial fix for SPARK-1019: Attempts to stop reading the input stream since
+ * other completion callbacks might invalidate the input. Because interruption
+ * is not synchronous this still leaves a potential race where the interruption is
+ * processed only after the stream becomes invalid.
+ */
+ context.addOnCompleteCallback(() => context.interrupted = true)
+
// Return an iterator that read lines from the process's stdout
val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))
val stdoutIterator = new Iterator[Array[Byte]] {