You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by pgandhi999 <gi...@git.apache.org> on 2018/08/24 15:33:18 UTC

[GitHub] spark pull request #22221: [SPARK-25231] : Executor Heartbeat Receiver does ...

GitHub user pgandhi999 opened a pull request:

    https://github.com/apache/spark/pull/22221

    [SPARK-25231] : Executor Heartbeat Receiver does not need to synchron…

    …ize on the TaskSchedulerImpl object
    
    Running a large Spark job with speculation turned on was causing executor heartbeats to time out on the driver end after sometime and eventually, after hitting the max number of executor failures, the job would fail. 
    
    ## What changes were proposed in this pull request?
    
    The main reason for the heartbeat timeouts was that the heartbeat-receiver-event-loop-thread was blocked waiting on the TaskSchedulerImpl object which was being held by one of the dispatcher-event-loop threads executing the method dequeueSpeculativeTasks() in TaskSetManager.scala. On further analysis of the heartbeat receiver method, it turns out there is no need to hold the lock on the whole object. The block of code in the method only uses  one global HashMap taskIdToTaskSetManager. Making that map a ConcurrentHashMap, we are ensuring atomicity of operations and speeding up the heartbeat receiver thread operation.
    
    ## How was this patch tested?
    
    Screenshots of the thread dump have been attached below:
    **heartbeat-receiver-event-loop-thread:**
    
    <img width="1409" alt="screen shot 2018-08-24 at 9 19 57 am" src="https://user-images.githubusercontent.com/22228190/44593413-e25df780-a788-11e8-9520-176a18401a59.png">
    
    **dispatcher-event-loop-thread:**
    
    <img width="1409" alt="screen shot 2018-08-24 at 9 21 56 am" src="https://user-images.githubusercontent.com/22228190/44593484-13d6c300-a789-11e8-8d88-34b1d51d4541.png">
    
    
    
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/pgandhi999/spark SPARK-25231

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22221.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22221
    
----
commit a0dcde583c76cb96f5112f4ff863874415ec9140
Author: pgandhi <pg...@...>
Date:   2018-08-24T15:27:01Z

    [SPARK-25231] : Executor Heartbeat Receiver does not need to synchronize on the TaskSchedulerImpl object
    
    The main reason for the heartbeat timeouts was that the heartbeat-receiver-event-loop-thread was blocked waiting on the TaskSchedulerImpl object which was being held by one of the dispatcher-event-loop threads executing the method dequeueSpeculativeTasks() in TaskSetManager.scala. On further analysis of the heartbeat receiver method, it turns out there is no need to hold the lock on the whole object. The block of code in the method only uses  one global HashMap taskIdToTaskSetManager. Making that map a ConcurrentHashMap, we are ensuring atomicity of operations and speeding up the heartbeat receiver thread operation.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22221: [SPARK-25231] : Fix synchronization of executor h...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22221#discussion_r214508181
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -517,10 +517,10 @@ private[spark] class TaskSchedulerImpl(
           accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
           blockManagerId: BlockManagerId): Boolean = {
         // (taskId, stageId, stageAttemptId, accumUpdates)
    -    val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = synchronized {
    +    val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = {
           accumUpdates.flatMap { case (id, updates) =>
             val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), None))
    -        taskIdToTaskSetManager.get(id).map { taskSetMgr =>
    +        Option(taskIdToTaskSetManager.get(id)).map { taskSetMgr =>
    --- End diff --
    
    Just leave a small concern here, original code locked hole scope of ids in `accumUpdates`, after this changing, maybe some id could be found originally but can't find now, because `taskIdToTaskSetManager` can be changed by `removeExecutor` or `statusUpdate`. Its not big problem if executor has been removed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22221: [SPARK-25231] : Fix synchronization of executor heartbea...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22221
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95216/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22221: [SPARK-25231] : Fix synchronization of executor heartbea...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on the issue:

    https://github.com/apache/spark/pull/22221
  
    @pgandhi999 thank you for your comments.
    LGTM, then let me ask @zsxwing and @JoshRosen for their comments


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22221: [SPARK-25231] : Fix synchronization of executor h...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22221#discussion_r212833953
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -465,7 +465,7 @@ private[spark] class TaskSchedulerImpl(
         var reason: Option[ExecutorLossReason] = None
         synchronized {
           try {
    -        taskIdToTaskSetManager.get(tid) match {
    +        Option(taskIdToTaskSetManager.get(tid)) match {
    --- End diff --
    
    Good catch. I like this direction.
    
    I have a question about the change of semantics. By removing `synchronization` at `accumUpdatesWithTaskIds()`, a pair of operations in this `synchronized` `get()` and `remove()` in `cleanupTaskState()` is not atomic regarding `get`.
    Is this change ok?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22221: [SPARK-25231] : Fix synchronization of executor h...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22221#discussion_r214937032
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -517,10 +517,10 @@ private[spark] class TaskSchedulerImpl(
           accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
           blockManagerId: BlockManagerId): Boolean = {
         // (taskId, stageId, stageAttemptId, accumUpdates)
    -    val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = synchronized {
    +    val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = {
           accumUpdates.flatMap { case (id, updates) =>
             val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), None))
    -        taskIdToTaskSetManager.get(id).map { taskSetMgr =>
    +        Option(taskIdToTaskSetManager.get(id)).map { taskSetMgr =>
    --- End diff --
    
    I agree this could happen, but it shouldn't cause issues because before this change the executor could have been removed right before this function was called (its all timing dependent), so that does not change this functionality.  This is only to update accumulators for running tasks.  If the tasks had finished then the accumulator updates would have been processed via the task end events.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22221: [SPARK-25231] : Executor Heartbeat Receiver does not nee...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22221
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22221: [SPARK-25231] : Executor Heartbeat Receiver does not nee...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22221
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22221: [SPARK-25231] : Fix synchronization of executor heartbea...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22221
  
    **[Test build #95216 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95216/testReport)** for PR 22221 at commit [`a0dcde5`](https://github.com/apache/spark/commit/a0dcde583c76cb96f5112f4ff863874415ec9140).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22221: [SPARK-25231] : Fix synchronization of executor heartbea...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on the issue:

    https://github.com/apache/spark/pull/22221
  
    We would appreciate it if you would describe some detail on `further analysis` for removing lock on the whole object. 
    
    > On further analysis of the heartbeat receiver method, it turns out there is no need to hold the lock on the whole object



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22221: [SPARK-25231] : Fix synchronization of executor h...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22221#discussion_r212978188
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -465,7 +465,7 @@ private[spark] class TaskSchedulerImpl(
         var reason: Option[ExecutorLossReason] = None
         synchronized {
           try {
    -        taskIdToTaskSetManager.get(tid) match {
    +        Option(taskIdToTaskSetManager.get(tid)) match {
    --- End diff --
    
    It think your question is about the get() and remove() inside cleanupTaskState all being inside a single synchronize block, correct?  
    
    I don't see that as being a problem here since taskIdToTaskSetManager is a concurrentHashMap.  That protects the operations from being atomic and if you do a remove on an object that isn't there then it does nothing.  There is no other code that removes from there so I don't think that can happen anyway.  With this change the only thing outside of a synchronize block is a get in accumUpdatesWithTaskIds which will be harmless if it had been removed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22221: [SPARK-25231] : Executor Heartbeat Receiver does not nee...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22221
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22221: [SPARK-25231] : Executor Heartbeat Receiver does not nee...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the issue:

    https://github.com/apache/spark/pull/22221
  
    ok to test


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22221: [SPARK-25231] : Executor Heartbeat Receiver does not nee...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22221
  
    **[Test build #95216 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95216/testReport)** for PR 22221 at commit [`a0dcde5`](https://github.com/apache/spark/commit/a0dcde583c76cb96f5112f4ff863874415ec9140).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22221: [SPARK-25231] : Fix synchronization of executor heartbea...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22221
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22221: [SPARK-25231] : Fix synchronization of executor heartbea...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the issue:

    https://github.com/apache/spark/pull/22221
  
    +1 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22221: [SPARK-25231] : Fix synchronization of executor h...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22221#discussion_r213071006
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -465,7 +465,7 @@ private[spark] class TaskSchedulerImpl(
         var reason: Option[ExecutorLossReason] = None
         synchronized {
           try {
    -        taskIdToTaskSetManager.get(tid) match {
    +        Option(taskIdToTaskSetManager.get(tid)) match {
    --- End diff --
    
    yes as far as I can see its safe.  If the get happened before its removed it calculates the accumulators, if its after its removed it just gets an empty array back.  This isn't any different then when it was synchronized.   There is nothing in the statusUpdate between the get and call to cleanupTaskState where it removes that I see depends on accumulators or anything else.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22221: [SPARK-25231] : Fix synchronization of executor h...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22221#discussion_r213005226
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -465,7 +465,7 @@ private[spark] class TaskSchedulerImpl(
         var reason: Option[ExecutorLossReason] = None
         synchronized {
           try {
    -        taskIdToTaskSetManager.get(tid) match {
    +        Option(taskIdToTaskSetManager.get(tid)) match {
    --- End diff --
    
    `ConcurrentHashMap` make each operation like `get()`, `remove()`, and others. Thus, I reviewed places more than one operations are within a `synchronized`. The place is here. 
    When we apply this PR, the `get` in `accumUpdatesWithTaskIds` can be executed between `get()` and `remove()`. My question is like a confirmation whether it is safe or not.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22221: [SPARK-25231] : Fix synchronization of executor heartbea...

Posted by pgandhi999 <gi...@git.apache.org>.
Github user pgandhi999 commented on the issue:

    https://github.com/apache/spark/pull/22221
  
    @kiszk I have added some more details in the PR description. Thank you.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22221: [SPARK-25231] : Fix synchronization of executor heartbea...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the issue:

    https://github.com/apache/spark/pull/22221
  
    merged to master and 2.3, thanks @pgandhi999 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22221: [SPARK-25231] : Fix synchronization of executor h...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/22221


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org