You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (Jira)" <ji...@apache.org> on 2019/10/08 05:42:19 UTC

[jira] [Resolved] (SPARK-24293) Serialized shuffle supports mapSideCombine

     [ https://issues.apache.org/jira/browse/SPARK-24293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-24293.
----------------------------------
    Resolution: Incomplete

> Serialized shuffle supports mapSideCombine
> ------------------------------------------
>
>                 Key: SPARK-24293
>                 URL: https://issues.apache.org/jira/browse/SPARK-24293
>             Project: Spark
>          Issue Type: Brainstorming
>          Components: Shuffle
>    Affects Versions: 2.3.0
>            Reporter: Xianjin YE
>            Priority: Major
>              Labels: bulk-closed
>
> While doing research on integrating my company's internal Shuffle Service with Spark, I found it is possible to support mapSideCombine with serialized shuffle.
> The simple idea is that the `UnsafeShuffleWriter` uses a `Combiner` to accumulate records when mapSideCombine is required before inserting into `ShuffleExternalSorter`. The `Combiner` will tracking it's memory usage or elements accumulated and is never spilled. When the `Combiner` accumulates enough records(varied by different strategies), the accumulated (K, C) pairs are then inserted into the `ShuffleExternalSorter`.  After that, the `Combiner` is reset to empty state.
> After this change, combinedValues are sent to sorter segment by segment, and the `BlockStoreShuffleReader` already handles this case.
> I did a local POC, and looks like that I can get the same result with normal SortShuffle. The performance is not optimized yet. The most significant part of code is shown as below: 
> {code:java}
> // code placeholder
> while (records.hasNext()) {
>   Product2<K, V> record = records.next();
>   if (this.mapSideCombine) {
>     this.aggregator.accumulateRecord(record);
>     if (this.aggregator.accumulatedKeyNum() >= 160_000) { // for poc
>       scala.collection.Iterator<Tuple2<K, C>> combinedIterator = this.aggregator.accumulatedIterator();
>       while (combinedIterator.hasNext()) {
>         insertRecordIntoSorter(combinedIterator.next());
>       }
>       this.aggregator.resetAccumulation();
>     }
>   } else {
>     insertRecordIntoSorter(record);
>   }
> }
> if (this.mapSideCombine && this.aggregator.accumulatedKeyNum() > 0) {
>   scala.collection.Iterator<Tuple2<K, C>> combinedIterator = this.aggregator.accumulatedIterator();
>   while (combinedIterator.hasNext()) {
>     insertRecordIntoSorter(combinedIterator.next());
>   }
>   this.aggregator.resetAccumulation(1);
> }
> {code}
>  
>  Is there something I am missing? cc [~joshrosen] [~cloud_fan] [~XuanYuan]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org