You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/02/20 02:49:00 UTC

[GitHub] [spark] HeartSaVioR commented on a change in pull request #35574: [SPARK-38237][SQL][SS] Allow `HashPartitioning` to satisfy `ClusteredDistribution` only with full clustering keys

HeartSaVioR commented on a change in pull request #35574:
URL: https://github.com/apache/spark/pull/35574#discussion_r810560773



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -524,10 +543,7 @@ case class HashShuffleSpec(
     // will add shuffles with the default partitioning of `ClusteredDistribution`, which uses all
     // the join keys.
     if (SQLConf.get.getConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION)) {
-      partitioning.expressions.length == distribution.clustering.length &&
-        partitioning.expressions.zip(distribution.clustering).forall {
-          case (l, r) => l.semanticEquals(r)
-        }
+      partitioning.isPartitionedOnFullKeys(distribution)

Review comment:
       Although it is beyond the scope of the PR, same thing applies here. Would we need to require strict order of keys?

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -261,8 +261,16 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
           expressions.length == h.expressions.length && expressions.zip(h.expressions).forall {
             case (l, r) => l.semanticEquals(r)
           }
-        case ClusteredDistribution(requiredClustering, _) =>
-          expressions.forall(x => requiredClustering.exists(_.semanticEquals(x)))
+        case c @ ClusteredDistribution(requiredClustering, _) =>
+          if (SQLConf.get.requireAllClusterKeysForHashPartition) {
+            // Checks `HashPartitioning` is partitioned on exactly full clustering keys of
+            // `ClusteredDistribution`. Opt in this feature with enabling
+            // "spark.sql.requireAllClusterKeysForHashPartition", can help avoid potential data
+            // skewness for some jobs.
+            isPartitionedOnFullKeys(c)

Review comment:
       If we end up with strict ordering, we could document the method doc on isPartitionedOnFullKeys that it is also requiring exact order, and replace the condition of StatefulOpClusteredDistribution with isPartitionedOnFullKeys. I'm wondering we would care about ordering for cases we described.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
##########
@@ -1453,6 +1455,57 @@ class DataFrameAggregateSuite extends QueryTest
     val df = Seq(1).toDF("id").groupBy(Stream($"id" + 1, $"id" + 2): _*).sum("id")
     checkAnswer(df, Row(2, 3, 1))
   }
+
+  test("SPARK-38237: require all cluster keys for child required distribution") {

Review comment:
       Thanks! Please feel free to adjust the test case or leverage the test case to deduce additional test cases.

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -271,6 +279,17 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
   override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec =
     HashShuffleSpec(this, distribution)
 
+  /**
+   * Checks if [[HashPartitioning]] is partitioned on exactly same full `clustering` keys of
+   * [[ClusteredDistribution]].
+   */
+  def isPartitionedOnFullKeys(distribution: ClusteredDistribution): Boolean = {
+    expressions.length == distribution.clustering.length &&

Review comment:
       The condition is more restrict than we explain in the config (e.g. is the ordering important here?), but I'm fine with this if we are all OK with this, as my proposal is technically the same.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org