You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/02/05 18:47:44 UTC

[spark] branch master updated: [SPARK-26768][CORE] Remove useless code in BlockManager

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 32ec528  [SPARK-26768][CORE] Remove useless code in BlockManager
32ec528 is described below

commit 32ec528e63cb768f85644282978040157c3c2fb7
Author: Liupengcheng <li...@xiaomi.com>
AuthorDate: Tue Feb 5 10:47:21 2019 -0800

    [SPARK-26768][CORE] Remove useless code in BlockManager
    
    ## What changes were proposed in this pull request?
    
    Recently, when I was reading some code of `BlockManager.getBlockData`, I found that there are useless code that would never reach. The related codes is as below:
    
    ```
    override def getBlockData(blockId: BlockId): ManagedBuffer = {
      if (blockId.isShuffle) {
        shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
      } else {
        getLocalBytes(blockId) match {
          case Some(blockData) =>
            new BlockManagerManagedBuffer(blockInfoManager, blockId, blockData, true)
          case None =>
            // If this block manager receives a request for a block that it doesn't have then it's
            // likely that the master has outdated block statuses for this block. Therefore, we send
            // an RPC so that this block is marked as being unavailable from this block manager.
            reportBlockStatus(blockId, BlockStatus.empty)
            throw new BlockNotFoundException(blockId.toString)
        }
      }
    }
    ```
    ```
    def getLocalBytes(blockId: BlockId): Option[BlockData] = {
      logDebug(s"Getting local block $blockId as bytes")
      // As an optimization for map output fetches, if the block is for a shuffle, return it
      // without acquiring a lock; the disk store never deletes (recent) items so this should work
      if (blockId.isShuffle) {
        val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
        // TODO: This should gracefully handle case where local block is not available. Currently
        // downstream code will throw an exception.
        val buf = new ChunkedByteBuffer(
          shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
        Some(new ByteBufferBlockData(buf, true))
      } else {
        blockInfoManager.lockForReading(blockId).map { info => doGetLocalBytes(blockId, info) }
      }
    }
    ```
    the `blockId.isShuffle` is checked twice, but however it seems that in the method calling hierarchy of `BlockManager.getLocalBytes`, the another callsite of the `BlockManager.getLocalBytes` is at `TorrentBroadcast.readBlocks` where the blockId can never be a `ShuffleBlockId`.
    
    ![image](https://user-images.githubusercontent.com/6747355/51963980-1fe55000-24a0-11e9-961a-e10fe67f8119.png)
    
    So I think we should remove these useless code for easy reading.
    
    ## How was this patch tested?
    
    NA
    
    Please review http://spark.apache.org/contributing.html before opening a pull request.
    
    Closes #23693 from liupc/Remove-useless-code-in-BlockManager.
    
    Authored-by: Liupengcheng <li...@xiaomi.com>
    Signed-off-by: Sean Owen <se...@databricks.com>
---
 .../main/scala/org/apache/spark/storage/BlockManager.scala | 14 ++------------
 1 file changed, 2 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index e02870cd..497a5f7 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -632,18 +632,8 @@ private[spark] class BlockManager(
    */
   def getLocalBytes(blockId: BlockId): Option[BlockData] = {
     logDebug(s"Getting local block $blockId as bytes")
-    // As an optimization for map output fetches, if the block is for a shuffle, return it
-    // without acquiring a lock; the disk store never deletes (recent) items so this should work
-    if (blockId.isShuffle) {
-      val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
-      // TODO: This should gracefully handle case where local block is not available. Currently
-      // downstream code will throw an exception.
-      val buf = new ChunkedByteBuffer(
-        shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
-      Some(new ByteBufferBlockData(buf, true))
-    } else {
-      blockInfoManager.lockForReading(blockId).map { info => doGetLocalBytes(blockId, info) }
-    }
+    assert(!blockId.isShuffle, s"Unexpected ShuffleBlockId $blockId")
+    blockInfoManager.lockForReading(blockId).map { info => doGetLocalBytes(blockId, info) }
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org