You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2019/04/25 11:48:00 UTC

[jira] [Assigned] (SPARK-27562) Complete the verification mechanism for shuffle transmitted data

     [ https://issues.apache.org/jira/browse/SPARK-27562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apache Spark reassigned SPARK-27562:
------------------------------------

    Assignee: Apache Spark

> Complete the verification mechanism for shuffle transmitted data
> ----------------------------------------------------------------
>
>                 Key: SPARK-27562
>                 URL: https://issues.apache.org/jira/browse/SPARK-27562
>             Project: Spark
>          Issue Type: Improvement
>          Components: Shuffle
>    Affects Versions: 2.4.0
>            Reporter: feiwang
>            Assignee: Apache Spark
>            Priority: Major
>
> We've seen some shuffle data corruption during shuffle read phase. 
> As described in SPARK-26089, spark only checks small  shuffle blocks before PR #23453, which is proposed by ankuriitg.
> There are two changes/improvements that are made in PR #23453.
> 1. Large blocks are checked upto maxBytesInFlight/3 size in a similar way as smaller blocks, so if a
> large block is corrupt in the starting, that block will be re-fetched and if that also fails,
> FetchFailureException will be thrown.
> 2. If large blocks are corrupt after size maxBytesInFlight/3, then any IOException thrown while
> reading the stream will be converted to FetchFailureException. This is slightly more aggressive
> than was originally intended but since the consumer of the stream may have already read some records and processed them, we can't just re-fetch the block, we need to fail the whole task. Additionally, we also thought about maybe adding a new type of TaskEndReason, which would re-try the task couple of times before failing the previous stage, but given the complexity involved in that solution we decided to not proceed in that direction.
> However, I think there still exists some problems with the current shuffle transmitted data verification mechanism:
> - For a large block, it is checked upto  maxBytesInFlight/3 size when fetching shuffle data. So if a large block is corrupt after size maxBytesInFlight/3, it can not be detected in data fetch phase.  This has been described in the previous section.
> - Only the compressed or wrapped blocks are checked, I think we should also check thease blocks which are not wrapped.
> We complete the verification mechanism for shuffle transmitted data:
> Firstly, we choose crc32 for the checksum verification  of shuffle data.
> Crc is also used for checksum verification in hadoop, it is simple and fast.
> In shuffle write phase, after completing the partitionedFile, we compute 
> the crc32 value for each partition and then write these digests with the indexs into shuffle index file.
> For the sortShuffleWriter and unsafe shuffle writer, there is only one partitionedFile for a shuffleMapTask, so the compution of digests(compute the digests for each partition depend on the indexs of this partitionedFile) is  cheap.
> For the bypassShuffleWriter, the reduce partitions is little than byPassMergeThreshold, the cost of digests compution is acceptable.
> In shuffle read phase, the digest value will be passed with the block data.
> And we will recompute the digest of the data obtained to compare with the origin digest value.
> When recomputing the digest of data obtained, it only need an additional buffer(2048Bytes) for computing crc32 value.
> After recomputing, we will reset the obtained data inputStream, if it is markSupported we only need reset it, otherwise it is a fileSegmentManagerBuffer, we need recreate it.
> So, this verification mechanism  proposed for shuffle transmitted data is cheap and complete.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org