You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (Jira)" <ji...@apache.org> on 2019/10/08 05:42:19 UTC
[jira] [Resolved] (SPARK-24293) Serialized shuffle supports
mapSideCombine
[ https://issues.apache.org/jira/browse/SPARK-24293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon resolved SPARK-24293.
----------------------------------
Resolution: Incomplete
> Serialized shuffle supports mapSideCombine
> ------------------------------------------
>
> Key: SPARK-24293
> URL: https://issues.apache.org/jira/browse/SPARK-24293
> Project: Spark
> Issue Type: Brainstorming
> Components: Shuffle
> Affects Versions: 2.3.0
> Reporter: Xianjin YE
> Priority: Major
> Labels: bulk-closed
>
> While doing research on integrating my company's internal Shuffle Service with Spark, I found it is possible to support mapSideCombine with serialized shuffle.
> The simple idea is that the `UnsafeShuffleWriter` uses a `Combiner` to accumulate records when mapSideCombine is required before inserting into `ShuffleExternalSorter`. The `Combiner` will tracking it's memory usage or elements accumulated and is never spilled. When the `Combiner` accumulates enough records(varied by different strategies), the accumulated (K, C) pairs are then inserted into the `ShuffleExternalSorter`. After that, the `Combiner` is reset to empty state.
> After this change, combinedValues are sent to sorter segment by segment, and the `BlockStoreShuffleReader` already handles this case.
> I did a local POC, and looks like that I can get the same result with normal SortShuffle. The performance is not optimized yet. The most significant part of code is shown as below:
> {code:java}
> // code placeholder
> while (records.hasNext()) {
> Product2<K, V> record = records.next();
> if (this.mapSideCombine) {
> this.aggregator.accumulateRecord(record);
> if (this.aggregator.accumulatedKeyNum() >= 160_000) { // for poc
> scala.collection.Iterator<Tuple2<K, C>> combinedIterator = this.aggregator.accumulatedIterator();
> while (combinedIterator.hasNext()) {
> insertRecordIntoSorter(combinedIterator.next());
> }
> this.aggregator.resetAccumulation();
> }
> } else {
> insertRecordIntoSorter(record);
> }
> }
> if (this.mapSideCombine && this.aggregator.accumulatedKeyNum() > 0) {
> scala.collection.Iterator<Tuple2<K, C>> combinedIterator = this.aggregator.accumulatedIterator();
> while (combinedIterator.hasNext()) {
> insertRecordIntoSorter(combinedIterator.next());
> }
> this.aggregator.resetAccumulation(1);
> }
> {code}
>
> Is there something I am missing? cc [~joshrosen] [~cloud_fan] [~XuanYuan]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org