You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/07/26 09:58:10 UTC

[spark] branch branch-3.2 updated: [SPARK-36105][SQL] OptimizeLocalShuffleReader support reading data of multiple mappers in one task

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new ec91818  [SPARK-36105][SQL] OptimizeLocalShuffleReader support reading data of multiple mappers in one task
ec91818 is described below

commit ec91818e14518317667374b8d28fb73fb918fd0a
Author: michaelzhang-db <mi...@databricks.com>
AuthorDate: Mon Jul 26 17:56:58 2021 +0800

    [SPARK-36105][SQL] OptimizeLocalShuffleReader support reading data of multiple mappers in one task
    
    ### What changes were proposed in this pull request?
    Added another partition spec to allow OptimizeLocalShuffleReader rule to read data from multiple mappers if the parallelism is less than the number of mappers.
    
    ### Why are the changes needed?
    Optimization to the OptimizeLocalShuffleReader rule
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Unit tests
    
    Closes #33310 from michaelzhang-db/supportDataFromMultipleMappers.
    
    Authored-by: michaelzhang-db <mi...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 094ae3708f54f5cb208141b345077c0e10cb08f3)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../apache/spark/sql/execution/ShuffledRowRDD.scala  | 19 +++++++++++++++++++
 .../execution/adaptive/CustomShuffleReaderExec.scala |  3 ++-
 .../adaptive/OptimizeLocalShuffleReader.scala        | 20 ++++++++++++++++----
 .../execution/adaptive/AdaptiveQueryExecSuite.scala  |  8 ++++----
 4 files changed, 41 insertions(+), 9 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
index 88c4bf6..47d6119 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
@@ -58,6 +58,12 @@ case class PartialMapperPartitionSpec(
     startReducerIndex: Int,
     endReducerIndex: Int) extends ShufflePartitionSpec
 
+// TODO(SPARK-36234): Consider mapper location and shuffle block size when coalescing mappers
+case class CoalescedMapperPartitionSpec(
+    startMapIndex: Int,
+    endMapIndex: Int,
+    numReducers: Int) extends ShufflePartitionSpec
+
 /**
  * The [[Partition]] used by [[ShuffledRowRDD]].
  */
@@ -181,6 +187,9 @@ class ShuffledRowRDD(
 
       case PartialMapperPartitionSpec(mapIndex, _, _) =>
         tracker.getMapLocation(dependency, mapIndex, mapIndex + 1)
+
+      case CoalescedMapperPartitionSpec(startMapIndex, endMapIndex, numReducers) =>
+        tracker.getMapLocation(dependency, startMapIndex, endMapIndex)
     }
   }
 
@@ -217,6 +226,16 @@ class ShuffledRowRDD(
           endReducerIndex,
           context,
           sqlMetricsReporter)
+
+      case CoalescedMapperPartitionSpec(startMapIndex, endMapIndex, numReducers) =>
+        SparkEnv.get.shuffleManager.getReader(
+          dependency.shuffleHandle,
+          startMapIndex,
+          endMapIndex,
+          0,
+          numReducers,
+          context,
+          sqlMetricsReporter)
     }
     reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(_._2)
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
index 8975054..b8aef14 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
@@ -105,7 +105,8 @@ case class CustomShuffleReaderExec private(
     partitionSpecs.exists(_.isInstanceOf[PartialReducerPartitionSpec])
 
   def isLocalReader: Boolean =
-    partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec])
+    partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec]) ||
+      partitionSpecs.exists(_.isInstanceOf[CoalescedMapperPartitionSpec])
 
   private def shuffleStage = child match {
     case stage: ShuffleQueryStageExec => Some(stage)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
index 2103145..3e3d6d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
@@ -72,10 +72,22 @@ object OptimizeLocalShuffleReader extends CustomShuffleReaderRule {
     assert(numMappers > 0)
     val numReducers = shuffleStage.shuffle.numPartitions
     val expectedParallelism = advisoryParallelism.getOrElse(numReducers)
-    val splitPoints = equallyDivide(numReducers, math.max(1, expectedParallelism / numMappers))
-    (0 until numMappers).flatMap { mapIndex =>
-      (splitPoints :+ numReducers).sliding(2).map {
-        case Seq(start, end) => PartialMapperPartitionSpec(mapIndex, start, end)
+    val splitPoints = if (expectedParallelism >= numMappers) {
+      equallyDivide(numReducers, expectedParallelism / numMappers)
+    } else {
+      equallyDivide(numMappers, expectedParallelism)
+    }
+    if (expectedParallelism >= numMappers) {
+      (0 until numMappers).flatMap { mapIndex =>
+        (splitPoints :+ numReducers).sliding(2).map {
+          case Seq(start, end) => PartialMapperPartitionSpec(mapIndex, start, end)
+        }
+      }
+    } else {
+      (0 until 1).flatMap { _ =>
+        (splitPoints :+ numMappers).sliding(2).map {
+          case Seq(start, end) => CoalescedMapperPartitionSpec(start, end, numReducers)
+        }
       }
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index b56e673..8a74981 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -201,13 +201,13 @@ class AdaptiveQueryExecSuite
       // The pre-shuffle partition size is [0, 0, 0, 72, 0]
       // We exclude the 0-size partitions, so only one partition, advisoryParallelism = 1
       // the final parallelism is
-      // math.max(1, advisoryParallelism / numMappers): math.max(1, 1/2) = 1
-      // and the partitions length is 1 * numMappers = 2
-      assert(localShuffleRDD0.getPartitions.length == 2)
+      // advisoryParallelism = 1 since advisoryParallelism < numMappers
+      // and the partitions length is 1
+      assert(localShuffleRDD0.getPartitions.length == 1)
       // The pre-shuffle partition size is [0, 72, 0, 72, 126]
       // We exclude the 0-size partitions, so only 3 partition, advisoryParallelism = 3
       // the final parallelism is
-      // math.max(1, advisoryParallelism / numMappers): math.max(1, 3/2) = 1
+      // advisoryParallelism / numMappers: 3/2 = 1 since advisoryParallelism >= numMappers
       // and the partitions length is 1 * numMappers = 2
       assert(localShuffleRDD1.getPartitions.length == 2)
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org