You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Krishna <re...@gmail.com> on 2016/06/23 21:59:48 UTC

destroyPythonWorker job in PySpark

Hi,

I am running a PySpark app with 1000's of cores (partitions is a small
multiple of # of cores) and the overall application performance is fine.
However, I noticed that, at the end of the job, PySpark initiates job
clean-up procedures and as part of this procedure, PySpark executes a job
shown in the Web UI as "runJob at PythonRDD.scala:361" for each
executor/core. The pain point is that, this step is running in a sequential
fashion and it has become the bottleneck in our application. Even though
each job takes only 0.5 sec (on average), it adds up when running with
1000's of executors.

Looking into the code for "destroyPythonWorker" in "SparkEnv.scala", is
this behavior the result of "stopWorker" being executed sequentially within
foreach? Let me know if I'm missing something and what can be done to fix
the issue.

  private[spark]
>   def destroyPythonWorker(pythonExec: String, envVars: Map[String,
> String], worker: Socket) {
>     synchronized {
>       val key = (pythonExec, envVars)
>       pythonWorkers.get(key).foreach(_.stopWorker(worker))
>     }
>   }



Spark version: 1.5.0-cdh5.5.1

Thanks