You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by zsxwing <gi...@git.apache.org> on 2015/04/27 19:44:49 UTC

[GitHub] spark pull request: [SPARK-7174][Core] Move calling `TaskScheduler...

GitHub user zsxwing opened a pull request:

    https://github.com/apache/spark/pull/5723

    [SPARK-7174][Core] Move calling `TaskScheduler.executorHeartbeatReceived` to another thread

    `HeartbeatReceiver` will call `TaskScheduler.executorHeartbeatReceived`, which is a blocking operation because `TaskScheduler.executorHeartbeatReceived` will call 
    
    ```Scala
        blockManagerMaster.driverEndpoint.askWithReply[Boolean](
          BlockManagerHeartbeat(blockManagerId), 600 seconds)
    ```
    
    finally. Even if it asks from a local Actor, it may block the current Akka thread. E.g., the reply may be dispatched to the same thread of the ask operation. So the reply cannot be processed. An extreme case is setting the thread number of Akka dispatch thread pool to 1.
    
    jstack log:
    
    ```
    "sparkDriver-akka.actor.default-dispatcher-14" daemon prio=10 tid=0x00007f2a8c02d000 nid=0x725 waiting on condition [0x00007f2b1d6d0000]
       java.lang.Thread.State: TIMED_WAITING (parking)
    	at sun.misc.Unsafe.park(Native Method)
    	- parking to wait for  <0x00000006197a0868> (a scala.concurrent.impl.Promise$CompletionLatch)
    	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
    	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1033)
    	at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
    	at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
    	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
    	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    	at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    	at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
    	at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
    	at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)
    	at scala.concurrent.Await$.result(package.scala:107)
    	at org.apache.spark.rpc.RpcEndpointRef.askWithReply(RpcEnv.scala:355)
    	at org.apache.spark.scheduler.DAGScheduler.executorHeartbeatReceived(DAGScheduler.scala:169)
    	at org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:367)
    	at org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1.applyOrElse(HeartbeatReceiver.scala:103)
    	at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:182)
    	at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:128)
    	at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:203)
    	at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:127)
    	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    	at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
    	at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
    	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    	at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
    	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    	at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:94)
    	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    ```
    
    This PR moved this blocking operation to a separated thread.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zsxwing/spark SPARK-7174

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/5723.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5723
    
----
commit 5b3b545e21de0cc6067b6157a806277571d33006
Author: zsxwing <zs...@gmail.com>
Date:   2015-04-27T17:42:00Z

    Move calling `TaskScheduler.executorHeartbeatReceived` to another thread to avoid blocking the Akka thread pool

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7174][Core] Move calling `TaskScheduler...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/5723


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7174][Core] Move calling `TaskScheduler...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5723#discussion_r29213445
  
    --- Diff: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ---
    @@ -76,13 +76,15 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
       
       private var timeoutCheckingTask: ScheduledFuture[_] = null
     
    -  private val timeoutCheckingThread =
    -    ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-timeout-checking-thread")
    +  // "eventLoopThread" is used to run some pretty fast actions. The actions running in it should not
    +  // block the thread for a long time.
    +  private val eventLoopThread =
    +    ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-receiver-event-loop-thread")
     
       private val killExecutorThread = ThreadUtils.newDaemonSingleThreadExecutor("kill-executor-thread")
     
       override def onStart(): Unit = {
    -    timeoutCheckingTask = timeoutCheckingThread.scheduleAtFixedRate(new Runnable {
    +    timeoutCheckingTask = eventLoopThread.scheduleAtFixedRate(new Runnable {
           override def run(): Unit = Utils.tryLogNonFatalError {
             Option(self).foreach(_.send(ExpireDeadHosts))
    --- End diff --
    
    I wrote this line because I was trying to write the following 3 lines code:
    
    ```Scala
    var _self = self
    if (_self != null) {
      _self.send(ExpireDeadHosts)
    }
    ```
    `self` is a method actually. In your code, the second `self` may return `null` if it's stopping.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7174][Core] Move calling `TaskScheduler...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5723#discussion_r29181433
  
    --- Diff: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ---
    @@ -99,11 +102,15 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
       override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
         case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
           if (scheduler != null) {
    -        val unknownExecutor = !scheduler.executorHeartbeatReceived(
    -          executorId, taskMetrics, blockManagerId)
    -        val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
             executorLastSeen(executorId) = System.currentTimeMillis()
    -        context.reply(response)
    +        reportExecutorHeartbeatReceived.submit(new Runnable {
    --- End diff --
    
    I think we can combine  `timeoutCheckingThread` and `reportExecutorHeartbeatReceived` to a single thread since the actions running in these threads are pretty fast.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7174][Core] Move calling `TaskScheduler...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/5723#issuecomment-96907193
  
    I've merged this. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7174][Core] Move calling `TaskScheduler...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/5723#issuecomment-96891913
  
    LGTM.
    
    
    We might want to instrument EventLoop to log warnings or errors if the rate of messages going in is much larger than the rate of processing.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7174][Core] Move calling `TaskScheduler...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/5723#issuecomment-96829203
  
      [Test build #31067 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31067/consoleFull) for   PR 5723 at commit [`98bfe48`](https://github.com/apache/spark/commit/98bfe48d603c56f45945049b72a484686e2d0be2).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.
     * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7174][Core] Move calling `TaskScheduler...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5723#discussion_r29210755
  
    --- Diff: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ---
    @@ -76,13 +76,15 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
       
       private var timeoutCheckingTask: ScheduledFuture[_] = null
     
    -  private val timeoutCheckingThread =
    -    ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-timeout-checking-thread")
    +  // "eventLoopThread" is used to run some pretty fast actions. The actions running in it should not
    +  // block the thread for a long time.
    +  private val eventLoopThread =
    +    ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-receiver-event-loop-thread")
     
       private val killExecutorThread = ThreadUtils.newDaemonSingleThreadExecutor("kill-executor-thread")
     
       override def onStart(): Unit = {
    -    timeoutCheckingTask = timeoutCheckingThread.scheduleAtFixedRate(new Runnable {
    +    timeoutCheckingTask = eventLoopThread.scheduleAtFixedRate(new Runnable {
           override def run(): Unit = Utils.tryLogNonFatalError {
             Option(self).foreach(_.send(ExpireDeadHosts))
    --- End diff --
    
    not part of this change, but can we change this to
    ```scala
    if (self != null) {
      self.send(ExpireDeadHosts)
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7174][Core] Move calling `TaskScheduler...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5723#discussion_r29177083
  
    --- Diff: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ---
    @@ -99,11 +102,15 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
       override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
         case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
           if (scheduler != null) {
    -        val unknownExecutor = !scheduler.executorHeartbeatReceived(
    -          executorId, taskMetrics, blockManagerId)
    -        val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
             executorLastSeen(executorId) = System.currentTimeMillis()
    -        context.reply(response)
    +        reportExecutorHeartbeatReceived.submit(new Runnable {
    --- End diff --
    
    keep creating threads might be bad -- maybe we should have a separate event loop for this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7174][Core] Move calling `TaskScheduler...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/5723#issuecomment-96767268
  
      [Test build #30968 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30968/consoleFull) for   PR 5723 at commit [`5b3b545`](https://github.com/apache/spark/commit/5b3b545e21de0cc6067b6157a806277571d33006).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7174][Core] Move calling `TaskScheduler...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/5723#issuecomment-96798651
  
      [Test build #31067 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31067/consoleFull) for   PR 5723 at commit [`98bfe48`](https://github.com/apache/spark/commit/98bfe48d603c56f45945049b72a484686e2d0be2).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7174][Core] Move calling `TaskScheduler...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5723#discussion_r29213486
  
    --- Diff: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ---
    @@ -76,13 +76,15 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
       
       private var timeoutCheckingTask: ScheduledFuture[_] = null
     
    -  private val timeoutCheckingThread =
    -    ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-timeout-checking-thread")
    +  // "eventLoopThread" is used to run some pretty fast actions. The actions running in it should not
    +  // block the thread for a long time.
    +  private val eventLoopThread =
    +    ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-receiver-event-loop-thread")
     
       private val killExecutorThread = ThreadUtils.newDaemonSingleThreadExecutor("kill-executor-thread")
     
       override def onStart(): Unit = {
    -    timeoutCheckingTask = timeoutCheckingThread.scheduleAtFixedRate(new Runnable {
    +    timeoutCheckingTask = eventLoopThread.scheduleAtFixedRate(new Runnable {
           override def run(): Unit = Utils.tryLogNonFatalError {
             Option(self).foreach(_.send(ExpireDeadHosts))
    --- End diff --
    
    ah ok that makes sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7174][Core] Move calling `TaskScheduler...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/5723#issuecomment-96755272
  
    cc @rxin @yhuai


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org