You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/03/28 05:44:57 UTC

[spark] branch branch-3.3 updated: [SPARK-38623][SQL] Add more comments and tests for HashShuffleSpec

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new cc85b1e  [SPARK-38623][SQL] Add more comments and tests for HashShuffleSpec
cc85b1e is described below

commit cc85b1ee138eeb2ea9aca9c545d48ab2c5b49c1c
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Mon Mar 28 13:43:31 2022 +0800

    [SPARK-38623][SQL] Add more comments and tests for HashShuffleSpec
    
    ### What changes were proposed in this pull request?
    
    Add more comments and tests to explain the special handling of duplicated cluster keys.
    
    ### Why are the changes needed?
    
    improve code readability
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    existing tests.
    
    Closes #35937 from cloud-fan/join.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit c0cb5bce6623e98fa5161c1e3e866e730de87fa5)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../apache/spark/sql/catalyst/plans/physical/partitioning.scala   | 7 ++++++-
 .../scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala    | 8 ++++++++
 2 files changed, 14 insertions(+), 1 deletion(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index 78d153c..e4ff14b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -516,6 +516,11 @@ case class HashShuffleSpec(
    * A sequence where each element is a set of positions of the hash partition key to the cluster
    * keys. For instance, if cluster keys are [a, b, b] and hash partition keys are [a, b], the
    * result will be [(0), (1, 2)].
+   *
+   * This is useful to check compatibility between two `HashShuffleSpec`s. If the cluster keys are
+   * [a, b, b] and [x, y, z] for the two join children, and the hash partition keys are
+   * [a, b] and [x, z], they are compatible. With the positions, we can do the compatibility check
+   * by looking at if the positions of hash partition keys from two sides have overlapping.
    */
   lazy val hashKeyPositions: Seq[mutable.BitSet] = {
     val distKeyToPos = mutable.Map.empty[Expression, mutable.BitSet]
@@ -533,7 +538,7 @@ case class HashShuffleSpec(
       //  1. both distributions have the same number of clustering expressions
       //  2. both partitioning have the same number of partitions
       //  3. both partitioning have the same number of expressions
-      //  4. each pair of expression from both has overlapping positions in their
+      //  4. each pair of partitioning expression from both sides has overlapping positions in their
       //     corresponding distributions.
       distribution.clustering.length == otherDistribution.clustering.length &&
       partitioning.numPartitions == otherPartitioning.numPartitions &&
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala
index 74ec949..7e11d4f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala
@@ -92,6 +92,14 @@ class ShuffleSpecSuite extends SparkFunSuite with SQLHelper {
     )
 
     checkCompatible(
+      HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10),
+        ClusteredDistribution(Seq($"a", $"b", $"b"))),
+      HashShuffleSpec(HashPartitioning(Seq($"a", $"d"), 10),
+        ClusteredDistribution(Seq($"a", $"c", $"d"))),
+      expected = true
+    )
+
+    checkCompatible(
       HashShuffleSpec(HashPartitioning(Seq($"a", $"b", $"a"), 10),
         ClusteredDistribution(Seq($"a", $"b", $"b"))),
       HashShuffleSpec(HashPartitioning(Seq($"a", $"c", $"a"), 10),

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org