You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2016/09/12 20:10:47 UTC

spark git commit: [SPARK-17483] Refactoring in BlockManager status reporting and block removal

Repository: spark
Updated Branches:
  refs/heads/master 1742c3ab8 -> 3d40896f4


[SPARK-17483] Refactoring in BlockManager status reporting and block removal

This patch makes three minor refactorings to the BlockManager:

- Move the `if (info.tellMaster)` check out of `reportBlockStatus`; this fixes an issue where a debug logging message would incorrectly claim to have reported a block status to the master even though no message had been sent (in case `info.tellMaster == false`). This also makes it easier to write code which unconditionally sends block statuses to the master (which is necessary in another patch of mine).
- Split  `removeBlock()` into two methods, the existing method and an internal `removeBlockInternal()` method which is designed to be called by internal code that already holds a write lock on the block. This is also needed by a followup patch.
- Instead of calling `getCurrentBlockStatus()` in `removeBlock()`, just pass `BlockStatus.empty`; the block status should always be empty following complete removal of a block.

These changes were originally authored as part of a bug fix patch which is targeted at branch-2.0 and master; I've split them out here into their own separate PR in order to make them easier to review and so that the behavior-changing parts of my other patch can be isolated to their own PR.

Author: Josh Rosen <jo...@databricks.com>

Closes #15036 from JoshRosen/cache-failure-race-conditions-refactorings-only.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3d40896f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3d40896f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3d40896f

Branch: refs/heads/master
Commit: 3d40896f410590c0be044b3fa7e5d32115fac05e
Parents: 1742c3a
Author: Josh Rosen <jo...@databricks.com>
Authored: Mon Sep 12 13:09:33 2016 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Mon Sep 12 13:09:33 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/storage/BlockManager.scala | 87 ++++++++++----------
 1 file changed, 42 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3d40896f/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 0614646..9e63777 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -217,7 +217,7 @@ private[spark] class BlockManager(
     logInfo(s"Reporting ${blockInfoManager.size} blocks to the master.")
     for ((blockId, info) <- blockInfoManager.entries) {
       val status = getCurrentBlockStatus(blockId, info)
-      if (!tryToReportBlockStatus(blockId, info, status)) {
+      if (info.tellMaster && !tryToReportBlockStatus(blockId, status)) {
         logError(s"Failed to report $blockId to master; giving up.")
         return
       }
@@ -298,7 +298,7 @@ private[spark] class BlockManager(
 
   /**
    * Get the BlockStatus for the block identified by the given ID, if it exists.
-   * NOTE: This is mainly for testing, and it doesn't fetch information from external block store.
+   * NOTE: This is mainly for testing.
    */
   def getStatus(blockId: BlockId): Option[BlockStatus] = {
     blockInfoManager.get(blockId).map { info =>
@@ -333,10 +333,9 @@ private[spark] class BlockManager(
    */
   private def reportBlockStatus(
       blockId: BlockId,
-      info: BlockInfo,
       status: BlockStatus,
       droppedMemorySize: Long = 0L): Unit = {
-    val needReregister = !tryToReportBlockStatus(blockId, info, status, droppedMemorySize)
+    val needReregister = !tryToReportBlockStatus(blockId, status, droppedMemorySize)
     if (needReregister) {
       logInfo(s"Got told to re-register updating block $blockId")
       // Re-registering will report our new block for free.
@@ -352,17 +351,12 @@ private[spark] class BlockManager(
    */
   private def tryToReportBlockStatus(
       blockId: BlockId,
-      info: BlockInfo,
       status: BlockStatus,
       droppedMemorySize: Long = 0L): Boolean = {
-    if (info.tellMaster) {
-      val storageLevel = status.storageLevel
-      val inMemSize = Math.max(status.memSize, droppedMemorySize)
-      val onDiskSize = status.diskSize
-      master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
-    } else {
-      true
-    }
+    val storageLevel = status.storageLevel
+    val inMemSize = Math.max(status.memSize, droppedMemorySize)
+    val onDiskSize = status.diskSize
+    master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
   }
 
   /**
@@ -374,7 +368,7 @@ private[spark] class BlockManager(
     info.synchronized {
       info.level match {
         case null =>
-          BlockStatus(StorageLevel.NONE, memSize = 0L, diskSize = 0L)
+          BlockStatus.empty
         case level =>
           val inMem = level.useMemory && memoryStore.contains(blockId)
           val onDisk = level.useDisk && diskStore.contains(blockId)
@@ -807,12 +801,10 @@ private[spark] class BlockManager(
         // Now that the block is in either the memory or disk store,
         // tell the master about it.
         info.size = size
-        if (tellMaster) {
-          reportBlockStatus(blockId, info, putBlockStatus)
-        }
-        Option(TaskContext.get()).foreach { c =>
-          c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus)
+        if (tellMaster && info.tellMaster) {
+          reportBlockStatus(blockId, putBlockStatus)
         }
+        addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
       }
       logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
       if (level.replication > 1) {
@@ -961,15 +953,12 @@ private[spark] class BlockManager(
       val putBlockStatus = getCurrentBlockStatus(blockId, info)
       val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
       if (blockWasSuccessfullyStored) {
-        // Now that the block is in either the memory, externalBlockStore, or disk store,
-        // tell the master about it.
+        // Now that the block is in either the memory or disk store, tell the master about it.
         info.size = size
-        if (tellMaster) {
-          reportBlockStatus(blockId, info, putBlockStatus)
-        }
-        Option(TaskContext.get()).foreach { c =>
-          c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus)
+        if (tellMaster && info.tellMaster) {
+          reportBlockStatus(blockId, putBlockStatus)
         }
+        addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
         logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
         if (level.replication > 1) {
           val remoteStartTime = System.currentTimeMillis
@@ -1271,12 +1260,10 @@ private[spark] class BlockManager(
 
     val status = getCurrentBlockStatus(blockId, info)
     if (info.tellMaster) {
-      reportBlockStatus(blockId, info, status, droppedMemorySize)
+      reportBlockStatus(blockId, status, droppedMemorySize)
     }
     if (blockIsUpdated) {
-      Option(TaskContext.get()).foreach { c =>
-        c.taskMetrics().incUpdatedBlockStatuses(blockId -> status)
-      }
+      addUpdatedBlockStatusToTaskMetrics(blockId, status)
     }
     status.storageLevel
   }
@@ -1316,21 +1303,31 @@ private[spark] class BlockManager(
         // The block has already been removed; do nothing.
         logWarning(s"Asked to remove block $blockId, which does not exist")
       case Some(info) =>
-        // Removals are idempotent in disk store and memory store. At worst, we get a warning.
-        val removedFromMemory = memoryStore.remove(blockId)
-        val removedFromDisk = diskStore.remove(blockId)
-        if (!removedFromMemory && !removedFromDisk) {
-          logWarning(s"Block $blockId could not be removed as it was not found in either " +
-            "the disk, memory, or external block store")
-        }
-        blockInfoManager.removeBlock(blockId)
-        val removeBlockStatus = getCurrentBlockStatus(blockId, info)
-        if (tellMaster && info.tellMaster) {
-          reportBlockStatus(blockId, info, removeBlockStatus)
-        }
-        Option(TaskContext.get()).foreach { c =>
-          c.taskMetrics().incUpdatedBlockStatuses(blockId -> removeBlockStatus)
-        }
+        removeBlockInternal(blockId, tellMaster = tellMaster && info.tellMaster)
+        addUpdatedBlockStatusToTaskMetrics(blockId, BlockStatus.empty)
+    }
+  }
+
+  /**
+   * Internal version of [[removeBlock()]] which assumes that the caller already holds a write
+   * lock on the block.
+   */
+  private def removeBlockInternal(blockId: BlockId, tellMaster: Boolean): Unit = {
+    // Removals are idempotent in disk store and memory store. At worst, we get a warning.
+    val removedFromMemory = memoryStore.remove(blockId)
+    val removedFromDisk = diskStore.remove(blockId)
+    if (!removedFromMemory && !removedFromDisk) {
+      logWarning(s"Block $blockId could not be removed as it was not found on disk or in memory")
+    }
+    blockInfoManager.removeBlock(blockId)
+    if (tellMaster) {
+      reportBlockStatus(blockId, BlockStatus.empty)
+    }
+  }
+
+  private def addUpdatedBlockStatusToTaskMetrics(blockId: BlockId, status: BlockStatus): Unit = {
+    Option(TaskContext.get()).foreach { c =>
+      c.taskMetrics().incUpdatedBlockStatuses(blockId -> status)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org