You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yin Huai (JIRA)" <ji...@apache.org> on 2015/05/27 07:15:17 UTC
[jira] [Resolved] (SPARK-6012) Deadlock when asking for partitions
from CoalescedRDD on top of a TakeOrdered operator
[ https://issues.apache.org/jira/browse/SPARK-6012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yin Huai resolved SPARK-6012.
-----------------------------
Resolution: Not A Problem
I think we do not have this issue after 1.3. I am going to resolve it as "Not A Problem".
> Deadlock when asking for partitions from CoalescedRDD on top of a TakeOrdered operator
> --------------------------------------------------------------------------------------
>
> Key: SPARK-6012
> URL: https://issues.apache.org/jira/browse/SPARK-6012
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.2.1
> Reporter: Max Seiden
> Priority: Critical
>
> h3. Summary
> I've found that a deadlock occurs when asking for the partitions from a SchemaRDD that has a TakeOrdered as its terminal operator. The problem occurs when a child RDDs asks the DAGScheduler for preferred partition locations (which locks the scheduler) and eventually hits the #execute() of the TakeOrdered operator, which submits tasks but is blocked when it also tries to get preferred locations (in a separate thread). It seems like the TakeOrdered op's #execute() method should not actually submit a task (it is calling #executeCollect() and creating a new RDD) and should instead stay more true to the comment a logically apply a Limit on top of a Sort.
> In my particular case, I am forcing a repartition of a SchemaRDD with a terminal Limit(..., Sort(...)), which is where the CoalescedRDD comes into play.
> h3. Stack Traces
> h4. Task Submission
> {noformat}
> "main" prio=5 tid=0x00007f8e72800000 nid=0x1303 in Object.wait() [0x000000010ed5e000]
> java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> - waiting on <0x00000007c4c239b8> (a org.apache.spark.scheduler.JobWaiter)
> at java.lang.Object.wait(Object.java:503)
> at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
> - locked <0x00000007c4c239b8> (a org.apache.spark.scheduler.JobWaiter)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:514)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1321)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1390)
> at org.apache.spark.rdd.RDD.reduce(RDD.scala:884)
> at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1161)
> at org.apache.spark.sql.execution.TakeOrdered.executeCollect(basicOperators.scala:183)
> at org.apache.spark.sql.execution.TakeOrdered.execute(basicOperators.scala:188)
> at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
> - locked <0x00000007c36ce038> (a org.apache.spark.sql.hive.HiveContext$$anon$7)
> at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
> at org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:127)
> at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209)
> at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207)
> at org.apache.spark.rdd.RDD.firstParent(RDD.scala:1278)
> at org.apache.spark.sql.SchemaRDD.getPartitions(SchemaRDD.scala:122)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
> at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
> at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:79)
> at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80)
> at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209)
> at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207)
> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1333)
> at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1304)
> - locked <0x00000007f55c2238> (a org.apache.spark.scheduler.DAGScheduler)
> at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1148)
> at org.apache.spark.rdd.PartitionCoalescer.currPrefLocs(CoalescedRDD.scala:175)
> at org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$5$$anonfun$apply$2.apply(CoalescedRDD.scala:192)
> at org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$5$$anonfun$apply$2.apply(CoalescedRDD.scala:191)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
> at org.apache.spark.rdd.PartitionCoalescer$LocationIterator.<init>(CoalescedRDD.scala:186)
> at org.apache.spark.rdd.PartitionCoalescer.setupGroups(CoalescedRDD.scala:237)
> at org.apache.spark.rdd.PartitionCoalescer.run(CoalescedRDD.scala:338)
> at org.apache.spark.rdd.CoalescedRDD.getPartitions(CoalescedRDD.scala:84)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
> at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
> at org.apache.spark.sql.SchemaRDD.getPartitions(SchemaRDD.scala:122)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
> at org.apache.spark.sql.SchemaRDD.getPartitions(SchemaRDD.scala:122)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
> {noformat}
> h4. Submitted Task
> {noformat}
> "sparkDriver-akka.actor.default-dispatcher-2" daemon prio=5 tid=0x00007f8e72248800 nid=0x5903 waiting for monitor entry [0x000000012214c000]
> java.lang.Thread.State: BLOCKED (on object monitor)
> at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1304)
> - waiting to lock <0x00000007f55c2238> (a org.apache.spark.scheduler.DAGScheduler)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:853)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:852)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:852)
> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:781)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:780)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:780)
> at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1389)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org