You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2021/04/05 22:33:09 UTC

[spark] branch master updated: [SPARK-34949][CORE] Prevent BlockManager reregister when Executor is shutting down

This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a9ca197  [SPARK-34949][CORE] Prevent BlockManager reregister when Executor is shutting down
a9ca197 is described below

commit a9ca1978ae8ecc53e2ef9e14b4be70dc8f5d9341
Author: Sumeet Gajjar <su...@gmail.com>
AuthorDate: Mon Apr 5 17:32:43 2021 -0500

    [SPARK-34949][CORE] Prevent BlockManager reregister when Executor is shutting down
    
    ### What changes were proposed in this pull request?
    
    This PR prevents reregistering BlockManager when a Executor is shutting down. It is achieved by checking  `executorShutdown` before calling `env.blockManager.reregister()`.
    
    ### Why are the changes needed?
    
    This change is required since Spark reports executors as active, even they are removed.
    I was testing Dynamic Allocation on K8s with about 300 executors. While doing so, when the executors were torn down due to `spark.dynamicAllocation.executorIdleTimeout`, I noticed all the executor pods being removed from K8s, however, under the "Executors" tab in SparkUI, I could see some executors listed as alive.  [spark.sparkContext.statusTracker.getExecutorInfos.length](https://github.com/apache/spark/blob/65da9287bc5112564836a555cd2967fc6b05856f/core/src/main/scala/org/apache/spa [...]
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added a new test.
    
    ## Logs
    Following are the logs of the executor(Id:303) which re-registers `BlockManager`
    ```
    21/04/02 21:33:28 INFO CoarseGrainedExecutorBackend: Got assigned task 1076
    21/04/02 21:33:28 INFO Executor: Running task 4.0 in stage 3.0 (TID 1076)
    21/04/02 21:33:28 INFO MapOutputTrackerWorker: Updating epoch to 302 and clearing cache
    21/04/02 21:33:28 INFO TorrentBroadcast: Started reading broadcast variable 3
    21/04/02 21:33:28 INFO TransportClientFactory: Successfully created connection to /100.100.195.227:33703 after 76 ms (62 ms spent in bootstraps)
    21/04/02 21:33:28 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 2.4 KB, free 168.0 MB)
    21/04/02 21:33:28 INFO TorrentBroadcast: Reading broadcast variable 3 took 168 ms
    21/04/02 21:33:28 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 3.9 KB, free 168.0 MB)
    21/04/02 21:33:29 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 1, fetching them
    21/04/02 21:33:29 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTrackerda-lite-test-4-7a57e478947d206d-driver-svc.dex-app-n5ttnbmg.svc:7078)
    21/04/02 21:33:29 INFO MapOutputTrackerWorker: Got the output locations
    21/04/02 21:33:29 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks including 1 local blocks and 1 remote blocks
    21/04/02 21:33:30 INFO TransportClientFactory: Successfully created connection to /100.100.80.103:40971 after 660 ms (528 ms spent in bootstraps)
    21/04/02 21:33:30 INFO ShuffleBlockFetcherIterator: Started 1 remote fetches in 1042 ms
    21/04/02 21:33:31 INFO Executor: Finished task 4.0 in stage 3.0 (TID 1076). 1276 bytes result sent to driver
    .
    .
    .
    21/04/02 21:34:16 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
    21/04/02 21:34:16 INFO Executor: Told to re-register on heartbeat
    21/04/02 21:34:16 INFO BlockManager: BlockManager BlockManagerId(303, 100.100.122.34, 41265, None) re-registering with master
    21/04/02 21:34:16 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(303, 100.100.122.34, 41265, None)
    21/04/02 21:34:16 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(303, 100.100.122.34, 41265, None)
    21/04/02 21:34:16 INFO BlockManager: Reporting 0 blocks to the master.
    21/04/02 21:34:16 INFO MemoryStore: MemoryStore cleared
    21/04/02 21:34:16 INFO BlockManager: BlockManager stopped
    21/04/02 21:34:16 INFO FileDataSink: Closing sink with output file = /tmp/safari-events/.des_analysis/safari-events/hdp_spark_monitoring_random-container-037caf27-6c77-433f-820f-03cd9c7d9b6e-spark-8a492407d60b401bbf4309a14ea02ca2_events.tsv
    21/04/02 21:34:16 INFO HonestProfilerBasedThreadSnapshotProvider: Stopping agent
    21/04/02 21:34:16 INFO HonestProfilerHandler: Stopping honest profiler agent
    21/04/02 21:34:17 INFO ShutdownHookManager: Shutdown hook called
    21/04/02 21:34:17 INFO ShutdownHookManager: Deleting directory /var/data/spark-d886588c-2a7e-491d-bbcb-4f58b3e31001/spark-4aa337a0-60c0-45da-9562-8c50eaff3cea
    
    ```
    
    Closes #32043 from sumeetgajjar/SPARK-34949.
    
    Authored-by: Sumeet Gajjar <su...@gmail.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../scala/org/apache/spark/executor/Executor.scala |  2 +-
 .../org/apache/spark/executor/ExecutorSuite.scala  | 66 +++++++++++++++++-----
 2 files changed, 52 insertions(+), 16 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 3865c9c..8fc1c80 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -996,7 +996,7 @@ private[spark] class Executor(
     try {
       val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
         message, new RpcTimeout(HEARTBEAT_INTERVAL_MS.millis, EXECUTOR_HEARTBEAT_INTERVAL.key))
-      if (response.reregisterBlockManager) {
+      if (!executorShutdown.get && response.reregisterBlockManager) {
         logInfo("Told to re-register on heartbeat")
         env.blockManager.reregister()
       }
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index 97ffb36..a237447 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -270,6 +270,17 @@ class ExecutorSuite extends SparkFunSuite
     heartbeatZeroAccumulatorUpdateTest(false)
   }
 
+  private def withMockHeartbeatReceiverRef(executor: Executor)
+      (func: RpcEndpointRef => Unit): Unit = {
+    val executorClass = classOf[Executor]
+    val mockReceiverRef = mock[RpcEndpointRef]
+    val receiverRef = executorClass.getDeclaredField("heartbeatReceiverRef")
+    receiverRef.setAccessible(true)
+    receiverRef.set(executor, mockReceiverRef)
+
+    func(mockReceiverRef)
+  }
+
   private def withHeartbeatExecutor(confs: (String, String)*)
       (f: (Executor, ArrayBuffer[Heartbeat]) => Unit): Unit = {
     val conf = new SparkConf
@@ -277,22 +288,18 @@ class ExecutorSuite extends SparkFunSuite
     val serializer = new JavaSerializer(conf)
     val env = createMockEnv(conf, serializer)
     withExecutor("id", "localhost", SparkEnv.get) { executor =>
-      val executorClass = classOf[Executor]
-
-      // Save all heartbeats sent into an ArrayBuffer for verification
-      val heartbeats = ArrayBuffer[Heartbeat]()
-      val mockReceiver = mock[RpcEndpointRef]
-      when(mockReceiver.askSync(any[Heartbeat], any[RpcTimeout])(any))
-        .thenAnswer((invocation: InvocationOnMock) => {
-          val args = invocation.getArguments()
-          heartbeats += args(0).asInstanceOf[Heartbeat]
-          HeartbeatResponse(false)
-        })
-      val receiverRef = executorClass.getDeclaredField("heartbeatReceiverRef")
-      receiverRef.setAccessible(true)
-      receiverRef.set(executor, mockReceiver)
+      withMockHeartbeatReceiverRef(executor) { mockReceiverRef =>
+        // Save all heartbeats sent into an ArrayBuffer for verification
+        val heartbeats = ArrayBuffer[Heartbeat]()
+        when(mockReceiverRef.askSync(any[Heartbeat], any[RpcTimeout])(any))
+          .thenAnswer((invocation: InvocationOnMock) => {
+            val args = invocation.getArguments()
+            heartbeats += args(0).asInstanceOf[Heartbeat]
+            HeartbeatResponse(false)
+          })
 
-      f(executor, heartbeats)
+        f(executor, heartbeats)
+      }
     }
   }
 
@@ -416,6 +423,35 @@ class ExecutorSuite extends SparkFunSuite
     assert(taskMetrics.getMetricValue("JVMHeapMemory") > 0)
   }
 
+  test("SPARK-34949: do not re-register BlockManager when executor is shutting down") {
+    val reregisterInvoked = new AtomicBoolean(false)
+    val mockBlockManager = mock[BlockManager]
+    when(mockBlockManager.reregister()).thenAnswer { (_: InvocationOnMock) =>
+      reregisterInvoked.getAndSet(true)
+    }
+    val conf = new SparkConf(false).setAppName("test").setMaster("local[2]")
+    val mockEnv = createMockEnv(conf, new JavaSerializer(conf))
+    when(mockEnv.blockManager).thenReturn(mockBlockManager)
+
+    withExecutor("id", "localhost", mockEnv) { executor =>
+      withMockHeartbeatReceiverRef(executor) { mockReceiverRef =>
+        when(mockReceiverRef.askSync(any[Heartbeat], any[RpcTimeout])(any)).thenAnswer {
+          (_: InvocationOnMock) => HeartbeatResponse(reregisterBlockManager = true)
+        }
+        val reportHeartbeat = PrivateMethod[Unit](Symbol("reportHeartBeat"))
+        executor.invokePrivate(reportHeartbeat())
+        assert(reregisterInvoked.get(), "BlockManager.reregister should be invoked " +
+          "on HeartbeatResponse(reregisterBlockManager = true) when executor is not shutting down")
+
+        reregisterInvoked.getAndSet(false)
+        executor.stop()
+        executor.invokePrivate(reportHeartbeat())
+        assert(!reregisterInvoked.get(),
+          "BlockManager.reregister should not be invoked when executor is shutting down")
+      }
+    }
+  }
+
   test("SPARK-33587: isFatalError") {
     def errorInThreadPool(e: => Throwable): Throwable = {
       intercept[Throwable] {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org