You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "liupengcheng (JIRA)" <ji...@apache.org> on 2019/01/29 11:28:00 UTC
[jira] [Updated] (SPARK-26768) Remove useless code in BlockManager
[ https://issues.apache.org/jira/browse/SPARK-26768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
liupengcheng updated SPARK-26768:
---------------------------------
Description:
Recently, when I was reading some code of `BlockManager.getBlockData`, I found that there are useless code that would never reach. The related codes is as below:
{code:java}
override def getBlockData(blockId: BlockId): ManagedBuffer = {
if (blockId.isShuffle) {
shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
} else {
getLocalBytes(blockId) match {
case Some(blockData) =>
new BlockManagerManagedBuffer(blockInfoManager, blockId, blockData, true)
case None =>
// If this block manager receives a request for a block that it doesn't have then it's
// likely that the master has outdated block statuses for this block. Therefore, we send
// an RPC so that this block is marked as being unavailable from this block manager.
reportBlockStatus(blockId, BlockStatus.empty)
throw new BlockNotFoundException(blockId.toString)
}
}
}
{code}
{code:java}
def getLocalBytes(blockId: BlockId): Option[BlockData] = {
logDebug(s"Getting local block $blockId as bytes")
// As an optimization for map output fetches, if the block is for a shuffle, return it
// without acquiring a lock; the disk store never deletes (recent) items so this should work
if (blockId.isShuffle) {
val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
// TODO: This should gracefully handle case where local block is not available. Currently
// downstream code will throw an exception.
val buf = new ChunkedByteBuffer(
shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
Some(new ByteBufferBlockData(buf, true))
} else {
blockInfoManager.lockForReading(blockId).map { info => doGetLocalBytes(blockId, info) }
}
}
{code}
the `blockId.isShuffle` is checked twice, but however it seems that in the method calling hierarchy of `BlockManager.getLocalBytes`, the another callsite of the `BlockManager.getLocalBytes` is at `TorrentBroadcast.readBlocks` where the blockId can never be a `ShuffleBlockId`.
!Selection_037.jpg!
So I think we should remove these useless code for easy reading.
was:
Recently, when I was reading some code of `BlockManager.getBlockData`, I found that there are useless code that would never reach. The related codes is as below:
{code:java}
override def getBlockData(blockId: BlockId): ManagedBuffer = {
if (blockId.isShuffle) {
shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
} else {
getLocalBytes(blockId) match {
case Some(blockData) =>
new BlockManagerManagedBuffer(blockInfoManager, blockId, blockData, true)
case None =>
// If this block manager receives a request for a block that it doesn't have then it's
// likely that the master has outdated block statuses for this block. Therefore, we send
// an RPC so that this block is marked as being unavailable from this block manager.
reportBlockStatus(blockId, BlockStatus.empty)
throw new BlockNotFoundException(blockId.toString)
}
}
}
{code}
{code:java}
def getLocalBytes(blockId: BlockId): Option[BlockData] = {
logDebug(s"Getting local block $blockId as bytes")
// As an optimization for map output fetches, if the block is for a shuffle, return it
// without acquiring a lock; the disk store never deletes (recent) items so this should work
if (blockId.isShuffle) {
val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
// TODO: This should gracefully handle case where local block is not available. Currently
// downstream code will throw an exception.
val buf = new ChunkedByteBuffer(
shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
Some(new ByteBufferBlockData(buf, true))
} else {
blockInfoManager.lockForReading(blockId).map { info => doGetLocalBytes(blockId, info) }
}
}
{code}
the `blockId.isShuffle` is checked twice, but however it seems that in the method calling hierarchy of `BlockManager.getLocalBytes`, the another callsite of the `BlockManager.getLocalBytes` is at `TorrentBroadcast.readBlocks` where the blockId can never be a `ShuffleBlockId`.
So I think we should remove these useless code for easy reading.
> Remove useless code in BlockManager
> -----------------------------------
>
> Key: SPARK-26768
> URL: https://issues.apache.org/jira/browse/SPARK-26768
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core
> Affects Versions: 2.4.0
> Reporter: liupengcheng
> Priority: Major
> Attachments: Selection_037.jpg
>
>
> Recently, when I was reading some code of `BlockManager.getBlockData`, I found that there are useless code that would never reach. The related codes is as below:
>
> {code:java}
> override def getBlockData(blockId: BlockId): ManagedBuffer = {
> if (blockId.isShuffle) {
> shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
> } else {
> getLocalBytes(blockId) match {
> case Some(blockData) =>
> new BlockManagerManagedBuffer(blockInfoManager, blockId, blockData, true)
> case None =>
> // If this block manager receives a request for a block that it doesn't have then it's
> // likely that the master has outdated block statuses for this block. Therefore, we send
> // an RPC so that this block is marked as being unavailable from this block manager.
> reportBlockStatus(blockId, BlockStatus.empty)
> throw new BlockNotFoundException(blockId.toString)
> }
> }
> }
> {code}
> {code:java}
> def getLocalBytes(blockId: BlockId): Option[BlockData] = {
> logDebug(s"Getting local block $blockId as bytes")
> // As an optimization for map output fetches, if the block is for a shuffle, return it
> // without acquiring a lock; the disk store never deletes (recent) items so this should work
> if (blockId.isShuffle) {
> val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
> // TODO: This should gracefully handle case where local block is not available. Currently
> // downstream code will throw an exception.
> val buf = new ChunkedByteBuffer(
> shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
> Some(new ByteBufferBlockData(buf, true))
> } else {
> blockInfoManager.lockForReading(blockId).map { info => doGetLocalBytes(blockId, info) }
> }
> }
> {code}
> the `blockId.isShuffle` is checked twice, but however it seems that in the method calling hierarchy of `BlockManager.getLocalBytes`, the another callsite of the `BlockManager.getLocalBytes` is at `TorrentBroadcast.readBlocks` where the blockId can never be a `ShuffleBlockId`.
> !Selection_037.jpg!
> So I think we should remove these useless code for easy reading.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org