You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2022/05/28 06:57:00 UTC

[jira] [Commented] (SPARK-39325) Improve MapOutputTracker convertMapStatuses performance

    [ https://issues.apache.org/jira/browse/SPARK-39325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543424#comment-17543424 ] 

Apache Spark commented on SPARK-39325:
--------------------------------------

User 'wankunde' has created a pull request for this issue:
https://github.com/apache/spark/pull/36709

> Improve MapOutputTracker convertMapStatuses performance
> -------------------------------------------------------
>
>                 Key: SPARK-39325
>                 URL: https://issues.apache.org/jira/browse/SPARK-39325
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 3.4.0
>            Reporter: Wan Kun
>            Priority: Major
>
> How to reproduce this issue:
> {code:java}
> val benchmark = new Benchmark("MapStatuses Convert", 1, output = output)
> val blockManagerNumber = 1000
> val mapNumber = 50000
> val shufflePartitions = 10000
> val shuffleId: Int = 0
> // First reduce task will fetch map data from startPartition to endPartition
> val startPartition = 0
> val startMapIndex = 0
> val endMapIndex = mapNumber
> val blockManagers = Array.tabulate(blockManagerNumber) { i =>
>   BlockManagerId("a", "host" + i, 7337)
> }
> val mapStatuses: Array[MapStatus] = Array.tabulate(mapNumber) { mapTaskId =>
>   HighlyCompressedMapStatus(
>     blockManagers(mapTaskId % blockManagerNumber),
>     Array.tabulate(shufflePartitions)(i => if (i % 50 == 0) 1 else 0),
>     mapTaskId)
> }
> val bitmap = new RoaringBitmap()
> Range(0, 4000).foreach(bitmap.add(_))
> val mergeStatuses = Array.tabulate(shufflePartitions) { part =>
>   MergeStatus(blockManagers(part % blockManagerNumber), shuffleId, bitmap, 100)
> }
> Array(499, 999, 1499).foreach { endPartition =>
>   benchmark.addCase(
>     s"Num Maps: $mapNumber Fetch partitions:${endPartition - startPartition + 1}",
>     numIters) { _ =>
>     MapOutputTracker.convertMapStatuses(
>       shuffleId,
>       startPartition,
>       endPartition,
>       mapStatuses,
>       startMapIndex,
>       endMapIndex,
>       Some(mergeStatuses))
>   }
> }
> benchmark.run() {code}
> Benchmark result
> {code:java}
> ================================================================================================ MapStatuses Convert Benchmark ================================================================================================Java HotSpot(TM) 64-Bit Server VM 1.8.0_281-b09 on Mac OS X 10.15.7 Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz MapStatuses Convert:                      Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative ------------------------------------------------------------------------------------------------------------------------ Num Maps: 50000 Fetch partitions:500               3393           3483          96          0.0  3393439257.0       1.0X Num Maps: 50000 Fetch partitions:1000              6640           6772         121          0.0  6639654832.0       0.5X Num Maps: 50000 Fetch partitions:1500             10035          10143         108          0.0 10035100069.0       0.3X
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org