You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/02/20 18:11:41 UTC

[GitHub] [spark] sunchao commented on a change in pull request #35574: [SPARK-38237][SQL][SS] Allow `HashPartitioning` to satisfy `ClusteredDistribution` only with full clustering keys

sunchao commented on a change in pull request #35574:
URL: https://github.com/apache/spark/pull/35574#discussion_r810663212



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -261,8 +261,16 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
           expressions.length == h.expressions.length && expressions.zip(h.expressions).forall {
             case (l, r) => l.semanticEquals(r)
           }
-        case ClusteredDistribution(requiredClustering, _) =>
-          expressions.forall(x => requiredClustering.exists(_.semanticEquals(x)))
+        case c @ ClusteredDistribution(requiredClustering, _) =>
+          if (SQLConf.get.requireAllClusterKeysForHashPartition) {

Review comment:
       I'm also inclined to option 1) for now, and agree to the points that @c21 raised above. 
   
   As a Spark developer, I was originally confused when seeing both `HashClusteredDistribution` and `ClusteredDistribution` and had to navigate the code base and reason about their behavior differences. Combined with the newly introduced config, a developer now has to remember parsing the value of the config and choose `HashClusteredDistribution` or `ClusteredDistribution` accordingly, which is some extra burden. In addition, it's better to have a separate `StatefulOpClusteredDistribution` dedicated to SS use cases, as it makes them more distinctive.
   
   Of course, having a separate `HashClusteredDistribution` opens up more opportunities for it to evolve separately. But I'd suggest to only consider that when we have some concrete ideas. So far, I don't see what can't be done with `ClusteredDistribution` alone.

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -271,6 +279,17 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
   override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec =
     HashShuffleSpec(this, distribution)
 
+  /**
+   * Checks if [[HashPartitioning]] is partitioned on exactly same full `clustering` keys of
+   * [[ClusteredDistribution]].
+   */
+  def isPartitionedOnFullKeys(distribution: ClusteredDistribution): Boolean = {
+    expressions.length == distribution.clustering.length &&

Review comment:
       I'm not sure if ordering is important here: is it a common case that data skewness is introduced after changing the order the hash keys? I'm not sure if murmur3 hash exhibits this kind of property.
   
   This also makes the optimization harder to kick in (imagine users have to carefully align join or aggregation keys to the same order as in bucket keys in the table). It is also a behavior change of bucket join, since currently Spark reorders the hash keys w.r.t join keys in `EnsureRequirements.reorderJoinPredicates`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org