You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2021/04/05 22:33:09 UTC
[spark] branch master updated: [SPARK-34949][CORE] Prevent
BlockManager reregister when Executor is shutting down
This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a9ca197 [SPARK-34949][CORE] Prevent BlockManager reregister when Executor is shutting down
a9ca197 is described below
commit a9ca1978ae8ecc53e2ef9e14b4be70dc8f5d9341
Author: Sumeet Gajjar <su...@gmail.com>
AuthorDate: Mon Apr 5 17:32:43 2021 -0500
[SPARK-34949][CORE] Prevent BlockManager reregister when Executor is shutting down
### What changes were proposed in this pull request?
This PR prevents reregistering BlockManager when a Executor is shutting down. It is achieved by checking `executorShutdown` before calling `env.blockManager.reregister()`.
### Why are the changes needed?
This change is required since Spark reports executors as active, even they are removed.
I was testing Dynamic Allocation on K8s with about 300 executors. While doing so, when the executors were torn down due to `spark.dynamicAllocation.executorIdleTimeout`, I noticed all the executor pods being removed from K8s, however, under the "Executors" tab in SparkUI, I could see some executors listed as alive. [spark.sparkContext.statusTracker.getExecutorInfos.length](https://github.com/apache/spark/blob/65da9287bc5112564836a555cd2967fc6b05856f/core/src/main/scala/org/apache/spa [...]
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added a new test.
## Logs
Following are the logs of the executor(Id:303) which re-registers `BlockManager`
```
21/04/02 21:33:28 INFO CoarseGrainedExecutorBackend: Got assigned task 1076
21/04/02 21:33:28 INFO Executor: Running task 4.0 in stage 3.0 (TID 1076)
21/04/02 21:33:28 INFO MapOutputTrackerWorker: Updating epoch to 302 and clearing cache
21/04/02 21:33:28 INFO TorrentBroadcast: Started reading broadcast variable 3
21/04/02 21:33:28 INFO TransportClientFactory: Successfully created connection to /100.100.195.227:33703 after 76 ms (62 ms spent in bootstraps)
21/04/02 21:33:28 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 2.4 KB, free 168.0 MB)
21/04/02 21:33:28 INFO TorrentBroadcast: Reading broadcast variable 3 took 168 ms
21/04/02 21:33:28 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 3.9 KB, free 168.0 MB)
21/04/02 21:33:29 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 1, fetching them
21/04/02 21:33:29 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTrackerda-lite-test-4-7a57e478947d206d-driver-svc.dex-app-n5ttnbmg.svc:7078)
21/04/02 21:33:29 INFO MapOutputTrackerWorker: Got the output locations
21/04/02 21:33:29 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks including 1 local blocks and 1 remote blocks
21/04/02 21:33:30 INFO TransportClientFactory: Successfully created connection to /100.100.80.103:40971 after 660 ms (528 ms spent in bootstraps)
21/04/02 21:33:30 INFO ShuffleBlockFetcherIterator: Started 1 remote fetches in 1042 ms
21/04/02 21:33:31 INFO Executor: Finished task 4.0 in stage 3.0 (TID 1076). 1276 bytes result sent to driver
.
.
.
21/04/02 21:34:16 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
21/04/02 21:34:16 INFO Executor: Told to re-register on heartbeat
21/04/02 21:34:16 INFO BlockManager: BlockManager BlockManagerId(303, 100.100.122.34, 41265, None) re-registering with master
21/04/02 21:34:16 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(303, 100.100.122.34, 41265, None)
21/04/02 21:34:16 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(303, 100.100.122.34, 41265, None)
21/04/02 21:34:16 INFO BlockManager: Reporting 0 blocks to the master.
21/04/02 21:34:16 INFO MemoryStore: MemoryStore cleared
21/04/02 21:34:16 INFO BlockManager: BlockManager stopped
21/04/02 21:34:16 INFO FileDataSink: Closing sink with output file = /tmp/safari-events/.des_analysis/safari-events/hdp_spark_monitoring_random-container-037caf27-6c77-433f-820f-03cd9c7d9b6e-spark-8a492407d60b401bbf4309a14ea02ca2_events.tsv
21/04/02 21:34:16 INFO HonestProfilerBasedThreadSnapshotProvider: Stopping agent
21/04/02 21:34:16 INFO HonestProfilerHandler: Stopping honest profiler agent
21/04/02 21:34:17 INFO ShutdownHookManager: Shutdown hook called
21/04/02 21:34:17 INFO ShutdownHookManager: Deleting directory /var/data/spark-d886588c-2a7e-491d-bbcb-4f58b3e31001/spark-4aa337a0-60c0-45da-9562-8c50eaff3cea
```
Closes #32043 from sumeetgajjar/SPARK-34949.
Authored-by: Sumeet Gajjar <su...@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../scala/org/apache/spark/executor/Executor.scala | 2 +-
.../org/apache/spark/executor/ExecutorSuite.scala | 66 +++++++++++++++++-----
2 files changed, 52 insertions(+), 16 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 3865c9c..8fc1c80 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -996,7 +996,7 @@ private[spark] class Executor(
try {
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
message, new RpcTimeout(HEARTBEAT_INTERVAL_MS.millis, EXECUTOR_HEARTBEAT_INTERVAL.key))
- if (response.reregisterBlockManager) {
+ if (!executorShutdown.get && response.reregisterBlockManager) {
logInfo("Told to re-register on heartbeat")
env.blockManager.reregister()
}
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index 97ffb36..a237447 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -270,6 +270,17 @@ class ExecutorSuite extends SparkFunSuite
heartbeatZeroAccumulatorUpdateTest(false)
}
+ private def withMockHeartbeatReceiverRef(executor: Executor)
+ (func: RpcEndpointRef => Unit): Unit = {
+ val executorClass = classOf[Executor]
+ val mockReceiverRef = mock[RpcEndpointRef]
+ val receiverRef = executorClass.getDeclaredField("heartbeatReceiverRef")
+ receiverRef.setAccessible(true)
+ receiverRef.set(executor, mockReceiverRef)
+
+ func(mockReceiverRef)
+ }
+
private def withHeartbeatExecutor(confs: (String, String)*)
(f: (Executor, ArrayBuffer[Heartbeat]) => Unit): Unit = {
val conf = new SparkConf
@@ -277,22 +288,18 @@ class ExecutorSuite extends SparkFunSuite
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
withExecutor("id", "localhost", SparkEnv.get) { executor =>
- val executorClass = classOf[Executor]
-
- // Save all heartbeats sent into an ArrayBuffer for verification
- val heartbeats = ArrayBuffer[Heartbeat]()
- val mockReceiver = mock[RpcEndpointRef]
- when(mockReceiver.askSync(any[Heartbeat], any[RpcTimeout])(any))
- .thenAnswer((invocation: InvocationOnMock) => {
- val args = invocation.getArguments()
- heartbeats += args(0).asInstanceOf[Heartbeat]
- HeartbeatResponse(false)
- })
- val receiverRef = executorClass.getDeclaredField("heartbeatReceiverRef")
- receiverRef.setAccessible(true)
- receiverRef.set(executor, mockReceiver)
+ withMockHeartbeatReceiverRef(executor) { mockReceiverRef =>
+ // Save all heartbeats sent into an ArrayBuffer for verification
+ val heartbeats = ArrayBuffer[Heartbeat]()
+ when(mockReceiverRef.askSync(any[Heartbeat], any[RpcTimeout])(any))
+ .thenAnswer((invocation: InvocationOnMock) => {
+ val args = invocation.getArguments()
+ heartbeats += args(0).asInstanceOf[Heartbeat]
+ HeartbeatResponse(false)
+ })
- f(executor, heartbeats)
+ f(executor, heartbeats)
+ }
}
}
@@ -416,6 +423,35 @@ class ExecutorSuite extends SparkFunSuite
assert(taskMetrics.getMetricValue("JVMHeapMemory") > 0)
}
+ test("SPARK-34949: do not re-register BlockManager when executor is shutting down") {
+ val reregisterInvoked = new AtomicBoolean(false)
+ val mockBlockManager = mock[BlockManager]
+ when(mockBlockManager.reregister()).thenAnswer { (_: InvocationOnMock) =>
+ reregisterInvoked.getAndSet(true)
+ }
+ val conf = new SparkConf(false).setAppName("test").setMaster("local[2]")
+ val mockEnv = createMockEnv(conf, new JavaSerializer(conf))
+ when(mockEnv.blockManager).thenReturn(mockBlockManager)
+
+ withExecutor("id", "localhost", mockEnv) { executor =>
+ withMockHeartbeatReceiverRef(executor) { mockReceiverRef =>
+ when(mockReceiverRef.askSync(any[Heartbeat], any[RpcTimeout])(any)).thenAnswer {
+ (_: InvocationOnMock) => HeartbeatResponse(reregisterBlockManager = true)
+ }
+ val reportHeartbeat = PrivateMethod[Unit](Symbol("reportHeartBeat"))
+ executor.invokePrivate(reportHeartbeat())
+ assert(reregisterInvoked.get(), "BlockManager.reregister should be invoked " +
+ "on HeartbeatResponse(reregisterBlockManager = true) when executor is not shutting down")
+
+ reregisterInvoked.getAndSet(false)
+ executor.stop()
+ executor.invokePrivate(reportHeartbeat())
+ assert(!reregisterInvoked.get(),
+ "BlockManager.reregister should not be invoked when executor is shutting down")
+ }
+ }
+ }
+
test("SPARK-33587: isFatalError") {
def errorInThreadPool(e: => Throwable): Throwable = {
intercept[Throwable] {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org