You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Josh Rosen (JIRA)" <ji...@apache.org> on 2015/01/27 23:49:36 UTC

[jira] [Commented] (SPARK-5191) Pyspark: scheduler hangs when importing a standalone pyspark app

    [ https://issues.apache.org/jira/browse/SPARK-5191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14294351#comment-14294351 ] 

Josh Rosen commented on SPARK-5191:
-----------------------------------

I'm investigating this now and it seems to affect even old PySpark versions, such as 1.0.2.  It seems that the DAGScheduler is blocking in handling task completion while trying to communicate with the Python driver to process accumulator updates.  From {{jstack}} on 1.0.2:

{code}
   java.lang.Thread.State: RUNNABLE
	at java.net.SocketInputStream.socketRead0(Native Method)
	at java.net.SocketInputStream.read(SocketInputStream.java:152)
	at java.net.SocketInputStream.read(SocketInputStream.java:122)
	at java.net.SocketInputStream.read(SocketInputStream.java:210)
	at org.apache.spark.api.python.PythonAccumulatorParam.addInPlace(PythonRDD.scala:416)
	at org.apache.spark.api.python.PythonAccumulatorParam.addInPlace(PythonRDD.scala:387)
	at org.apache.spark.Accumulable.$plus$plus$eq(Accumulators.scala:72)
	at org.apache.spark.Accumulators$$anonfun$add$2.apply(Accumulators.scala:280)
	at org.apache.spark.Accumulators$$anonfun$add$2.apply(Accumulators.scala:278)
	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
	at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
	at org.apache.spark.Accumulators$.add(Accumulators.scala:278)
	- locked <0x00000007f71393f8> (a org.apache.spark.Accumulators$)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:825)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1231)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
	at akka.actor.ActorCell.invoke(ActorCell.scala:456)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
	at akka.dispatch.Mailbox.run(Mailbox.scala:219)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}

I added some additional print statements to the Python {{accumulators.py}} file and it seems like the Python driver is receiving the accumulator updates but somehow hanging while processing them.  Here's a modified version of the Python accumulator update handler:

{code}
class _UpdateRequestHandler(SocketServer.StreamRequestHandler):
    def handle(self):
        print "Handling an accumulator update request!"
        from pyspark.accumulators import _accumulatorRegistry
        print "Done importing the accumulator registry!"
        num_updates = read_int(self.rfile)
        print "Num Updates: %i" % num_updates
        for _ in range(num_updates):
            (aid, update) = pickleSer._read_with_length(self.rfile)
            _accumulatorRegistry[aid] += update
        # Write a byte in acknowledgement
        self.wfile.write(struct.pack("!b", 1))
        print "Done handling the accumulator update!"
{code}

Based on the print statements when the job hangs, it looks like the hang is occurring when importing {{_accumulatorRegistry}}.

I'm curious why this import leads to problems, since I think the {{pyspark.accumulators}} module should have already been loaded by the time that this import runs.  I'm also not quite sure why the extra layer of indirection when running {{b.py}} causes this problem.

> Pyspark: scheduler hangs when importing a standalone pyspark app
> ----------------------------------------------------------------
>
>                 Key: SPARK-5191
>                 URL: https://issues.apache.org/jira/browse/SPARK-5191
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 1.0.2, 1.1.1, 1.3.0, 1.2.1
>            Reporter: Daniel Liu
>
> In a.py:
> {code}
> from pyspark import SparkContext
> sc = SparkContext("local", "test spark")
> rdd = sc.parallelize(range(1, 10))
> print rdd.count()
> {code}
> In b.py:
> {code}
> from a import *
> {code}
> {{python a.py}} runs fine
> {{python b.py}} will hang at TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
> {{./bin/spark-submit --py-files a.py b.py}} has the same problem



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org