You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2014/09/16 18:21:33 UTC

[jira] [Commented] (SPARK-3551) Remove redundant putting FetchResult which means Fetch Fail when Remote fetching

    [ https://issues.apache.org/jira/browse/SPARK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14135666#comment-14135666 ] 

Apache Spark commented on SPARK-3551:
-------------------------------------

User 'sarutak' has created a pull request for this issue:
https://github.com/apache/spark/pull/2413

> Remove redundant putting FetchResult which means Fetch Fail when Remote fetching
> --------------------------------------------------------------------------------
>
>                 Key: SPARK-3551
>                 URL: https://issues.apache.org/jira/browse/SPARK-3551
>             Project: Spark
>          Issue Type: Improvement
>          Components: Shuffle, Spark Core
>    Affects Versions: 1.2.0
>            Reporter: Kousuke Saruta
>            Priority: Minor
>
> In ShuffleBlockFetcherIterator#fetchLocalBlocks, when local fetch is failed, only first fail is put in results (LinkedBlockingQueue).
> {code}
>   private[this] def fetchLocalBlocks() {
>     // Get the local blocks while remote blocks are being fetched. Note that it's okay to do
>     // these all at once because they will just memory-map some files, so they won't consume
>     // any memory that might exceed our maxBytesInFlight
>     for (id <- localBlocks) {
>       try {
>         shuffleMetrics.localBlocksFetched += 1
>         results.put(new FetchResult(
>           id, 0, () => blockManager.getLocalShuffleFromDisk(id, serializer).get))
>         logDebug("Got local block " + id)
>       } catch {
>         case e: Exception =>
>           logError(s"Error occurred while fetching local blocks", e)
>           results.put(new FetchResult(id, -1, null))
>           return                              <-------------------------------- fail fast.
>       }
>     }
> {code}
> But, in ShuffleBlockFetcherIterator#sendRequest, all of failed results are put in results.
> {code}
>         override def onBlockFetchFailure(e: Throwable): Unit = {
>           logError("Failed to get block(s) from ${req.address.host}:${req.address.port}", e)
>            // Note that there is a chance that some blocks have been fetched successfully, but we
>            // still add them to the failed queue. This is fine because when the caller see a
>            // FetchFailedException, it is going to fail the entire task anyway.
>           for ((blockId, size) <- req.blocks) {
>             results.put(new FetchResult(blockId, -1, null))
>           }
> {code}
> I think, putting all of failed results is useless because when BlockStoreShuffleFetcher#unpackBlock meets first failed result, then it throws FetchFailedException.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org