You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2016/09/12 20:10:47 UTC
spark git commit: [SPARK-17483] Refactoring in BlockManager status
reporting and block removal
Repository: spark
Updated Branches:
refs/heads/master 1742c3ab8 -> 3d40896f4
[SPARK-17483] Refactoring in BlockManager status reporting and block removal
This patch makes three minor refactorings to the BlockManager:
- Move the `if (info.tellMaster)` check out of `reportBlockStatus`; this fixes an issue where a debug logging message would incorrectly claim to have reported a block status to the master even though no message had been sent (in case `info.tellMaster == false`). This also makes it easier to write code which unconditionally sends block statuses to the master (which is necessary in another patch of mine).
- Split `removeBlock()` into two methods, the existing method and an internal `removeBlockInternal()` method which is designed to be called by internal code that already holds a write lock on the block. This is also needed by a followup patch.
- Instead of calling `getCurrentBlockStatus()` in `removeBlock()`, just pass `BlockStatus.empty`; the block status should always be empty following complete removal of a block.
These changes were originally authored as part of a bug fix patch which is targeted at branch-2.0 and master; I've split them out here into their own separate PR in order to make them easier to review and so that the behavior-changing parts of my other patch can be isolated to their own PR.
Author: Josh Rosen <jo...@databricks.com>
Closes #15036 from JoshRosen/cache-failure-race-conditions-refactorings-only.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3d40896f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3d40896f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3d40896f
Branch: refs/heads/master
Commit: 3d40896f410590c0be044b3fa7e5d32115fac05e
Parents: 1742c3a
Author: Josh Rosen <jo...@databricks.com>
Authored: Mon Sep 12 13:09:33 2016 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Mon Sep 12 13:09:33 2016 -0700
----------------------------------------------------------------------
.../org/apache/spark/storage/BlockManager.scala | 87 ++++++++++----------
1 file changed, 42 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/3d40896f/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 0614646..9e63777 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -217,7 +217,7 @@ private[spark] class BlockManager(
logInfo(s"Reporting ${blockInfoManager.size} blocks to the master.")
for ((blockId, info) <- blockInfoManager.entries) {
val status = getCurrentBlockStatus(blockId, info)
- if (!tryToReportBlockStatus(blockId, info, status)) {
+ if (info.tellMaster && !tryToReportBlockStatus(blockId, status)) {
logError(s"Failed to report $blockId to master; giving up.")
return
}
@@ -298,7 +298,7 @@ private[spark] class BlockManager(
/**
* Get the BlockStatus for the block identified by the given ID, if it exists.
- * NOTE: This is mainly for testing, and it doesn't fetch information from external block store.
+ * NOTE: This is mainly for testing.
*/
def getStatus(blockId: BlockId): Option[BlockStatus] = {
blockInfoManager.get(blockId).map { info =>
@@ -333,10 +333,9 @@ private[spark] class BlockManager(
*/
private def reportBlockStatus(
blockId: BlockId,
- info: BlockInfo,
status: BlockStatus,
droppedMemorySize: Long = 0L): Unit = {
- val needReregister = !tryToReportBlockStatus(blockId, info, status, droppedMemorySize)
+ val needReregister = !tryToReportBlockStatus(blockId, status, droppedMemorySize)
if (needReregister) {
logInfo(s"Got told to re-register updating block $blockId")
// Re-registering will report our new block for free.
@@ -352,17 +351,12 @@ private[spark] class BlockManager(
*/
private def tryToReportBlockStatus(
blockId: BlockId,
- info: BlockInfo,
status: BlockStatus,
droppedMemorySize: Long = 0L): Boolean = {
- if (info.tellMaster) {
- val storageLevel = status.storageLevel
- val inMemSize = Math.max(status.memSize, droppedMemorySize)
- val onDiskSize = status.diskSize
- master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
- } else {
- true
- }
+ val storageLevel = status.storageLevel
+ val inMemSize = Math.max(status.memSize, droppedMemorySize)
+ val onDiskSize = status.diskSize
+ master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
}
/**
@@ -374,7 +368,7 @@ private[spark] class BlockManager(
info.synchronized {
info.level match {
case null =>
- BlockStatus(StorageLevel.NONE, memSize = 0L, diskSize = 0L)
+ BlockStatus.empty
case level =>
val inMem = level.useMemory && memoryStore.contains(blockId)
val onDisk = level.useDisk && diskStore.contains(blockId)
@@ -807,12 +801,10 @@ private[spark] class BlockManager(
// Now that the block is in either the memory or disk store,
// tell the master about it.
info.size = size
- if (tellMaster) {
- reportBlockStatus(blockId, info, putBlockStatus)
- }
- Option(TaskContext.get()).foreach { c =>
- c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus)
+ if (tellMaster && info.tellMaster) {
+ reportBlockStatus(blockId, putBlockStatus)
}
+ addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
}
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
if (level.replication > 1) {
@@ -961,15 +953,12 @@ private[spark] class BlockManager(
val putBlockStatus = getCurrentBlockStatus(blockId, info)
val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
if (blockWasSuccessfullyStored) {
- // Now that the block is in either the memory, externalBlockStore, or disk store,
- // tell the master about it.
+ // Now that the block is in either the memory or disk store, tell the master about it.
info.size = size
- if (tellMaster) {
- reportBlockStatus(blockId, info, putBlockStatus)
- }
- Option(TaskContext.get()).foreach { c =>
- c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus)
+ if (tellMaster && info.tellMaster) {
+ reportBlockStatus(blockId, putBlockStatus)
}
+ addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
if (level.replication > 1) {
val remoteStartTime = System.currentTimeMillis
@@ -1271,12 +1260,10 @@ private[spark] class BlockManager(
val status = getCurrentBlockStatus(blockId, info)
if (info.tellMaster) {
- reportBlockStatus(blockId, info, status, droppedMemorySize)
+ reportBlockStatus(blockId, status, droppedMemorySize)
}
if (blockIsUpdated) {
- Option(TaskContext.get()).foreach { c =>
- c.taskMetrics().incUpdatedBlockStatuses(blockId -> status)
- }
+ addUpdatedBlockStatusToTaskMetrics(blockId, status)
}
status.storageLevel
}
@@ -1316,21 +1303,31 @@ private[spark] class BlockManager(
// The block has already been removed; do nothing.
logWarning(s"Asked to remove block $blockId, which does not exist")
case Some(info) =>
- // Removals are idempotent in disk store and memory store. At worst, we get a warning.
- val removedFromMemory = memoryStore.remove(blockId)
- val removedFromDisk = diskStore.remove(blockId)
- if (!removedFromMemory && !removedFromDisk) {
- logWarning(s"Block $blockId could not be removed as it was not found in either " +
- "the disk, memory, or external block store")
- }
- blockInfoManager.removeBlock(blockId)
- val removeBlockStatus = getCurrentBlockStatus(blockId, info)
- if (tellMaster && info.tellMaster) {
- reportBlockStatus(blockId, info, removeBlockStatus)
- }
- Option(TaskContext.get()).foreach { c =>
- c.taskMetrics().incUpdatedBlockStatuses(blockId -> removeBlockStatus)
- }
+ removeBlockInternal(blockId, tellMaster = tellMaster && info.tellMaster)
+ addUpdatedBlockStatusToTaskMetrics(blockId, BlockStatus.empty)
+ }
+ }
+
+ /**
+ * Internal version of [[removeBlock()]] which assumes that the caller already holds a write
+ * lock on the block.
+ */
+ private def removeBlockInternal(blockId: BlockId, tellMaster: Boolean): Unit = {
+ // Removals are idempotent in disk store and memory store. At worst, we get a warning.
+ val removedFromMemory = memoryStore.remove(blockId)
+ val removedFromDisk = diskStore.remove(blockId)
+ if (!removedFromMemory && !removedFromDisk) {
+ logWarning(s"Block $blockId could not be removed as it was not found on disk or in memory")
+ }
+ blockInfoManager.removeBlock(blockId)
+ if (tellMaster) {
+ reportBlockStatus(blockId, BlockStatus.empty)
+ }
+ }
+
+ private def addUpdatedBlockStatusToTaskMetrics(blockId: BlockId, status: BlockStatus): Unit = {
+ Option(TaskContext.get()).foreach { c =>
+ c.taskMetrics().incUpdatedBlockStatuses(blockId -> status)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org