You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yu...@apache.org on 2022/06/11 10:17:21 UTC

[spark] branch master updated: [SPARK-39325][CORE] Improve MapOutputTracker convertMapStatuses performance

This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b6aea1a8d99 [SPARK-39325][CORE] Improve MapOutputTracker convertMapStatuses performance
b6aea1a8d99 is described below

commit b6aea1a8d99b3d99e91f7f195b23169d3d61b6a7
Author: Kun Wan <wa...@apache.org>
AuthorDate: Sat Jun 11 18:16:43 2022 +0800

    [SPARK-39325][CORE] Improve MapOutputTracker convertMapStatuses performance
    
    ### What changes were proposed in this pull request?
    
    Optimize `MapOutputTracker.convertMapStatuses()` method.
    
    ### Why are the changes needed?
    
    `MapOutputTracker.convertMapStatuses()` will be very slow if there are tens of thousands MapStatuses and MergeStatuses.
    
    Benchmark code:
    ```java
      val benchmark = new Benchmark("MapStatuses Convert", 1, output = output)
    
        val blockManagerNumber = 1000
        val mapNumber = 50000
        val shufflePartitions = 10000
    
        val shuffleId: Int = 0
        // First reduce task will fetch map data from startPartition to endPartition
        val startPartition = 0
        val startMapIndex = 0
        val endMapIndex = mapNumber
        val blockManagers = Array.tabulate(blockManagerNumber) { i =>
          BlockManagerId("a", "host" + i, 7337)
        }
        val mapStatuses: Array[MapStatus] = Array.tabulate(mapNumber) { mapTaskId =>
          HighlyCompressedMapStatus(
            blockManagers(mapTaskId % blockManagerNumber),
            Array.tabulate(shufflePartitions)(i => if (i % 50 == 0) 1 else 0),
            mapTaskId)
        }
        val bitmap = new RoaringBitmap()
        Range(0, 4000).foreach(bitmap.add(_))
        val mergeStatuses = Array.tabulate(shufflePartitions) { part =>
          MergeStatus(blockManagers(part % blockManagerNumber), shuffleId, bitmap, 100)
        }
    
        Array(499, 999, 1499).foreach { endPartition =>
          benchmark.addCase(
            s"Num Maps: $mapNumber Fetch partitions:${endPartition - startPartition + 1}",
            numIters) { _ =>
            MapOutputTracker.convertMapStatuses(
              shuffleId,
              startPartition,
              endPartition,
              mapStatuses,
              startMapIndex,
              endMapIndex,
              Some(mergeStatuses))
          }
        }
    ```
    
    Before this PR
    ```
    ================================================================================================
    MapStatuses Convert Benchmark
    ================================================================================================
    
    Java HotSpot(TM) 64-Bit Server VM 1.8.0_281-b09 on Mac OS X 10.15.7
    Intel(R) Core(TM) i9-9980HK CPU  2.40GHz
    MapStatuses Convert:                      Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    ------------------------------------------------------------------------------------------------------------------------
    Num Maps: 50000 Fetch partitions:500               3393           3483          96          0.0  3393439257.0       1.0X
    Num Maps: 50000 Fetch partitions:1000              6640           6772         121          0.0  6639654832.0       0.5X
    Num Maps: 50000 Fetch partitions:1500             10035          10143         108          0.0 10035100069.0       0.3X
    ```
    
    After this PR
    ```
    ================================================================================================
    MapStatuses Convert Benchmark
    ================================================================================================
    
    Java HotSpot(TM) 64-Bit Server VM 1.8.0_281-b09 on Mac OS X 10.15.7
    Intel(R) Core(TM) i9-9980HK CPU  2.40GHz
    MapStatuses Convert:                      Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    ------------------------------------------------------------------------------------------------------------------------
    Num Maps: 50000 Fetch partitions:500                667            679          15          0.0   666562302.0       1.0X
    Num Maps: 50000 Fetch partitions:1000              1285           1397         115          0.0  1284808865.0       0.5X
    Num Maps: 50000 Fetch partitions:1500              2045           2068          32          0.0  2044951906.0       0.3X
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Exists UTs.
    
    Closes #36709 from wankunde/convert_mapstatus.
    
    Lead-authored-by: Kun Wan <wa...@apache.org>
    Co-authored-by: wankun <wa...@163.com>
    Signed-off-by: Yuming Wang <yu...@ebay.com>
---
 .../MapStatusesConvertBenchmark-results.txt        | 13 +++
 .../scala/org/apache/spark/MapOutputTracker.scala  | 50 ++++++------
 .../org/apache/spark/scheduler/MergeStatus.scala   |  9 ---
 .../apache/spark/MapStatusesConvertBenchmark.scala | 92 ++++++++++++++++++++++
 4 files changed, 130 insertions(+), 34 deletions(-)

diff --git a/core/benchmarks/MapStatusesConvertBenchmark-results.txt b/core/benchmarks/MapStatusesConvertBenchmark-results.txt
new file mode 100644
index 00000000000..f41401bbe2e
--- /dev/null
+++ b/core/benchmarks/MapStatusesConvertBenchmark-results.txt
@@ -0,0 +1,13 @@
+================================================================================================
+MapStatuses Convert Benchmark
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_332-b09 on Linux 5.13.0-1025-azure
+Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
+MapStatuses Convert:                      Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+Num Maps: 50000 Fetch partitions:500               1330           1359          26          0.0  1329827185.0       1.0X
+Num Maps: 50000 Fetch partitions:1000              2648           2666          20          0.0  2647944453.0       0.5X
+Num Maps: 50000 Fetch partitions:1500              4155           4436         383          0.0  4154563448.0       0.3X
+
+
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index e6ed469250b..79cae483b22 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -1596,7 +1596,7 @@ private[spark] object MapOutputTracker extends Logging {
       mapStatuses: Array[MapStatus],
       startMapIndex : Int,
       endMapIndex: Int,
-      mergeStatuses: Option[Array[MergeStatus]] = None): MapSizesByExecutorId = {
+      mergeStatusesOpt: Option[Array[MergeStatus]] = None): MapSizesByExecutorId = {
     assert (mapStatuses != null)
     val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, Long, Int)]]
     var enableBatchFetch = true
@@ -1608,39 +1608,39 @@ private[spark] object MapOutputTracker extends Logging {
     // TODO: SPARK-35036: Instead of reading map blocks in case of AQE with Push based shuffle,
     // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
     // TODO: map indexes
-    if (mergeStatuses.exists(_.exists(_ != null)) && startMapIndex == 0
+    if (mergeStatusesOpt.exists(_.exists(_ != null)) && startMapIndex == 0
       && endMapIndex == mapStatuses.length) {
       enableBatchFetch = false
       logDebug(s"Disable shuffle batch fetch as Push based shuffle is enabled for $shuffleId.")
-      // We have MergeStatus and full range of mapIds are requested so return a merged block.
-      val numMaps = mapStatuses.length
-      mergeStatuses.get.zipWithIndex.slice(startPartition, endPartition).foreach {
-        case (mergeStatus, partId) =>
-          val remainingMapStatuses = if (mergeStatus != null && mergeStatus.totalSize > 0) {
-            // If MergeStatus is available for the given partition, add location of the
-            // pre-merged shuffle partition for this partition ID. Here we create a
-            // ShuffleMergedBlockId to indicate this is a merged shuffle block.
-            splitsByAddress.getOrElseUpdate(mergeStatus.location, ListBuffer()) +=
-              ((ShuffleMergedBlockId(shuffleId, mergeStatus.shuffleMergeId, partId),
-                mergeStatus.totalSize, SHUFFLE_PUSH_MAP_ID))
-            // For the "holes" in this pre-merged shuffle partition, i.e., unmerged mapper
-            // shuffle partition blocks, fetch the original map produced shuffle partition blocks
-            val mapStatusesWithIndex = mapStatuses.zipWithIndex
-            mergeStatus.getMissingMaps(numMaps).map(mapStatusesWithIndex)
-          } else {
-            // If MergeStatus is not available for the given partition, fall back to
-            // fetching all the original mapper shuffle partition blocks
-            mapStatuses.zipWithIndex.toSeq
-          }
-          // Add location for the mapper shuffle partition blocks
-          for ((mapStatus, mapIndex) <- remainingMapStatuses) {
-            validateStatus(mapStatus, shuffleId, partId)
+      val mergeStatuses = mergeStatusesOpt.get
+      for (partId <- startPartition until endPartition) {
+        val mergeStatus = mergeStatuses(partId)
+        if (mergeStatus != null && mergeStatus.totalSize > 0) {
+          // If MergeStatus is available for the given partition, add location of the
+          // pre-merged shuffle partition for this partition ID. Here we create a
+          // ShuffleMergedBlockId to indicate this is a merged shuffle block.
+          splitsByAddress.getOrElseUpdate(mergeStatus.location, ListBuffer()) +=
+            ((ShuffleMergedBlockId(shuffleId, mergeStatus.shuffleMergeId, partId),
+              mergeStatus.totalSize, SHUFFLE_PUSH_MAP_ID))
+        }
+      }
+
+      // Add location for the mapper shuffle partition blocks
+      for ((mapStatus, mapIndex) <- mapStatuses.iterator.zipWithIndex) {
+        validateStatus(mapStatus, shuffleId, startPartition)
+        for (partId <- startPartition until endPartition) {
+          // For the "holes" in this pre-merged shuffle partition, i.e., unmerged mapper
+          // shuffle partition blocks, fetch the original map produced shuffle partition blocks
+          val mergeStatus = mergeStatuses(partId)
+          if (mergeStatus == null || mergeStatus.totalSize == 0 ||
+            !mergeStatus.tracker.contains(mapIndex)) {
             val size = mapStatus.getSizeForBlock(partId)
             if (size != 0) {
               splitsByAddress.getOrElseUpdate(mapStatus.location, ListBuffer()) +=
                 ((ShuffleBlockId(shuffleId, mapStatus.mapId, partId), size, mapIndex))
             }
           }
+        }
       }
     } else {
       val iter = mapStatuses.iterator.zipWithIndex
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MergeStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MergeStatus.scala
index 6d160264538..850756b50a3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MergeStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MergeStatus.scala
@@ -58,15 +58,6 @@ private[spark] class MergeStatus(
 
   def tracker: RoaringBitmap = mapTracker
 
-  /**
-   * Get the list of mapper IDs for missing mapper partition blocks that are not merged.
-   * The reducer will use this information to decide which shuffle partition blocks to
-   * fetch in the original way.
-   */
-  def getMissingMaps(numMaps: Int): Seq[Int] = {
-    (0 until numMaps).filter(i => !mapTracker.contains(i))
-  }
-
   /**
    * Get the number of missing map outputs for missing mapper partition blocks that are not merged.
    */
diff --git a/core/src/test/scala/org/apache/spark/MapStatusesConvertBenchmark.scala b/core/src/test/scala/org/apache/spark/MapStatusesConvertBenchmark.scala
new file mode 100644
index 00000000000..7f25c86497f
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/MapStatusesConvertBenchmark.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.roaringbitmap.RoaringBitmap
+
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+import org.apache.spark.scheduler.{HighlyCompressedMapStatus, MapStatus, MergeStatus}
+import org.apache.spark.storage.BlockManagerId
+
+/**
+ * Benchmark to measure performance for converting mapStatuses and mergeStatuses.
+ * To run this benchmark:
+ * {{{
+ *   1. without sbt:
+ *      bin/spark-submit --class <this class> --jars <spark core test jar>
+ *   2. build/sbt "core/test:runMain <this class>"
+ *   3. generate result:
+ *      SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain <this class>"
+ *      Results will be written to "benchmarks/MapStatusesConvertBenchmark-results.txt".
+ * }}}
+ * */
+object MapStatusesConvertBenchmark extends BenchmarkBase {
+
+  private def convertMapStatus(numIters: Int): Unit = {
+
+    val benchmark = new Benchmark("MapStatuses Convert", 1, output = output)
+
+    val blockManagerNumber = 1000
+    val mapNumber = 50000
+    val shufflePartitions = 10000
+
+    val shuffleId: Int = 0
+    // First reduce task will fetch map data from startPartition to endPartition
+    val startPartition = 0
+    val startMapIndex = 0
+    val endMapIndex = mapNumber
+    val blockManagers = Array.tabulate(blockManagerNumber) { i =>
+      BlockManagerId("a", "host" + i, 7337)
+    }
+    val mapStatuses: Array[MapStatus] = Array.tabulate(mapNumber) { mapTaskId =>
+      HighlyCompressedMapStatus(
+        blockManagers(mapTaskId % blockManagerNumber),
+        Array.tabulate(shufflePartitions)(i => if (i % 50 == 0) 1 else 0),
+        mapTaskId)
+    }
+    val bitmap = new RoaringBitmap()
+    Range(0, 4000).foreach(bitmap.add(_))
+    val mergeStatuses = Array.tabulate(shufflePartitions) { part =>
+      MergeStatus(blockManagers(part % blockManagerNumber), shuffleId, bitmap, 100)
+    }
+
+    Array(499, 999, 1499).foreach { endPartition =>
+      benchmark.addCase(
+        s"Num Maps: $mapNumber Fetch partitions:${endPartition - startPartition + 1}",
+        numIters) { _ =>
+        MapOutputTracker.convertMapStatuses(
+          shuffleId,
+          startPartition,
+          endPartition,
+          mapStatuses,
+          startMapIndex,
+          endMapIndex,
+          Some(mergeStatuses))
+      }
+    }
+
+    benchmark.run()
+  }
+
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    val numIters = 3
+    runBenchmark("MapStatuses Convert Benchmark") {
+      convertMapStatus(numIters)
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org