You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/02 08:16:16 UTC

[GitHub] [spark] Ngone51 opened a new pull request, #38876: [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost

Ngone51 opened a new pull request, #38876:
URL: https://github.com/apache/spark/pull/38876

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   
   This PR majorly proposes to reject the block manager re-registration if the executor has been already considered lost/dead from the scheduler backend.
   
   Along with the major proposal, this PR also includes a few other changes:
   * Only post `SparkListenerBlockManagerAdded` event when the registration succeeds
   * Return an "invalid" executor id when the re-registration fails
   * Do not report all blocks when the re-registration fails
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   BlockManager re-registration from dead or terminating executor has led to some known issues, e.g., false-active executor shows up in UP (SPARK-35011), [block fetching to the dead executor](https://github.com/apache/spark/pull/32114#issuecomment-899979045). And since there's no re-registration from the executor itself, it's meaningless to have BlockManager re-registration when the executor is already lost.
   
   Regarding the corner case where the re-registration event comes earlier before the lost executor is actually removed from the scheduler backend, I think it is not possible. Because re-registration will only be required when the BlockManager doesn't see the block manager in `blockManagerInfo`. And the block manager will only be removed from `blockManagerInfo` whether when the executor is already know lost or removed by the driver proactively. So the executor should always be removed from the scheduler backend first before the re-registration event comes.
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   No
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   
   Unit test
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #38876: [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on PR #38876:
URL: https://github.com/apache/spark/pull/38876#issuecomment-1347179731

   Thank you, @Ngone51 , @mridulm , @jiangxb1987 .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #38876: [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #38876:
URL: https://github.com/apache/spark/pull/38876#discussion_r1040544977


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##########
@@ -583,7 +586,12 @@ class BlockManagerMasterEndpoint(
 
     val time = System.currentTimeMillis()
     executorIdToLocalDirs.put(id.executorId, localDirs)
-    if (!blockManagerInfo.contains(id)) {
+    // SPARK-41360: For the block manager re-registration, we should only allow it when
+    // the executor is recognized as active by the scheduler backend. Otherwise, this kind
+    // of re-registration from the terminating/stopped executor is meaningless and harmful.
+    lazy val isExecutorAlive =
+      driverEndpoint.askSync[Boolean](CoarseGrainedClusterMessages.IsExecutorAlive(id.executorId))
+    if (!blockManagerInfo.contains(id) && (!isReRegister || isExecutorAlive)) {

Review Comment:
   > Wont the driver not remove in case of a heartbeat expiry even though the executor did not disconnect ?
   
   It will. But in these cases, the driver could fail to kill the executor.
   
   (@mridulm  No worries, take care:))



##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##########
@@ -583,7 +586,12 @@ class BlockManagerMasterEndpoint(
 
     val time = System.currentTimeMillis()
     executorIdToLocalDirs.put(id.executorId, localDirs)
-    if (!blockManagerInfo.contains(id)) {
+    // SPARK-41360: For the block manager re-registration, we should only allow it when
+    // the executor is recognized as active by the scheduler backend. Otherwise, this kind
+    // of re-registration from the terminating/stopped executor is meaningless and harmful.
+    lazy val isExecutorAlive =
+      driverEndpoint.askSync[Boolean](CoarseGrainedClusterMessages.IsExecutorAlive(id.executorId))
+    if (!blockManagerInfo.contains(id) && (!isReRegister || isExecutorAlive)) {

Review Comment:
   > Wont the driver not remove in case of a heartbeat expiry even though the executor did not disconnect ? (for the same reasons as above - long gc pause, network partition, etc)
   
   It will. But in these cases, the driver could fail to kill the executor.
   
   (@mridulm  No worries, take care:))



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38876: [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38876:
URL: https://github.com/apache/spark/pull/38876#discussion_r1044131362


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -637,9 +637,11 @@ private[spark] class BlockManager(
   def reregister(): Unit = {
     // TODO: We might need to rate limit re-registering.
     logInfo(s"BlockManager $blockManagerId re-registering with master")
-    master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString, maxOnHeapMemory,
-      maxOffHeapMemory, storageEndpoint)
-    reportAllBlocks()
+    val id = master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString,
+      maxOnHeapMemory, maxOffHeapMemory, storageEndpoint, isReRegister = true)
+    if (id.executorId != BlockManagerId.INVALID_EXECUTOR_ID) {
+      reportAllBlocks()
+    }

Review Comment:
   Note - if we do change this - it is introduction of a new code path.
   So I dont want to block the PR on this discussion - but would be good to understand this case better :-)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38876: [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38876:
URL: https://github.com/apache/spark/pull/38876#discussion_r1043005698


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -637,9 +637,11 @@ private[spark] class BlockManager(
   def reregister(): Unit = {
     // TODO: We might need to rate limit re-registering.
     logInfo(s"BlockManager $blockManagerId re-registering with master")
-    master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString, maxOnHeapMemory,
-      maxOffHeapMemory, storageEndpoint)
-    reportAllBlocks()
+    val id = master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString,
+      maxOnHeapMemory, maxOffHeapMemory, storageEndpoint, isReRegister = true)
+    if (id.executorId != BlockManagerId.INVALID_EXECUTOR_ID) {
+      reportAllBlocks()
+    }

Review Comment:
   Do we want to terminate in case of `INVALID_EXECUTOR_ID` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #38876: [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on PR #38876:
URL: https://github.com/apache/spark/pull/38876#issuecomment-1334894364

   cc @mridulm @jiangxb1987 @attilapiros @wankunde @sumeetgajjar 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #38876: [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #38876:
URL: https://github.com/apache/spark/pull/38876#discussion_r1039081212


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##########
@@ -616,10 +624,29 @@ class BlockManagerMasterEndpoint(
       if (pushBasedShuffleEnabled) {
         addMergerLocation(id)
       }
+      listenerBus.post(SparkListenerBlockManagerAdded(time, id,
+        maxOnHeapMemSize + maxOffHeapMemSize, Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
     }
-    listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize,
-        Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
-    id
+    val updatedId = if (isReRegister && !isExecutorAlive) {
+      assert(!blockManagerInfo.contains(id),
+        "BlockManager re-registration shouldn't succeed when the executor is lost")

Review Comment:
   > Does this assertion need to always hold ?
   
   It does.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #38876: [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #38876:
URL: https://github.com/apache/spark/pull/38876#discussion_r1044198454


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -637,9 +637,11 @@ private[spark] class BlockManager(
   def reregister(): Unit = {
     // TODO: We might need to rate limit re-registering.
     logInfo(s"BlockManager $blockManagerId re-registering with master")
-    master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString, maxOnHeapMemory,
-      maxOffHeapMemory, storageEndpoint)
-    reportAllBlocks()
+    val id = master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString,
+      maxOnHeapMemory, maxOffHeapMemory, storageEndpoint, isReRegister = true)
+    if (id.executorId != BlockManagerId.INVALID_EXECUTOR_ID) {
+      reportAllBlocks()
+    }

Review Comment:
   The case we met is that the executor failed to be killed by both `StopExecutor` and `ExecutorRunner.killProcess`. After second thinking, maybe killing (via `System.exit`) from inside would give it one more chance to save this case?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38876: [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38876:
URL: https://github.com/apache/spark/pull/38876#discussion_r1038742186


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##########
@@ -583,7 +586,12 @@ class BlockManagerMasterEndpoint(
 
     val time = System.currentTimeMillis()
     executorIdToLocalDirs.put(id.executorId, localDirs)
-    if (!blockManagerInfo.contains(id)) {
+    // SPARK-41360: For the block manager re-registration, we should only allow it when
+    // the executor is recognized as active by the scheduler backend. Otherwise, this kind
+    // of re-registration from the terminating/stopped executor is meaningless and harmful.
+    lazy val isExecutorAlive =
+      driverEndpoint.askSync[Boolean](CoarseGrainedClusterMessages.IsExecutorAlive(id.executorId))
+    if (!blockManagerInfo.contains(id) && (!isReRegister || isExecutorAlive)) {

Review Comment:
   If this is an issue only for terminating executors, we can detect that in executor side and propagate it in the registration request right ? Or are there other cases as well ?
   Else a transient network partition can result in loosing all executors ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38876: [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38876:
URL: https://github.com/apache/spark/pull/38876#discussion_r1047077651


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala:
##########
@@ -63,7 +63,8 @@ private[spark] object BlockManagerMessages {
       localDirs: Array[String],
       maxOnHeapMemSize: Long,
       maxOffHeapMemSize: Long,
-      sender: RpcEndpointRef)
+      sender: RpcEndpointRef,
+      isReRegister: Boolean)

Review Comment:
   Not sure why and how but seems like branch-3.3 complains about this in MiMa binary compatibility test (https://github.com/apache/spark/actions/runs/3683054520/jobs/6231260546).
   
   ```
   [error] spark-core: Failed binary compatibility check against org.apache.spark:spark-core_2.13:3.2.0! Found 4 potential problems (filtered 921)
   [error]  * method copy(org.apache.spark.storage.BlockManagerId,Array[java.lang.String],Long,Long,org.apache.spark.rpc.RpcEndpointRef)org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager in class org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager does not have a correspondent in current version
   [error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager.copy")
   [error]  * method this(org.apache.spark.storage.BlockManagerId,Array[java.lang.String],Long,Long,org.apache.spark.rpc.RpcEndpointRef)Unit in class org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager does not have a correspondent in current version
   [error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager.this")
   [error]  * the type hierarchy of object org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager is different in current version. Missing types {scala.runtime.AbstractFunction5}
   [error]    filter with: ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.storage.BlockManagerMessages$RegisterBlockManager$")
   [error]  * method apply(org.apache.spark.storage.BlockManagerId,Array[java.lang.String],Long,Long,org.apache.spark.rpc.RpcEndpointRef)org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager in object org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager does not have a correspondent in current version
   [error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager.apply")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #38876: [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #38876:
URL: https://github.com/apache/spark/pull/38876#discussion_r1044198454


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -637,9 +637,11 @@ private[spark] class BlockManager(
   def reregister(): Unit = {
     // TODO: We might need to rate limit re-registering.
     logInfo(s"BlockManager $blockManagerId re-registering with master")
-    master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString, maxOnHeapMemory,
-      maxOffHeapMemory, storageEndpoint)
-    reportAllBlocks()
+    val id = master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString,
+      maxOnHeapMemory, maxOffHeapMemory, storageEndpoint, isReRegister = true)
+    if (id.executorId != BlockManagerId.INVALID_EXECUTOR_ID) {
+      reportAllBlocks()
+    }

Review Comment:
   The case we met is that the executor failed to be killed by both `StopExecutor` and `ExecutorRunner.killProcess`. After second thinking, maybe killing (via `System.exit`) from inside would give a more chance to save this case?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38876: [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38876:
URL: https://github.com/apache/spark/pull/38876#discussion_r1038743250


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##########
@@ -616,10 +624,29 @@ class BlockManagerMasterEndpoint(
       if (pushBasedShuffleEnabled) {
         addMergerLocation(id)
       }
+      listenerBus.post(SparkListenerBlockManagerAdded(time, id,
+        maxOnHeapMemSize + maxOffHeapMemSize, Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
     }
-    listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize,
-        Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
-    id
+    val updatedId = if (isReRegister && !isExecutorAlive) {
+      assert(!blockManagerInfo.contains(id),
+        "BlockManager re-registration shouldn't succeed when the executor is lost")

Review Comment:
   Does this assertion need to always hold ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38876: [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38876:
URL: https://github.com/apache/spark/pull/38876#discussion_r1038742887


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##########
@@ -583,7 +586,12 @@ class BlockManagerMasterEndpoint(
 
     val time = System.currentTimeMillis()
     executorIdToLocalDirs.put(id.executorId, localDirs)
-    if (!blockManagerInfo.contains(id)) {
+    // SPARK-41360: For the block manager re-registration, we should only allow it when
+    // the executor is recognized as active by the scheduler backend. Otherwise, this kind
+    // of re-registration from the terminating/stopped executor is meaningless and harmful.
+    lazy val isExecutorAlive =
+      driverEndpoint.askSync[Boolean](CoarseGrainedClusterMessages.IsExecutorAlive(id.executorId))

Review Comment:
   We have to be careful with this change.
   Scheduler backend does call into block manager master - but as of today, these are nonblocking calls. So this sync call is fine right now - but can become a potential deadlock as the code evolves.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #38876: [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #38876:
URL: https://github.com/apache/spark/pull/38876#issuecomment-1347178709

   Merged to master, branch-3.3 and branch-3.2
   Thanks for fixing this @Ngone51 !
   Thanks for the reviews @jiangxb1987 :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #38876: [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #38876:
URL: https://github.com/apache/spark/pull/38876#discussion_r1047120075


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala:
##########
@@ -63,7 +63,8 @@ private[spark] object BlockManagerMessages {
       localDirs: Array[String],
       maxOnHeapMemSize: Long,
       maxOffHeapMemSize: Long,
-      sender: RpcEndpointRef)
+      sender: RpcEndpointRef,
+      isReRegister: Boolean)

Review Comment:
   Thanks @HyukjinKwon 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38876: [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38876:
URL: https://github.com/apache/spark/pull/38876#discussion_r1047078201


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala:
##########
@@ -63,7 +63,8 @@ private[spark] object BlockManagerMessages {
       localDirs: Array[String],
       maxOnHeapMemSize: Long,
       maxOffHeapMemSize: Long,
-      sender: RpcEndpointRef)
+      sender: RpcEndpointRef,
+      isReRegister: Boolean)

Review Comment:
   Let me just make a quick followup to fix this in all branches. This was detected with Scala 2.13 FWIW.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38876: [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38876:
URL: https://github.com/apache/spark/pull/38876#discussion_r1038743250


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##########
@@ -616,10 +624,29 @@ class BlockManagerMasterEndpoint(
       if (pushBasedShuffleEnabled) {
         addMergerLocation(id)
       }
+      listenerBus.post(SparkListenerBlockManagerAdded(time, id,
+        maxOnHeapMemSize + maxOffHeapMemSize, Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
     }
-    listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize,
-        Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
-    id
+    val updatedId = if (isReRegister && !isExecutorAlive) {
+      assert(!blockManagerInfo.contains(id),
+        "BlockManager re-registration shouldn't succeed when the executor is lost")

Review Comment:
   Does this assertion need to always hold ?
   I will need to relook at the code a bit, but I vaguely think there are corner cases here ... might be good to check up on this. (I will too next week).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38876: [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38876:
URL: https://github.com/apache/spark/pull/38876#discussion_r1039131641


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##########
@@ -583,7 +586,12 @@ class BlockManagerMasterEndpoint(
 
     val time = System.currentTimeMillis()
     executorIdToLocalDirs.put(id.executorId, localDirs)
-    if (!blockManagerInfo.contains(id)) {
+    // SPARK-41360: For the block manager re-registration, we should only allow it when
+    // the executor is recognized as active by the scheduler backend. Otherwise, this kind
+    // of re-registration from the terminating/stopped executor is meaningless and harmful.
+    lazy val isExecutorAlive =
+      driverEndpoint.askSync[Boolean](CoarseGrainedClusterMessages.IsExecutorAlive(id.executorId))
+    if (!blockManagerInfo.contains(id) && (!isReRegister || isExecutorAlive)) {

Review Comment:
   `ShutdownHookManager.inShutdown` should tell if it is in the process of shutting down - and prevent call to reregister ?
   
   For the other cases, lost due to long GC, lost due to network partitions, etc - they are legitimate candidates for reregisteration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38876: [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38876:
URL: https://github.com/apache/spark/pull/38876#discussion_r1038742186


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##########
@@ -583,7 +586,12 @@ class BlockManagerMasterEndpoint(
 
     val time = System.currentTimeMillis()
     executorIdToLocalDirs.put(id.executorId, localDirs)
-    if (!blockManagerInfo.contains(id)) {
+    // SPARK-41360: For the block manager re-registration, we should only allow it when
+    // the executor is recognized as active by the scheduler backend. Otherwise, this kind
+    // of re-registration from the terminating/stopped executor is meaningless and harmful.
+    lazy val isExecutorAlive =
+      driverEndpoint.askSync[Boolean](CoarseGrainedClusterMessages.IsExecutorAlive(id.executorId))
+    if (!blockManagerInfo.contains(id) && (!isReRegister || isExecutorAlive)) {

Review Comment:
   If this is an issue only for terminating executors, we can detect that in executor side and propagate it in the registration request right ?
   Else a transient network partition can result in loosing all executors ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38876: [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38876:
URL: https://github.com/apache/spark/pull/38876#discussion_r1040459276


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##########
@@ -583,7 +586,12 @@ class BlockManagerMasterEndpoint(
 
     val time = System.currentTimeMillis()
     executorIdToLocalDirs.put(id.executorId, localDirs)
-    if (!blockManagerInfo.contains(id)) {
+    // SPARK-41360: For the block manager re-registration, we should only allow it when
+    // the executor is recognized as active by the scheduler backend. Otherwise, this kind
+    // of re-registration from the terminating/stopped executor is meaningless and harmful.
+    lazy val isExecutorAlive =
+      driverEndpoint.askSync[Boolean](CoarseGrainedClusterMessages.IsExecutorAlive(id.executorId))
+    if (!blockManagerInfo.contains(id) && (!isReRegister || isExecutorAlive)) {

Review Comment:
   Wont the driver not remove in case of a heartbeat expiry even though the executor did not disconnect ? (for the same reasons as above - long gc pause, network partition, etc)
   
   (Sorry for the delay, I will get back to this PR later this week - not in good health)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #38876: [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #38876:
URL: https://github.com/apache/spark/pull/38876#discussion_r1039195161


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##########
@@ -583,7 +586,12 @@ class BlockManagerMasterEndpoint(
 
     val time = System.currentTimeMillis()
     executorIdToLocalDirs.put(id.executorId, localDirs)
-    if (!blockManagerInfo.contains(id)) {
+    // SPARK-41360: For the block manager re-registration, we should only allow it when
+    // the executor is recognized as active by the scheduler backend. Otherwise, this kind
+    // of re-registration from the terminating/stopped executor is meaningless and harmful.
+    lazy val isExecutorAlive =
+      driverEndpoint.askSync[Boolean](CoarseGrainedClusterMessages.IsExecutorAlive(id.executorId))
+    if (!blockManagerInfo.contains(id) && (!isReRegister || isExecutorAlive)) {

Review Comment:
   > ShutdownHookManager.inShutdown should tell if it is in the process of shutting down - and prevent call to reregister ?
   
   Ok, i see.
   
   > For the other cases, lost due to long GC, lost due to network partitions, etc - they are legitimate candidates for registration.
   
   Note that there's a prerequisite of the re-registration in this PR that the executor should already be lost in the driver's view. In that case, block manager re-registration is meaningless since the executor won't reconnect to the driver.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #38876: [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #38876:
URL: https://github.com/apache/spark/pull/38876#discussion_r1040544977


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##########
@@ -583,7 +586,12 @@ class BlockManagerMasterEndpoint(
 
     val time = System.currentTimeMillis()
     executorIdToLocalDirs.put(id.executorId, localDirs)
-    if (!blockManagerInfo.contains(id)) {
+    // SPARK-41360: For the block manager re-registration, we should only allow it when
+    // the executor is recognized as active by the scheduler backend. Otherwise, this kind
+    // of re-registration from the terminating/stopped executor is meaningless and harmful.
+    lazy val isExecutorAlive =
+      driverEndpoint.askSync[Boolean](CoarseGrainedClusterMessages.IsExecutorAlive(id.executorId))
+    if (!blockManagerInfo.contains(id) && (!isReRegister || isExecutorAlive)) {

Review Comment:
   > Wont the driver not remove in case of a heartbeat expiry even though the executor did not disconnect ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #38876: [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #38876:
URL: https://github.com/apache/spark/pull/38876#discussion_r1043027754


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -637,9 +637,11 @@ private[spark] class BlockManager(
   def reregister(): Unit = {
     // TODO: We might need to rate limit re-registering.
     logInfo(s"BlockManager $blockManagerId re-registering with master")
-    master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString, maxOnHeapMemory,
-      maxOffHeapMemory, storageEndpoint)
-    reportAllBlocks()
+    val id = master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString,
+      maxOnHeapMemory, maxOffHeapMemory, storageEndpoint, isReRegister = true)
+    if (id.executorId != BlockManagerId.INVALID_EXECUTOR_ID) {
+      reportAllBlocks()
+    }

Review Comment:
   Good question. So ideally, a lost executor should be terminated in the end anyways (whether killed by the driver or exit itself proactively)...if the lost executor fails to terminate, there must be something wrong with it. So I think terminate here won't make any difference to the result.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #38876: [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #38876:
URL: https://github.com/apache/spark/pull/38876#discussion_r1044955782


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -637,9 +637,11 @@ private[spark] class BlockManager(
   def reregister(): Unit = {
     // TODO: We might need to rate limit re-registering.
     logInfo(s"BlockManager $blockManagerId re-registering with master")
-    master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString, maxOnHeapMemory,
-      maxOffHeapMemory, storageEndpoint)
-    reportAllBlocks()
+    val id = master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString,
+      maxOnHeapMemory, maxOffHeapMemory, storageEndpoint, isReRegister = true)
+    if (id.executorId != BlockManagerId.INVALID_EXECUTOR_ID) {
+      reportAllBlocks()
+    }

Review Comment:
   Added the termination logic here. cc @mridulm 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38876: [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38876:
URL: https://github.com/apache/spark/pull/38876#discussion_r1047077651


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala:
##########
@@ -63,7 +63,8 @@ private[spark] object BlockManagerMessages {
       localDirs: Array[String],
       maxOnHeapMemSize: Long,
       maxOffHeapMemSize: Long,
-      sender: RpcEndpointRef)
+      sender: RpcEndpointRef,
+      isReRegister: Boolean)

Review Comment:
   Not sure why and how but seems like branch-3.3 complains about this in MiMa binary compatibility test.
   
   ```
   [error] spark-core: Failed binary compatibility check against org.apache.spark:spark-core_2.13:3.2.0! Found 4 potential problems (filtered 921)
   [error]  * method copy(org.apache.spark.storage.BlockManagerId,Array[java.lang.String],Long,Long,org.apache.spark.rpc.RpcEndpointRef)org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager in class org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager does not have a correspondent in current version
   [error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager.copy")
   [error]  * method this(org.apache.spark.storage.BlockManagerId,Array[java.lang.String],Long,Long,org.apache.spark.rpc.RpcEndpointRef)Unit in class org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager does not have a correspondent in current version
   [error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager.this")
   [error]  * the type hierarchy of object org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager is different in current version. Missing types {scala.runtime.AbstractFunction5}
   [error]    filter with: ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.storage.BlockManagerMessages$RegisterBlockManager$")
   [error]  * method apply(org.apache.spark.storage.BlockManagerId,Array[java.lang.String],Long,Long,org.apache.spark.rpc.RpcEndpointRef)org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager in object org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager does not have a correspondent in current version
   [error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager.apply")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38876: [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38876:
URL: https://github.com/apache/spark/pull/38876#discussion_r1047082916


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala:
##########
@@ -63,7 +63,8 @@ private[spark] object BlockManagerMessages {
       localDirs: Array[String],
       maxOnHeapMemSize: Long,
       maxOffHeapMemSize: Long,
-      sender: RpcEndpointRef)
+      sender: RpcEndpointRef,
+      isReRegister: Boolean)

Review Comment:
   Here: https://github.com/apache/spark/pull/39052



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm closed pull request #38876: [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost

Posted by GitBox <gi...@apache.org>.
mridulm closed pull request #38876: [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost
URL: https://github.com/apache/spark/pull/38876


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #38876: [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #38876:
URL: https://github.com/apache/spark/pull/38876#discussion_r1039080517


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##########
@@ -583,7 +586,12 @@ class BlockManagerMasterEndpoint(
 
     val time = System.currentTimeMillis()
     executorIdToLocalDirs.put(id.executorId, localDirs)
-    if (!blockManagerInfo.contains(id)) {
+    // SPARK-41360: For the block manager re-registration, we should only allow it when
+    // the executor is recognized as active by the scheduler backend. Otherwise, this kind
+    // of re-registration from the terminating/stopped executor is meaningless and harmful.
+    lazy val isExecutorAlive =
+      driverEndpoint.askSync[Boolean](CoarseGrainedClusterMessages.IsExecutorAlive(id.executorId))
+    if (!blockManagerInfo.contains(id) && (!isReRegister || isExecutorAlive)) {

Review Comment:
   > If this is an issue only for terminating executors, we can detect that in executor side and propagate it in the registration request right ? 
   
   What do you mean? The terminating executor can detect itself as being terminating?
   
   Actually, not only the terminating executors,  executors lost due to long GC or executors who failed to be killed by the driver (where the executor could be an orphan rather than terminated ) are also applied here as long as the case is considered executor lost by the driver.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38876: [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38876:
URL: https://github.com/apache/spark/pull/38876#discussion_r1040459276


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##########
@@ -583,7 +586,12 @@ class BlockManagerMasterEndpoint(
 
     val time = System.currentTimeMillis()
     executorIdToLocalDirs.put(id.executorId, localDirs)
-    if (!blockManagerInfo.contains(id)) {
+    // SPARK-41360: For the block manager re-registration, we should only allow it when
+    // the executor is recognized as active by the scheduler backend. Otherwise, this kind
+    // of re-registration from the terminating/stopped executor is meaningless and harmful.
+    lazy val isExecutorAlive =
+      driverEndpoint.askSync[Boolean](CoarseGrainedClusterMessages.IsExecutorAlive(id.executorId))
+    if (!blockManagerInfo.contains(id) && (!isReRegister || isExecutorAlive)) {

Review Comment:
   Wont the driver not remove in case of a heartbeat expiry even though the executor did not disconnect ? (for the same reasons as above - long gc pause, network partition, etc)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38876: [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38876:
URL: https://github.com/apache/spark/pull/38876#discussion_r1044130961


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -637,9 +637,11 @@ private[spark] class BlockManager(
   def reregister(): Unit = {
     // TODO: We might need to rate limit re-registering.
     logInfo(s"BlockManager $blockManagerId re-registering with master")
-    master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString, maxOnHeapMemory,
-      maxOffHeapMemory, storageEndpoint)
-    reportAllBlocks()
+    val id = master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString,
+      maxOnHeapMemory, maxOffHeapMemory, storageEndpoint, isReRegister = true)
+    if (id.executorId != BlockManagerId.INVALID_EXECUTOR_ID) {
+      reportAllBlocks()
+    }

Review Comment:
   If it is failing to terminate, every heartbeat will end up returning `BlockManagerId.INVALID_EXECUTOR_ID` from driver right ?
   Wondering if there is a reason to keep it around.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org