You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/07/01 01:43:41 UTC

[spark] branch master updated: [SPARK-35888][SQL][FOLLOWUP] Return partition specs for all the shuffles

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new cd6a463  [SPARK-35888][SQL][FOLLOWUP] Return partition specs for all the shuffles
cd6a463 is described below

commit cd6a4638110ef3f0db8b6366be680870dfb0bcad
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Thu Jul 1 01:43:11 2021 +0000

    [SPARK-35888][SQL][FOLLOWUP] Return partition specs for all the shuffles
    
    ### What changes were proposed in this pull request?
    
    This is a followup of https://github.com/apache/spark/pull/33079, to fix a bug in corner cases: `ShufflePartitionsUtil.coalescePartitions` should either return the shuffle spec for all the shuffles, or none.
    
    If the input RDD has no partition, the `mapOutputStatistics` is None, and we should still return shuffle specs with size 0.
    
    ### Why are the changes needed?
    
    bug fix
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    a new test
    
    Closes #33158 from cloud-fan/bug.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../execution/adaptive/ShufflePartitionsUtil.scala | 43 ++++++++++++----------
 .../adaptive/AdaptiveQueryExecSuite.scala          | 24 ++++++++++++
 2 files changed, 48 insertions(+), 19 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
index a1f2d91..1353dc9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
@@ -96,9 +96,8 @@ object ShufflePartitionsUtil extends Logging {
 
     val numPartitions = validMetrics.head.bytesByPartitionId.length
     val newPartitionSpecs = coalescePartitions(0, numPartitions, validMetrics, targetSize)
-    assert(newPartitionSpecs.length == validMetrics.length)
-    if (newPartitionSpecs.head.length < numPartitions) {
-      newPartitionSpecs
+    if (newPartitionSpecs.length < numPartitions) {
+      attachDataSize(mapOutputStatistics, newPartitionSpecs)
     } else {
       Seq.empty
     }
@@ -148,7 +147,8 @@ object ShufflePartitionsUtil extends Logging {
         if (i - 1 > start) {
           val partitionSpecs = coalescePartitions(
             partitionIndices(start), repeatValue, validMetrics, targetSize, true)
-          newPartitionSpecsSeq.zip(partitionSpecs).foreach(spec => spec._1 ++= spec._2)
+          newPartitionSpecsSeq.zip(attachDataSize(mapOutputStatistics, partitionSpecs))
+            .foreach(spec => spec._1 ++= spec._2)
         }
         // find the end of this skew section, skipping partition(i - 1) and partition(i).
         var repeatIndex = i + 1
@@ -173,7 +173,8 @@ object ShufflePartitionsUtil extends Logging {
     if (numPartitions > start) {
       val partitionSpecs = coalescePartitions(
         partitionIndices(start), partitionIndices.last + 1, validMetrics, targetSize, true)
-      newPartitionSpecsSeq.zip(partitionSpecs).foreach(spec => spec._1 ++= spec._2)
+      newPartitionSpecsSeq.zip(attachDataSize(mapOutputStatistics, partitionSpecs))
+        .foreach(spec => spec._1 ++= spec._2)
     }
     // only return coalesced result if any coalescing has happened.
     if (newPartitionSpecsSeq.head.length < numPartitions) {
@@ -204,19 +205,17 @@ object ShufflePartitionsUtil extends Logging {
    *  - coalesced partition 2: shuffle partition 2 (size 170 MiB)
    *  - coalesced partition 3: shuffle partition 3 and 4 (size 50 MiB)
    *
-   *  @return A sequence of sequence of [[CoalescedPartitionSpec]]s. which each inner sequence as
-   *          the new partition specs for its corresponding shuffle after coalescing. For example,
-   *          if partitions [0, 1, 2, 3, 4] and partition bytes [10, 10, 100, 10, 20] with
-   *          targetSize 100, split at indices [0, 2, 3], the returned partition specs will be:
-   *          CoalescedPartitionSpec(0, 2, 20), CoalescedPartitionSpec(2, 3, 100) and
-   *          CoalescedPartitionSpec(3, 5, 30).
+   *  @return A sequence of [[CoalescedPartitionSpec]]s. For example, if partitions [0, 1, 2, 3, 4]
+   *          split at indices [0, 2, 3], the returned partition specs will be:
+   *          CoalescedPartitionSpec(0, 2), CoalescedPartitionSpec(2, 3) and
+   *          CoalescedPartitionSpec(3, 5).
    */
   private def coalescePartitions(
       start: Int,
       end: Int,
       mapOutputStatistics: Seq[MapOutputStatistics],
       targetSize: Long,
-      allowReturnEmpty: Boolean = false): Seq[Seq[CoalescedPartitionSpec]] = {
+      allowReturnEmpty: Boolean = false): Seq[CoalescedPartitionSpec] = {
     val partitionSpecs = ArrayBuffer.empty[CoalescedPartitionSpec]
     var coalescedSize = 0L
     var i = start
@@ -252,14 +251,20 @@ object ShufflePartitionsUtil extends Logging {
     }
     // If do not allowReturnEmpty, create at least one partition if all partitions are empty.
     createPartitionSpec(!allowReturnEmpty && partitionSpecs.isEmpty)
+    partitionSpecs.toSeq
+  }
 
-    // add data size for each partitionSpecs
-    mapOutputStatistics.map { mapStats =>
-      partitionSpecs.map { spec =>
-        val dataSize = spec.startReducerIndex.until(spec.endReducerIndex)
-          .map(mapStats.bytesByPartitionId).sum
-        spec.copy(dataSize = Some(dataSize))
-      }.toSeq
+  private def attachDataSize(
+      mapOutputStatistics: Seq[Option[MapOutputStatistics]],
+      partitionSpecs: Seq[CoalescedPartitionSpec]): Seq[Seq[CoalescedPartitionSpec]] = {
+    mapOutputStatistics.map {
+      case Some(mapStats) =>
+        partitionSpecs.map { spec =>
+          val dataSize = spec.startReducerIndex.until(spec.endReducerIndex)
+            .map(mapStats.bytesByPartitionId).sum
+          spec.copy(dataSize = Some(dataSize))
+        }.toSeq
+      case None => partitionSpecs.map(_.copy(dataSize = Some(0))).toSeq
     }.toSeq
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index e1fa855..2343a92 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -1861,4 +1861,28 @@ class AdaptiveQueryExecSuite
       }
     }
   }
+
+  test("SPARK-35888: join with a 0-partition table") {
+    withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+      SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> AQEPropagateEmptyRelation.ruleName) {
+      withTempView("t2") {
+        // create a temp view with 0 partition
+        spark.createDataFrame(sparkContext.emptyRDD[Row], new StructType().add("b", IntegerType))
+          .createOrReplaceTempView("t2")
+        val (_, adaptive) =
+          runAdaptiveAndVerifyResult("SELECT * FROM testData2 t1 left semi join t2 ON t1.a=t2.b")
+        val customReaders = collect(adaptive) {
+          case c: CustomShuffleReaderExec => c
+        }
+        assert(customReaders.length == 2)
+        customReaders.foreach { c =>
+          val stats = c.child.asInstanceOf[QueryStageExec].getRuntimeStatistics
+          assert(stats.sizeInBytes >= 0)
+          assert(stats.rowCount.get >= 0)
+        }
+      }
+    }
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org