You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/04/09 21:15:57 UTC

[GitHub] [spark] sumeetgajjar opened a new pull request #32114: [Spark 35011][CORE] Avoid Block Manager registerations when StopExecutor msg is in-flight

sumeetgajjar opened a new pull request #32114:
URL: https://github.com/apache/spark/pull/32114


   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   This patch proposes a fix to prevent triggering BlockManager reregistration while `StopExecutor` msg is in-flight.
   It adds a Cache of recently removed executors on Driver. During the registration in `BlockManagerMasterEndpoint` if the BlockManager belongs to a recently removed executor, we return None indicating the registration is ignored since the executor will be shutting down soon.
   On `BlockManagerHeartbeat`, if the BlockManager belongs to a recently removed executor, we return true indicating the driver knows about it, thereby preventing reregisteration.
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   This changes are needed since BlockManager reregistration while executor is shutting down causes inconsistent bookkeeping of executors in Spark.
   Consider the following scenario:
   - `CoarseGrainedSchedulerBackend` issues async `StopExecutor` on executorEndpoint
   - `CoarseGrainedSchedulerBackend` removes that executor from Driver's internal data structures and publishes `SparkListenerExecutorRemoved` on the `listenerBus`.
   - Executor has still not processed `StopExecutor` from the Driver
   - Driver receives heartbeat from the Executor, since it cannot find the `executorId` in its data structures, it responds with `HeartbeatResponse(reregisterBlockManager = true)`
   - `BlockManager` on the Executor reregisters with the `BlockManagerMaster` and `SparkListenerBlockManagerAdded` is published on the `listenerBus`
   - Executor starts processing the `StopExecutor` and exits
   - `AppStatusListener` picks the `SparkListenerBlockManagerAdded` event and updates `AppStatusStore`
   - `statusTracker.getExecutorInfos` refers `AppStatusStore` to get the list of executors which returns the dead executor as alive.
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   No
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   -->
   Added new unit tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-849643086


   **[Test build #139021 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139021/testReport)** for PR 32114 at commit [`2c7a439`](https://github.com/apache/spark/commit/2c7a4395c3dc75ff803b37a29541292104c53cb7).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-826114665


   So I think the solution now would be:
   
   For the heartbeat, using https://github.com/apache/spark/pull/32114#pullrequestreview-634766324.
    
   For  the`Blokmanager`, adding a `BlockManagerEndpointSharedState` (as mentioned by @sumeetgajjar in https://github.com/apache/spark/pull/32114#discussion_r612814973) for both `BlockManagerMasterEndpoint` and `BlockManagerMasterHeartbeatEndpoint`. It's true that if we only adding a removal state to the `blockManagerInfo`, we have to filter the removed blockmanagers first before traversing it (e.g., we won't expect to return a removed blockmanager in `getPeers`).
   
   In `BlockManagerEndpointSharedState`, we'd have both `activeBlockManagerInfo` and the `removedBlockManagerInfo`.  We don't have to set up a new cleaner to clear the `removedBlockManagerInfo`. Instead, we can reuse the fix of `HeartbeatReceiver` as whenever there's a sure removal in `HeartbeatReceiver`, we can send a removal request to `BlockManagerEndpointSharedState` as well by following the code path of `!scheduler.executorHeartbeatReceived`(e.g., we could have `scheduler.clearBlockManagerInfo` similarly).
   
   WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-849775873


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/139021/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r611286561



##########
File path: core/src/main/scala/org/apache/spark/SparkEnv.scala
##########
@@ -355,6 +355,12 @@ object SparkEnv extends Logging {
 
     // Mapping from block manager id to the block manager's information.
     val blockManagerInfo = new concurrent.TrieMap[BlockManagerId, BlockManagerInfo]()
+    // Using a cache here since we only want to track recently removed executors to deny their
+    // block manager registration while their StopExecutor message is in-flight.
+    // Assuming average size of 6 bytes of execId and each entry in Cache taking around 64 bytes,
+    // max size of this cache = (6 + 64) * 30000 = 2.1MB
+    val recentlyRemovedExecutors = CacheBuilder.newBuilder().maximumSize(30000)
+      .build[String, String]()

Review comment:
       Any particular reason for such a high cache size ?
   Also, expire after some time ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r640320239



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -336,13 +345,16 @@ class BlockManagerMasterEndpoint(
   private def removeBlockManager(blockManagerId: BlockManagerId): Unit = {
     val info = blockManagerInfo(blockManagerId)
 
+    // Not removing info from the blockManagerInfo map, but only updating the removal timestamp of
+    // the executor in BlockManagerInfo. This info will be removed from blockManagerInfo map by the
+    // blockManagerInfoCleaner once now() - info.executorRemovalTs > executorTimeoutMs.
+    info.updateExecutorRemovalTs()

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-819120744


   I am getting a little confused between PR description and the subsequent discussion.
   What exactly is the behavior we are trying to converge towards/address ?
   
   An expiration of executor from heartbeat master not only sends a `StopExecutor` to voluntarily get executor to exit, but also gets the cluster manager to force termination (in case of MIA/hung executor). So in steady state, once transitionary/overlapping updates are done, the executor should be gone according to driver.
   
   My understanding was, there is a race here between cluster manager notifying application and the executor heartbeat/registration : which ends up causing a dead executor to be marked live indefinitely.
   
   Is this the only case we are addressing ? Or are there any other paths that are impacted ?
   
   (@Ngone51 Not sure if standalone has nuances that I am missing here).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-827048420


   > So I think the solution now would be:
   > 
   > For the heartbeat, using [#32114 (review)](https://github.com/apache/spark/pull/32114#pullrequestreview-634766324).
   > 
   > For the`Blokmanager`, adding a `BlockManagerEndpointSharedState` (as mentioned by @sumeetgajjar in [#32114 (comment)](https://github.com/apache/spark/pull/32114#discussion_r612814973)) for both `BlockManagerMasterEndpoint` and `BlockManagerMasterHeartbeatEndpoint`. It's true that if we only adding a removal state to the `blockManagerInfo`, we have to filter the removed blockmanagers first before traversing it (e.g., we won't expect to return a removed blockmanager in `getPeers`).
   > 
   > In `BlockManagerEndpointSharedState`, we'd have both `activeBlockManagerInfo` and the `removedBlockManagerInfo`. We don't have to set up a new cleaner to clear the `removedBlockManagerInfo`. Instead, we can reuse the fix of `HeartbeatReceiver` as whenever there's a sure removal in `HeartbeatReceiver`, we can send a removal request to `BlockManagerEndpointSharedState` as well by following the code path of `!scheduler.executorHeartbeatReceived`(e.g., we could have `scheduler.clearBlockManagerInfo` similarly).
   > 
   > WDYT?
   
   @Ngone51 Thank you for this suggestion, I understand the solution, however, I believe this might be slight complex to keep track of things since the cleanup/removal is triggered from a different component i.e. `HeartbeatReceiver`. 
   
   I spoke to @attilapiros offline regarding his solution and it seems he missed to mention one thing that `BlockManagerInfo` will not be removed here: https://github.com/apache/spark/blob/1b609c7dcfc3a30aefff12a71aac5c1d6273b2c0/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala#L344
   
   Insterad, the cleanup thread would take care of removal. This will keep the whole logic in the same component i.e. `BlockManagerMasterEndpoint` and would be easy to track from a code understanding point of view.
   
   I will proceed with @attilapiros [proposal](https://github.com/apache/spark/pull/32114#discussion_r612373538) where we model BlockManager removal as a new state, run some tests and update more. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
attilapiros commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r640514344



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -273,12 +282,12 @@ class BlockManagerMasterEndpoint(
         }
       }
       bmIdsExecutor.foreach { bmId =>
-        blockManagerInfo.get(bmId).foreach { bmInfo =>
+        blockManagerInfo.get(bmId).filter(_.isAlive).foreach { bmInfo =>

Review comment:
       Issue is created: https://issues.apache.org/jira/browse/SPARK-35543 
   
   Give me some time to fix it I am very busy with another component (strange coincidence with a huge memory leak) 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 edited a comment on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
Ngone51 edited a comment on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-899979045


   I just realized this bug does cause the real problem when working in conjunction with https://github.com/apache/spark/pull/24533. Basically, the re-registration issue leads to the driver thinks an executor is alive while it's actually dead, which in turn causes the client to retry the block on the dead executor, while it shouldn't.   Could you @sumeetgajjar backport this fix to 3.1/3.0 as well?
   cc @mridulm @attilapiros 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
attilapiros commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r612373538



##########
File path: core/src/main/scala/org/apache/spark/SparkEnv.scala
##########
@@ -355,6 +355,12 @@ object SparkEnv extends Logging {
 
     // Mapping from block manager id to the block manager's information.
     val blockManagerInfo = new concurrent.TrieMap[BlockManagerId, BlockManagerInfo]()
+    // Using a cache here since we only want to track recently removed executors to deny their
+    // block manager registration while their StopExecutor message is in-flight.
+    // Assuming average size of 6 bytes of execId and each entry in Cache taking around 64 bytes,
+    // max size of this cache = (6 + 64) * 30000 = 2.1MB
+    val recentlyRemovedExecutors = CacheBuilder.newBuilder().maximumSize(30000)
+      .build[String, String]()

Review comment:
       I think another possible solution is to extend the `BlockManagerInfo` with the timestamp of the removing. So modelling the removing as a new state and this way we could avoid using this separate cache completely and all the bm related data would be in the same place. 
   
   Of course in this case you should implement the cleanup.
   
   For example it could be just a simple Long var which is 0 by default which means the BlockManager is alive/active (this special value can be hidden behind a method of `BlockMangerInfo` like `isAlive(currentTs)`). The cleanup would triggered for delay plus some extra time to avoid too frequent iteration on the `blockManagerInfo` collection.
   
   WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-899997315


   > I just realized this bug does cause the real problem when working in conjunction with #24533. Basically, the re-registration issue leads to the driver thinks an executor is alive while it's actually dead, which in turn causes the client to retry the block on the dead executor, while it shouldn't. Could you @sumeetgajjar backport this fix to 3.1/3.0 as well?
   > cc @mridulm @attilapiros
   
   @Ngone51, sure I will backport it to 3.1 and 3.0 as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-824346384


   > If that's the case (it seems not correct but exits for a long time already), I think posting the SparkListenerBlockManagerAdded inside the if (!blockManagerInfo.contains(id)) would be enough for the whole fix?
   
   @Ngone51 I believe moving `SparkListenerBlockManagerAdded` inside the if-loop should be enough.
   I will give it a try and check if any other cases surface.
   
   Thanks!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r617990361



##########
File path: core/src/main/scala/org/apache/spark/SparkEnv.scala
##########
@@ -355,6 +355,12 @@ object SparkEnv extends Logging {
 
     // Mapping from block manager id to the block manager's information.
     val blockManagerInfo = new concurrent.TrieMap[BlockManagerId, BlockManagerInfo]()
+    // Using a cache here since we only want to track recently removed executors to deny their
+    // block manager registration while their StopExecutor message is in-flight.
+    // Assuming average size of 6 bytes of execId and each entry in Cache taking around 64 bytes,
+    // max size of this cache = (6 + 64) * 30000 = 2.1MB
+    val recentlyRemovedExecutors = CacheBuilder.newBuilder().maximumSize(30000)
+      .build[String, String]()

Review comment:
       I found a code path which will break @attilapiros's proposed solution since the following holds true.
   
   > However, we will have to abstract blockManagerInfo: mutable.Map[BlockManagerId, BlockManagerInfo].
   Currently, ...
   
   Let's consider the following set of events:
   - A  `CoarseGrainedClusterMessage.RemoveExecutor` is issued
   - `CoarseGrainedSchedulerBackend` issues async `StopExecutor` on `executorEndpoint` and then invokes `executorLost` on `TaskSchedulerImpl`
   https://github.com/apache/spark/blob/e60939591336a2c38e5b7a6a36f45b8614b46fe5/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L430
   - TaskSchedulerImpl in its `executorLost` invokes `dagScheduler.executorLost`
   https://github.com/apache/spark/blob/e60939591336a2c38e5b7a6a36f45b8614b46fe5/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L998-L1001
   - `DAGScheduler` while handling executorLost invokes `removeExecutorAndUnregisterOutputs` which internally invokes `blockManagerMaster.removeExecutor(execId)` (as you pointed out in your [comment below](https://github.com/apache/spark/pull/32114#issuecomment-819255421)) which further clears `blockManagerId` from `blockManagerInfo` in `BlockManagerMasterEndpoint`
   https://github.com/apache/spark/blob/e60939591336a2c38e5b7a6a36f45b8614b46fe5/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2043
   - The Executor has not yet processed `StopExecutor`
   - Executor reports its Heartbeat
   - `HeartbeatReceiver` invokes `scheduler.executorHeartbeatReceived` to check if the BlockManager on the executor requires re-registration
   https://github.com/apache/spark/blob/e60939591336a2c38e5b7a6a36f45b8614b46fe5/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala#L137
   - `TashSchedulerImpl` delegates this to `DAGScheduler`
   https://github.com/apache/spark/blob/e60939591336a2c38e5b7a6a36f45b8614b46fe5/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L858
   - `DAGScheduler` asks `BlockManagerMasterHeartbeatEndpoint` if it knows the BlockManager
   https://github.com/apache/spark/blob/e60939591336a2c38e5b7a6a36f45b8614b46fe5/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L300
   - `BlockManagerMasterHeartbeatEndpoint` returns false since it cannot find `blockManagerId` in `BlockManagerInfo` indicating the blockManager should re-register
   https://github.com/apache/spark/blob/e60939591336a2c38e5b7a6a36f45b8614b46fe5/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterHeartbeatEndpoint.scala#L51
   - BlockManager re-registers which publishes the `SparkListenerBlockManagerAdded` causing the inconsistent book-keeping in `AppStatusStore`
   - Executor processes `StopExecutor` and exits.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r638521415



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -422,7 +430,7 @@ class BlockManagerMasterEndpoint(
     val locations = blockLocations.get(blockId)
     if (locations != null) {
       locations.foreach { blockManagerId: BlockManagerId =>
-        val blockManager = blockManagerInfo.get(blockManagerId)
+        val blockManager = blockManagerInfo.get(blockManagerId).filter(_.isAlive)

Review comment:
       Hmm...uses `isActive` is kind of messy. What if we maintain the inactive BlockManagerInfos in a separate data structure and remove the BlockManagerInfo from the `blockManagerInfo` as it is.
   
   (I have an impression we discussed this but I can't remember why we abandon it. Sorry if I raise the duplicate discussion here.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-899979045


   I just realized this bug does cause the real problem when working in conjunction with https://github.com/apache/spark/pull/24533. Could you @sumeetgajjar backport this fix to 3.1/3.0 as well?
   cc @mridulm @attilapiros 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r638460320



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -878,4 +903,15 @@ private[spark] class BlockManagerInfo(
   def clear(): Unit = {
     _blocks.clear()
   }
+
+  def executorRemovalTs: Option[Long] = _executorRemovalTs
+
+  def isAlive: Boolean = _executorRemovalTs.isEmpty
+
+  def updateExecutorRemovalTs(): Unit = {
+    if (!isAlive) {
+      logWarning(s"executorRemovalTs is already set to ${_executorRemovalTs.get}")
+    }
+    _executorRemovalTs = Some(System.currentTimeMillis())

Review comment:
       Update `_executorRemovalTs` in `else` ?

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -728,14 +738,28 @@ class BlockManagerMasterEndpoint(
   private def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {
     for (
       blockManagerId <- blockManagerIdByExecutor.get(executorId);
-      info <- blockManagerInfo.get(blockManagerId)
+      info <- blockManagerInfo.get(blockManagerId).filter(_.isAlive)
     ) yield {
       info.storageEndpoint
     }
   }
 
   override def onStop(): Unit = {
     askThreadPool.shutdownNow()
+    blockManagerInfoCleaner.shutdownNow()
+  }
+
+  private def cleanBlockManagerInfo(): Unit = {
+    logDebug("Cleaning blockManagerInfo")
+    val now = System.currentTimeMillis()
+    val (expiredEntries, _) = blockManagerInfo.partition { case (_, bmInfo) =>
+      // bmInfo.executorRemovalTs.get cannot be None when BM is not alive
+      !bmInfo.isAlive && (now - bmInfo.executorRemovalTs.get) > executorTimeoutMs
+    }

Review comment:
       `filter(<>).keys` instead of `partition` ?

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -273,12 +282,12 @@ class BlockManagerMasterEndpoint(
         }
       }
       bmIdsExecutor.foreach { bmId =>
-        blockManagerInfo.get(bmId).foreach { bmInfo =>
+        blockManagerInfo.get(bmId).filter(_.isAlive).foreach { bmInfo =>

Review comment:
       Does this actually have an impact ?

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -273,12 +282,12 @@ class BlockManagerMasterEndpoint(
         }
       }
       bmIdsExecutor.foreach { bmId =>
-        blockManagerInfo.get(bmId).foreach { bmInfo =>
+        blockManagerInfo.get(bmId).filter(_.isAlive).foreach { bmInfo =>

Review comment:
       Looking more, @Ngone51 do we have a leak w.r.t `blockStatusByShuffleService` in this class ?
   Is bmInfo.removeBlock helping to reduce impact of the leak ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
attilapiros commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r640514344



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -273,12 +282,12 @@ class BlockManagerMasterEndpoint(
         }
       }
       bmIdsExecutor.foreach { bmId =>
-        blockManagerInfo.get(bmId).foreach { bmInfo =>
+        blockManagerInfo.get(bmId).filter(_.isAlive).foreach { bmInfo =>

Review comment:
       Issue is created: https://issues.apache.org/jira/browse/SPARK-35543 
   
   Give my some time to fix it I am very busy with another component (strange coincidence with a huge memory leak) 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
attilapiros commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-817266614


   I fixed a typo in the title and description: registerations => registrations.
   But I will review this properly only on next week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-849394608


   oh yes, we have to reply anyway..


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-853530293


   Thanks, merged to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-819053945


   > In essence, if I understood correctly, we are adding a `lostExecutorCandidates:Map[String, ExpirationState]` ?
   > 
   >     * If we detect a request to expire an executor comes in - then expire based on (some) policy : timeout since initial expiry/number of expirations/other reasons : else add/update expiration state of candidate.
   > 
   >     * If heartbeat comes in, then remove from candidate set.
   > 
   >     * If explicit remove, then remove from both `executorLastSeen` and `lostExecutorCandidates:Set`.
   > 
   > 
   > Did I miss anything ? I am fine with this approach.
   > (I explicitly pulled out magic values out for explanation clarity)
   
   Thanks for the comment @mridulm .
   I believe this [comment](https://github.com/apache/spark/pull/32114#issuecomment-819046189) applies here as well.
   
   I believe @attilapiros [suggestion](https://github.com/apache/spark/pull/32114#discussion_r612373538) would take care of both cases where re-registration is trigger without introducing another Cache of `recentlyRemovedExecutors`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-849758681


   Just an FYI, although the Github checks for `Build and test` shows Queued here, the summary page for that job shows all green ✅ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
attilapiros commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r639185928



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -273,12 +282,12 @@ class BlockManagerMasterEndpoint(
         }
       }
       bmIdsExecutor.foreach { bmId =>
-        blockManagerInfo.get(bmId).foreach { bmInfo =>
+        blockManagerInfo.get(bmId).filter(_.isAlive).foreach { bmInfo =>

Review comment:
       @Ngone51, @mridulm  So the feature was introduced by https://github.com/apache/spark/pull/24499.
   It can be enabled by setting `spark.shuffle.service.fetch.rdd.enable` to true then instead of the executor's block manager ID an artificial ID will be used by where the port part is coming from the external shuffle service.
   
   About the leak:
   I cannot see leak in `blockStatusByShuffleService` regarding this method. As the clean is done a few lines above:
   https://github.com/apache/spark/blob/c0a4102acbeee20abd69e1b9c98f4bc308b9caea/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala#L279-L281
   
   And there is test for it:
   https://github.com/apache/spark/blob/c0a4102acbeee20abd69e1b9c98f4bc308b9caea/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala#L129-L144
   
      




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-824461500


   > > If that's the case (it seems not correct but exits for a long time already), I think posting the SparkListenerBlockManagerAdded inside the if (!blockManagerInfo.contains(id)) would be enough for the whole fix?
   > 
   > @Ngone51 I believe moving `SparkListenerBlockManagerAdded` inside the if-loop should be enough.
   > I will give it a try and check if any other cases surface.
   > 
   > Thanks!
   
   I tried this and the issue still exists. When we apply the sequence of events mentioned in [#32114](https://github.com/apache/spark/pull/32114#discussion_r617990361), the issue surfaces again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on pull request #32114: [Spark 35011][CORE] Avoid Block Manager registerations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-817218035


   I just realized I can now re-run the checks in my personal fork, instead of pushing empty commits.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-851655230


   **[Test build #139123 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139123/testReport)** for PR 32114 at commit [`2c7a439`](https://github.com/apache/spark/commit/2c7a4395c3dc75ff803b37a29541292104c53cb7).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r611348328



##########
File path: core/src/main/scala/org/apache/spark/SparkEnv.scala
##########
@@ -355,6 +355,12 @@ object SparkEnv extends Logging {
 
     // Mapping from block manager id to the block manager's information.
     val blockManagerInfo = new concurrent.TrieMap[BlockManagerId, BlockManagerInfo]()
+    // Using a cache here since we only want to track recently removed executors to deny their
+    // block manager registration while their StopExecutor message is in-flight.
+    // Assuming average size of 6 bytes of execId and each entry in Cache taking around 64 bytes,
+    // max size of this cache = (6 + 64) * 30000 = 2.1MB
+    val recentlyRemovedExecutors = CacheBuilder.newBuilder().maximumSize(30000)
+      .build[String, String]()

Review comment:
       The high cache size is to ensure the fix works for a large enough job with 30000 executors. 
   
   Sure, does an expiry of 10min (or larger) sounds good?
   This should give the executor long enough to process `StopExecutor` (in-flight) message and complete the shutdown.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-849686711


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/43538/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r639022211



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -728,14 +738,28 @@ class BlockManagerMasterEndpoint(
   private def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {
     for (
       blockManagerId <- blockManagerIdByExecutor.get(executorId);
-      info <- blockManagerInfo.get(blockManagerId)
+      info <- blockManagerInfo.get(blockManagerId).filter(_.isAlive)
     ) yield {
       info.storageEndpoint
     }
   }
 
   override def onStop(): Unit = {
     askThreadPool.shutdownNow()
+    blockManagerInfoCleaner.shutdownNow()
+  }
+
+  private def cleanBlockManagerInfo(): Unit = {
+    logDebug("Cleaning blockManagerInfo")
+    val now = System.currentTimeMillis()
+    val (expiredEntries, _) = blockManagerInfo.partition { case (_, bmInfo) =>
+      // bmInfo.executorRemovalTs.get cannot be None when BM is not alive
+      !bmInfo.isAlive && (now - bmInfo.executorRemovalTs.get) > executorTimeoutMs
+    }

Review comment:
       Done.

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -878,4 +903,15 @@ private[spark] class BlockManagerInfo(
   def clear(): Unit = {
     _blocks.clear()
   }
+
+  def executorRemovalTs: Option[Long] = _executorRemovalTs
+
+  def isAlive: Boolean = _executorRemovalTs.isEmpty
+
+  def updateExecutorRemovalTs(): Unit = {
+    if (!isAlive) {
+      logWarning(s"executorRemovalTs is already set to ${_executorRemovalTs.get}")
+    }
+    _executorRemovalTs = Some(System.currentTimeMillis())

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
attilapiros commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r612363119



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterHeartbeatEndpoint.scala
##########
@@ -44,11 +47,17 @@ private[spark] class BlockManagerMasterHeartbeatEndpoint(
   }
 
   /**
-   * Return true if the driver knows about the given block manager. Otherwise, return false,
-   * indicating that the block manager should re-register.
+   * Return true if the driver knows about the given block manager or if the block manager belongs
+   * to a recently removed executor. Otherwise, return false, indicating that the block manager
+   * should re-register.
    */
   private def heartbeatReceived(blockManagerId: BlockManagerId): Boolean = {
-    if (!blockManagerInfo.contains(blockManagerId)) {
+    if (Option(recentlyRemovedExecutors.getIfPresent(blockManagerId.executorId)).isDefined) {

Review comment:
       Nit: same here (Option is not needed)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r640288481



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -422,7 +430,7 @@ class BlockManagerMasterEndpoint(
     val locations = blockLocations.get(blockId)
     if (locations != null) {
       locations.foreach { blockManagerId: BlockManagerId =>
-        val blockManager = blockManagerInfo.get(blockManagerId)
+        val blockManager = blockManagerInfo.get(blockManagerId).filter(_.isAlive)

Review comment:
       I will add the helper methods as they would clearly help to prevent accidental misses for `isAlive`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r611286561



##########
File path: core/src/main/scala/org/apache/spark/SparkEnv.scala
##########
@@ -355,6 +355,12 @@ object SparkEnv extends Logging {
 
     // Mapping from block manager id to the block manager's information.
     val blockManagerInfo = new concurrent.TrieMap[BlockManagerId, BlockManagerInfo]()
+    // Using a cache here since we only want to track recently removed executors to deny their
+    // block manager registration while their StopExecutor message is in-flight.
+    // Assuming average size of 6 bytes of execId and each entry in Cache taking around 64 bytes,
+    // max size of this cache = (6 + 64) * 30000 = 2.1MB
+    val recentlyRemovedExecutors = CacheBuilder.newBuilder().maximumSize(30000)
+      .build[String, String]()

Review comment:
       Any particular reason for such a high cache size ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r640273239



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -422,7 +430,7 @@ class BlockManagerMasterEndpoint(
     val locations = blockLocations.get(blockId)
     if (locations != null) {
       locations.foreach { blockManagerId: BlockManagerId =>
-        val blockManager = blockManagerInfo.get(blockManagerId)
+        val blockManager = blockManagerInfo.get(blockManagerId).filter(_.isAlive)

Review comment:
       I am slightly more in favor of @attilapiros's suggestion .. I am more concerned with future evolution, where someone might accidentally forget to check for `isAlive` : the helper methods should defend against that.
   
   I am fine with either approach though - in current state and with this enhancement, I am fine with this going through.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r640307434



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -728,15 +739,34 @@ class BlockManagerMasterEndpoint(
   private def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {
     for (
       blockManagerId <- blockManagerIdByExecutor.get(executorId);
-      info <- blockManagerInfo.get(blockManagerId)
+      info <- aliveBlockManagerInfo(blockManagerId)
     ) yield {
       info.storageEndpoint
     }
   }
 
   override def onStop(): Unit = {
     askThreadPool.shutdownNow()
+    blockManagerInfoCleaner.shutdownNow()
+  }
+
+  private def cleanBlockManagerInfo(): Unit = {
+    logDebug("Cleaning blockManagerInfo")
+    val now = System.currentTimeMillis()
+    val expiredBmIds = blockManagerInfo.filter { case (_, bmInfo) =>
+      // bmInfo.executorRemovalTs.get cannot be None when BM is not alive
+      !bmInfo.isAlive && (now - bmInfo.executorRemovalTs.get) > executorTimeoutMs
+    }.keys
+    expiredBmIds.foreach { bmId =>
+      logInfo(s"Cleaning expired $bmId from blockManagerInfo")
+      blockManagerInfo.remove(bmId)
+    }
   }
+
+  @inline private def aliveBlockManagerInfo(bmId: BlockManagerId): Option[BlockManagerInfo] =
+    blockManagerInfo.get(bmId).filter(_.isAlive)
+
+  @inline private def allAliveBlockManagerInfos() = blockManagerInfo.values.filter(_.isAlive)

Review comment:
       Could you declare the return type for this method too? And we can omit the brackets when there are no parameters to make the code neater at the caller side.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r617862914



##########
File path: core/src/main/scala/org/apache/spark/SparkEnv.scala
##########
@@ -355,6 +355,12 @@ object SparkEnv extends Logging {
 
     // Mapping from block manager id to the block manager's information.
     val blockManagerInfo = new concurrent.TrieMap[BlockManagerId, BlockManagerInfo]()
+    // Using a cache here since we only want to track recently removed executors to deny their
+    // block manager registration while their StopExecutor message is in-flight.
+    // Assuming average size of 6 bytes of execId and each entry in Cache taking around 64 bytes,
+    // max size of this cache = (6 + 64) * 30000 = 2.1MB
+    val recentlyRemovedExecutors = CacheBuilder.newBuilder().maximumSize(30000)
+      .build[String, String]()

Review comment:
       Yes, @Ngone51 as pointed out in [#32114](https://github.com/apache/spark/pull/32114#issuecomment-819255421), `BlockManagerMessages.RemoveExecutor` is raised in limited cases. I could not find any more code path apart from the one you pointed. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r612976711



##########
File path: core/src/main/scala/org/apache/spark/SparkEnv.scala
##########
@@ -355,6 +355,12 @@ object SparkEnv extends Logging {
 
     // Mapping from block manager id to the block manager's information.
     val blockManagerInfo = new concurrent.TrieMap[BlockManagerId, BlockManagerInfo]()
+    // Using a cache here since we only want to track recently removed executors to deny their
+    // block manager registration while their StopExecutor message is in-flight.
+    // Assuming average size of 6 bytes of execId and each entry in Cache taking around 64 bytes,
+    // max size of this cache = (6 + 64) * 30000 = 2.1MB
+    val recentlyRemovedExecutors = CacheBuilder.newBuilder().maximumSize(30000)
+      .build[String, String]()

Review comment:
       > Is it possible to separate driver commanded intentionally removed executors from unintentional executor loss?
   
   @attilapiros  It's possible to know that an executor is removed intentionally by the driver or not. The problem is, currently, the executor info is stored in many different places. So you have to update many methods or messages to add the `isIntentional` filed (for example), to let all the components know and make certain decisions on it, which could be miscellaneous.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
attilapiros commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r612406005



##########
File path: core/src/main/scala/org/apache/spark/SparkEnv.scala
##########
@@ -355,6 +355,12 @@ object SparkEnv extends Logging {
 
     // Mapping from block manager id to the block manager's information.
     val blockManagerInfo = new concurrent.TrieMap[BlockManagerId, BlockManagerInfo]()
+    // Using a cache here since we only want to track recently removed executors to deny their
+    // block manager registration while their StopExecutor message is in-flight.
+    // Assuming average size of 6 bytes of execId and each entry in Cache taking around 64 bytes,
+    // max size of this cache = (6 + 64) * 30000 = 2.1MB
+    val recentlyRemovedExecutors = CacheBuilder.newBuilder().maximumSize(30000)
+      .build[String, String]()

Review comment:
       I thought about this a bit more but haven't checked the code yet: Is it possible to separate driver commanded intentionally removed executors from unintentional executor loss?
   
   Intentionally removed executors shouldn't be re-registered. 
   
   cc @Ngone51 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-851701808


   **[Test build #139123 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139123/testReport)** for PR 32114 at commit [`2c7a439`](https://github.com/apache/spark/commit/2c7a4395c3dc75ff803b37a29541292104c53cb7).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r612970148



##########
File path: core/src/main/scala/org/apache/spark/SparkEnv.scala
##########
@@ -355,6 +355,12 @@ object SparkEnv extends Logging {
 
     // Mapping from block manager id to the block manager's information.
     val blockManagerInfo = new concurrent.TrieMap[BlockManagerId, BlockManagerInfo]()
+    // Using a cache here since we only want to track recently removed executors to deny their
+    // block manager registration while their StopExecutor message is in-flight.
+    // Assuming average size of 6 bytes of execId and each entry in Cache taking around 64 bytes,
+    // max size of this cache = (6 + 64) * 30000 = 2.1MB
+    val recentlyRemovedExecutors = CacheBuilder.newBuilder().maximumSize(30000)
+      .build[String, String]()

Review comment:
       > Currently, on RemoveExecutor, we remove the corresponding BlockManagerInfo from blockManagerInfo map...
   
   As per https://github.com/apache/spark/pull/32114#issuecomment-819255421, I don't think there would be a `BlockManagerMessages.RemoveExecutor` raised in this PR case.
   
   Could you point out on which code path that the `BlockManagerMessages.RemoveExecutor` is raised? 
   
   If there's no more code path raises the `BlockManagerMessages.RemoveExecutor` in this PR case, then @attilapiros definitely works. But, I'd also suggest another idea in  https://github.com/apache/spark/pull/32114#issuecomment-819255421.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r611995057



##########
File path: core/src/main/scala/org/apache/spark/SparkEnv.scala
##########
@@ -355,6 +355,12 @@ object SparkEnv extends Logging {
 
     // Mapping from block manager id to the block manager's information.
     val blockManagerInfo = new concurrent.TrieMap[BlockManagerId, BlockManagerInfo]()
+    // Using a cache here since we only want to track recently removed executors to deny their
+    // block manager registration while their StopExecutor message is in-flight.
+    // Assuming average size of 6 bytes of execId and each entry in Cache taking around 64 bytes,
+    // max size of this cache = (6 + 64) * 30000 = 2.1MB
+    val recentlyRemovedExecutors = CacheBuilder.newBuilder().maximumSize(30000)
+      .build[String, String]()

Review comment:
       The timeout should be modeled based on what is the max expected delay for heartbeat to come in from executor.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-849643086


   **[Test build #139021 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139021/testReport)** for PR 32114 at commit [`2c7a439`](https://github.com/apache/spark/commit/2c7a4395c3dc75ff803b37a29541292104c53cb7).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 closed pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
Ngone51 closed pull request #32114:
URL: https://github.com/apache/spark/pull/32114


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
attilapiros commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r612362334



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -510,14 +514,20 @@ class BlockManagerMasterEndpoint(
   }
 
   /**
-   * Returns the BlockManagerId with topology information populated, if available.
+   * Returns an option of BlockManagerId. If topology information is available, it populated inside
+   * BlockManagerId. If the BlockManager belongs to a recently removed executor, None is returned.
    */
   private def register(
       idWithoutTopologyInfo: BlockManagerId,
       localDirs: Array[String],
       maxOnHeapMemSize: Long,
       maxOffHeapMemSize: Long,
-      storageEndpoint: RpcEndpointRef): BlockManagerId = {
+      storageEndpoint: RpcEndpointRef): Option[BlockManagerId] = {
+    if (Option(recentlyRemovedExecutors.getIfPresent(idWithoutTopologyInfo.executorId)).isDefined) {

Review comment:
       Nit: for just checking whether an object is null you do not need to build an Option instance (Option is good when when you pass the object to another method to emphasize it can be null). 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #32114: [Spark 35011][CORE] Avoid Block Manager registerations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-817237064


   cc @Ngone51 FYI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-900534808


   [SPARK-34949](https://issues.apache.org/jira/browse/SPARK-34949) should also be backported to close any gaps.
   
   P.S. It is already in 3.1 we just need to backport it to 3.0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-849764248


   The K8s integration test failure is unrelated to this change and is due to `apt install` failure in SparkR docker image build.
   ```
   W: GPG error: http://cloud.r-project.org/bin/linux/debian buster-cran35/ InRelease: The following signatures couldn't be verified because the public key is not available: NO_PUBKEY FCAE2A0E115C3D8A
   E: The repository 'http://cloud.r-project.org/bin/linux/debian buster-cran35/ InRelease' is not signed.
   The command '/bin/sh -c apt-get update &&   apt install -y gnupg &&   echo "deb http://cloud.r-project.org/bin/linux/debian buster-cran35/" >> /etc/apt/sources.list &&   (apt-key adv --keyserver keys.gnupg.net --recv-key 'E19F5F87128899B192B1A2C2AD5F960A256A04AF' || apt-key adv --keyserver keys.openpgp.org --recv-key 'E19F5F87128899B192B1A2C2AD5F960A256A04AF') &&   apt-get update &&   apt install -y -t buster-cran35 r-base r-base-dev &&   rm -rf /var/cache/apt/*' returned a non-zero code: 100
   Failed to build SparkR Docker image, please refer to Docker build output for details.
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-849686711


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/43538/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-852286255


   Hi @mridulm @Ngone51,
   Please let me know if any more changes are required from my end, if not could you please merge this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r640270377



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -273,12 +282,12 @@ class BlockManagerMasterEndpoint(
         }
       }
       bmIdsExecutor.foreach { bmId =>
-        blockManagerInfo.get(bmId).foreach { bmInfo =>
+        blockManagerInfo.get(bmId).filter(_.isAlive).foreach { bmInfo =>

Review comment:
       @attilapiros The entry of `bmIdForShuffleService ` from `blockStatusByShuffleService` itself is not getting removed right ? (With the value `HashMap` is also never cleaned up - just gets cleared ?)
   
   Only the corresponding blockStatus is getting removed ?
   
   I am sure I am missing something here ... want to make sure what !




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on pull request #32114: [Spark 35011][CORE] Avoid Block Manager registerations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-817025778


   It turns out "on-disk storage" (`encryptionTest`) under `BlockManagerSuite` fails when "encryption = on" on my mac. But Intellij does not report it as a failure since the jvm simply exited. It also does not run the next set of tests under `BlockManagerSuite` given such scenarios and hence I didn't notice NPE for two of those unrunned tests. 
   
   However, the same encryption test passes on Github. JVM exits while dynamically loading `org.apache.commons.crypto.random.OpenSslCryptoRandom` using `commons-crypto` on my machine.
   
   Lesson learned: always perform a final sanity test run using `sbt/mvn` before concluding the test passes 😋 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-849756029


   Jenkins retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-819248812


   Standalone should be the same. @mridulm 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r638514447



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -273,12 +282,12 @@ class BlockManagerMasterEndpoint(
         }
       }
       bmIdsExecutor.foreach { bmId =>
-        blockManagerInfo.get(bmId).foreach { bmInfo =>
+        blockManagerInfo.get(bmId).filter(_.isAlive).foreach { bmInfo =>

Review comment:
       > Does this actually have an impact ?
   
   Yes, this looks like a no-op as `bmIdsExecutor` should be empty for an inactive BlockManager.
   
   > Looking more, @Ngone51 do we have a leak w.r.t blockStatusByShuffleService in this class ?
   Is bmInfo.removeBlock helping to reduce impact of the leak ?
   
   I'm not sure about it because I'm now not sure when `bmIdsExtShuffle` is not empty (or say when the block location can be external shuffle service).  @attilapiros could you give more context here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-849629152


   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-851683239


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/43643/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar edited a comment on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar edited a comment on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-849372854


   > https://github.com/apache/spark/blob/5cc17ba0c7d81278a622fb474cf18f8e8530335f/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala#L116
   > 
   > @sumeetgajjar ^^^ Shall we not reply when it's already an inactive BlockManager here? It would avoid the network timeout issue when the executor is shut down.
   
   @Ngone51 I don't follow, how can we not reply to `RegisterBlockManager` when it expects a return type of `BlockManagerId`. Can you please elaborate on this? Please correct me if I am missing something here.
   
   https://github.com/apache/spark/blob/5cc17ba0c7d81278a622fb474cf18f8e8530335f/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala#L77-L80


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-819255421


   > From BlockManager - e.g. reportBlockStatus.
   Just modifying HeartbeatReceiver won't solve the re-registration issue here. We will also have to implement a similar kind of tracking inside BlockManagerMasterEndpoint. And now since both tracking are independent of each other, it might introduce some race condition (please correct me if I am wrong).
   
   You're right. I followed the PR description only so I thought `HeartbeatReceiver` is the only problematic place.
   
   
   I checked the code and surprisingly find that we don't remove `BlockManager` when we remove an executor. And removing `BlockManager` happens in few cases only,
   
   * the corresponding executor of the `BlockManager` caused the shuffle fetch failure
   https://github.com/apache/spark/blob/ee7d838aaf46f9d786e0388915b422fb78952893/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2081
   * an executor is removed redundantly
   https://github.com/apache/spark/blob/ee7d838aaf46f9d786e0388915b422fb78952893/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L434
   * a new registered `BlockManager` evicts an old one (if any)
   https://github.com/apache/spark/blob/ee7d838aaf46f9d786e0388915b422fb78952893/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala#L534-L537
   
   If that's the case (it seems not correct but exits for a long time already), I think posting the `SparkListenerBlockManagerAdded` inside the `if (!blockManagerInfo.contains(id)) ` would be enough for the whole fix?
   
   https://github.com/apache/spark/blob/ee7d838aaf46f9d786e0388915b422fb78952893/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala#L531-L562
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 closed pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
Ngone51 closed pull request #32114:
URL: https://github.com/apache/spark/pull/32114


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r639053095



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -422,7 +430,7 @@ class BlockManagerMasterEndpoint(
     val locations = blockLocations.get(blockId)
     if (locations != null) {
       locations.foreach { blockManagerId: BlockManagerId =>
-        val blockManager = blockManagerInfo.get(blockManagerId)
+        val blockManager = blockManagerInfo.get(blockManagerId).filter(_.isAlive)

Review comment:
       > What if we maintain the inactive BlockManagerInfos in a separate data structure and remove the BlockManagerInfo from the blockManagerInfo as it is.
   
   Yes @Ngone51, you are right, we discussed this earlier. My original solution was exactly what you described, to have a Guava cache which stored inactive BlockManagerInfos. In this case, we also had to pass `InactiveBlockManagerCache` to `BlockManagerMasterHeartbeatEndpoint` so it can respond appropriately to `BlockManagerHeartbeat`.
   
   Later @attilapiros suggested to model BlockManager removal as a new state by adding `removalTS` to `BlockManagerInfo`. I found his solution was better than using the `InactiveBlockManagerCache`. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-849684238


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43538/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-849775873


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/139021/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r640310754



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -336,13 +345,16 @@ class BlockManagerMasterEndpoint(
   private def removeBlockManager(blockManagerId: BlockManagerId): Unit = {
     val info = blockManagerInfo(blockManagerId)
 
+    // Not removing info from the blockManagerInfo map, but only updating the removal timestamp of

Review comment:
       Could you also explain the issue with removing here? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32114: [Spark 35011][CORE] Avoid Block Manager registerations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-816977606


   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar edited a comment on pull request #32114: [Spark 35011][CORE] Avoid Block Manager registerations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar edited a comment on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-817025778


   It turns out "on-disk storage" (`encryptionTest`) under `BlockManagerSuite` fails when "encryption = on" on my mac. But Intellij does not report it as a failure since the jvm simply exited. It also does not run the next set of tests under `BlockManagerSuite` given such scenarios and hence I didn't notice NPE for two of those unrunned tests. 
   
   However, the same encryption test passes on Github. JVM exits while dynamically loading `org.apache.commons.crypto.random.OpenSslCryptoRandom` using `commons-crypto` on my machine.
   
   Lesson learnt: always perform a final sanity test run using `sbt/mvn` before concluding the test passes 😋 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r640270377



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -273,12 +282,12 @@ class BlockManagerMasterEndpoint(
         }
       }
       bmIdsExecutor.foreach { bmId =>
-        blockManagerInfo.get(bmId).foreach { bmInfo =>
+        blockManagerInfo.get(bmId).filter(_.isAlive).foreach { bmInfo =>

Review comment:
       @attilapiros The entry of `bmIdForShuffleService ` from `blockStatusByShuffleService` itself is not getting removed right ? Also, the value `HashMap` is also never cleaned up/cleared ?
   
   Only the corresponding blockStatus is getting removed ?
   
   I am sure I am missing something here ... want to make sure what !




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-847202280


   Gentle ping @Ngone51 @attilapiros @mridulm.
   Please take another look at it once you have time. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r640270377



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -273,12 +282,12 @@ class BlockManagerMasterEndpoint(
         }
       }
       bmIdsExecutor.foreach { bmId =>
-        blockManagerInfo.get(bmId).foreach { bmInfo =>
+        blockManagerInfo.get(bmId).filter(_.isAlive).foreach { bmInfo =>

Review comment:
       @attilapiros The entry of bmId from `blockStatusByShuffleService` itself is not getting removed right ? (With the key bmid and value `HashMap` never removed - the value map is just gets cleared ?)
   
   I am sure I am missing something here ... want to make sure what !




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r643623560



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -273,12 +282,12 @@ class BlockManagerMasterEndpoint(
         }
       }
       bmIdsExecutor.foreach { bmId =>
-        blockManagerInfo.get(bmId).foreach { bmInfo =>
+        blockManagerInfo.get(bmId).filter(_.isAlive).foreach { bmInfo =>

Review comment:
       Thanks for confirming @attilapiros !




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-851674791


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43643/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r640295779



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -422,7 +430,7 @@ class BlockManagerMasterEndpoint(
     val locations = blockLocations.get(blockId)
     if (locations != null) {
       locations.foreach { blockManagerId: BlockManagerId =>
-        val blockManager = blockManagerInfo.get(blockManagerId)
+        val blockManager = blockManagerInfo.get(blockManagerId).filter(_.isAlive)

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 edited a comment on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
Ngone51 edited a comment on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-899979045


   I just realized this bug does cause the real problem when working in conjunction with https://github.com/apache/spark/pull/24533. Basically, the re-registration issue leads to the driver thinks an executor is alive while it's actually dead, which in turn causes the client to retry the block fetching on a dead executor, while it shouldn't.   Could you @sumeetgajjar backport this fix to 3.1/3.0 as well?
   cc @mridulm @attilapiros 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-840986610


   Hi @mridulm @Ngone51 @attilapiros ,
   I have updated the PR with the solution that Attila proposed. Could you please review it once you are free?
   Apologies for the delay, I was fighting fires of my own. 
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r640270377



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -273,12 +282,12 @@ class BlockManagerMasterEndpoint(
         }
       }
       bmIdsExecutor.foreach { bmId =>
-        blockManagerInfo.get(bmId).foreach { bmInfo =>
+        blockManagerInfo.get(bmId).filter(_.isAlive).foreach { bmInfo =>

Review comment:
       @attilapiros The entry of `bmIdForShuffleService ` from `blockStatusByShuffleService` itself is not getting removed right ? (With the key bmid and value `HashMap` never removed - the value map is just gets cleared ?)
   
   I am sure I am missing something here ... want to make sure what !




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r617862914



##########
File path: core/src/main/scala/org/apache/spark/SparkEnv.scala
##########
@@ -355,6 +355,12 @@ object SparkEnv extends Logging {
 
     // Mapping from block manager id to the block manager's information.
     val blockManagerInfo = new concurrent.TrieMap[BlockManagerId, BlockManagerInfo]()
+    // Using a cache here since we only want to track recently removed executors to deny their
+    // block manager registration while their StopExecutor message is in-flight.
+    // Assuming average size of 6 bytes of execId and each entry in Cache taking around 64 bytes,
+    // max size of this cache = (6 + 64) * 30000 = 2.1MB
+    val recentlyRemovedExecutors = CacheBuilder.newBuilder().maximumSize(30000)
+      .build[String, String]()

Review comment:
       ~~Yes, @Ngone51 as pointed out in [#32114](https://github.com/apache/spark/pull/32114#issuecomment-819255421), `BlockManagerMessages.RemoveExecutor` is raised in limited cases. I could not find any more code path apart from the one you pointed.~~
   
   Redacting this due to new findings.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on pull request #32114: [Spark 35011][CORE] Avoid Block Manager registerations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-817050465


   > It turns out "on-disk storage" (`encryptionTest`) under `BlockManagerSuite` fails when "encryption = on" on my mac. But Intellij does not report it as a failure since the jvm simply exited. It also does not run the next set of tests under `BlockManagerSuite` given such scenarios and hence I didn't notice NPE for two of those unrunned tests.
   > 
   > However, the same encryption test passes on Github. JVM exits while dynamically loading `org.apache.commons.crypto.random.OpenSslCryptoRandom` using `commons-crypto` on my machine.
   > 
   > Lesson learned: always perform a final sanity test run using `sbt/mvn` before concluding the test passes 😋
   
   I had `openssl@1.1.1` installed on my mac (Catalina 10.15.7), however, the corresponding shared libs `libcrypto.1.1.dylib` and `libssl.1.1.dylib` were missing from my `/usr/local/lib` dir. 
   Running the following commands solved the issue and I was able to run the `encryptionTest`. 
   ```
   cd /usr/local/Cellar/openssl@1.1/1.1.1k/lib
   cp libssl.1.1.dylib libcrypto.1.1.dylib /usr/local/lib
   cd /usr/local/lib
   ln -s libcrypto.1.1.dylib libcrypto.dylib
   ln -s libssl.1.1.dylib libssl.dylib 
   ```
   
   You could check if the `commons-crypto` is using the correct `openssl` shared libs by running the following command
   ```
   java -jar Desktop/commons-crypto-1.1.0/commons-crypto-1.1.0.jar Crypto
   Apache Commons Crypto 1.1.0
   Native code loaded OK: 1.1.0
   Native name: Apache Commons Crypto
   Native built: Aug 28 2020
   OpenSSL library loaded OK, version: 0x101010bf
   OpenSSL library info: OpenSSL 1.1.1k  25 Mar 2021
   Random instance created OK: org.apache.commons.crypto.random.OpenSslCryptoRandom@2a84aee7
   Cipher AES/CTR/NoPadding instance created OK: org.apache.commons.crypto.cipher.OpenSslCipher@1fb3ebeb
   Additional OpenSSL_version(n) details:
   1: compiler: clang -fPIC -arch x86_64 -O3 -Wall -DL_ENDIAN -DOPENSSL_PIC -DOPENSSL_CPUID_OBJ -DOPENSSL_IA32_SSE2 -DOPENSSL_BN_ASM_MONT -DOPENSSL_BN_ASM_MONT5 -DOPENSSL_BN_ASM_GF2m -DSHA1_ASM -DSHA256_ASM -DSHA512_ASM -DKECCAK1600_ASM -DRC4_ASM -DMD5_ASM -DAESNI_ASM -DVPAES_ASM -DGHASH_ASM -DECP_NISTZ256_ASM -DX25519_ASM -DPOLY1305_ASM -D_REENTRANT -DNDEBUG
   2: built on: Thu Mar 25 21:01:02 2021 UTC
   3: platform: darwin64-x86_64-cc
   4: OPENSSLDIR: "/usr/local/etc/openssl@1.1"
   5: ENGINESDIR: "/usr/local/Cellar/openssl@1.1/1.1.1k/lib/engines-1.1"
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-900534808


   [SPARK-34949](https://issues.apache.org/jira/browse/SPARK-34949) should also be backported to close any gaps.
   
   P.S. It is already in 3.1 we just need to backport it to 3.0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r640270377



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -273,12 +282,12 @@ class BlockManagerMasterEndpoint(
         }
       }
       bmIdsExecutor.foreach { bmId =>
-        blockManagerInfo.get(bmId).foreach { bmInfo =>
+        blockManagerInfo.get(bmId).filter(_.isAlive).foreach { bmInfo =>

Review comment:
       @attilapiros The entry of `bmIdForShuffleService ` from `blockStatusByShuffleService` itself is not getting removed right ? Also, the value `HashMap` is also never cleared ?
   
   Only the corresponding blockStatus is getting removed ?
   
   I am sure I am missing something here ... want to make sure what !




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r640308497



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -336,13 +345,16 @@ class BlockManagerMasterEndpoint(
   private def removeBlockManager(blockManagerId: BlockManagerId): Unit = {
     val info = blockManagerInfo(blockManagerId)
 
+    // Not removing info from the blockManagerInfo map, but only updating the removal timestamp of
+    // the executor in BlockManagerInfo. This info will be removed from blockManagerInfo map by the
+    // blockManagerInfoCleaner once now() - info.executorRemovalTs > executorTimeoutMs.
+    info.updateExecutorRemovalTs()

Review comment:
       nit: "updateExecutorRemovalTs" -> "setExecutorRemovalTs"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r640319534



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -336,13 +345,16 @@ class BlockManagerMasterEndpoint(
   private def removeBlockManager(blockManagerId: BlockManagerId): Unit = {
     val info = blockManagerInfo(blockManagerId)
 
+    // Not removing info from the blockManagerInfo map, but only updating the removal timestamp of

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
attilapiros commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r612362334



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -510,14 +514,20 @@ class BlockManagerMasterEndpoint(
   }
 
   /**
-   * Returns the BlockManagerId with topology information populated, if available.
+   * Returns an option of BlockManagerId. If topology information is available, it populated inside
+   * BlockManagerId. If the BlockManager belongs to a recently removed executor, None is returned.
    */
   private def register(
       idWithoutTopologyInfo: BlockManagerId,
       localDirs: Array[String],
       maxOnHeapMemSize: Long,
       maxOffHeapMemSize: Long,
-      storageEndpoint: RpcEndpointRef): BlockManagerId = {
+      storageEndpoint: RpcEndpointRef): Option[BlockManagerId] = {
+    if (Option(recentlyRemovedExecutors.getIfPresent(idWithoutTopologyInfo.executorId)).isDefined) {

Review comment:
       Nit: for just checking the object is null you do not need to build an Option (Option is good when when you pass the object to another method to emphasize it can be null). 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
attilapiros commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-851653789


   jenkins retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r611288569



##########
File path: core/src/main/scala/org/apache/spark/SparkEnv.scala
##########
@@ -368,10 +374,12 @@ object SparkEnv extends Logging {
           } else {
             None
           }, blockManagerInfo,
-          mapOutputTracker.asInstanceOf[MapOutputTrackerMaster])),
+          mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
+          recentlyRemovedExecutors)),
       registerOrLookupEndpoint(
         BlockManagerMaster.DRIVER_HEARTBEAT_ENDPOINT_NAME,
-        new BlockManagerMasterHeartbeatEndpoint(rpcEnv, isLocal, blockManagerInfo)),
+        new BlockManagerMasterHeartbeatEndpoint(rpcEnv, isLocal, blockManagerInfo,
+          recentlyRemovedExecutors)),
       conf,

Review comment:
       Move the cache into `BlockManagerMasterEndpoint` and hide the impl detail ?
   We can have a cleaner interface here ... `BlockManagerMasterHeartbeatEndpoint` simply needs a way to validate if an executor was recently removed - does not need to know if it was a Cache/Set/etc. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r640294866



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -422,7 +430,7 @@ class BlockManagerMasterEndpoint(
     val locations = blockLocations.get(blockId)
     if (locations != null) {
       locations.foreach { blockManagerId: BlockManagerId =>
-        val blockManager = blockManagerInfo.get(blockManagerId)
+        val blockManager = blockManagerInfo.get(blockManagerId).filter(_.isAlive)

Review comment:
       Sure, let's go with the way of helper methods.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-851683239






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r640323089



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -336,13 +345,16 @@ class BlockManagerMasterEndpoint(
   private def removeBlockManager(blockManagerId: BlockManagerId): Unit = {
     val info = blockManagerInfo(blockManagerId)
 
+    // Not removing info from the blockManagerInfo map, but only updating the removal timestamp of

Review comment:
       Please let me know if any more changes are required. 😄 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r640270377



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -273,12 +282,12 @@ class BlockManagerMasterEndpoint(
         }
       }
       bmIdsExecutor.foreach { bmId =>
-        blockManagerInfo.get(bmId).foreach { bmInfo =>
+        blockManagerInfo.get(bmId).filter(_.isAlive).foreach { bmInfo =>

Review comment:
       @attilapiros The entry of `bmIdForShuffleService ` from `blockStatusByShuffleService` itself is not getting removed right ?
   Only the corresponding blockStatus is getting removed ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-851683229


   Kubernetes integration test status success
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43643/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-826114665


   So I think the solution now would be:
   
   For the heartbeat, using https://github.com/apache/spark/pull/32114#pullrequestreview-634766324.
    
   For  the`Blokmanager`, adding a `BlockManagerEndpointSharedState` (as mentioned by @sumeetgajjar in https://github.com/apache/spark/pull/32114#discussion_r612814973) for both `BlockManagerMasterEndpoint` and `BlockManagerMasterHeartbeatEndpoint`. It's true that if we only adding a removal state to the `blockManagerInfo`, we have to filter the removed blockmanagers first before traversing it (e.g., we won't expect to return a removed blockmanager in `getPeers`).
   
   In `BlockManagerEndpointSharedState`, we'd have both `activeBlockManagerInfo` and the `removedBlockManagerInfo`.  We don't have to set up a new cleaner to clear the `removedBlockManagerInfo`. Instead, we can reuse the fix of `HeartbeatReceiver` as whenever there's a sure removal in `HeartbeatReceiver`, we can send a removal request to `BlockManagerEndpointSharedState` as well by following the code path of `!scheduler.executorHeartbeatReceived`(e.g., we could have `scheduler.clearBlockManagerInfo` similarly).
   
   WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-849368510


   https://github.com/apache/spark/blob/5cc17ba0c7d81278a622fb474cf18f8e8530335f/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala#L116
   @sumeetgajjar ^^^ Shall we not reply when it's already an inactive BlockManager here? It would avoid the network timeout issue when the executor is shut down.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
attilapiros commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r640503459



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -273,12 +282,12 @@ class BlockManagerMasterEndpoint(
         }
       }
       bmIdsExecutor.foreach { bmId =>
-        blockManagerInfo.get(bmId).foreach { bmInfo =>
+        blockManagerInfo.get(bmId).filter(_.isAlive).foreach { bmInfo =>

Review comment:
       @mridulm 
   
   No, you are right! Thanks!
   
   Although this is not a critical leak as there is only one `blockStatusByShuffleService` entry for each Yarn node and probably in most cases you will get a new executor allocated from an already used node. In addition the node number is much lower than the number of executors.
   
   But for sure we should fix the small leaks too (we have a proverb for this in Hungary: "many a little becomes much"). 
   I think it is fine to fix it in a separate issue (I would not block this PR) so I will create a jira issue for this.
   
   Thanks again! 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-849677368


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43538/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r639529982



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -422,7 +430,7 @@ class BlockManagerMasterEndpoint(
     val locations = blockLocations.get(blockId)
     if (locations != null) {
       locations.foreach { blockManagerId: BlockManagerId =>
-        val blockManager = blockManagerInfo.get(blockManagerId)
+        val blockManager = blockManagerInfo.get(blockManagerId).filter(_.isAlive)

Review comment:
       I think the helper methods don't solve the problem thoroughly as you still have to replace all the usages where `isActive` exists now.
   
   I'm personally ok with pass the inactive BlockManagerInfos to `BlockManagerMasterHeartbeatEndpoint`.
   
   @mridulm what's your opinion?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r640308051



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -336,13 +345,16 @@ class BlockManagerMasterEndpoint(
   private def removeBlockManager(blockManagerId: BlockManagerId): Unit = {
     val info = blockManagerInfo(blockManagerId)
 
+    // Not removing info from the blockManagerInfo map, but only updating the removal timestamp of

Review comment:
       nit: "updating" -> "setting"
   
   (I think it'd be only removed once.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
attilapiros commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r639207240



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -422,7 +430,7 @@ class BlockManagerMasterEndpoint(
     val locations = blockLocations.get(blockId)
     if (locations != null) {
       locations.foreach { blockManagerId: BlockManagerId =>
-        val blockManager = blockManagerInfo.get(blockManagerId)
+        val blockManager = blockManagerInfo.get(blockManagerId).filter(_.isAlive)

Review comment:
       What about introducing some helper methods? 
   
   Sth like:
   
   ```scala
    private def aliveBlockManagerInfo(bmId: BlockManagerId): Option[BlockManagerInfo] =
        blockManagerInfo.get(bmId).filter(_.isAlive)
   
    private def allAliveBlockManagerInfos() =
        blockManagerInfo.values.filter(_.isAlive)
   ```
   
   They even could be inline methods...
   
   @Ngone51  WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r617862914



##########
File path: core/src/main/scala/org/apache/spark/SparkEnv.scala
##########
@@ -355,6 +355,12 @@ object SparkEnv extends Logging {
 
     // Mapping from block manager id to the block manager's information.
     val blockManagerInfo = new concurrent.TrieMap[BlockManagerId, BlockManagerInfo]()
+    // Using a cache here since we only want to track recently removed executors to deny their
+    // block manager registration while their StopExecutor message is in-flight.
+    // Assuming average size of 6 bytes of execId and each entry in Cache taking around 64 bytes,
+    // max size of this cache = (6 + 64) * 30000 = 2.1MB
+    val recentlyRemovedExecutors = CacheBuilder.newBuilder().maximumSize(30000)
+      .build[String, String]()

Review comment:
       Yes, @Ngone51 as pointed out in [#32114](https://github.com/apache/spark/pull/32114#issuecomment-819255421) `BlockManagerMessages.RemoveExecutor` in limited cases. I could not find any more code path apart from the one you pointed. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-825353017


   > Should I be rebasing my commits on the new changes from upstream/master ?
   
   Rebase is recommended whenever it's possible. @sumeetgajjar 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-819046189


   > Hey guys, I'd like to propose a simpler (but might be a little bit ticky) fix if I understand the issue correctly. The idea is,
   > 
   > Instead of removing the executor directly, we set `executorLastSeen(executorId) = -1L` when we receives `ExecutorRemoved` in `HeartbeatReceiver`. And then,
   > 
   >     1. if `Heartbeat` comes first before `ExpireDeadHosts`, we remove the executor from `executorLastSeen` by checking the value "< 0" and avoid the re-register.
   > 
   >     2. if `ExpireDeadHosts` comes first before `Heartbeat`,  we set `executorLastSeen(executorId) = -2L`. We can't remove it this time in `ExpireDeadHosts` because if `Heartbeat` comes later we'd have the same issue again.
   > 
   > 
   > 2.1 if `Heartbeat` comes later, we remove the executor from `executorLastSeen` by checking the value "< 0" too and also avoid the re-register.
   > 
   > 2.2 if `Heartbeat` doesn't come (that means the executor stopped before sending the heartbeat), we remove the executor from `executorLastSeen` by checking the value = -2L in next `ExpireDeadHosts`.
   > 
   > In this way, we can avoid the extra cache and all changes should be limited to `HeartbeatReceiver`.
   > 
   > Any thoughts?
   
   Thanks for the comment @Ngone51 .
   There are two places from where the re-registration can be triggered
   - From `HeartbeatReceiver` - by responding `HeartbeatResponse(reregisterBlockManager = true)`. 
   Your solution will take care of this.
   - From `BlockManager` - e.g. [reportBlockStatus](https://github.com/apache/spark/blob/ee7d838aaf46f9d786e0388915b422fb78952893/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L766).
   Just modifying `HeartbeatReceiver` won't solve the re-registration issue here. We will also have to implement a similar kind of tracking inside `BlockManagerMasterEndpoint`. And now since both tracking are independent of each other, it might introduce some race condition (please correct me if I am wrong).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-824354174


   I was wondering what is policy for merging the latest changes to a upstream-dev-branch?
   For e.g. I have a branch `SPARK-35011` which contains my changes, it is behind `upstream/master`
   
   - Should I be rebasing my commits on the new changes from `upstream/master` ? 
      - current branch is `SPARK-35011` -- git pull upstream master --rebase
      - In this case when I push my changes to PR, I will require a --force push since the commit ordering is now changed.
   
   - Or should I merge the upstream/master into `SPARK-35011`?
      - current branch is `SPARK-35011` -- git merge upstream/master
      - No force push required, however my commits are too far behind in the history.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-849372854


   > https://github.com/apache/spark/blob/5cc17ba0c7d81278a622fb474cf18f8e8530335f/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala#L116
   > 
   > @sumeetgajjar ^^^ Shall we not reply when it's already an inactive BlockManager here? It would avoid the network timeout issue when the executor is shut down.
   
   @Ngone51 I don't follow, how can we not reply to `RegisterBlockManager` when it expects a return type of `BlockManagerId`. Can you please elaborate on this?
   
   https://github.com/apache/spark/blob/5cc17ba0c7d81278a622fb474cf18f8e8530335f/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala#L77-L80


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-853537413


   Thank you @Ngone51, @mridulm and @attilapiros for your detailed reviews and insights. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r611288569



##########
File path: core/src/main/scala/org/apache/spark/SparkEnv.scala
##########
@@ -368,10 +374,12 @@ object SparkEnv extends Logging {
           } else {
             None
           }, blockManagerInfo,
-          mapOutputTracker.asInstanceOf[MapOutputTrackerMaster])),
+          mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
+          recentlyRemovedExecutors)),
       registerOrLookupEndpoint(
         BlockManagerMaster.DRIVER_HEARTBEAT_ENDPOINT_NAME,
-        new BlockManagerMasterHeartbeatEndpoint(rpcEnv, isLocal, blockManagerInfo)),
+        new BlockManagerMasterHeartbeatEndpoint(rpcEnv, isLocal, blockManagerInfo,
+          recentlyRemovedExecutors)),
       conf,

Review comment:
       Move the cache into `BlockManagerMasterEndpoint`.
   We should fix the interface here ... `BlockManagerMasterHeartbeatEndpoint` simply needs a way to validate if an executor was recently removed - does not need to know if it was a Cache/Set/etc. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-851655230


   **[Test build #139123 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139123/testReport)** for PR 32114 at commit [`2c7a439`](https://github.com/apache/spark/commit/2c7a4395c3dc75ff803b37a29541292104c53cb7).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-849755746


   **[Test build #139021 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139021/testReport)** for PR 32114 at commit [`2c7a439`](https://github.com/apache/spark/commit/2c7a4395c3dc75ff803b37a29541292104c53cb7).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-851708120


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/139123/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r611348500



##########
File path: core/src/main/scala/org/apache/spark/SparkEnv.scala
##########
@@ -368,10 +374,12 @@ object SparkEnv extends Logging {
           } else {
             None
           }, blockManagerInfo,
-          mapOutputTracker.asInstanceOf[MapOutputTrackerMaster])),
+          mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
+          recentlyRemovedExecutors)),
       registerOrLookupEndpoint(
         BlockManagerMaster.DRIVER_HEARTBEAT_ENDPOINT_NAME,
-        new BlockManagerMasterHeartbeatEndpoint(rpcEnv, isLocal, blockManagerInfo)),
+        new BlockManagerMasterHeartbeatEndpoint(rpcEnv, isLocal, blockManagerInfo,
+          recentlyRemovedExecutors)),
       conf,

Review comment:
       Sure.
   The same comment applies to already present `blockManagerInfo` map as well.
   I could refactor that as well. I was thinking of creating a `BlockMangerEndpointSharedState` class which contains `blockManagerInfo` and `recentlyRemovedExecutors`. The class would expose corresponding methods for lookup and updates.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r612815281



##########
File path: core/src/main/scala/org/apache/spark/SparkEnv.scala
##########
@@ -355,6 +355,12 @@ object SparkEnv extends Logging {
 
     // Mapping from block manager id to the block manager's information.
     val blockManagerInfo = new concurrent.TrieMap[BlockManagerId, BlockManagerInfo]()
+    // Using a cache here since we only want to track recently removed executors to deny their
+    // block manager registration while their StopExecutor message is in-flight.
+    // Assuming average size of 6 bytes of execId and each entry in Cache taking around 64 bytes,
+    // max size of this cache = (6 + 64) * 30000 = 2.1MB
+    val recentlyRemovedExecutors = CacheBuilder.newBuilder().maximumSize(30000)
+      .build[String, String]()

Review comment:
       When `CoarseGrainedSchedulerBackend` receives `RemoveExecutor`, it has the `ExecutorLossReason`. However, after processing the message, when it publishes `SparkListenerExecutorRemoved`, the reason is passed in form of a string.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-853530293


   Thanks, merged to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-825358789


   > I tried this and the issue still exists. When we apply the sequence of events mentioned in #32114, the issue surfaces again.
   
   I see. I think I missed the code path of `scheduler.executorLost(executorId, lossReason)`. 
   
   Thanks for the experiment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r612814973



##########
File path: core/src/main/scala/org/apache/spark/SparkEnv.scala
##########
@@ -355,6 +355,12 @@ object SparkEnv extends Logging {
 
     // Mapping from block manager id to the block manager's information.
     val blockManagerInfo = new concurrent.TrieMap[BlockManagerId, BlockManagerInfo]()
+    // Using a cache here since we only want to track recently removed executors to deny their
+    // block manager registration while their StopExecutor message is in-flight.
+    // Assuming average size of 6 bytes of execId and each entry in Cache taking around 64 bytes,
+    // max size of this cache = (6 + 64) * 30000 = 2.1MB
+    val recentlyRemovedExecutors = CacheBuilder.newBuilder().maximumSize(30000)
+      .build[String, String]()

Review comment:
       I believe extending `BlockManagerInfo` should solve the problem.
   However, we will have to abstract `blockManagerInfo: mutable.Map[BlockManagerId, BlockManagerInfo]`.
   Currently, on `RemoveExecutor`, we remove the corresponding `BlockManagerInfo` from `blockManagerInfo` map, now since during the removal, instead of removing, we update the `removalTs` inside `BlockManagerInfo`, other methods will have to filter the values in the map first before using them.
   
   So my suggestion here would be to abstract these details into a `BlockManagerEndpointSharedState` which holds the `blockManagerInfo` map and exposes methods for lookup and updates.
   I propose the name as `BlockManagerEndpointSharedState` since we will have to pass the same object to `BlockManagerMasterEndpoint` and `BlockManagerMasterHeartbeatEndpoint`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r640310177



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -728,15 +739,34 @@ class BlockManagerMasterEndpoint(
   private def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {
     for (
       blockManagerId <- blockManagerIdByExecutor.get(executorId);
-      info <- blockManagerInfo.get(blockManagerId)
+      info <- aliveBlockManagerInfo(blockManagerId)
     ) yield {
       info.storageEndpoint
     }
   }
 
   override def onStop(): Unit = {
     askThreadPool.shutdownNow()
+    blockManagerInfoCleaner.shutdownNow()
+  }
+
+  private def cleanBlockManagerInfo(): Unit = {
+    logDebug("Cleaning blockManagerInfo")
+    val now = System.currentTimeMillis()
+    val expiredBmIds = blockManagerInfo.filter { case (_, bmInfo) =>
+      // bmInfo.executorRemovalTs.get cannot be None when BM is not alive
+      !bmInfo.isAlive && (now - bmInfo.executorRemovalTs.get) > executorTimeoutMs
+    }.keys
+    expiredBmIds.foreach { bmId =>
+      logInfo(s"Cleaning expired $bmId from blockManagerInfo")
+      blockManagerInfo.remove(bmId)
+    }
   }
+
+  @inline private def aliveBlockManagerInfo(bmId: BlockManagerId): Option[BlockManagerInfo] =
+    blockManagerInfo.get(bmId).filter(_.isAlive)
+
+  @inline private def allAliveBlockManagerInfos() = blockManagerInfo.values.filter(_.isAlive)

Review comment:
       Done.

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -336,13 +345,16 @@ class BlockManagerMasterEndpoint(
   private def removeBlockManager(blockManagerId: BlockManagerId): Unit = {
     val info = blockManagerInfo(blockManagerId)
 
+    // Not removing info from the blockManagerInfo map, but only updating the removal timestamp of

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r640270377



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -273,12 +282,12 @@ class BlockManagerMasterEndpoint(
         }
       }
       bmIdsExecutor.foreach { bmId =>
-        blockManagerInfo.get(bmId).foreach { bmInfo =>
+        blockManagerInfo.get(bmId).filter(_.isAlive).foreach { bmInfo =>

Review comment:
       @attilapiros The entry of `bmIdForShuffleService ` from `blockStatusByShuffleService` itself is not getting removed right ? Also, the value `HashMap` is also never cleaned up - just gets cleared ?
   
   Only the corresponding blockStatus is getting removed ?
   
   I am sure I am missing something here ... want to make sure what !




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm edited a comment on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
mridulm edited a comment on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-819120744


   I am getting a little confused between PR description and the subsequent discussion.
   What exactly is the behavior we are trying to converge towards/address ?
   
   An expiration of executor from heartbeat master not only sends a `StopExecutor` to voluntarily get executor to exit, but also gets the cluster manager to force termination (in case of MIA/hung executor). So in steady state, once transitionary/overlapping updates are done, the executor should be gone according to driver.
   
   My understanding was, there is a race here between cluster manager notifying application (after killing executor) and the executor heartbeat/blockmanager re-registration : which ends up causing a dead executor to be marked live indefinitely.
   
   Is this the only case we are addressing ? Or are there any other paths that are impacted ?
   
   (@Ngone51 Not sure if standalone has nuances that I am missing here).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-816977606


   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-818960027


   In essence, if I understood correctly, we are adding a `lostExecutorCandidates:Map[String, ExpirationState]` ?
   
   * If we detect a request to expire an executor comes in - then expire based on (some) policy : timeout since initial expiry/number of expirations/other reasons : else add/update expiration state of candidate.
   * If heartbeat comes in, then remove from candidate set.
   * If explicit remove, then remove from both `executorLastSeen` and `lostExecutorCandidates:Set`.
   
   Did I miss anything ? I am fine with this approach.
   (I explicitly pulled out magic values out for explanation clarity)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sumeetgajjar commented on pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
sumeetgajjar commented on pull request #32114:
URL: https://github.com/apache/spark/pull/32114#issuecomment-853537413


   Thank you @Ngone51, @mridulm and @attilapiros for your detailed reviews and insights. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #32114: [SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #32114:
URL: https://github.com/apache/spark/pull/32114#discussion_r640324064



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -336,13 +345,40 @@ class BlockManagerMasterEndpoint(
   private def removeBlockManager(blockManagerId: BlockManagerId): Unit = {
     val info = blockManagerInfo(blockManagerId)
 
+    // Not removing info from the blockManagerInfo map, but only setting the removal timestamp of
+    // the executor in BlockManagerInfo. This info will be removed from blockManagerInfo map by the
+    // blockManagerInfoCleaner once now() - info.executorRemovalTs > executorTimeoutMs.
+    //
+    // We are delaying the removal of BlockManagerInfo to avoid a BlockManager reregistration
+    // while a executor is shutting. This unwanted reregistration causes inconsistent bookkeeping
+    // of executors in Spark.

Review comment:
       Oh..Just some succinct description should be fine. This's too detailed for the comment. I think you can remove the detailed description below.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org