You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/02/17 05:35:12 UTC

spark git commit: [SPARK-5395] [PySpark] fix python process leak while coalesce()

Repository: spark
Updated Branches:
  refs/heads/branch-1.2 f468688f1 -> a39da171c


[SPARK-5395] [PySpark] fix python process leak while coalesce()

Currently, the Python process is released into pool only after the task had finished, it cause many process forked if coalesce() is called.

This PR will change it to release the process as soon as read all the data from it (finish the partition), then a process could be reused to process multiple partitions in a single task.

Author: Davies Liu <da...@databricks.com>

Closes #4238 from davies/py_leak and squashes the following commits:

ec80a43 [Davies Liu] add @volatile
6da437a [Davies Liu] address comments
24ed322 [Davies Liu] fix python process leak while coalesce()

(cherry picked from commit 5c746eedda8cff2fc1692cf6dce376f4b0ca6fac)
Signed-off-by: Josh Rosen <jo...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a39da171
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a39da171
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a39da171

Branch: refs/heads/branch-1.2
Commit: a39da171cb7fea2f32367edd60c2644aadb88282
Parents: f468688
Author: Davies Liu <da...@databricks.com>
Authored: Thu Jan 29 17:28:37 2015 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Mon Feb 16 20:35:02 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/api/python/PythonRDD.scala  | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a39da171/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 2b6788d..0d508d6 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -68,17 +68,16 @@ private[spark] class PythonRDD(
       envVars += ("SPARK_REUSE_WORKER" -> "1")
     }
     val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)
+    // Whether is the worker released into idle pool
+    @volatile var released = false
 
     // Start a thread to feed the process input from our parent's iterator
     val writerThread = new WriterThread(env, worker, split, context)
 
-    var complete_cleanly = false
     context.addTaskCompletionListener { context =>
       writerThread.shutdownOnTaskCompletion()
       writerThread.join()
-      if (reuse_worker && complete_cleanly) {
-        env.releasePythonWorker(pythonExec, envVars.toMap, worker)
-      } else {
+      if (!reuse_worker || !released) {
         try {
           worker.close()
         } catch {
@@ -146,8 +145,12 @@ private[spark] class PythonRDD(
                 stream.readFully(update)
                 accumulator += Collections.singletonList(update)
               }
+              // Check whether the worker is ready to be re-used.
               if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
-                complete_cleanly = true
+                if (reuse_worker) {
+                  env.releasePythonWorker(pythonExec, envVars.toMap, worker)
+                  released = true
+                }
               }
               null
           }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org