You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "sunchao (via GitHub)" <gi...@apache.org> on 2023/08/08 22:46:37 UTC

[GitHub] [spark] sunchao commented on a diff in pull request #42194: [SPARK-41471][SQL] Reduce Spark shuffle when only one side of a join is KeyGroupedPartitioning

sunchao commented on code in PR #42194:
URL: https://github.com/apache/spark/pull/42194#discussion_r1287757003


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -519,6 +519,35 @@ case class BroadcastPartitioning(mode: BroadcastMode) extends Partitioning {
   }
 }
 
+/**
+ * Represents a partitioning use partition value map to partition.
+ */
+case class PartitionValueMapPartitioning(

Review Comment:
   Could we reuse `KeyGroupedPartitioning` instead of creating a new `Partitioning`? they look mostly similar, except `valueMap` which I think can be computed using `partitionValues`? The `PartitionValueMapShuffleSpec` also look similar to `KeyGroupedShuffleSpec`
   



##########
core/src/main/scala/org/apache/spark/Partitioner.scala:
##########
@@ -137,6 +137,18 @@ private[spark] class PartitionIdPassthrough(override val numPartitions: Int) ext
   override def getPartition(key: Any): Int = key.asInstanceOf[Int]
 }
 
+/**
+ * A [[org.apache.spark.Partitioner]] that partitions all records use partition value map
+ */
+private[spark] class PartitionValueMapPartitioner(
+    valueMap: mutable.Map[Seq[Any], Int],
+    override val numPartitions: Int) extends Partitioner {
+  override def getPartition(key: Any): Int = {
+    val keys = key.asInstanceOf[Seq[Any]]
+    valueMap.getOrElseUpdate(keys, Utils.nonNegativeMod(keys.hashCode, numPartitions))

Review Comment:
   When do we need to update this map? will this collide with one of the existing mappings?



##########
core/src/main/scala/org/apache/spark/Partitioner.scala:
##########
@@ -137,6 +137,18 @@ private[spark] class PartitionIdPassthrough(override val numPartitions: Int) ext
   override def getPartition(key: Any): Int = key.asInstanceOf[Int]
 }
 
+/**
+ * A [[org.apache.spark.Partitioner]] that partitions all records use partition value map
+ */
+private[spark] class PartitionValueMapPartitioner(
+    valueMap: mutable.Map[Seq[Any], Int],

Review Comment:
   Instead of using `Seq[Any]`, can we use `InternalRowComparableWrapper` here?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala:
##########
@@ -368,150 +368,197 @@ case class EnsureRequirements(
     var newLeft = left
     var newRight = right
 
-    val specs = Seq(left, right).zip(requiredChildDistribution).map { case (p, d) =>
+    val specsOpts = Seq(left, right).zip(requiredChildDistribution).map { case (p, d) =>
       if (!d.isInstanceOf[ClusteredDistribution]) return None
       val cd = d.asInstanceOf[ClusteredDistribution]
       val specOpt = createKeyGroupedShuffleSpec(p.outputPartitioning, cd)
-      if (specOpt.isEmpty) return None
-      specOpt.get
+      specOpt
+    }
+    val specsAllExist = specsOpts.forall(_.nonEmpty)
+    if ((!conf.v2BucketingShuffleOneSideEnabled && !specsAllExist)
+      || specsOpts.count(_.isEmpty) > 1) {
+      return None
     }
-
-    val leftSpec = specs.head
-    val rightSpec = specs(1)
 
     var isCompatible = false
-    if (!conf.v2BucketingPushPartValuesEnabled) {
-      isCompatible = leftSpec.isCompatibleWith(rightSpec)
-    } else {
-      logInfo("Pushing common partition values for storage-partitioned join")
-      isCompatible = leftSpec.areKeysCompatible(rightSpec)
-
-      // Partition expressions are compatible. Regardless of whether partition values
-      // match from both sides of children, we can calculate a superset of partition values and
-      // push-down to respective data sources so they can adjust their output partitioning by
-      // filling missing partition keys with empty partitions. As result, we can still avoid
-      // shuffle.
-      //
-      // For instance, if two sides of a join have partition expressions
-      // `day(a)` and `day(b)` respectively
-      // (the join query could be `SELECT ... FROM t1 JOIN t2 on t1.a = t2.b`), but
-      // with different partition values:
-      //   `day(a)`: [0, 1]
-      //   `day(b)`: [1, 2, 3]
-      // Following the case 2 above, we don't have to shuffle both sides, but instead can
-      // just push the common set of partition values: `[0, 1, 2, 3]` down to the two data
-      // sources.
-      if (isCompatible) {
-        val leftPartValues = leftSpec.partitioning.partitionValues
-        val rightPartValues = rightSpec.partitioning.partitionValues
-
-        logInfo(
-          s"""
-             |Left side # of partitions: ${leftPartValues.size}
-             |Right side # of partitions: ${rightPartValues.size}
-             |""".stripMargin)
-
-        // As partition keys are compatible, we can pick either left or right as partition
-        // expressions
-        val partitionExprs = leftSpec.partitioning.expressions
-
-        var mergedPartValues = InternalRowComparableWrapper
+
+    if (specsAllExist) {

Review Comment:
   Instead of always shuffle the other non-key grouped partitioning side according to the key grouped side, I wonder whether we can leverage the existing logic in `EnsureRequirements` which considers the trade-off between shuffle parallelism and cost.
   
   For instance, if one side of the join is using hash partitioning with 200 partitions, while the other side is using key grouped partitioning with 5 partitions, it's probably better to shuffle the key grouped partitioning side into 200 partitions, to ensure the job has good parallelism.
   
   Maybe we can check if it's possible to make `KeyGroupedPartitioning.canCreatePartitioning` to return true if `spark.sql.sources.v2.bucketing.shuffleOneSide.enabled` is enabled. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org