You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2015/04/27 19:45:40 UTC

[jira] [Assigned] (SPARK-7174) Move calling `TaskScheduler.executorHeartbeatReceived` to another thread to avoid blocking the Akka thread pool

     [ https://issues.apache.org/jira/browse/SPARK-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apache Spark reassigned SPARK-7174:
-----------------------------------

    Assignee: Apache Spark

> Move calling `TaskScheduler.executorHeartbeatReceived` to another thread to avoid blocking the Akka thread pool
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-7174
>                 URL: https://issues.apache.org/jira/browse/SPARK-7174
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.2.2, 1.3.1
>            Reporter: Shixiong Zhu
>            Assignee: Apache Spark
>
> HeartbeatReceiver will TaskScheduler.executorHeartbeatReceived, which is a blocking operation because "TaskScheduler.executorHeartbeatReceived" will call 
> {code}
>     blockManagerMaster.driverEndpoint.askWithReply[Boolean](
>       BlockManagerHeartbeat(blockManagerId), 600 seconds)
> {code}
> finally. Even if it asks from a local Actor, it may block the current Akka thread. E.g., the reply may be dispatched to the same thread of the ask operation. So the reply cannot be processed. An extreme case is setting the thread number of Akka dispatch thread pool to 1.
> jstack log:
> {code}
> "sparkDriver-akka.actor.default-dispatcher-14" daemon prio=10 tid=0x00007f2a8c02d000 nid=0x725 waiting on condition [0x00007f2b1d6d0000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
> 	at sun.misc.Unsafe.park(Native Method)
> 	- parking to wait for  <0x00000006197a0868> (a scala.concurrent.impl.Promise$CompletionLatch)
> 	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1033)
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
> 	at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
> 	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
> 	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> 	at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
> 	at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
> 	at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
> 	at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)
> 	at scala.concurrent.Await$.result(package.scala:107)
> 	at org.apache.spark.rpc.RpcEndpointRef.askWithReply(RpcEnv.scala:355)
> 	at org.apache.spark.scheduler.DAGScheduler.executorHeartbeatReceived(DAGScheduler.scala:169)
> 	at org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:367)
> 	at org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1.applyOrElse(HeartbeatReceiver.scala:103)
> 	at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:182)
> 	at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:128)
> 	at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:203)
> 	at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:127)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> 	at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
> 	at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
> 	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> 	at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
> 	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> 	at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:94)
> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> 	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> 	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org