You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "szehon-ho (via GitHub)" <gi...@apache.org> on 2023/08/02 22:43:18 UTC

[GitHub] [spark] szehon-ho opened a new pull request, #42306: [SQL][SPARK-44647] Support SPJ where join keys are less than cluster keys

szehon-ho opened a new pull request, #42306:
URL: https://github.com/apache/spark/pull/42306

   ### What changes were proposed in this pull request?
   - Add new conf spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled
   - Change key compatibility checks in EnsureRequirements.  Remove checks where all partition keys must be in join keys to allow isKeyCompatible = true in this case.
   - Change BatchScanExec/DataSourceV2Relation to group splits by join keys if they differ from partition keys (previously grouped only by partition values)
   - Implement partiallyClustered skew-handling.
     - Group only the replicate side (now by join key as well)
     - add an additional sort in the end of partitions based on join key, as when we group the non-replicate side, partition ordering becomes out of order.
   
   ### Why are the changes needed?
   - Support Storage Partition Join in cases where the join condition does not contain all the partition keys, but just some of them
   
   ### Does this PR introduce _any_ user-facing change? 
   No
   
   ### How was this patch tested? -Added tests in KeyGroupedPartitioningSuite
   -Found two problems, will address in separate PR:
   - https://github.com/apache/spark/pull/37886  made another change so that we have to select all join keys, otherwise DSV2 scan does not report KeyGroupedPartitioning and SPJ does not get triggered.  Need to see how to relax this.
   - https://issues.apache.org/jira/browse/SPARK-44641 was found when testing this change.  This pr refactors some of those code to add group-by-join-key, but doesnt change the underlying logic, so issue continues to exist.  Hopefully this will also get fixed in another way.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] szehon-ho commented on a diff in pull request #42306: [SPARK-44647][SQL] Support SPJ where join keys are less than cluster keys

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #42306:
URL: https://github.com/apache/spark/pull/42306#discussion_r1304941351


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -711,36 +718,82 @@ case class KeyGroupedShuffleSpec(
     case _ => false
   }
 
+  def isPartitioningCompatible(otherPartitioning: KeyGroupedPartitioning): Boolean = {
+    val joinKeyPositions = keyPositions.map(_.nonEmpty)
+    partitioning.partitionValues.zip(otherPartitioning.partitionValues)
+      .forall {
+        case (left, right) =>
+          KeyGroupedShuffleSpec.project(left, partitioning.expressions, joinKeyPositions)
+            .equals(
+              KeyGroupedShuffleSpec.project(right, partitioning.expressions, joinKeyPositions))
+      }
+  }
+
   // Whether the partition keys (i.e., partition expressions) are compatible between this and the
-  // `other` spec.
+  // other spec.
   def areKeysCompatible(other: KeyGroupedShuffleSpec): Boolean = {
-    val expressions = partitioning.expressions
-    val otherExpressions = other.partitioning.expressions
-
-    expressions.length == otherExpressions.length && {
-      val otherKeyPositions = other.keyPositions
-      keyPositions.zip(otherKeyPositions).forall { case (left, right) =>
-        left.intersect(right).nonEmpty
+    partitionExpressionsCompatible(other) &&
+      KeyGroupedShuffleSpec.keyPositionsCompatible(
+        keyPositions, other.keyPositions
+      )
+  }
+
+  // Whether the partition keys (i.e., partition expressions) that also are in the set of
+  // join keys are compatible between this and the other spec.
+  def areJoinKeysCompatible(other: KeyGroupedShuffleSpec): Boolean = {
+    partitionExpressionsCompatible(other) &&
+      KeyGroupedShuffleSpec.keyPositionsCompatible(
+        keyPositions.filter(_.nonEmpty),
+        other.keyPositions.filter(_.nonEmpty)
+    )
+  }
+
+  private def partitionExpressionsCompatible(other: KeyGroupedShuffleSpec): Boolean = {
+    val left = partitioning.expressions
+    val right = other.partitioning.expressions
+    left.length == right.length &&
+      left.zip(right).forall {
+        case (l, r) => KeyGroupedShuffleSpec.isExpressionCompatible(l, r)
       }
-    } && expressions.zip(otherExpressions).forall {
-      case (l, r) => isExpressionCompatible(l, r)
-    }
   }
 
-  private def isExpressionCompatible(left: Expression, right: Expression): Boolean =
+  override def canCreatePartitioning: Boolean = SQLConf.get.v2BucketingShuffleEnabled &&
+    // Only support partition expressions are AttributeReference for now
+    partitioning.expressions.forall(_.isInstanceOf[AttributeReference])
+
+  override def createPartitioning(clustering: Seq[Expression]): Partitioning = {
+    KeyGroupedPartitioning(clustering, partitioning.numPartitions, partitioning.partitionValues)
+  }
+}
+
+object KeyGroupedShuffleSpec {
+
+  def isExpressionCompatible(left: Expression, right: Expression): Boolean =

Review Comment:
   This is just grouping the new static into companion object, so the diff looks a bit bigger, let me know if I should revert



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sunchao commented on pull request #42306: [SPARK-44647][SQL] Support SPJ where join keys are less than cluster keys

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on PR #42306:
URL: https://github.com/apache/spark/pull/42306#issuecomment-1714373575

   Thanks @szehon-ho @dongjoon-hyun !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] szehon-ho commented on pull request #42306: [SPARK-44647][SQL] Support SPJ where join keys are less than cluster keys

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on PR #42306:
URL: https://github.com/apache/spark/pull/42306#issuecomment-1711136324

   @sunchao thanks! addressed review comments


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #42306: [SPARK-44647][SQL] Support SPJ where join keys are less than cluster keys

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #42306:
URL: https://github.com/apache/spark/pull/42306#discussion_r1320385251


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -1530,6 +1530,18 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS =
+    buildConf("spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled")
+      .doc("Whether to allow storage-partition join in the case where join keys are" +
+        "a subset of the partition keys of the source tables.  At planning time, " +

Review Comment:
   nit. `.  At` -> `. At`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #42306: [SPARK-44647][SQL] Support SPJ where join keys are less than cluster keys

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #42306:
URL: https://github.com/apache/spark/pull/42306#issuecomment-1714368810

   Merged to mater for Apache Spark 4.0.0. Thank you so much, @szehon-ho and @sunchao !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] szehon-ho commented on a diff in pull request #42306: [SPARK-44647][SQL] Support SPJ where join keys are less than cluster keys

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #42306:
URL: https://github.com/apache/spark/pull/42306#discussion_r1304977725


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -1500,6 +1500,18 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS =
+    buildConf("spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled")
+      .doc("Whether to allow storage-partition join in the case where join keys are" +
+        "a subset of the partition keys of the source tables.  At planning time, " +
+        "Spark will group the partitions by only those keys that are in the join keys." +
+        "This is currently enabled only if spark.sql.sources.v2.bucketing.pushPartValues.enabled " +
+        "is also enabled."
+      )
+      .version("3.5.0")
+      .booleanConf
+      .createWithDefault(true)

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44647][SQL] Support SPJ where join keys are less than cluster keys [spark]

Posted by "irsath (via GitHub)" <gi...@apache.org>.
irsath commented on PR #42306:
URL: https://github.com/apache/spark/pull/42306#issuecomment-1819425926

   Right, sorry for the typo but I meant: what if we make a 3.6 with this PR ?
   
   I never contributed to OSS spark but if your ok with the idea I can try to do a PR in that regard.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sunchao commented on a diff in pull request #42306: [SPARK-44647][SQL] Support SPJ where join keys are less than cluster keys

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #42306:
URL: https://github.com/apache/spark/pull/42306#discussion_r1320031556


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -672,9 +708,17 @@ case class HashShuffleSpec(
   override def numPartitions: Int = partitioning.numPartitions
 }
 
+/**
+ * [[ShuffleSpec]] created by [[KeyGroupedPartitioning]].
+ * @param partitioning key grouped partitioning

Review Comment:
   nit: leave an empty line above the first `@param`



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -1530,6 +1530,18 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS =
+    buildConf("spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled")
+      .doc("Whether to allow storage-partition join in the case where join keys are" +
+        "a subset of the partition keys of the source tables.  At planning time, " +
+        "Spark will group the partitions by only those keys that are in the join keys." +
+        "This is currently enabled only if spark.sql.requireAllClusterKeysForDistribution " +

Review Comment:
   nit: replace with `spark.sql.requireAllClusterKeysForDistribution` `${REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION.key}` (and add `s` to the beginning of this line)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] szehon-ho commented on a diff in pull request #42306: [SPARK-44647][SQL] Support SPJ where join keys are less than cluster keys

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #42306:
URL: https://github.com/apache/spark/pull/42306#discussion_r1304977574


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala:
##########
@@ -144,8 +159,25 @@ case class BatchScanExec(
                   s"${SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key} " +
                   "is enabled")
 
-            val groupedPartitions = groupPartitions(finalPartitions.map(_.head),
-              groupSplits = true).get
+            // In the case where we replicate partitions, we have grouped
+            // the partitions by the join key if they differ
+            val groupByExpressions =

Review Comment:
   Done, changed outputPartitioning to return KeyGroupedPartitoning to reflect that.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -701,41 +705,78 @@ case class KeyGroupedShuffleSpec(
     case otherSpec @ KeyGroupedShuffleSpec(otherPartitioning, otherDistribution) =>
       distribution.clustering.length == otherDistribution.clustering.length &&
         numPartitions == other.numPartitions && areKeysCompatible(otherSpec) &&
-          partitioning.partitionValues.zip(otherPartitioning.partitionValues).forall {
-            case (left, right) =>
-              InternalRowComparableWrapper(left, partitioning.expressions)
-                .equals(InternalRowComparableWrapper(right, partitioning.expressions))
-          }
+        isPartitioningCompatible(otherPartitioning)
     case ShuffleSpecCollection(specs) =>
       specs.exists(isCompatibleWith)
     case _ => false
   }
 
+  def isPartitioningCompatible(otherPartitioning: KeyGroupedPartitioning): Boolean = {
+    val clusterKeySize = keyPositions.size
+    partitioning.partitionValues.zip(otherPartitioning.partitionValues)
+      .forall {
+        case (left, right) =>
+          val leftTypes = partitioning.expressions.map(_.dataType)
+          val leftVals = left.toSeq(leftTypes).take(clusterKeySize).toArray
+          val newLeft = new GenericInternalRow(leftVals)
+
+          val rightTypes = partitioning.expressions.map(_.dataType)
+          val rightVals = right.toSeq(rightTypes).take(clusterKeySize).toArray
+          val newRight = new GenericInternalRow(rightVals)
+
+          InternalRowComparableWrapper(newLeft, partitioning.expressions.take(clusterKeySize))
+            .equals(InternalRowComparableWrapper(
+              newRight, partitioning.expressions.take(clusterKeySize)))
+      }
+  }
+
   // Whether the partition keys (i.e., partition expressions) are compatible between this and the
   // `other` spec.
   def areKeysCompatible(other: KeyGroupedShuffleSpec): Boolean = {
-    val expressions = partitioning.expressions
-    val otherExpressions = other.partitioning.expressions
-
-    expressions.length == otherExpressions.length && {
-      val otherKeyPositions = other.keyPositions
-      keyPositions.zip(otherKeyPositions).forall { case (left, right) =>
-        left.intersect(right).nonEmpty
+    partitionExpressionsCompatible(other) &&
+      KeyGroupedShuffleSpec.keyPositionsCompatible(
+        keyPositions, other.keyPositions
+      )
+  }
+
+  // Whether the partition keys (i.e., partition expressions) that also are in the set of
+  // cluster keys are compatible between this and the 'other' spec.

Review Comment:
   I think its moved from existing javadoc, but removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #42306: [SPARK-44647][SQL] Support SPJ where join keys are less than cluster keys

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #42306:
URL: https://github.com/apache/spark/pull/42306#discussion_r1320385009


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -1530,6 +1530,18 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS =
+    buildConf("spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled")
+      .doc("Whether to allow storage-partition join in the case where join keys are" +
+        "a subset of the partition keys of the source tables.  At planning time, " +
+        "Spark will group the partitions by only those keys that are in the join keys." +
+        "This is currently enabled only if spark.sql.requireAllClusterKeysForDistribution " +
+        "is false."
+      )

Review Comment:
   nit. Let's put `)` at the end of the previous line.
   ```scala
   "is false.")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44647][SQL] Support SPJ where join keys are less than cluster keys [spark]

Posted by "irsath (via GitHub)" <gi...@apache.org>.
irsath commented on PR #42306:
URL: https://github.com/apache/spark/pull/42306#issuecomment-1818004984

   Hi @dongjoon-hyun @sunchao,
   Do you see any blocker to backport this to spark 3.5 ?
   I think it would be useful for many use case (including mine) that partition by date for GDPR purpose but still need SPJ on the other partitioning column.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] szehon-ho commented on a diff in pull request #42306: [SPARK-44647][SQL] Support SPJ where join keys are less than cluster keys

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #42306:
URL: https://github.com/apache/spark/pull/42306#discussion_r1304978430


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -344,7 +344,11 @@ case class KeyGroupedPartitioning(
           } else {
             // We'll need to find leaf attributes from the partition expressions first.
             val attributes = expressions.flatMap(_.collectLeaves())
-            attributes.forall(x => requiredClustering.exists(_.semanticEquals(x)))
+
+            // Support only when all cluster key have an associated partition expression key
+            requiredClustering.exists(x => attributes.exists(_.semanticEquals(x))) &&
+              // and if all partition expression contain only a single partition key.
+               expressions.forall(_.collectLeaves().size == 1)

Review Comment:
   This was to fix a test, I couldn't find it back to be honest.  There was a test somewhere that was trying this case (which isnt actually supported in the code currently), and I think asserting the right exception, which I think would break if SPJ is activated.  I could revert this and see again to find the test, if you want.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #42306: [SPARK-44647][SQL] Support SPJ where join keys are less than cluster keys

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #42306:
URL: https://github.com/apache/spark/pull/42306#issuecomment-1696150617

   Oh, got it!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun closed pull request #42306: [SPARK-44647][SQL] Support SPJ where join keys are less than cluster keys

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun closed pull request #42306: [SPARK-44647][SQL] Support SPJ where join keys are less than cluster keys
URL: https://github.com/apache/spark/pull/42306


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] szehon-ho commented on a diff in pull request #42306: [SPARK-44647][SQL] Support SPJ where join keys are less than cluster keys

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #42306:
URL: https://github.com/apache/spark/pull/42306#discussion_r1319412741


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -344,7 +344,14 @@ case class KeyGroupedPartitioning(
           } else {
             // We'll need to find leaf attributes from the partition expressions first.
             val attributes = expressions.flatMap(_.collectLeaves())
-            attributes.forall(x => requiredClustering.exists(_.semanticEquals(x)))
+
+            if (SQLConf.get.getConf(
+              SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS)) {
+              requiredClustering.exists(x => attributes.exists(_.semanticEquals(x))) &&
+                expressions.forall(_.collectLeaves().size == 1)

Review Comment:
   Added some comment, please check if it makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sunchao commented on a diff in pull request #42306: [SPARK-44647][SQL] Support SPJ where join keys are less than cluster keys

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #42306:
URL: https://github.com/apache/spark/pull/42306#discussion_r1287768581


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -344,7 +344,11 @@ case class KeyGroupedPartitioning(
           } else {
             // We'll need to find leaf attributes from the partition expressions first.
             val attributes = expressions.flatMap(_.collectLeaves())
-            attributes.forall(x => requiredClustering.exists(_.semanticEquals(x)))
+
+            // Support only when all cluster key have an associated partition expression key
+            requiredClustering.exists(x => attributes.exists(_.semanticEquals(x))) &&
+              // and if all partition expression contain only a single partition key.
+               expressions.forall(_.collectLeaves().size == 1)

Review Comment:
   hmm why this condition?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -344,7 +344,11 @@ case class KeyGroupedPartitioning(
           } else {
             // We'll need to find leaf attributes from the partition expressions first.
             val attributes = expressions.flatMap(_.collectLeaves())
-            attributes.forall(x => requiredClustering.exists(_.semanticEquals(x)))
+
+            // Support only when all cluster key have an associated partition expression key

Review Comment:
   should we consider the new flag here and still keep the old behavior if it is not enabled?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -701,41 +705,78 @@ case class KeyGroupedShuffleSpec(
     case otherSpec @ KeyGroupedShuffleSpec(otherPartitioning, otherDistribution) =>
       distribution.clustering.length == otherDistribution.clustering.length &&
         numPartitions == other.numPartitions && areKeysCompatible(otherSpec) &&
-          partitioning.partitionValues.zip(otherPartitioning.partitionValues).forall {
-            case (left, right) =>
-              InternalRowComparableWrapper(left, partitioning.expressions)
-                .equals(InternalRowComparableWrapper(right, partitioning.expressions))
-          }
+        isPartitioningCompatible(otherPartitioning)
     case ShuffleSpecCollection(specs) =>
       specs.exists(isCompatibleWith)
     case _ => false
   }
 
+  def isPartitioningCompatible(otherPartitioning: KeyGroupedPartitioning): Boolean = {
+    val clusterKeySize = keyPositions.size
+    partitioning.partitionValues.zip(otherPartitioning.partitionValues)
+      .forall {
+        case (left, right) =>
+          val leftTypes = partitioning.expressions.map(_.dataType)

Review Comment:
   nit: perhaps add some comments here since it's not that clear. Also we can extract:
   
   ```scala
             val rightTypes = partitioning.expressions.map(_.dataType)
             val rightVals = right.toSeq(rightTypes).take(clusterKeySize).toArray
             val newRight = new GenericInternalRow(rightVals)
   ```
   
   into a separate util method.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -701,41 +705,78 @@ case class KeyGroupedShuffleSpec(
     case otherSpec @ KeyGroupedShuffleSpec(otherPartitioning, otherDistribution) =>
       distribution.clustering.length == otherDistribution.clustering.length &&
         numPartitions == other.numPartitions && areKeysCompatible(otherSpec) &&
-          partitioning.partitionValues.zip(otherPartitioning.partitionValues).forall {
-            case (left, right) =>
-              InternalRowComparableWrapper(left, partitioning.expressions)
-                .equals(InternalRowComparableWrapper(right, partitioning.expressions))
-          }
+        isPartitioningCompatible(otherPartitioning)
     case ShuffleSpecCollection(specs) =>
       specs.exists(isCompatibleWith)
     case _ => false
   }
 
+  def isPartitioningCompatible(otherPartitioning: KeyGroupedPartitioning): Boolean = {
+    val clusterKeySize = keyPositions.size
+    partitioning.partitionValues.zip(otherPartitioning.partitionValues)
+      .forall {
+        case (left, right) =>
+          val leftTypes = partitioning.expressions.map(_.dataType)
+          val leftVals = left.toSeq(leftTypes).take(clusterKeySize).toArray
+          val newLeft = new GenericInternalRow(leftVals)
+
+          val rightTypes = partitioning.expressions.map(_.dataType)
+          val rightVals = right.toSeq(rightTypes).take(clusterKeySize).toArray
+          val newRight = new GenericInternalRow(rightVals)
+
+          InternalRowComparableWrapper(newLeft, partitioning.expressions.take(clusterKeySize))
+            .equals(InternalRowComparableWrapper(
+              newRight, partitioning.expressions.take(clusterKeySize)))
+      }
+  }
+
   // Whether the partition keys (i.e., partition expressions) are compatible between this and the
   // `other` spec.
   def areKeysCompatible(other: KeyGroupedShuffleSpec): Boolean = {
-    val expressions = partitioning.expressions
-    val otherExpressions = other.partitioning.expressions
-
-    expressions.length == otherExpressions.length && {
-      val otherKeyPositions = other.keyPositions
-      keyPositions.zip(otherKeyPositions).forall { case (left, right) =>
-        left.intersect(right).nonEmpty
+    partitionExpressionsCompatible(other) &&
+      KeyGroupedShuffleSpec.keyPositionsCompatible(
+        keyPositions, other.keyPositions
+      )
+  }
+
+  // Whether the partition keys (i.e., partition expressions) that also are in the set of
+  // cluster keys are compatible between this and the 'other' spec.

Review Comment:
   nit: 'other' -> `other`



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala:
##########
@@ -144,8 +159,25 @@ case class BatchScanExec(
                   s"${SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key} " +
                   "is enabled")
 
-            val groupedPartitions = groupPartitions(finalPartitions.map(_.head),
-              groupSplits = true).get
+            // In the case where we replicate partitions, we have grouped
+            // the partitions by the join key if they differ
+            val groupByExpressions =

Review Comment:
   Can we override `KeyGroupedPartitioning` method in this class, and wrap the logic of handling join keys in the method? We can return a new `KeyGroupedPartitioning` instance whose `expressions`, `partitionValues` are "projected" on the join keys.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -1500,6 +1500,18 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS =
+    buildConf("spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled")
+      .doc("Whether to allow storage-partition join in the case where join keys are" +
+        "a subset of the partition keys of the source tables.  At planning time, " +
+        "Spark will group the partitions by only those keys that are in the join keys." +
+        "This is currently enabled only if spark.sql.sources.v2.bucketing.pushPartValues.enabled " +
+        "is also enabled."
+      )
+      .version("3.5.0")

Review Comment:
   4.0.0



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -1500,6 +1500,18 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS =
+    buildConf("spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled")
+      .doc("Whether to allow storage-partition join in the case where join keys are" +
+        "a subset of the partition keys of the source tables.  At planning time, " +
+        "Spark will group the partitions by only those keys that are in the join keys." +
+        "This is currently enabled only if spark.sql.sources.v2.bucketing.pushPartValues.enabled " +
+        "is also enabled."
+      )
+      .version("3.5.0")
+      .booleanConf
+      .createWithDefault(true)

Review Comment:
   we should default to false 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala:
##########
@@ -523,23 +546,25 @@ case class EnsureRequirements(
         joinType == LeftAnti || joinType == LeftOuter
   }
 
-  // Populate the common partition values down to the scan nodes
-  private def populatePartitionValues(
+  // Populate the storage partition join params down to the scan nodes
+  private def populateStoragePartitionJoinParams(
       plan: SparkPlan,
       values: Seq[(InternalRow, Int)],
+      partitionGroupByPositions: Option[Seq[Boolean]],
       applyPartialClustering: Boolean,
       replicatePartitions: Boolean): SparkPlan = plan match {
     case scan: BatchScanExec =>
       scan.copy(
         spjParams = scan.spjParams.copy(
           commonPartitionValues = Some(values),
+          partitionGroupByPositions = partitionGroupByPositions,
           applyPartialClustering = applyPartialClustering,
           replicatePartitions = replicatePartitions
         )
       )
     case node =>
-      node.mapChildren(child => populatePartitionValues(
-        child, values, applyPartialClustering, replicatePartitions))
+      node.mapChildren(child => populateStoragePartitionJoinParams(
+        child, values, partitionGroupByPositions, applyPartialClustering, replicatePartitions))

Review Comment:
   Instead of populating `partitionGroupByPositions`, can we populate `StoragePartitionJoinParams.keyGroupedPartitioning` instead? which can be the subset of expressions that participate in the join.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #42306: [SPARK-44647][SQL] Support SPJ where join keys are less than cluster keys

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #42306:
URL: https://github.com/apache/spark/pull/42306#issuecomment-1696129665

   Could you re-trigger the failed pipeline or rebase this PR to the `master` branch once more, @szehon-ho ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] szehon-ho commented on pull request #42306: [SPARK-44647][SQL] Support SPJ where join keys are less than cluster keys

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on PR #42306:
URL: https://github.com/apache/spark/pull/42306#issuecomment-1696141032

   Hi @dongjoon-hyun , I think @sunchao had another idea he is thinking about, was going to wait a bit for that to update the pr


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] szehon-ho commented on a diff in pull request #42306: [SPARK-44647][SQL] Support SPJ where join keys are less than cluster keys

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #42306:
URL: https://github.com/apache/spark/pull/42306#discussion_r1304977700


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -701,41 +705,78 @@ case class KeyGroupedShuffleSpec(
     case otherSpec @ KeyGroupedShuffleSpec(otherPartitioning, otherDistribution) =>
       distribution.clustering.length == otherDistribution.clustering.length &&
         numPartitions == other.numPartitions && areKeysCompatible(otherSpec) &&
-          partitioning.partitionValues.zip(otherPartitioning.partitionValues).forall {
-            case (left, right) =>
-              InternalRowComparableWrapper(left, partitioning.expressions)
-                .equals(InternalRowComparableWrapper(right, partitioning.expressions))
-          }
+        isPartitioningCompatible(otherPartitioning)
     case ShuffleSpecCollection(specs) =>
       specs.exists(isCompatibleWith)
     case _ => false
   }
 
+  def isPartitioningCompatible(otherPartitioning: KeyGroupedPartitioning): Boolean = {
+    val clusterKeySize = keyPositions.size
+    partitioning.partitionValues.zip(otherPartitioning.partitionValues)
+      .forall {
+        case (left, right) =>
+          val leftTypes = partitioning.expressions.map(_.dataType)

Review Comment:
   Got rid of this actually.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -1500,6 +1500,18 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS =
+    buildConf("spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled")
+      .doc("Whether to allow storage-partition join in the case where join keys are" +
+        "a subset of the partition keys of the source tables.  At planning time, " +
+        "Spark will group the partitions by only those keys that are in the join keys." +
+        "This is currently enabled only if spark.sql.sources.v2.bucketing.pushPartValues.enabled " +
+        "is also enabled."
+      )
+      .version("3.5.0")
+      .booleanConf
+      .createWithDefault(true)

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sunchao commented on a diff in pull request #42306: [SPARK-44647][SQL] Support SPJ where join keys are less than cluster keys

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #42306:
URL: https://github.com/apache/spark/pull/42306#discussion_r1305837139


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -1510,6 +1510,18 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS =
+    buildConf("spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled")

Review Comment:
   Maybe we should also mention that this also requires `spark.sql.requireAllClusterKeysForDistribution` to be `false` as well, even though by default it is already false.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -674,7 +711,8 @@ case class HashShuffleSpec(
 
 case class KeyGroupedShuffleSpec(
     partitioning: KeyGroupedPartitioning,
-    distribution: ClusteredDistribution) extends ShuffleSpec {
+    distribution: ClusteredDistribution,
+    joinKeyPositions: Option[Seq[Int]] = None) extends ShuffleSpec {

Review Comment:
   we can add some comments for `KeyGroupedShuffleSpec` to explain what is this for, otherwise it's a bit hard to understand.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -344,7 +344,14 @@ case class KeyGroupedPartitioning(
           } else {
             // We'll need to find leaf attributes from the partition expressions first.
             val attributes = expressions.flatMap(_.collectLeaves())
-            attributes.forall(x => requiredClustering.exists(_.semanticEquals(x)))
+
+            if (SQLConf.get.getConf(
+              SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS)) {
+              requiredClustering.exists(x => attributes.exists(_.semanticEquals(x))) &&
+                expressions.forall(_.collectLeaves().size == 1)

Review Comment:
   this deserves some comments since otherwise it's a bit confusing why we need it.



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala:
##########
@@ -1276,4 +1279,266 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
       }
     }
   }
+
+  test("SPARK-44647: test join key is subset of cluster key " +
+      "with push values and partially-clustered") {
+    val table1 = "tab1e1"
+    val table2 = "table2"
+    val partition = Array(identity("id"), identity("data"))
+    createTable(table1, schema, partition)
+    sql(s"INSERT INTO testcat.ns.$table1 VALUES " +
+        "(1, 'aa', cast('2020-01-01' as timestamp)), " +
+        "(2, 'bb', cast('2020-01-01' as timestamp)), " +
+        "(2, 'cc', cast('2020-01-01' as timestamp)), " +
+        "(3, 'dd', cast('2020-01-01' as timestamp)), " +
+        "(3, 'dd', cast('2020-01-01' as timestamp)), " +
+        "(3, 'ee', cast('2020-01-01' as timestamp)), " +
+        "(3, 'ee', cast('2020-01-01' as timestamp))")
+
+    createTable(table2, schema, partition)
+    sql(s"INSERT INTO testcat.ns.$table2 VALUES " +
+        "(4, 'zz', cast('2020-01-01' as timestamp)), " +
+        "(4, 'zz', cast('2020-01-01' as timestamp)), " +
+        "(3, 'yy', cast('2020-01-01' as timestamp)), " +
+        "(3, 'yy', cast('2020-01-01' as timestamp)), " +
+        "(3, 'xx', cast('2020-01-01' as timestamp)), " +
+        "(3, 'xx', cast('2020-01-01' as timestamp)), " +
+        "(2, 'ww', cast('2020-01-01' as timestamp))")
+
+    Seq(true, false).foreach { pushDownValues =>
+      Seq(true, false).foreach { partiallyClustered =>
+        Seq(true, false).foreach { allowJoinKeysSubsetOfPartitionKeys =>
+
+          withSQLConf(
+            SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
+            SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString,
+            SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key ->
+                partiallyClustered.toString,
+            SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key ->
+                allowJoinKeysSubsetOfPartitionKeys.toString) {
+
+            val df = sql("SELECT t1.id AS id, t1.data AS t1data, t2.data AS t2data " +
+                s"FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2 " +
+                "ON t1.id = t2.id ORDER BY t1.id, t1data, t2data")
+
+            // Currently SPJ for case where join key not same as partition key
+            // only supported when push-part-values enabled

Review Comment:
   nit: the comment is out-dated



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -355,7 +355,14 @@ case class KeyGroupedPartitioning(
           } else {
             // We'll need to find leaf attributes from the partition expressions first.
             val attributes = expressions.flatMap(_.collectLeaves())
-            attributes.forall(x => requiredClustering.exists(_.semanticEquals(x)))
+
+            if (SQLConf.get.getConf(
+              SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS)) {
+              requiredClustering.forall(x => attributes.exists(_.semanticEquals(x))) &&
+                  expressions.forall(_.collectLeaves().size == 1)

Review Comment:
   this should be guaranteed currently - it might be better to have this invariant check somewhere else like when constructing a `KeyGroupedPartitioning`, but OK to leave it here for now



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -355,7 +355,14 @@ case class KeyGroupedPartitioning(
           } else {
             // We'll need to find leaf attributes from the partition expressions first.
             val attributes = expressions.flatMap(_.collectLeaves())
-            attributes.forall(x => requiredClustering.exists(_.semanticEquals(x)))
+
+            if (SQLConf.get.getConf(

Review Comment:
   nit: we can just use `SQLConf.get.v2BucketingAllowJoinKeysSubsetOfPartitionKeys` - it's shorter



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -1530,6 +1530,18 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS =
+    buildConf("spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled")
+      .doc("Whether to allow storage-partition join in the case where join keys are" +
+        "a subset of the partition keys of the source tables.  At planning time, " +
+        "Spark will group the partitions by only those keys that are in the join keys." +
+        "This is currently enabled only if spark.sql.sources.v2.bucketing.pushPartValues.enabled " +

Review Comment:
   nit: this is out-dated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] szehon-ho commented on a diff in pull request #42306: [SPARK-44647][SQL] Support SPJ where join keys are less than cluster keys

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #42306:
URL: https://github.com/apache/spark/pull/42306#discussion_r1319412999


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -674,7 +711,8 @@ case class HashShuffleSpec(
 
 case class KeyGroupedShuffleSpec(
     partitioning: KeyGroupedPartitioning,
-    distribution: ClusteredDistribution) extends ShuffleSpec {
+    distribution: ClusteredDistribution,
+    joinKeyPositions: Option[Seq[Int]] = None) extends ShuffleSpec {

Review Comment:
   Added comments, please check and suggest if it can be improved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] szehon-ho commented on a diff in pull request #42306: [SPARK-44647][SQL] Support SPJ where join keys are less than cluster keys

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #42306:
URL: https://github.com/apache/spark/pull/42306#discussion_r1304979623


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala:
##########
@@ -523,23 +546,25 @@ case class EnsureRequirements(
         joinType == LeftAnti || joinType == LeftOuter
   }
 
-  // Populate the common partition values down to the scan nodes
-  private def populatePartitionValues(
+  // Populate the storage partition join params down to the scan nodes
+  private def populateStoragePartitionJoinParams(
       plan: SparkPlan,
       values: Seq[(InternalRow, Int)],
+      partitionGroupByPositions: Option[Seq[Boolean]],
       applyPartialClustering: Boolean,
       replicatePartitions: Boolean): SparkPlan = plan match {
     case scan: BatchScanExec =>
       scan.copy(
         spjParams = scan.spjParams.copy(
           commonPartitionValues = Some(values),
+          partitionGroupByPositions = partitionGroupByPositions,
           applyPartialClustering = applyPartialClustering,
           replicatePartitions = replicatePartitions
         )
       )
     case node =>
-      node.mapChildren(child => populatePartitionValues(
-        child, values, applyPartialClustering, replicatePartitions))
+      node.mapChildren(child => populateStoragePartitionJoinParams(
+        child, values, partitionGroupByPositions, applyPartialClustering, replicatePartitions))

Review Comment:
   I will need some more guidance on this one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44647][SQL] Support SPJ where join keys are less than cluster keys [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #42306:
URL: https://github.com/apache/spark/pull/42306#issuecomment-1818006200

   Apache Spark has a back-porting policy which allows only bug fixes, @irsath . Given that this PR is an improvement, we are unable to touch the release branches like `branch-3.5` for this improvement.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org