You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yu...@apache.org on 2022/06/11 10:17:21 UTC
[spark] branch master updated: [SPARK-39325][CORE] Improve MapOutputTracker convertMapStatuses performance
This is an automated email from the ASF dual-hosted git repository.
yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b6aea1a8d99 [SPARK-39325][CORE] Improve MapOutputTracker convertMapStatuses performance
b6aea1a8d99 is described below
commit b6aea1a8d99b3d99e91f7f195b23169d3d61b6a7
Author: Kun Wan <wa...@apache.org>
AuthorDate: Sat Jun 11 18:16:43 2022 +0800
[SPARK-39325][CORE] Improve MapOutputTracker convertMapStatuses performance
### What changes were proposed in this pull request?
Optimize `MapOutputTracker.convertMapStatuses()` method.
### Why are the changes needed?
`MapOutputTracker.convertMapStatuses()` will be very slow if there are tens of thousands MapStatuses and MergeStatuses.
Benchmark code:
```java
val benchmark = new Benchmark("MapStatuses Convert", 1, output = output)
val blockManagerNumber = 1000
val mapNumber = 50000
val shufflePartitions = 10000
val shuffleId: Int = 0
// First reduce task will fetch map data from startPartition to endPartition
val startPartition = 0
val startMapIndex = 0
val endMapIndex = mapNumber
val blockManagers = Array.tabulate(blockManagerNumber) { i =>
BlockManagerId("a", "host" + i, 7337)
}
val mapStatuses: Array[MapStatus] = Array.tabulate(mapNumber) { mapTaskId =>
HighlyCompressedMapStatus(
blockManagers(mapTaskId % blockManagerNumber),
Array.tabulate(shufflePartitions)(i => if (i % 50 == 0) 1 else 0),
mapTaskId)
}
val bitmap = new RoaringBitmap()
Range(0, 4000).foreach(bitmap.add(_))
val mergeStatuses = Array.tabulate(shufflePartitions) { part =>
MergeStatus(blockManagers(part % blockManagerNumber), shuffleId, bitmap, 100)
}
Array(499, 999, 1499).foreach { endPartition =>
benchmark.addCase(
s"Num Maps: $mapNumber Fetch partitions:${endPartition - startPartition + 1}",
numIters) { _ =>
MapOutputTracker.convertMapStatuses(
shuffleId,
startPartition,
endPartition,
mapStatuses,
startMapIndex,
endMapIndex,
Some(mergeStatuses))
}
}
```
Before this PR
```
================================================================================================
MapStatuses Convert Benchmark
================================================================================================
Java HotSpot(TM) 64-Bit Server VM 1.8.0_281-b09 on Mac OS X 10.15.7
Intel(R) Core(TM) i9-9980HK CPU 2.40GHz
MapStatuses Convert: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Num Maps: 50000 Fetch partitions:500 3393 3483 96 0.0 3393439257.0 1.0X
Num Maps: 50000 Fetch partitions:1000 6640 6772 121 0.0 6639654832.0 0.5X
Num Maps: 50000 Fetch partitions:1500 10035 10143 108 0.0 10035100069.0 0.3X
```
After this PR
```
================================================================================================
MapStatuses Convert Benchmark
================================================================================================
Java HotSpot(TM) 64-Bit Server VM 1.8.0_281-b09 on Mac OS X 10.15.7
Intel(R) Core(TM) i9-9980HK CPU 2.40GHz
MapStatuses Convert: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Num Maps: 50000 Fetch partitions:500 667 679 15 0.0 666562302.0 1.0X
Num Maps: 50000 Fetch partitions:1000 1285 1397 115 0.0 1284808865.0 0.5X
Num Maps: 50000 Fetch partitions:1500 2045 2068 32 0.0 2044951906.0 0.3X
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Exists UTs.
Closes #36709 from wankunde/convert_mapstatus.
Lead-authored-by: Kun Wan <wa...@apache.org>
Co-authored-by: wankun <wa...@163.com>
Signed-off-by: Yuming Wang <yu...@ebay.com>
---
.../MapStatusesConvertBenchmark-results.txt | 13 +++
.../scala/org/apache/spark/MapOutputTracker.scala | 50 ++++++------
.../org/apache/spark/scheduler/MergeStatus.scala | 9 ---
.../apache/spark/MapStatusesConvertBenchmark.scala | 92 ++++++++++++++++++++++
4 files changed, 130 insertions(+), 34 deletions(-)
diff --git a/core/benchmarks/MapStatusesConvertBenchmark-results.txt b/core/benchmarks/MapStatusesConvertBenchmark-results.txt
new file mode 100644
index 00000000000..f41401bbe2e
--- /dev/null
+++ b/core/benchmarks/MapStatusesConvertBenchmark-results.txt
@@ -0,0 +1,13 @@
+================================================================================================
+MapStatuses Convert Benchmark
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_332-b09 on Linux 5.13.0-1025-azure
+Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
+MapStatuses Convert: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+Num Maps: 50000 Fetch partitions:500 1330 1359 26 0.0 1329827185.0 1.0X
+Num Maps: 50000 Fetch partitions:1000 2648 2666 20 0.0 2647944453.0 0.5X
+Num Maps: 50000 Fetch partitions:1500 4155 4436 383 0.0 4154563448.0 0.3X
+
+
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index e6ed469250b..79cae483b22 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -1596,7 +1596,7 @@ private[spark] object MapOutputTracker extends Logging {
mapStatuses: Array[MapStatus],
startMapIndex : Int,
endMapIndex: Int,
- mergeStatuses: Option[Array[MergeStatus]] = None): MapSizesByExecutorId = {
+ mergeStatusesOpt: Option[Array[MergeStatus]] = None): MapSizesByExecutorId = {
assert (mapStatuses != null)
val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, Long, Int)]]
var enableBatchFetch = true
@@ -1608,39 +1608,39 @@ private[spark] object MapOutputTracker extends Logging {
// TODO: SPARK-35036: Instead of reading map blocks in case of AQE with Push based shuffle,
// TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
// TODO: map indexes
- if (mergeStatuses.exists(_.exists(_ != null)) && startMapIndex == 0
+ if (mergeStatusesOpt.exists(_.exists(_ != null)) && startMapIndex == 0
&& endMapIndex == mapStatuses.length) {
enableBatchFetch = false
logDebug(s"Disable shuffle batch fetch as Push based shuffle is enabled for $shuffleId.")
- // We have MergeStatus and full range of mapIds are requested so return a merged block.
- val numMaps = mapStatuses.length
- mergeStatuses.get.zipWithIndex.slice(startPartition, endPartition).foreach {
- case (mergeStatus, partId) =>
- val remainingMapStatuses = if (mergeStatus != null && mergeStatus.totalSize > 0) {
- // If MergeStatus is available for the given partition, add location of the
- // pre-merged shuffle partition for this partition ID. Here we create a
- // ShuffleMergedBlockId to indicate this is a merged shuffle block.
- splitsByAddress.getOrElseUpdate(mergeStatus.location, ListBuffer()) +=
- ((ShuffleMergedBlockId(shuffleId, mergeStatus.shuffleMergeId, partId),
- mergeStatus.totalSize, SHUFFLE_PUSH_MAP_ID))
- // For the "holes" in this pre-merged shuffle partition, i.e., unmerged mapper
- // shuffle partition blocks, fetch the original map produced shuffle partition blocks
- val mapStatusesWithIndex = mapStatuses.zipWithIndex
- mergeStatus.getMissingMaps(numMaps).map(mapStatusesWithIndex)
- } else {
- // If MergeStatus is not available for the given partition, fall back to
- // fetching all the original mapper shuffle partition blocks
- mapStatuses.zipWithIndex.toSeq
- }
- // Add location for the mapper shuffle partition blocks
- for ((mapStatus, mapIndex) <- remainingMapStatuses) {
- validateStatus(mapStatus, shuffleId, partId)
+ val mergeStatuses = mergeStatusesOpt.get
+ for (partId <- startPartition until endPartition) {
+ val mergeStatus = mergeStatuses(partId)
+ if (mergeStatus != null && mergeStatus.totalSize > 0) {
+ // If MergeStatus is available for the given partition, add location of the
+ // pre-merged shuffle partition for this partition ID. Here we create a
+ // ShuffleMergedBlockId to indicate this is a merged shuffle block.
+ splitsByAddress.getOrElseUpdate(mergeStatus.location, ListBuffer()) +=
+ ((ShuffleMergedBlockId(shuffleId, mergeStatus.shuffleMergeId, partId),
+ mergeStatus.totalSize, SHUFFLE_PUSH_MAP_ID))
+ }
+ }
+
+ // Add location for the mapper shuffle partition blocks
+ for ((mapStatus, mapIndex) <- mapStatuses.iterator.zipWithIndex) {
+ validateStatus(mapStatus, shuffleId, startPartition)
+ for (partId <- startPartition until endPartition) {
+ // For the "holes" in this pre-merged shuffle partition, i.e., unmerged mapper
+ // shuffle partition blocks, fetch the original map produced shuffle partition blocks
+ val mergeStatus = mergeStatuses(partId)
+ if (mergeStatus == null || mergeStatus.totalSize == 0 ||
+ !mergeStatus.tracker.contains(mapIndex)) {
val size = mapStatus.getSizeForBlock(partId)
if (size != 0) {
splitsByAddress.getOrElseUpdate(mapStatus.location, ListBuffer()) +=
((ShuffleBlockId(shuffleId, mapStatus.mapId, partId), size, mapIndex))
}
}
+ }
}
} else {
val iter = mapStatuses.iterator.zipWithIndex
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MergeStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MergeStatus.scala
index 6d160264538..850756b50a3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MergeStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MergeStatus.scala
@@ -58,15 +58,6 @@ private[spark] class MergeStatus(
def tracker: RoaringBitmap = mapTracker
- /**
- * Get the list of mapper IDs for missing mapper partition blocks that are not merged.
- * The reducer will use this information to decide which shuffle partition blocks to
- * fetch in the original way.
- */
- def getMissingMaps(numMaps: Int): Seq[Int] = {
- (0 until numMaps).filter(i => !mapTracker.contains(i))
- }
-
/**
* Get the number of missing map outputs for missing mapper partition blocks that are not merged.
*/
diff --git a/core/src/test/scala/org/apache/spark/MapStatusesConvertBenchmark.scala b/core/src/test/scala/org/apache/spark/MapStatusesConvertBenchmark.scala
new file mode 100644
index 00000000000..7f25c86497f
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/MapStatusesConvertBenchmark.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.roaringbitmap.RoaringBitmap
+
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+import org.apache.spark.scheduler.{HighlyCompressedMapStatus, MapStatus, MergeStatus}
+import org.apache.spark.storage.BlockManagerId
+
+/**
+ * Benchmark to measure performance for converting mapStatuses and mergeStatuses.
+ * To run this benchmark:
+ * {{{
+ * 1. without sbt:
+ * bin/spark-submit --class <this class> --jars <spark core test jar>
+ * 2. build/sbt "core/test:runMain <this class>"
+ * 3. generate result:
+ * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain <this class>"
+ * Results will be written to "benchmarks/MapStatusesConvertBenchmark-results.txt".
+ * }}}
+ * */
+object MapStatusesConvertBenchmark extends BenchmarkBase {
+
+ private def convertMapStatus(numIters: Int): Unit = {
+
+ val benchmark = new Benchmark("MapStatuses Convert", 1, output = output)
+
+ val blockManagerNumber = 1000
+ val mapNumber = 50000
+ val shufflePartitions = 10000
+
+ val shuffleId: Int = 0
+ // First reduce task will fetch map data from startPartition to endPartition
+ val startPartition = 0
+ val startMapIndex = 0
+ val endMapIndex = mapNumber
+ val blockManagers = Array.tabulate(blockManagerNumber) { i =>
+ BlockManagerId("a", "host" + i, 7337)
+ }
+ val mapStatuses: Array[MapStatus] = Array.tabulate(mapNumber) { mapTaskId =>
+ HighlyCompressedMapStatus(
+ blockManagers(mapTaskId % blockManagerNumber),
+ Array.tabulate(shufflePartitions)(i => if (i % 50 == 0) 1 else 0),
+ mapTaskId)
+ }
+ val bitmap = new RoaringBitmap()
+ Range(0, 4000).foreach(bitmap.add(_))
+ val mergeStatuses = Array.tabulate(shufflePartitions) { part =>
+ MergeStatus(blockManagers(part % blockManagerNumber), shuffleId, bitmap, 100)
+ }
+
+ Array(499, 999, 1499).foreach { endPartition =>
+ benchmark.addCase(
+ s"Num Maps: $mapNumber Fetch partitions:${endPartition - startPartition + 1}",
+ numIters) { _ =>
+ MapOutputTracker.convertMapStatuses(
+ shuffleId,
+ startPartition,
+ endPartition,
+ mapStatuses,
+ startMapIndex,
+ endMapIndex,
+ Some(mergeStatuses))
+ }
+ }
+
+ benchmark.run()
+ }
+
+ override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+ val numIters = 3
+ runBenchmark("MapStatuses Convert Benchmark") {
+ convertMapStatus(numIters)
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org