You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/01/28 21:10:20 UTC
[spark] branch branch-3.0 updated: [SPARK-34273][CORE] Do not
reregister BlockManager when SparkContext is stopped
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 5aaec8b [SPARK-34273][CORE] Do not reregister BlockManager when SparkContext is stopped
5aaec8b is described below
commit 5aaec8bcb5602b903f337a21f4f0ad7e669844ad
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Thu Jan 28 13:06:42 2021 -0800
[SPARK-34273][CORE] Do not reregister BlockManager when SparkContext is stopped
### What changes were proposed in this pull request?
This PR aims to prevent `HeartbeatReceiver` asks `Executor` to re-register blocker manager when the SparkContext is already stopped.
### Why are the changes needed?
Currently, `HeartbeatReceiver` blindly asks re-registration for the new heartbeat message.
However, when SparkContext is stopped, we don't need to re-register new block manager.
Re-registration causes unnecessary executors' logs and and a delay on job termination.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the CIs with the newly added test case.
Closes #31373 from dongjoon-hyun/SPARK-34273.
Authored-by: Dongjoon Hyun <dh...@apple.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
(cherry picked from commit bc41c5a0e598e6b697ed61c33e1bea629dabfc57)
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../scala/org/apache/spark/HeartbeatReceiver.scala | 8 +++++---
.../org/apache/spark/HeartbeatReceiverSuite.scala | 18 ++++++++++++++++++
2 files changed, 23 insertions(+), 3 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index be63072..6c18cf1 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -128,6 +128,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
// Messages received from executors
case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId, executorUpdates) =>
+ var reregisterBlockManager = !sc.isStopped
if (scheduler != null) {
if (executorLastSeen.contains(executorId)) {
executorLastSeen(executorId) = clock.getTimeMillis()
@@ -135,7 +136,8 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
override def run(): Unit = Utils.tryLogNonFatalError {
val unknownExecutor = !scheduler.executorHeartbeatReceived(
executorId, accumUpdates, blockManagerId, executorUpdates)
- val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
+ reregisterBlockManager &= unknownExecutor
+ val response = HeartbeatResponse(reregisterBlockManager)
context.reply(response)
}
})
@@ -145,14 +147,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
// not log warning here. Otherwise there may be a lot of noise especially if
// we explicitly remove executors (SPARK-4134).
logDebug(s"Received heartbeat from unknown executor $executorId")
- context.reply(HeartbeatResponse(reregisterBlockManager = true))
+ context.reply(HeartbeatResponse(reregisterBlockManager))
}
} else {
// Because Executor will sleep several seconds before sending the first "Heartbeat", this
// case rarely happens. However, if it really happens, log it and ask the executor to
// register itself again.
logWarning(s"Dropping $heartbeat because TaskScheduler is not ready yet")
- context.reply(HeartbeatResponse(reregisterBlockManager = true))
+ context.reply(HeartbeatResponse(reregisterBlockManager))
}
}
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index ff0f2f9..9e7bc03 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -218,6 +218,24 @@ class HeartbeatReceiverSuite
fakeSchedulerBackend.stop()
}
+ test("SPARK-34273: Do not reregister BlockManager when SparkContext is stopped") {
+ val blockManagerId = BlockManagerId(executorId1, "localhost", 12345)
+
+ heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
+ val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
+ Heartbeat(executorId1, Array.empty, blockManagerId, mutable.Map.empty))
+ assert(response.reregisterBlockManager)
+
+ try {
+ sc.stopped.set(true)
+ val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
+ Heartbeat(executorId1, Array.empty, blockManagerId, mutable.Map.empty))
+ assert(!response.reregisterBlockManager)
+ } finally {
+ sc.stopped.set(false)
+ }
+ }
+
/** Manually send a heartbeat and return the response. */
private def triggerHeartbeat(
executorId: String,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org