You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/07 18:48:17 UTC

[GitHub] [spark] sunchao commented on a diff in pull request #38950: [SPARK-41413][SQL] Avoid shuffle in Storage-Partitioned Join when partition keys mismatch, but join expressions are compatible

sunchao commented on code in PR #38950:
URL: https://github.com/apache/spark/pull/38950#discussion_r1042564231


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala:
##########
@@ -142,12 +145,66 @@ case class EnsureRequirements(
         Some(finalCandidateSpecs.values.maxBy(_.numPartitions))
       }
 
+      def getKeyGroupedSpec(idx: Int): ShuffleSpec = specs(idx) match {
+        case ShuffleSpecCollection(specs) => specs.head
+        case spec => spec
+      }
+
+      def populatePartitionKeys(plan: SparkPlan, keys: Seq[InternalRow]): SparkPlan =
+        plan match {
+          case scan: BatchScanExec =>
+            scan.copy(commonPartitionKeys = Some(keys))
+          case node =>
+            node.mapChildren(child => populatePartitionKeys(child, keys))
+        }
+
       // Check if 1) all children are of `KeyGroupedPartitioning` and 2) they are all compatible
       // with each other. If both are true, skip shuffle.
-      val allCompatible = childrenIndexes.sliding(2).forall {
-        case Seq(a, b) =>
-          checkKeyGroupedSpec(specs(a)) && checkKeyGroupedSpec(specs(b)) &&
-            specs(a).isCompatibleWith(specs(b))
+      val allCompatible = childrenIndexes.length == 2 && {
+        val left = childrenIndexes.head
+        val right = childrenIndexes(1)
+        var isCompatible: Boolean = false
+
+        if (checkKeyGroupedSpec(specs(left)) && checkKeyGroupedSpec(specs(right))) {
+          isCompatible = specs(left).isCompatibleWith(specs(right))
+
+          // If `isCompatible` is false, it could mean:
+          //   1. Partition expressions are not compatible: we have to shuffle in this case.
+          //   2. Partition expressions are compatible, but partition keys are not: in this case we
+          //      can compute a superset of partition keys and push-down to respective
+          //      data sources, which can then adjust their respective output partitioning by
+          //      filling missing partition keys with empty partitions. As result, Spark can still
+          //      avoid shuffle.
+          if (!isCompatible && conf.v2BucketingPushPartKeysEnabled) {
+            (getKeyGroupedSpec(left), getKeyGroupedSpec(right)) match {
+              case (leftSpec: KeyGroupedShuffleSpec, rightSpec: KeyGroupedShuffleSpec) =>
+                // Check if the two children are partition expression compatible. If so, find the
+                // common set of partition keys, and adjust the plan accordingly.
+                if (leftSpec.isExpressionsCompatible(rightSpec)) {
+                  assert(leftSpec.partitioning.partitionValuesOpt.isDefined)

Review Comment:
   I realized that it's not that useful to make `partitioning.partitionValuesOpt` an `Option[Seq[InternalRow]]`. Plan to change it to a `Seq[InternalRow]` in a follow-up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org