You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/01/28 21:10:20 UTC

[spark] branch branch-3.0 updated: [SPARK-34273][CORE] Do not reregister BlockManager when SparkContext is stopped

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 5aaec8b  [SPARK-34273][CORE] Do not reregister BlockManager when SparkContext is stopped
5aaec8b is described below

commit 5aaec8bcb5602b903f337a21f4f0ad7e669844ad
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Thu Jan 28 13:06:42 2021 -0800

    [SPARK-34273][CORE] Do not reregister BlockManager when SparkContext is stopped
    
    ### What changes were proposed in this pull request?
    
    This PR aims to prevent `HeartbeatReceiver` asks `Executor` to re-register blocker manager when the SparkContext is already stopped.
    
    ### Why are the changes needed?
    
    Currently, `HeartbeatReceiver` blindly asks re-registration for the new heartbeat message.
    However, when SparkContext is stopped, we don't need to re-register new block manager.
    Re-registration causes unnecessary executors' logs and and a delay on job termination.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the CIs with the newly added test case.
    
    Closes #31373 from dongjoon-hyun/SPARK-34273.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit bc41c5a0e598e6b697ed61c33e1bea629dabfc57)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../scala/org/apache/spark/HeartbeatReceiver.scala     |  8 +++++---
 .../org/apache/spark/HeartbeatReceiverSuite.scala      | 18 ++++++++++++++++++
 2 files changed, 23 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index be63072..6c18cf1 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -128,6 +128,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
 
     // Messages received from executors
     case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId, executorUpdates) =>
+      var reregisterBlockManager = !sc.isStopped
       if (scheduler != null) {
         if (executorLastSeen.contains(executorId)) {
           executorLastSeen(executorId) = clock.getTimeMillis()
@@ -135,7 +136,8 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
             override def run(): Unit = Utils.tryLogNonFatalError {
               val unknownExecutor = !scheduler.executorHeartbeatReceived(
                 executorId, accumUpdates, blockManagerId, executorUpdates)
-              val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
+              reregisterBlockManager &= unknownExecutor
+              val response = HeartbeatResponse(reregisterBlockManager)
               context.reply(response)
             }
           })
@@ -145,14 +147,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
           // not log warning here. Otherwise there may be a lot of noise especially if
           // we explicitly remove executors (SPARK-4134).
           logDebug(s"Received heartbeat from unknown executor $executorId")
-          context.reply(HeartbeatResponse(reregisterBlockManager = true))
+          context.reply(HeartbeatResponse(reregisterBlockManager))
         }
       } else {
         // Because Executor will sleep several seconds before sending the first "Heartbeat", this
         // case rarely happens. However, if it really happens, log it and ask the executor to
         // register itself again.
         logWarning(s"Dropping $heartbeat because TaskScheduler is not ready yet")
-        context.reply(HeartbeatResponse(reregisterBlockManager = true))
+        context.reply(HeartbeatResponse(reregisterBlockManager))
       }
   }
 
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index ff0f2f9..9e7bc03 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -218,6 +218,24 @@ class HeartbeatReceiverSuite
     fakeSchedulerBackend.stop()
   }
 
+  test("SPARK-34273: Do not reregister BlockManager when SparkContext is stopped") {
+    val blockManagerId = BlockManagerId(executorId1, "localhost", 12345)
+
+    heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
+    val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
+      Heartbeat(executorId1, Array.empty, blockManagerId, mutable.Map.empty))
+    assert(response.reregisterBlockManager)
+
+    try {
+      sc.stopped.set(true)
+      val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
+        Heartbeat(executorId1, Array.empty, blockManagerId, mutable.Map.empty))
+      assert(!response.reregisterBlockManager)
+    } finally {
+      sc.stopped.set(false)
+    }
+  }
+
   /** Manually send a heartbeat and return the response. */
   private def triggerHeartbeat(
       executorId: String,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org