You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/02/05 08:28:31 UTC
[spark] branch branch-3.0 updated: [SPARK-30594][CORE] Do not post
SparkListenerBlockUpdated when updateBlockInfo returns false
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 3854ad8 [SPARK-30594][CORE] Do not post SparkListenerBlockUpdated when updateBlockInfo returns false
3854ad8 is described below
commit 3854ad87c78f2a331f9c9c1a34f9ec281900f8fe
Author: yi.wu <yi...@databricks.com>
AuthorDate: Wed Feb 5 16:15:44 2020 +0800
[SPARK-30594][CORE] Do not post SparkListenerBlockUpdated when updateBlockInfo returns false
### What changes were proposed in this pull request?
If `updateBlockInfo` returns false, which means the `BlockManager` will re-register and report all blocks later. So, we may report two times for the same block, which causes `AppStatusListener` to count used memory for two times, too. As a result, the used memory can exceed the total memory.
So, this PR changes it to not post `SparkListenerBlockUpdated` when `updateBlockInfo` returns false. And, always clean up used memory whenever `AppStatusListener` receives `SparkListenerBlockManagerAdded`.
### Why are the changes needed?
This PR tries to fix negative memory usage in UI (https://user-images.githubusercontent.com/3488126/72131225-95e37e00-33b6-11ea-8708-6e5ed328d1ca.png, see #27144 ). Though, I'm not very sure this is the root cause for #27144 since known information is limited here.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Added new tests by xuanyuanking
Closes #27306 from Ngone51/fix-possible-negative-memory.
Lead-authored-by: yi.wu <yi...@databricks.com>
Co-authored-by: Yuanjian Li <xy...@gmail.com>
Co-authored-by: wuyi <yi...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 30e418a6fe971b4a84c37ca0ae20f1a664b117d3)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../apache/spark/status/AppStatusListener.scala | 9 ++++++--
.../scala/org/apache/spark/status/LiveEntity.scala | 2 +-
.../spark/storage/BlockManagerMasterEndpoint.scala | 9 ++++++--
.../spark/status/AppStatusListenerSuite.scala | 24 ++++++++++++++++++++++
.../apache/spark/storage/BlockManagerSuite.scala | 17 +++++++++++++--
5 files changed, 54 insertions(+), 7 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index a5850fc..c3f22f3 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -71,7 +71,7 @@ private[spark] class AppStatusListener(
// causing too many writes to the underlying store, and other expensive operations).
private val liveStages = new ConcurrentHashMap[(Int, Int), LiveStage]()
private val liveJobs = new HashMap[Int, LiveJob]()
- private val liveExecutors = new HashMap[String, LiveExecutor]()
+ private[spark] val liveExecutors = new HashMap[String, LiveExecutor]()
private val deadExecutors = new HashMap[String, LiveExecutor]()
private val liveTasks = new HashMap[Long, LiveTask]()
private val liveRDDs = new HashMap[Int, LiveRDD]()
@@ -772,6 +772,11 @@ private[spark] class AppStatusListener(
event.maxOnHeapMem.foreach { _ =>
exec.totalOnHeap = event.maxOnHeapMem.get
exec.totalOffHeap = event.maxOffHeapMem.get
+ // SPARK-30594: whenever(first time or re-register) a BlockManager added, all blocks
+ // from this BlockManager will be reported to driver later. So, we should clean up
+ // used memory to avoid overlapped count.
+ exec.usedOnHeap = 0
+ exec.usedOffHeap = 0
}
exec.isActive = true
exec.maxMemory = event.maxMem
@@ -1042,7 +1047,7 @@ private[spark] class AppStatusListener(
}
}
- private def updateExecutorMemoryDiskInfo(
+ private[spark] def updateExecutorMemoryDiskInfo(
exec: LiveExecutor,
storageLevel: StorageLevel,
memoryDelta: Long,
diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
index e3046dc..2714f30 100644
--- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
+++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
@@ -245,7 +245,7 @@ private class LiveTask(
}
-private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveEntity {
+private[spark] class LiveExecutor(val executorId: String, _addTime: Long) extends LiveEntity {
var hostPort: String = null
var host: String = null
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 41ef190..d7f7eed 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -98,8 +98,13 @@ class BlockManagerMasterEndpoint(
case _updateBlockInfo @
UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
- context.reply(updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size))
- listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
+ val isSuccess = updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size)
+ context.reply(isSuccess)
+ // SPARK-30594: we should not post `SparkListenerBlockUpdated` when updateBlockInfo
+ // returns false since the block info would be updated again later.
+ if (isSuccess) {
+ listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
+ }
case GetLocations(blockId) =>
context.reply(getLocations(blockId))
diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index e7eed7b..255f918 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -1657,6 +1657,30 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
}
}
+ test("clean up used memory when BlockManager added") {
+ val listener = new AppStatusListener(store, conf, true)
+ // Add block manager at the first time
+ val driver = BlockManagerId(SparkContext.DRIVER_IDENTIFIER, "localhost", 42)
+ listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(
+ time, driver, 42L, Some(43L), Some(44L)))
+ // Update the memory metrics
+ listener.updateExecutorMemoryDiskInfo(
+ listener.liveExecutors(SparkContext.DRIVER_IDENTIFIER),
+ StorageLevel.MEMORY_AND_DISK,
+ 10L,
+ 10L
+ )
+ // Re-add the same block manager again
+ listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(
+ time, driver, 42L, Some(43L), Some(44L)))
+
+ check[ExecutorSummaryWrapper](SparkContext.DRIVER_IDENTIFIER) { d =>
+ val memoryMetrics = d.info.memoryMetrics.get
+ assert(memoryMetrics.usedOffHeapStorageMemory == 0)
+ assert(memoryMetrics.usedOnHeapStorageMemory == 0)
+ }
+ }
+
private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptNumber)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 89f00b5..8d06768 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -50,7 +50,7 @@ import org.apache.spark.network.server.{NoOpRpcHandler, TransportServer, Transpo
import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager, ExecutorDiskUtils, ExternalBlockStoreClient}
import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, RegisterExecutor}
import org.apache.spark.rpc.RpcEnv
-import org.apache.spark.scheduler.LiveListenerBus
+import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerBlockUpdated}
import org.apache.spark.security.{CryptoStreamUtils, EncryptionFunSuite}
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, SerializerManager}
import org.apache.spark.shuffle.sort.SortShuffleManager
@@ -71,6 +71,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val allStores = ArrayBuffer[BlockManager]()
var rpcEnv: RpcEnv = null
var master: BlockManagerMaster = null
+ var liveListenerBus: LiveListenerBus = null
val securityMgr = new SecurityManager(new SparkConf(false))
val bcastManager = new BroadcastManager(true, new SparkConf(false), securityMgr)
val mapOutputTracker = new MapOutputTrackerMaster(new SparkConf(false), bcastManager, true)
@@ -145,9 +146,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
when(sc.conf).thenReturn(conf)
val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]()
+ liveListenerBus = spy(new LiveListenerBus(conf))
master = spy(new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
new BlockManagerMasterEndpoint(rpcEnv, true, conf,
- new LiveListenerBus(conf), None, blockManagerInfo)),
+ liveListenerBus, None, blockManagerInfo)),
rpcEnv.setupEndpoint("blockmanagerHeartbeat",
new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true, blockManagerInfo)), conf, true))
@@ -164,6 +166,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
rpcEnv.awaitTermination()
rpcEnv = null
master = null
+ liveListenerBus = null
} finally {
super.afterEach()
}
@@ -1693,6 +1696,16 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(locs(blockIds(0)) == expectedLocs)
}
+ test("SPARK-30594: Do not post SparkListenerBlockUpdated when updateBlockInfo returns false") {
+ // update block info for non-existent block manager
+ val updateInfo = UpdateBlockInfo(BlockManagerId("1", "host1", 100),
+ BlockId("test_1"), StorageLevel.MEMORY_ONLY, 1, 1)
+ val result = master.driverEndpoint.askSync[Boolean](updateInfo)
+
+ assert(!result)
+ verify(liveListenerBus, never()).post(SparkListenerBlockUpdated(BlockUpdatedInfo(updateInfo)))
+ }
+
class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService {
var numCalls = 0
var tempFileManager: DownloadFileManager = null
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org