You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "viirya (via GitHub)" <gi...@apache.org> on 2023/02/06 19:10:16 UTC

[GitHub] [spark] viirya commented on a diff in pull request #39633: [SPARK-42038][SQL] SPJ: Support partially clustered distribution

viirya commented on code in PR #39633:
URL: https://github.com/apache/spark/pull/39633#discussion_r1097810087


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala:
##########
@@ -411,6 +336,235 @@ case class EnsureRequirements(
     }
   }
 
+  /**
+   * Checks whether two children, `left` and `right`, of a join operator have compatible
+   * `KeyGroupedPartitioning`, and can benefit from storage-partitioned join.
+   *
+   * Returns the updated new children if the check is successful, otherwise `None`.
+   */
+  private def checkKeyGroupCompatible(
+      parent: SparkPlan,
+      left: SparkPlan,
+      right: SparkPlan,
+      requiredChildDistribution: Seq[Distribution]): Option[Seq[SparkPlan]] = {
+    parent match {
+      case smj: SortMergeJoinExec =>
+        checkKeyGroupCompatible(left, right, smj.joinType, requiredChildDistribution)
+      case sj: ShuffledHashJoinExec =>
+        checkKeyGroupCompatible(left, right, sj.joinType, requiredChildDistribution)
+      case _ =>
+        None
+    }
+  }
+
+  private def checkKeyGroupCompatible(
+      left: SparkPlan,
+      right: SparkPlan,
+      joinType: JoinType,
+      requiredChildDistribution: Seq[Distribution]): Option[Seq[SparkPlan]] = {
+    assert(requiredChildDistribution.length == 2)
+
+    var newLeft = left
+    var newRight = right
+
+    val specs = Seq(left, right).zip(requiredChildDistribution).map { case (p, d) =>
+      if (!d.isInstanceOf[ClusteredDistribution]) return None
+      val cd = d.asInstanceOf[ClusteredDistribution]
+      val specOpt = createKeyGroupedShuffleSpec(p.outputPartitioning, cd)
+      if (specOpt.isEmpty) return None
+      specOpt.get
+    }
+
+    val leftSpec = specs.head
+    val rightSpec = specs(1)
+
+    var isCompatible = false
+    if (!conf.v2BucketingPushPartValuesEnabled) {
+      isCompatible = leftSpec.isCompatibleWith(rightSpec)
+    } else {
+      logInfo("Pushing common partition values for storage-partitioned join")
+      isCompatible = leftSpec.areKeysCompatible(rightSpec)
+
+      // Partition expressions are compatible. Regardless of whether partition values
+      // match from both sides of children, we can we can calculate a superset of
+      // partition values and push-down to respective data sources so they can adjust
+      // their output partitioning by filling missing partition keys with empty
+      // partitions. As result, we can still avoid shuffle.
+      //
+      // For instance, if two sides of a join have partition expressions
+      // `day(a)` and `day(b)` respectively
+      // (the join query could be `SELECT ... FROM t1 JOIN t2 on t1.a = t2.b`), but
+      // with different partition values:
+      //   `day(a)`: [0, 1]
+      //   `day(b)`: [1, 2, 3]
+      // Following the case 2 above, we don't have to shuffle both sides, but instead can
+      // just push the common set of partition values: `[0, 1, 2, 3]` down to the two data
+      // sources.
+      if (isCompatible) {
+        val leftPartValues = leftSpec.partitioning.partitionValues
+        val rightPartValues = rightSpec.partitioning.partitionValues
+
+        logInfo(
+          s"""
+             |Left side # of partitions: ${leftPartValues.size}
+             |Right side # of partitions: ${rightPartValues.size}
+             |""".stripMargin)
+
+        var mergedPartValues = Utils.mergeOrdered(
+          Seq(leftPartValues, rightPartValues))(leftSpec.ordering)
+            .toSeq
+            .distinct
+            .map(v => (v, 1))
+
+        logInfo(s"After merging, there are ${mergedPartValues.size} partitions")
+
+        var replicateLeftSide = false
+        var replicateRightSide = false
+        var applyPartialClustering = false
+
+        // This means we allow partitions that are not clustered on their values,
+        // that is, multiple partitions with the same partition value. In the
+        // following, we calculate how many partitions that each distinct partition
+        // value has, and pushdown the information to scans, so they can adjust their
+        // final input partitions respectively.
+        if (conf.v2BucketingPartiallyClusteredDistributionEnabled) {
+          logInfo("Calculating partially clustered distribution for " +
+              "storage-partitioned join")
+
+          val canReplicateLeft = canReplicateLeftSide(joinType)
+          val canReplicateRight = canReplicateRightSide(joinType)
+
+          if (!canReplicateLeft && !canReplicateRight) {
+            logInfo("Skipping partially clustered distribution as it cannot be applied for " +
+                s"join type '$joinType'")
+          } else {
+            val leftLink = left.logicalLink
+            val rightLink = right.logicalLink
+
+            replicateLeftSide = if (
+              leftLink.isDefined && rightLink.isDefined &&
+                  leftLink.get.stats.sizeInBytes > 1 &&
+                  rightLink.get.stats.sizeInBytes > 1) {
+              logInfo(
+                s"""
+                   |Using plan statistics to determine which side of join to fully
+                   |cluster partition values:
+                   |Left side size (in bytes): ${leftLink.get.stats.sizeInBytes}
+                   |Right side size (in bytes): ${rightLink.get.stats.sizeInBytes}
+                   |""".stripMargin)
+              leftLink.get.stats.sizeInBytes < rightLink.get.stats.sizeInBytes
+            } else {
+              // As a simple heuristic, we pick the side with fewer number of partitions
+              // to apply the grouping & replication of partitions
+              logInfo("Using number of partitions to determine which side of join " +
+                  "to fully cluster partition values")
+              leftPartValues.size < rightPartValues.size
+            }
+
+            replicateRightSide = !replicateLeftSide
+
+            // Similar to skewed join, we need to check the join type to see whether replication
+            // of partitions can be applied. For instance, replication should not be allowed for
+            // the left-hand side of a right outer join.
+            if (replicateLeftSide && !canReplicateLeft) {
+              logInfo("Left-hand side is picked but cannot be applied to join type " +
+                  s"'$joinType'. Skipping partially clustered distribution.")
+              replicateLeftSide = false
+            } else if (replicateRightSide && !canReplicateRight) {
+              logInfo("Right-hand side is picked but cannot be applied to join type " +
+                  s"'$joinType'. Skipping partially clustered distribution.")
+              replicateRightSide = false
+            } else {
+              val partValues = if (replicateLeftSide) rightPartValues else leftPartValues
+              val numExpectedPartitions = partValues.groupBy(identity).mapValues(_.size)
+
+              mergedPartValues = mergedPartValues.map { case (partVal, numParts) =>
+                (partVal, numExpectedPartitions.getOrElse(partVal, numParts))
+              }
+
+              logInfo("After applying partially clustered distribution, there are " +
+                  s"${mergedPartValues.map(_._2).sum} partitions.")
+              applyPartialClustering = true
+            }
+          }
+        }
+
+        // Now we need to push-down the common partition key to the scan in each child
+        newLeft = populatePartitionValues(
+          left, mergedPartValues, applyPartialClustering, replicateLeftSide)
+        newRight = populatePartitionValues(
+          right, mergedPartValues, applyPartialClustering, replicateRightSide)
+      }
+    }
+
+    if (isCompatible) Some(Seq(newLeft, newRight)) else None
+  }
+
+  // Similar to `OptimizeSkewedJoin.canSplitRightSide`

Review Comment:
   This comment looks not informative.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org