You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/02/05 08:28:31 UTC

[spark] branch branch-3.0 updated: [SPARK-30594][CORE] Do not post SparkListenerBlockUpdated when updateBlockInfo returns false

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 3854ad8  [SPARK-30594][CORE] Do not post SparkListenerBlockUpdated when updateBlockInfo returns false
3854ad8 is described below

commit 3854ad87c78f2a331f9c9c1a34f9ec281900f8fe
Author: yi.wu <yi...@databricks.com>
AuthorDate: Wed Feb 5 16:15:44 2020 +0800

    [SPARK-30594][CORE] Do not post SparkListenerBlockUpdated when updateBlockInfo returns false
    
    ### What changes were proposed in this pull request?
    
    If `updateBlockInfo` returns false, which means the `BlockManager` will re-register and report all blocks later. So, we may report two times for the same block, which causes `AppStatusListener` to count used memory for two times, too. As a result, the used memory can exceed the total memory.
    So, this PR changes it to not post `SparkListenerBlockUpdated` when `updateBlockInfo` returns false. And, always clean up used memory whenever `AppStatusListener` receives `SparkListenerBlockManagerAdded`.
    
    ### Why are the changes needed?
    
    This PR tries to fix negative memory usage in UI (https://user-images.githubusercontent.com/3488126/72131225-95e37e00-33b6-11ea-8708-6e5ed328d1ca.png, see #27144 ). Though, I'm not very sure this is the root cause for #27144 since known information is limited here.
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added new tests by xuanyuanking
    
    Closes #27306 from Ngone51/fix-possible-negative-memory.
    
    Lead-authored-by: yi.wu <yi...@databricks.com>
    Co-authored-by: Yuanjian Li <xy...@gmail.com>
    Co-authored-by: wuyi <yi...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 30e418a6fe971b4a84c37ca0ae20f1a664b117d3)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../apache/spark/status/AppStatusListener.scala    |  9 ++++++--
 .../scala/org/apache/spark/status/LiveEntity.scala |  2 +-
 .../spark/storage/BlockManagerMasterEndpoint.scala |  9 ++++++--
 .../spark/status/AppStatusListenerSuite.scala      | 24 ++++++++++++++++++++++
 .../apache/spark/storage/BlockManagerSuite.scala   | 17 +++++++++++++--
 5 files changed, 54 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index a5850fc..c3f22f3 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -71,7 +71,7 @@ private[spark] class AppStatusListener(
   // causing too many writes to the underlying store, and other expensive operations).
   private val liveStages = new ConcurrentHashMap[(Int, Int), LiveStage]()
   private val liveJobs = new HashMap[Int, LiveJob]()
-  private val liveExecutors = new HashMap[String, LiveExecutor]()
+  private[spark] val liveExecutors = new HashMap[String, LiveExecutor]()
   private val deadExecutors = new HashMap[String, LiveExecutor]()
   private val liveTasks = new HashMap[Long, LiveTask]()
   private val liveRDDs = new HashMap[Int, LiveRDD]()
@@ -772,6 +772,11 @@ private[spark] class AppStatusListener(
     event.maxOnHeapMem.foreach { _ =>
       exec.totalOnHeap = event.maxOnHeapMem.get
       exec.totalOffHeap = event.maxOffHeapMem.get
+      // SPARK-30594: whenever(first time or re-register) a BlockManager added, all blocks
+      // from this BlockManager will be reported to driver later. So, we should clean up
+      // used memory to avoid overlapped count.
+      exec.usedOnHeap = 0
+      exec.usedOffHeap = 0
     }
     exec.isActive = true
     exec.maxMemory = event.maxMem
@@ -1042,7 +1047,7 @@ private[spark] class AppStatusListener(
     }
   }
 
-  private def updateExecutorMemoryDiskInfo(
+  private[spark] def updateExecutorMemoryDiskInfo(
       exec: LiveExecutor,
       storageLevel: StorageLevel,
       memoryDelta: Long,
diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
index e3046dc..2714f30 100644
--- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
+++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
@@ -245,7 +245,7 @@ private class LiveTask(
 
 }
 
-private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveEntity {
+private[spark] class LiveExecutor(val executorId: String, _addTime: Long) extends LiveEntity {
 
   var hostPort: String = null
   var host: String = null
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 41ef190..d7f7eed 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -98,8 +98,13 @@ class BlockManagerMasterEndpoint(
 
     case _updateBlockInfo @
         UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
-      context.reply(updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size))
-      listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
+      val isSuccess = updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size)
+      context.reply(isSuccess)
+      // SPARK-30594: we should not post `SparkListenerBlockUpdated` when updateBlockInfo
+      // returns false since the block info would be updated again later.
+      if (isSuccess) {
+        listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
+      }
 
     case GetLocations(blockId) =>
       context.reply(getLocations(blockId))
diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index e7eed7b..255f918 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -1657,6 +1657,30 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
     }
   }
 
+  test("clean up used memory when BlockManager added") {
+    val listener = new AppStatusListener(store, conf, true)
+    // Add block manager at the first time
+    val driver = BlockManagerId(SparkContext.DRIVER_IDENTIFIER, "localhost", 42)
+    listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(
+      time, driver, 42L, Some(43L), Some(44L)))
+    // Update the memory metrics
+    listener.updateExecutorMemoryDiskInfo(
+      listener.liveExecutors(SparkContext.DRIVER_IDENTIFIER),
+      StorageLevel.MEMORY_AND_DISK,
+      10L,
+      10L
+    )
+    // Re-add the same block manager again
+    listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(
+      time, driver, 42L, Some(43L), Some(44L)))
+
+    check[ExecutorSummaryWrapper](SparkContext.DRIVER_IDENTIFIER) { d =>
+      val memoryMetrics = d.info.memoryMetrics.get
+      assert(memoryMetrics.usedOffHeapStorageMemory == 0)
+      assert(memoryMetrics.usedOnHeapStorageMemory == 0)
+    }
+  }
+
 
   private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptNumber)
 
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 89f00b5..8d06768 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -50,7 +50,7 @@ import org.apache.spark.network.server.{NoOpRpcHandler, TransportServer, Transpo
 import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager, ExecutorDiskUtils, ExternalBlockStoreClient}
 import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, RegisterExecutor}
 import org.apache.spark.rpc.RpcEnv
-import org.apache.spark.scheduler.LiveListenerBus
+import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerBlockUpdated}
 import org.apache.spark.security.{CryptoStreamUtils, EncryptionFunSuite}
 import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, SerializerManager}
 import org.apache.spark.shuffle.sort.SortShuffleManager
@@ -71,6 +71,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
   val allStores = ArrayBuffer[BlockManager]()
   var rpcEnv: RpcEnv = null
   var master: BlockManagerMaster = null
+  var liveListenerBus: LiveListenerBus = null
   val securityMgr = new SecurityManager(new SparkConf(false))
   val bcastManager = new BroadcastManager(true, new SparkConf(false), securityMgr)
   val mapOutputTracker = new MapOutputTrackerMaster(new SparkConf(false), bcastManager, true)
@@ -145,9 +146,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     when(sc.conf).thenReturn(conf)
 
     val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]()
+    liveListenerBus = spy(new LiveListenerBus(conf))
     master = spy(new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
       new BlockManagerMasterEndpoint(rpcEnv, true, conf,
-        new LiveListenerBus(conf), None, blockManagerInfo)),
+        liveListenerBus, None, blockManagerInfo)),
       rpcEnv.setupEndpoint("blockmanagerHeartbeat",
       new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true, blockManagerInfo)), conf, true))
 
@@ -164,6 +166,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       rpcEnv.awaitTermination()
       rpcEnv = null
       master = null
+      liveListenerBus = null
     } finally {
       super.afterEach()
     }
@@ -1693,6 +1696,16 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(locs(blockIds(0)) == expectedLocs)
   }
 
+  test("SPARK-30594: Do not post SparkListenerBlockUpdated when updateBlockInfo returns false") {
+    // update block info for non-existent block manager
+    val updateInfo = UpdateBlockInfo(BlockManagerId("1", "host1", 100),
+      BlockId("test_1"), StorageLevel.MEMORY_ONLY, 1, 1)
+    val result = master.driverEndpoint.askSync[Boolean](updateInfo)
+
+    assert(!result)
+    verify(liveListenerBus, never()).post(SparkListenerBlockUpdated(BlockUpdatedInfo(updateInfo)))
+  }
+
   class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService {
     var numCalls = 0
     var tempFileManager: DownloadFileManager = null


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org