You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Mridul Muralidharan (Jira)" <ji...@apache.org> on 2022/01/29 03:17:00 UTC

[jira] [Resolved] (SPARK-37675) Push-based merge finalization bugs in the RemoteBlockPushResolver

     [ https://issues.apache.org/jira/browse/SPARK-37675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Mridul Muralidharan resolved SPARK-37675.
-----------------------------------------
    Fix Version/s: 3.3.0
                   3.2.2
       Resolution: Fixed

Issue resolved by pull request 35325
[https://github.com/apache/spark/pull/35325]

> Push-based merge finalization bugs in the RemoteBlockPushResolver
> -----------------------------------------------------------------
>
>                 Key: SPARK-37675
>                 URL: https://issues.apache.org/jira/browse/SPARK-37675
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Shuffle
>    Affects Versions: 3.2.0
>            Reporter: Cheng Pan
>            Assignee: Chandni Singh
>            Priority: Major
>             Fix For: 3.3.0, 3.2.2
>
>
> We identified 3 issues in the handling of merge finalization requests in the RemoteBlockPushResolver:
> 1. Empty merge data
> If the shuffle gets finalized while a reducer partition is still receiving its first block, when merger finalizes that partition, we will end up with no data in the files - as it gets truncated to the last good position (which will be 0 in this case).
> Even though no data exists for the reducer - we still add it to result (merged reducerIds).
> 2. Overwriting of the merged data file of a reduce partition after it is finalized
> This is a more involved issue where some specific set of situations must occur, and starts with how our check for a {{too late block}} is done [here|https://github.com/apache/spark/blob/50758ab1a3d6a5f73a2419149a1420d103930f77/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java#L180].
> The example below gives more details, but in a nutshell we have the following for a DETERMINATE shuffle:
>  # Merge starts, blocks are accepted.
>  # Merge is finalized.
>  ** Files closed, status reported to driver, appShuffleInfo.shuffles cleaned up.
>  # Late block push from an executor received.
>  ** Request for a reducer for which merger never received a data until then - so no on-disk files
>  ** Our check does not catch this case - we end up (re-) starting merge.
>  # Executor could now push blocks for reducers which were finalized earlier.
>  ** Files are truncated.
>  # Reads will see inconsistent state due to the ongoing writes.
> Explaining this with an example with for a DETERMINATE shuffleId 1, shuffleMergeId 0, and reduce partitions 100 and 200:
>  # shufflePush_1_0_0_100 is received by the RemoteBlockPushResolver.
>  ## No meta information existed for shuffle 1 so shuffle service creates AppShuffleMergePartitionsInfo for shuffle 1 and shuffleMerge 0 to start merge.
>  ## Merge starts with RemoteBlockPushResolver and it creates the data file for the merger request shuffleMerged_$APP_ID_1_0_100.data (along with index/meta files)
>  # FinalizeShuffleMerge message for shuffleId 1 and shuffleMerged 0 is received by RemoteBlockPushResolver. In a thread safe manner:
>  ## AppShuffleMergePartitionsInfo for shuffle 1 is removed from the map in memory.
>  ## shuffleMerged_$APP_ID_1_0_100.data/index/meta files are closed.
>  ## Driver is informed that partition 100 of shuffleId 1/mergeId 0 was merged.
>  # shufflePush_1_0_0_200 is received by the RemoteBlockPushResolver.
>  ## A new AppShuffleMergePartitionsInfo is added since:
>  ### There is no AppShuffleMergePartitionsInfo for shuffle 1/merged id 0 - as it was removed during finalization, and
>  ### The merger had never received data for partition 200 until then.
>  ## With this, shuffleMerged…200.data is created, and on that merger, merge for shuffleId 1/mergeId 0 starts again.
>  # shufflePush_1_0_5_100 is received by the RemoteBlockPushResolver. We randomize the order of pushes, so late pushes from an executor can end up pushing reducer 200 followed by data for reducer 100.
>  ## AppShuffleMergePartitionsInfo was created for shuffle 1 and shuffleMerged 0 in 3-1 which doesn’t have the reduce id 100, the data/index/meta files for these partitions will be recreated. Reference code.
> 3. Throwing exception in the finalization of a shuffle for which the shuffle server didn't receive any blocks.
> For very small stages and with low minCompletedPushRatio/minShuffleSizeToWait, the driver can initiate the finalization of a shuffle right away. The shuffle server may not receive any push blocks and so there will not be a {{AppShuffleMergePartitionsInfo}} instance corresponding to the shuffle in the state. In this case, we should mark the shuffle as finalized and return empty results.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org