You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/09/07 02:36:36 UTC

[GitHub] [spark] imback82 opened a new pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

imback82 opened a new pull request #29655:
URL: https://github.com/apache/spark/pull/29655


   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   This PR proposes to optimize SortMergeJoin (SMJ) if each of its children has hash output partitioning which "partially" satisfies the required distribution. In this case where the child's output partitioning expressions are a subset of required distribution expressions (join keys expressions), the shuffle can be removed because rows will be sorted by join keys before rows are joined (the required child ordering for SMJ is on join keys).
   
   This PR introduces `OptimizeSortMergeJoinWithPartialHashDistribution ` which removes shuffle for the sort merge join if the following conditions are met:
    - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions as the other side of join.
    - The child of ShuffleExchangeExec has output partitioning which has the subset of join keys on the respective join side.
   
   This rule can be turned on by setting `spark.sql.execution.sortMergeJoin.optimizePartialHashDistribution.enabled` to `true` (`false` by default).
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   To remove unnecessary shuffles in certain scenarios.
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   Suppose the following case where `t1` is bucketed by `i1`, and `t2` by `i2`:
   ```scala
   val df1 = (0 until 100).map(i => (i % 5, i % 13, i.toString)).toDF("i1", "j1", "k1")
   val df2 = (0 until 100).map(i => (i % 3, i % 17, i.toString)).toDF("i2", "j2", "k2")
   df1.write.format("parquet").bucketBy(8, "i1").saveAsTable("t1")
   df2.write.format("parquet").bucketBy(8, "i2").saveAsTable("t2")
   val t1 = spark.table("t1")
   val t2 = spark.table("t2")
   ```
   Now if you join two tables by `t1("i1") === t2("i2") && t1("j1") === t2("j2")`
   Before this change:
   ```scala
   scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "0")
   scala> t1.join(t2, t1("i1") === t2("i2") && t1("j1") === t2("j2")).explain
   == Physical Plan ==
   *(5) SortMergeJoin [i1#161, j1#162], [i2#167, j2#168], Inner
   :- *(2) Sort [i1#161 ASC NULLS FIRST, j1#162 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(i1#161, j1#162, 200), true, [id=#196]
   :     +- *(1) Filter (isnotnull(i1#161) AND isnotnull(j1#162))
   :        +- *(1) ColumnarToRow
   :           +- FileScan parquet default.t1[i1#161,j1#162,k1#163] Batched: true, DataFilters: [isnotnull(i1#161), isnotnull(j1#162)], Format: Parquet, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [IsNotNull(i1), IsNotNull(j1)], ReadSchema: struct<i1:int,j1:int,k1:string>, SelectedBucketsCount: 8 out of 8
   +- *(4) Sort [i2#167 ASC NULLS FIRST, j2#168 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(i2#167, j2#168, 200), true, [id=#205]
         +- *(3) Filter (isnotnull(i2#167) AND isnotnull(j2#168))
            +- *(3) ColumnarToRow
               +- FileScan parquet default.t2[i2#167,j2#168,k2#169] Batched: true, DataFilters: [isnotnull(i2#167), isnotnull(j2#168)], Format: Parquet, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [IsNotNull(i2), IsNotNull(j2)], ReadSchema: struct<i2:int,j2:int,k2:string>, SelectedBucketsCount: 8 out of 8
   ```
   
   After the PR:
   ```scala
   scala> spark.conf.set("spark.sql.execution.sortMergeJoin.optimizePartialHashDistribution.enabled", "true")
   scala> t1.join(t2, t1("i1") === t2("i2") && t1("j1") === t2("j2")).explain
   == Physical Plan ==
   *(3) SortMergeJoin [i1#161, j1#162], [i2#167, j2#168], Inner
   :- *(1) Sort [i1#161 ASC NULLS FIRST, j1#162 ASC NULLS FIRST], false, 0
   :  +- *(1) Filter (isnotnull(i1#161) AND isnotnull(j1#162))
   :     +- *(1) ColumnarToRow
   :        +- FileScan parquet default.t1[i1#161,j1#162,k1#163] Batched: true, DataFilters: [isnotnull(i1#161), isnotnull(j1#162)], Format: Parquet, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [IsNotNull(i1), IsNotNull(j1)], ReadSchema: struct<i1:int,j1:int,k1:string>, SelectedBucketsCount: 8 out of 8
   +- *(2) Sort [i2#167 ASC NULLS FIRST, j2#168 ASC NULLS FIRST], false, 0
      +- *(2) Filter (isnotnull(i2#167) AND isnotnull(j2#168))
         +- *(2) ColumnarToRow
            +- FileScan parquet default.t2[i2#167,j2#168,k2#169] Batched: true, DataFilters: [isnotnull(i2#167), isnotnull(j2#168)], Format: Parquet, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [IsNotNull(i2), IsNotNull(j2)], ReadSchema: struct<i2:int,j2:int,k2:string>, SelectedBucketsCount: 8 out of 8
   ```
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   -->
   Added tests.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-691261843






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-687991585






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] imback82 edited a comment on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
imback82 edited a comment on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-691522827


   Thanks @c21 for your feedback! If #19054 is a better approach, we should proceed with it; this PR seems to be a little less intrusive approach.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
maropu commented on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-691102549






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-691431898






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #29655:
URL: https://github.com/apache/spark/pull/29655#discussion_r487378141



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),
+        rSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            rChild,
+            rChildOutputPartitioning: HashPartitioning),
+          _),
+        false) if isPartialHashDistribution(
+          s.leftKeys, lChildOutputPartitioning, s.rightKeys, rChildOutputPartitioning) =>
+        // Remove ShuffleExchangeExec.
+        s.copy(left = lSort.copy(child = lChild), right = rSort.copy(child = rChild))
+      case other => other
+    }
+  }
+
+  /*
+   * Returns true if both HashPartitioning have the same number of partitions and
+   * their partitioning expressions are a subset of their respective join keys.
+   */
+  private def isPartialHashDistribution(
+      leftKeys: Seq[Expression],
+      leftPartitioning: HashPartitioning,
+      rightKeys: Seq[Expression],
+      rightPartitioning: HashPartitioning): Boolean = {
+    val mapping = leftKeyToRightKeyMapping(leftKeys, rightKeys)
+    (leftPartitioning.numPartitions == rightPartitioning.numPartitions) &&
+      leftPartitioning.expressions.zip(rightPartitioning.expressions)
+        .forall {
+          case (le, re) => mapping.get(le.canonicalized)
+            .map(_.exists(_.semanticEquals(re)))
+            .getOrElse(false)
+        }

Review comment:
       sorry if I miss anything, but I feel this might not be correct. We should make sure the `leftPartitioning.expressions` and `rightPartitioning.expressions` has same size, and the order of expressions matters, right?
   
   `expressions` size is different, so we should not remove shuffle:
   ```
   t1 has 1024 buckets on column (a)
   t2 has 1024 buckets on columns (a, b)
   
   SELECT *
   FROM t1
   JOIN t2
   ON t1.a = t2.a AND t1.b = t2.b
   ```
   
   `expressions` size is same, but order is wrong, so we should not remove shuffle:
   
   ```
   t1 has 1024 buckets on column (a, b)
   t2 has 1024 buckets on columns (b, a)
   
   SELECT *
   FROM t1
   JOIN t2
   ON t1.a = t2.a AND AND t1.a = t2.b AND t1.b = t2.a AND t1.b = t2.b
   ```

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),

Review comment:
       nit: why we can't just pattern matching `ShuffleExchangeExec(_, leftChild, _)` here? It seems to be looking simpler to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] imback82 commented on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
imback82 commented on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-691222048






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-691431898


   cc @cloud-fan I think we would like to get your opinions here, as you reviewed https://github.com/apache/spark/pull/19054 in the past and have context on this. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] imback82 commented on a change in pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
imback82 commented on a change in pull request #29655:
URL: https://github.com/apache/spark/pull/29655#discussion_r487594228



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),
+        rSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            rChild,
+            rChildOutputPartitioning: HashPartitioning),
+          _),
+        false) if isPartialHashDistribution(
+          s.leftKeys, lChildOutputPartitioning, s.rightKeys, rChildOutputPartitioning) =>
+        // Remove ShuffleExchangeExec.
+        s.copy(left = lSort.copy(child = lChild), right = rSort.copy(child = rChild))
+      case other => other
+    }
+  }
+
+  /*
+   * Returns true if both HashPartitioning have the same number of partitions and
+   * their partitioning expressions are a subset of their respective join keys.
+   */
+  private def isPartialHashDistribution(
+      leftKeys: Seq[Expression],
+      leftPartitioning: HashPartitioning,
+      rightKeys: Seq[Expression],
+      rightPartitioning: HashPartitioning): Boolean = {
+    val mapping = leftKeyToRightKeyMapping(leftKeys, rightKeys)
+    (leftPartitioning.numPartitions == rightPartitioning.numPartitions) &&
+      leftPartitioning.expressions.zip(rightPartitioning.expressions)
+        .forall {
+          case (le, re) => mapping.get(le.canonicalized)
+            .map(_.exists(_.semanticEquals(re)))
+            .getOrElse(false)
+        }

Review comment:
       > If the number of buckets for `t1` is less than number of shuffle partitions, shouldn't it shuffle both sides ? (in [`EnsureRequirements`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L96)). So the rule kicks in here and removes both shuffles, but we shouldn't remove any shuffle here.
   
   You are right. Thanks for the catch!
   
   > I think it's unsafe if we do not shuffle both sides. `HashPartitioning(Seq(a, b))` and `HashPartitioning(Seq(b, a))` are not same thing, e.g. for tuple (a: 1, b: 2) it will be assigned to different buckets given current `Murmur3Hash` implementation.
   
   Yes, I understand they produce different hash values. However, it has the join condition `t1.a = t2.b AND t1.b = t2.a`. This rule will not be applied if the condition was `t1.a = t2.a AND t1.b = t2.b`. Please let me know if I missed something. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] andyvanyperenAM commented on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
andyvanyperenAM commented on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-690980677






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] andyvanyperenAM commented on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
andyvanyperenAM commented on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-690980677






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #29655:
URL: https://github.com/apache/spark/pull/29655#discussion_r487378141



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),
+        rSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            rChild,
+            rChildOutputPartitioning: HashPartitioning),
+          _),
+        false) if isPartialHashDistribution(
+          s.leftKeys, lChildOutputPartitioning, s.rightKeys, rChildOutputPartitioning) =>
+        // Remove ShuffleExchangeExec.
+        s.copy(left = lSort.copy(child = lChild), right = rSort.copy(child = rChild))
+      case other => other
+    }
+  }
+
+  /*
+   * Returns true if both HashPartitioning have the same number of partitions and
+   * their partitioning expressions are a subset of their respective join keys.
+   */
+  private def isPartialHashDistribution(
+      leftKeys: Seq[Expression],
+      leftPartitioning: HashPartitioning,
+      rightKeys: Seq[Expression],
+      rightPartitioning: HashPartitioning): Boolean = {
+    val mapping = leftKeyToRightKeyMapping(leftKeys, rightKeys)
+    (leftPartitioning.numPartitions == rightPartitioning.numPartitions) &&
+      leftPartitioning.expressions.zip(rightPartitioning.expressions)
+        .forall {
+          case (le, re) => mapping.get(le.canonicalized)
+            .map(_.exists(_.semanticEquals(re)))
+            .getOrElse(false)
+        }

Review comment:
       sorry if I miss anything, but I feel this might not be correct. We should make sure the `leftPartitioning.expressions` and `rightPartitioning.expressions` has same size, and the order of expressions matters, right?
   
   `expressions` size is different, so we should not remove shuffle:
   ```
   t1 has 1024 buckets on column (a)
   t2 has 1024 buckets on columns (a, b)
   
   SELECT *
   FROM t1
   JOIN t2
   ON t1.a = t2.a AND t1.b = t2.b
   ```
   
   `expressions` size is same, but order is wrong, so we should not remove shuffle:
   
   ```
   t1 has 1024 buckets on column (a, b)
   t2 has 1024 buckets on columns (b, a)
   
   SELECT *
   FROM t1
   JOIN t2
   ON t1.a = t2.a AND AND t1.a = t2.b AND t1.b = t2.a AND t1.b = t2.b
   ```

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),

Review comment:
       nit: why we can't just pattern matching `ShuffleExchangeExec(_, leftChild, _)` here? It seems to be looking simpler to me.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),
+        rSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            rChild,
+            rChildOutputPartitioning: HashPartitioning),
+          _),
+        false) if isPartialHashDistribution(
+          s.leftKeys, lChildOutputPartitioning, s.rightKeys, rChildOutputPartitioning) =>
+        // Remove ShuffleExchangeExec.
+        s.copy(left = lSort.copy(child = lChild), right = rSort.copy(child = rChild))
+      case other => other
+    }
+  }
+
+  /*
+   * Returns true if both HashPartitioning have the same number of partitions and
+   * their partitioning expressions are a subset of their respective join keys.
+   */
+  private def isPartialHashDistribution(
+      leftKeys: Seq[Expression],
+      leftPartitioning: HashPartitioning,
+      rightKeys: Seq[Expression],
+      rightPartitioning: HashPartitioning): Boolean = {
+    val mapping = leftKeyToRightKeyMapping(leftKeys, rightKeys)
+    (leftPartitioning.numPartitions == rightPartitioning.numPartitions) &&
+      leftPartitioning.expressions.zip(rightPartitioning.expressions)
+        .forall {
+          case (le, re) => mapping.get(le.canonicalized)
+            .map(_.exists(_.semanticEquals(re)))
+            .getOrElse(false)
+        }

Review comment:
       sorry if I miss anything, but I feel this might not be correct. We should make sure the `leftPartitioning.expressions` and `rightPartitioning.expressions` has same size, and the order of expressions matters, right?
   
   `expressions` size is different, so we should not remove shuffle:
   ```
   t1 has 1024 buckets on column (a)
   t2 has 1024 buckets on columns (a, b)
   
   SELECT *
   FROM t1
   JOIN t2
   ON t1.a = t2.a AND t1.b = t2.b
   ```
   
   `expressions` size is same, but order is wrong, so we should not remove shuffle:
   
   ```
   t1 has 1024 buckets on column (a, b)
   t2 has 1024 buckets on columns (b, a)
   
   SELECT *
   FROM t1
   JOIN t2
   ON t1.a = t2.a AND AND t1.a = t2.b AND t1.b = t2.a AND t1.b = t2.b
   ```

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),

Review comment:
       nit: why we can't just pattern matching `ShuffleExchangeExec(_, leftChild, _)` here? It seems to be looking simpler to me.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),
+        rSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            rChild,
+            rChildOutputPartitioning: HashPartitioning),
+          _),
+        false) if isPartialHashDistribution(
+          s.leftKeys, lChildOutputPartitioning, s.rightKeys, rChildOutputPartitioning) =>
+        // Remove ShuffleExchangeExec.
+        s.copy(left = lSort.copy(child = lChild), right = rSort.copy(child = rChild))
+      case other => other
+    }
+  }
+
+  /*
+   * Returns true if both HashPartitioning have the same number of partitions and
+   * their partitioning expressions are a subset of their respective join keys.
+   */
+  private def isPartialHashDistribution(
+      leftKeys: Seq[Expression],
+      leftPartitioning: HashPartitioning,
+      rightKeys: Seq[Expression],
+      rightPartitioning: HashPartitioning): Boolean = {
+    val mapping = leftKeyToRightKeyMapping(leftKeys, rightKeys)
+    (leftPartitioning.numPartitions == rightPartitioning.numPartitions) &&
+      leftPartitioning.expressions.zip(rightPartitioning.expressions)
+        .forall {
+          case (le, re) => mapping.get(le.canonicalized)
+            .map(_.exists(_.semanticEquals(re)))
+            .getOrElse(false)
+        }

Review comment:
       sorry if I miss anything, but I feel this might not be correct. We should make sure the `leftPartitioning.expressions` and `rightPartitioning.expressions` has same size, and the order of expressions matters, right?
   
   `expressions` size is different, so we should not remove shuffle:
   ```
   t1 has 1024 buckets on column (a)
   t2 has 1024 buckets on columns (a, b)
   
   SELECT *
   FROM t1
   JOIN t2
   ON t1.a = t2.a AND t1.b = t2.b
   ```
   
   `expressions` size is same, but order is wrong, so we should not remove shuffle:
   
   ```
   t1 has 1024 buckets on column (a, b)
   t2 has 1024 buckets on columns (b, a)
   
   SELECT *
   FROM t1
   JOIN t2
   ON t1.a = t2.a AND AND t1.a = t2.b AND t1.b = t2.a AND t1.b = t2.b
   ```

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),

Review comment:
       nit: why we can't just pattern matching `ShuffleExchangeExec(_, leftChild, _)` here? It seems to be looking simpler to me.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),
+        rSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            rChild,
+            rChildOutputPartitioning: HashPartitioning),
+          _),
+        false) if isPartialHashDistribution(
+          s.leftKeys, lChildOutputPartitioning, s.rightKeys, rChildOutputPartitioning) =>
+        // Remove ShuffleExchangeExec.
+        s.copy(left = lSort.copy(child = lChild), right = rSort.copy(child = rChild))
+      case other => other
+    }
+  }
+
+  /*
+   * Returns true if both HashPartitioning have the same number of partitions and
+   * their partitioning expressions are a subset of their respective join keys.
+   */
+  private def isPartialHashDistribution(
+      leftKeys: Seq[Expression],
+      leftPartitioning: HashPartitioning,
+      rightKeys: Seq[Expression],
+      rightPartitioning: HashPartitioning): Boolean = {
+    val mapping = leftKeyToRightKeyMapping(leftKeys, rightKeys)
+    (leftPartitioning.numPartitions == rightPartitioning.numPartitions) &&
+      leftPartitioning.expressions.zip(rightPartitioning.expressions)
+        .forall {
+          case (le, re) => mapping.get(le.canonicalized)
+            .map(_.exists(_.semanticEquals(re)))
+            .getOrElse(false)
+        }

Review comment:
       sorry if I miss anything, but I feel this might not be correct. We should make sure the `leftPartitioning.expressions` and `rightPartitioning.expressions` has same size, and the order of expressions matters, right?
   
   `expressions` size is different, so we should not remove shuffle:
   ```
   t1 has 1024 buckets on column (a)
   t2 has 1024 buckets on columns (a, b)
   
   SELECT *
   FROM t1
   JOIN t2
   ON t1.a = t2.a AND t1.b = t2.b
   ```
   
   `expressions` size is same, but order is wrong, so we should not remove shuffle:
   
   ```
   t1 has 1024 buckets on column (a, b)
   t2 has 1024 buckets on columns (b, a)
   
   SELECT *
   FROM t1
   JOIN t2
   ON t1.a = t2.a AND AND t1.a = t2.b AND t1.b = t2.a AND t1.b = t2.b
   ```

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),

Review comment:
       nit: why we can't just pattern matching `ShuffleExchangeExec(_, leftChild, _)` here? It seems to be looking simpler to me.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),
+        rSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            rChild,
+            rChildOutputPartitioning: HashPartitioning),
+          _),
+        false) if isPartialHashDistribution(
+          s.leftKeys, lChildOutputPartitioning, s.rightKeys, rChildOutputPartitioning) =>
+        // Remove ShuffleExchangeExec.
+        s.copy(left = lSort.copy(child = lChild), right = rSort.copy(child = rChild))
+      case other => other
+    }
+  }
+
+  /*
+   * Returns true if both HashPartitioning have the same number of partitions and
+   * their partitioning expressions are a subset of their respective join keys.
+   */
+  private def isPartialHashDistribution(
+      leftKeys: Seq[Expression],
+      leftPartitioning: HashPartitioning,
+      rightKeys: Seq[Expression],
+      rightPartitioning: HashPartitioning): Boolean = {
+    val mapping = leftKeyToRightKeyMapping(leftKeys, rightKeys)
+    (leftPartitioning.numPartitions == rightPartitioning.numPartitions) &&
+      leftPartitioning.expressions.zip(rightPartitioning.expressions)
+        .forall {
+          case (le, re) => mapping.get(le.canonicalized)
+            .map(_.exists(_.semanticEquals(re)))
+            .getOrElse(false)
+        }

Review comment:
       sorry if I miss anything, but I feel this might not be correct. We should make sure the `leftPartitioning.expressions` and `rightPartitioning.expressions` has same size, and the order of expressions matters, right?
   
   `expressions` size is different, so we should not remove shuffle:
   ```
   t1 has 1024 buckets on column (a)
   t2 has 1024 buckets on columns (a, b)
   
   SELECT *
   FROM t1
   JOIN t2
   ON t1.a = t2.a AND t1.b = t2.b
   ```
   
   `expressions` size is same, but order is wrong, so we should not remove shuffle:
   
   ```
   t1 has 1024 buckets on column (a, b)
   t2 has 1024 buckets on columns (b, a)
   
   SELECT *
   FROM t1
   JOIN t2
   ON t1.a = t2.a AND AND t1.a = t2.b AND t1.b = t2.a AND t1.b = t2.b
   ```

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),

Review comment:
       nit: why we can't just pattern matching `ShuffleExchangeExec(_, leftChild, _)` here? It seems to be looking simpler to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] andyvanyperenAM edited a comment on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
andyvanyperenAM edited a comment on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-690980677






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on a change in pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
maropu commented on a change in pull request #29655:
URL: https://github.com/apache/spark/pull/29655#discussion_r484572785



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {

Review comment:
       Also, could you add fine-grained tests for this rule?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
maropu commented on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-691102549






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-687991152


   **[Test build #128330 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/128330/testReport)** for PR 29655 at commit [`1b5c4e9`](https://github.com/apache/spark/commit/1b5c4e963f0562c38f52f55703efad2c9056a12f).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] imback82 commented on a change in pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
imback82 commented on a change in pull request #29655:
URL: https://github.com/apache/spark/pull/29655#discussion_r484636101



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {

Review comment:
       To do this inside `EnsureRequirements.ensureDistributionAndOrdering`, it would require a new `Partitioning` and `Distribution` that know both sides of join, so I didn't go that route. Doing this outside would be less intrusive, I thought. But please let me know if doing this inside `EnsureRequirements` makes more sense. Thanks.
   
   This is done after `EnsureRequirements` since reordering keys may eliminate shuffles in which case this rule is not applied.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] imback82 commented on a change in pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
imback82 commented on a change in pull request #29655:
URL: https://github.com/apache/spark/pull/29655#discussion_r487594228



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),
+        rSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            rChild,
+            rChildOutputPartitioning: HashPartitioning),
+          _),
+        false) if isPartialHashDistribution(
+          s.leftKeys, lChildOutputPartitioning, s.rightKeys, rChildOutputPartitioning) =>
+        // Remove ShuffleExchangeExec.
+        s.copy(left = lSort.copy(child = lChild), right = rSort.copy(child = rChild))
+      case other => other
+    }
+  }
+
+  /*
+   * Returns true if both HashPartitioning have the same number of partitions and
+   * their partitioning expressions are a subset of their respective join keys.
+   */
+  private def isPartialHashDistribution(
+      leftKeys: Seq[Expression],
+      leftPartitioning: HashPartitioning,
+      rightKeys: Seq[Expression],
+      rightPartitioning: HashPartitioning): Boolean = {
+    val mapping = leftKeyToRightKeyMapping(leftKeys, rightKeys)
+    (leftPartitioning.numPartitions == rightPartitioning.numPartitions) &&
+      leftPartitioning.expressions.zip(rightPartitioning.expressions)
+        .forall {
+          case (le, re) => mapping.get(le.canonicalized)
+            .map(_.exists(_.semanticEquals(re)))
+            .getOrElse(false)
+        }

Review comment:
       > If the number of buckets for `t1` is less than number of shuffle partitions, shouldn't it shuffle both sides ? (in [`EnsureRequirements`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L96)). So the rule kicks in here and removes both shuffles, but we shouldn't remove any shuffle here.
   
   You are right. Thanks for the catch!
   
   > I think it's unsafe if we do not shuffle both sides. `HashPartitioning(Seq(a, b))` and `HashPartitioning(Seq(b, a))` are not same thing, e.g. for tuple (a: 1, b: 2) it will be assigned to different buckets given current `Murmur3Hash` implementation.
   
   Yes, I understand they produce different hash values. However, it has the join condition `t1.a = t2.b AND t1.b = t2.a`. On the other hand, this rule will not be applied if the condition was `t1.a = t2.a AND t1.b = t2.b`. Please let me know if I missed something. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] andyvanyperenAM commented on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
andyvanyperenAM commented on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-749039290


   > Thanks @imback82 for working on this, but I think #19054 seems to be a better approach for me (i.e. add `leftDistributionKeys` and `rightDistributionKeys` in `SortMergeJoinExec`/`ShuffledHashJoinExec`, and avoid shuffle by adding logic in `EnsureRequirements.reorderJoinPredicates`). @tejasapatil and I are in the same team so just bringing more context on this: we added #19054 in our internal fork and don't see much OOM issues. If #19054 is better approach in other people's opinions as well, I can redo that PR to latest master for review.
   > 
   > Adding the rule after `ensureRequirements` seems to add more burden on future development. We need to think about it every time during development as there's a new rule after `ensureRequirements` can remove shuffle.
   
   Hi @c21 
   I was wondering what the status of this issue is? I still see it as closed on the github page and  on the jira page.
   What is the best way to get this issue resolved? Is there anything I can do as a non-expert to help the process? 
   You mentioned you added #19054 to your internal fork without OOM issues, did some other pop up or is it good to go? Is there a way to get it in the main branch to be able to use it as a simple end-user? 
   
   Thanks for the update and sorry if this is the incorrect place to give this a bump. 
   
   kind regards,
   Andy


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #29655:
URL: https://github.com/apache/spark/pull/29655#discussion_r487378141



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),
+        rSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            rChild,
+            rChildOutputPartitioning: HashPartitioning),
+          _),
+        false) if isPartialHashDistribution(
+          s.leftKeys, lChildOutputPartitioning, s.rightKeys, rChildOutputPartitioning) =>
+        // Remove ShuffleExchangeExec.
+        s.copy(left = lSort.copy(child = lChild), right = rSort.copy(child = rChild))
+      case other => other
+    }
+  }
+
+  /*
+   * Returns true if both HashPartitioning have the same number of partitions and
+   * their partitioning expressions are a subset of their respective join keys.
+   */
+  private def isPartialHashDistribution(
+      leftKeys: Seq[Expression],
+      leftPartitioning: HashPartitioning,
+      rightKeys: Seq[Expression],
+      rightPartitioning: HashPartitioning): Boolean = {
+    val mapping = leftKeyToRightKeyMapping(leftKeys, rightKeys)
+    (leftPartitioning.numPartitions == rightPartitioning.numPartitions) &&
+      leftPartitioning.expressions.zip(rightPartitioning.expressions)
+        .forall {
+          case (le, re) => mapping.get(le.canonicalized)
+            .map(_.exists(_.semanticEquals(re)))
+            .getOrElse(false)
+        }

Review comment:
       sorry if I miss anything, but I feel this might not be correct. We should make sure the `leftPartitioning.expressions` and `rightPartitioning.expressions` has same size, and the order of expressions matters, right?
   
   `expressions` size is different, so we should not remove shuffle:
   ```
   t1 has 1024 buckets on column (a)
   t2 has 1024 buckets on columns (a, b)
   
   SELECT *
   FROM t1
   JOIN t2
   ON t1.a = t2.a AND t1.b = t2.b
   ```
   
   `expressions` size is same, but order is wrong, so we should not remove shuffle:
   
   ```
   t1 has 1024 buckets on column (a, b)
   t2 has 1024 buckets on columns (b, a)
   
   SELECT *
   FROM t1
   JOIN t2
   ON t1.a = t2.a AND AND t1.a = t2.b AND t1.b = t2.a AND t1.b = t2.b
   ```

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),

Review comment:
       nit: why we can't just pattern matching `ShuffleExchangeExec(_, leftChild, _)` here? It seems to be looking simpler to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] imback82 commented on a change in pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
imback82 commented on a change in pull request #29655:
URL: https://github.com/apache/spark/pull/29655#discussion_r487431129



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),
+        rSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            rChild,
+            rChildOutputPartitioning: HashPartitioning),
+          _),
+        false) if isPartialHashDistribution(
+          s.leftKeys, lChildOutputPartitioning, s.rightKeys, rChildOutputPartitioning) =>
+        // Remove ShuffleExchangeExec.
+        s.copy(left = lSort.copy(child = lChild), right = rSort.copy(child = rChild))
+      case other => other
+    }
+  }
+
+  /*
+   * Returns true if both HashPartitioning have the same number of partitions and
+   * their partitioning expressions are a subset of their respective join keys.
+   */
+  private def isPartialHashDistribution(
+      leftKeys: Seq[Expression],
+      leftPartitioning: HashPartitioning,
+      rightKeys: Seq[Expression],
+      rightPartitioning: HashPartitioning): Boolean = {
+    val mapping = leftKeyToRightKeyMapping(leftKeys, rightKeys)
+    (leftPartitioning.numPartitions == rightPartitioning.numPartitions) &&
+      leftPartitioning.expressions.zip(rightPartitioning.expressions)
+        .forall {
+          case (le, re) => mapping.get(le.canonicalized)
+            .map(_.exists(_.semanticEquals(re)))
+            .getOrElse(false)
+        }

Review comment:
       Thanks. I agree with your concerns for both cases. But, for the first example, only one side will be shuffled, so the rule should not kick in. For the second example,  we have `t1.a = t2.b AND t1.b = t2.a` which matches the bucket ordering, so this should be also fine.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-691261843






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] imback82 commented on a change in pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
imback82 commented on a change in pull request #29655:
URL: https://github.com/apache/spark/pull/29655#discussion_r487431129



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),
+        rSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            rChild,
+            rChildOutputPartitioning: HashPartitioning),
+          _),
+        false) if isPartialHashDistribution(
+          s.leftKeys, lChildOutputPartitioning, s.rightKeys, rChildOutputPartitioning) =>
+        // Remove ShuffleExchangeExec.
+        s.copy(left = lSort.copy(child = lChild), right = rSort.copy(child = rChild))
+      case other => other
+    }
+  }
+
+  /*
+   * Returns true if both HashPartitioning have the same number of partitions and
+   * their partitioning expressions are a subset of their respective join keys.
+   */
+  private def isPartialHashDistribution(
+      leftKeys: Seq[Expression],
+      leftPartitioning: HashPartitioning,
+      rightKeys: Seq[Expression],
+      rightPartitioning: HashPartitioning): Boolean = {
+    val mapping = leftKeyToRightKeyMapping(leftKeys, rightKeys)
+    (leftPartitioning.numPartitions == rightPartitioning.numPartitions) &&
+      leftPartitioning.expressions.zip(rightPartitioning.expressions)
+        .forall {
+          case (le, re) => mapping.get(le.canonicalized)
+            .map(_.exists(_.semanticEquals(re)))
+            .getOrElse(false)
+        }

Review comment:
       Thanks. I agree with your concerns for both cases. But, for the first example, only one side will be shuffled, so the rule should not kick in. For the second example,  we have `t1.a = t2.b AND t1.b = t2.a` which matches the bucket ordering, so this should be also fine.
   

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),
+        rSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            rChild,
+            rChildOutputPartitioning: HashPartitioning),
+          _),
+        false) if isPartialHashDistribution(
+          s.leftKeys, lChildOutputPartitioning, s.rightKeys, rChildOutputPartitioning) =>
+        // Remove ShuffleExchangeExec.
+        s.copy(left = lSort.copy(child = lChild), right = rSort.copy(child = rChild))
+      case other => other
+    }
+  }
+
+  /*
+   * Returns true if both HashPartitioning have the same number of partitions and
+   * their partitioning expressions are a subset of their respective join keys.
+   */
+  private def isPartialHashDistribution(
+      leftKeys: Seq[Expression],
+      leftPartitioning: HashPartitioning,
+      rightKeys: Seq[Expression],
+      rightPartitioning: HashPartitioning): Boolean = {
+    val mapping = leftKeyToRightKeyMapping(leftKeys, rightKeys)
+    (leftPartitioning.numPartitions == rightPartitioning.numPartitions) &&
+      leftPartitioning.expressions.zip(rightPartitioning.expressions)
+        .forall {
+          case (le, re) => mapping.get(le.canonicalized)
+            .map(_.exists(_.semanticEquals(re)))
+            .getOrElse(false)
+        }

Review comment:
       Thanks. I agree with your concerns for both cases. But, for the first example, only one side will be shuffled, so the rule should not kick in. For the second example,  we have `t1.a = t2.b AND t1.b = t2.a` which matches the bucket ordering, so this should be also fine.
   

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),
+        rSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            rChild,
+            rChildOutputPartitioning: HashPartitioning),
+          _),
+        false) if isPartialHashDistribution(
+          s.leftKeys, lChildOutputPartitioning, s.rightKeys, rChildOutputPartitioning) =>
+        // Remove ShuffleExchangeExec.
+        s.copy(left = lSort.copy(child = lChild), right = rSort.copy(child = rChild))
+      case other => other
+    }
+  }
+
+  /*
+   * Returns true if both HashPartitioning have the same number of partitions and
+   * their partitioning expressions are a subset of their respective join keys.
+   */
+  private def isPartialHashDistribution(
+      leftKeys: Seq[Expression],
+      leftPartitioning: HashPartitioning,
+      rightKeys: Seq[Expression],
+      rightPartitioning: HashPartitioning): Boolean = {
+    val mapping = leftKeyToRightKeyMapping(leftKeys, rightKeys)
+    (leftPartitioning.numPartitions == rightPartitioning.numPartitions) &&
+      leftPartitioning.expressions.zip(rightPartitioning.expressions)
+        .forall {
+          case (le, re) => mapping.get(le.canonicalized)
+            .map(_.exists(_.semanticEquals(re)))
+            .getOrElse(false)
+        }

Review comment:
       Thanks. I agree with your concerns for both cases. But, for the first example, only one side will be shuffled, so the rule should not kick in. For the second example,  we have `t1.a = t2.b AND t1.b = t2.a` which matches the bucket ordering, so this should be also fine.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] commented on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-811550594


   We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
maropu commented on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-691102549


   I didn't notice the discussion and thanks for the sharing, @andyvanyperenAM. I've checked it and, yea, they look the same. Could you check the discussion in SPARK-18067/#19054, @imback82 ? I think we need to address all the issues described there, e.g., data skew. cc: @hvanhovell 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] imback82 commented on a change in pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
imback82 commented on a change in pull request #29655:
URL: https://github.com/apache/spark/pull/29655#discussion_r487431129



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),
+        rSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            rChild,
+            rChildOutputPartitioning: HashPartitioning),
+          _),
+        false) if isPartialHashDistribution(
+          s.leftKeys, lChildOutputPartitioning, s.rightKeys, rChildOutputPartitioning) =>
+        // Remove ShuffleExchangeExec.
+        s.copy(left = lSort.copy(child = lChild), right = rSort.copy(child = rChild))
+      case other => other
+    }
+  }
+
+  /*
+   * Returns true if both HashPartitioning have the same number of partitions and
+   * their partitioning expressions are a subset of their respective join keys.
+   */
+  private def isPartialHashDistribution(
+      leftKeys: Seq[Expression],
+      leftPartitioning: HashPartitioning,
+      rightKeys: Seq[Expression],
+      rightPartitioning: HashPartitioning): Boolean = {
+    val mapping = leftKeyToRightKeyMapping(leftKeys, rightKeys)
+    (leftPartitioning.numPartitions == rightPartitioning.numPartitions) &&
+      leftPartitioning.expressions.zip(rightPartitioning.expressions)
+        .forall {
+          case (le, re) => mapping.get(le.canonicalized)
+            .map(_.exists(_.semanticEquals(re)))
+            .getOrElse(false)
+        }

Review comment:
       Thanks. I agree with your concerns for both cases. But, for the first example, only one side will be shuffled, so the rule should not kick in. For the second example,  we have `t1.a = t2.b AND t1.b = t2.a` which matches the bucket ordering, so this should be also fine.
   

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),
+        rSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            rChild,
+            rChildOutputPartitioning: HashPartitioning),
+          _),
+        false) if isPartialHashDistribution(
+          s.leftKeys, lChildOutputPartitioning, s.rightKeys, rChildOutputPartitioning) =>
+        // Remove ShuffleExchangeExec.
+        s.copy(left = lSort.copy(child = lChild), right = rSort.copy(child = rChild))
+      case other => other
+    }
+  }
+
+  /*
+   * Returns true if both HashPartitioning have the same number of partitions and
+   * their partitioning expressions are a subset of their respective join keys.
+   */
+  private def isPartialHashDistribution(
+      leftKeys: Seq[Expression],
+      leftPartitioning: HashPartitioning,
+      rightKeys: Seq[Expression],
+      rightPartitioning: HashPartitioning): Boolean = {
+    val mapping = leftKeyToRightKeyMapping(leftKeys, rightKeys)
+    (leftPartitioning.numPartitions == rightPartitioning.numPartitions) &&
+      leftPartitioning.expressions.zip(rightPartitioning.expressions)
+        .forall {
+          case (le, re) => mapping.get(le.canonicalized)
+            .map(_.exists(_.semanticEquals(re)))
+            .getOrElse(false)
+        }

Review comment:
       Thanks. I agree with your concerns for both cases. But, for the first example, only one side will be shuffled, so the rule should not kick in. For the second example,  we have `t1.a = t2.b AND t1.b = t2.a` which matches the bucket ordering, so this should be also fine.
   

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),
+        rSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            rChild,
+            rChildOutputPartitioning: HashPartitioning),
+          _),
+        false) if isPartialHashDistribution(
+          s.leftKeys, lChildOutputPartitioning, s.rightKeys, rChildOutputPartitioning) =>
+        // Remove ShuffleExchangeExec.
+        s.copy(left = lSort.copy(child = lChild), right = rSort.copy(child = rChild))
+      case other => other
+    }
+  }
+
+  /*
+   * Returns true if both HashPartitioning have the same number of partitions and
+   * their partitioning expressions are a subset of their respective join keys.
+   */
+  private def isPartialHashDistribution(
+      leftKeys: Seq[Expression],
+      leftPartitioning: HashPartitioning,
+      rightKeys: Seq[Expression],
+      rightPartitioning: HashPartitioning): Boolean = {
+    val mapping = leftKeyToRightKeyMapping(leftKeys, rightKeys)
+    (leftPartitioning.numPartitions == rightPartitioning.numPartitions) &&
+      leftPartitioning.expressions.zip(rightPartitioning.expressions)
+        .forall {
+          case (le, re) => mapping.get(le.canonicalized)
+            .map(_.exists(_.semanticEquals(re)))
+            .getOrElse(false)
+        }

Review comment:
       Thanks. I agree with your concerns for both cases. But, for the first example, only one side will be shuffled, so the rule should not kick in. For the second example,  we have `t1.a = t2.b AND t1.b = t2.a` which matches the bucket ordering, so this should be also fine.
   

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),
+        rSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            rChild,
+            rChildOutputPartitioning: HashPartitioning),
+          _),
+        false) if isPartialHashDistribution(
+          s.leftKeys, lChildOutputPartitioning, s.rightKeys, rChildOutputPartitioning) =>
+        // Remove ShuffleExchangeExec.
+        s.copy(left = lSort.copy(child = lChild), right = rSort.copy(child = rChild))
+      case other => other
+    }
+  }
+
+  /*
+   * Returns true if both HashPartitioning have the same number of partitions and
+   * their partitioning expressions are a subset of their respective join keys.
+   */
+  private def isPartialHashDistribution(
+      leftKeys: Seq[Expression],
+      leftPartitioning: HashPartitioning,
+      rightKeys: Seq[Expression],
+      rightPartitioning: HashPartitioning): Boolean = {
+    val mapping = leftKeyToRightKeyMapping(leftKeys, rightKeys)
+    (leftPartitioning.numPartitions == rightPartitioning.numPartitions) &&
+      leftPartitioning.expressions.zip(rightPartitioning.expressions)
+        .forall {
+          case (le, re) => mapping.get(le.canonicalized)
+            .map(_.exists(_.semanticEquals(re)))
+            .getOrElse(false)
+        }

Review comment:
       Thanks. I agree with your concerns for both cases. But, for the first example, only one side will be shuffled, so the rule should not kick in. For the second example,  we have `t1.a = t2.b AND t1.b = t2.a` which matches the bucket ordering, so this should be also fine.
   

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),
+        rSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            rChild,
+            rChildOutputPartitioning: HashPartitioning),
+          _),
+        false) if isPartialHashDistribution(
+          s.leftKeys, lChildOutputPartitioning, s.rightKeys, rChildOutputPartitioning) =>
+        // Remove ShuffleExchangeExec.
+        s.copy(left = lSort.copy(child = lChild), right = rSort.copy(child = rChild))
+      case other => other
+    }
+  }
+
+  /*
+   * Returns true if both HashPartitioning have the same number of partitions and
+   * their partitioning expressions are a subset of their respective join keys.
+   */
+  private def isPartialHashDistribution(
+      leftKeys: Seq[Expression],
+      leftPartitioning: HashPartitioning,
+      rightKeys: Seq[Expression],
+      rightPartitioning: HashPartitioning): Boolean = {
+    val mapping = leftKeyToRightKeyMapping(leftKeys, rightKeys)
+    (leftPartitioning.numPartitions == rightPartitioning.numPartitions) &&
+      leftPartitioning.expressions.zip(rightPartitioning.expressions)
+        .forall {
+          case (le, re) => mapping.get(le.canonicalized)
+            .map(_.exists(_.semanticEquals(re)))
+            .getOrElse(false)
+        }

Review comment:
       Thanks. I agree with your concerns for both cases. But, for the first example, only one side will be shuffled, so the rule should not kick in. For the second example,  we have `t1.a = t2.b AND t1.b = t2.a` which matches the bucket ordering, so this should be also fine.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] closed pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #29655:
URL: https://github.com/apache/spark/pull/29655


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] imback82 commented on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
imback82 commented on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-691222048


   Thanks for the pointers! I wasn't aware of the existing PR. I will take a look.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] imback82 commented on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
imback82 commented on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-688460628


   cc: @cloud-fan @viirya @maropu Thanks in advance!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #29655:
URL: https://github.com/apache/spark/pull/29655#discussion_r487378141



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),
+        rSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            rChild,
+            rChildOutputPartitioning: HashPartitioning),
+          _),
+        false) if isPartialHashDistribution(
+          s.leftKeys, lChildOutputPartitioning, s.rightKeys, rChildOutputPartitioning) =>
+        // Remove ShuffleExchangeExec.
+        s.copy(left = lSort.copy(child = lChild), right = rSort.copy(child = rChild))
+      case other => other
+    }
+  }
+
+  /*
+   * Returns true if both HashPartitioning have the same number of partitions and
+   * their partitioning expressions are a subset of their respective join keys.
+   */
+  private def isPartialHashDistribution(
+      leftKeys: Seq[Expression],
+      leftPartitioning: HashPartitioning,
+      rightKeys: Seq[Expression],
+      rightPartitioning: HashPartitioning): Boolean = {
+    val mapping = leftKeyToRightKeyMapping(leftKeys, rightKeys)
+    (leftPartitioning.numPartitions == rightPartitioning.numPartitions) &&
+      leftPartitioning.expressions.zip(rightPartitioning.expressions)
+        .forall {
+          case (le, re) => mapping.get(le.canonicalized)
+            .map(_.exists(_.semanticEquals(re)))
+            .getOrElse(false)
+        }

Review comment:
       sorry if I miss anything, but I feel this might not be correct. We should make sure the `leftPartitioning.expressions` and `rightPartitioning.expressions` has same size, and the order of expressions matters, right?
   
   `expressions` size is different, so we should not remove shuffle:
   ```
   t1 has 1024 buckets on column (a)
   t2 has 1024 buckets on columns (a, b)
   
   SELECT *
   FROM t1
   JOIN t2
   ON t1.a = t2.a AND t1.b = t2.b
   ```
   
   `expressions` size is same, but order is wrong, so we should not remove shuffle:
   
   ```
   t1 has 1024 buckets on column (a, b)
   t2 has 1024 buckets on columns (b, a)
   
   SELECT *
   FROM t1
   JOIN t2
   ON t1.a = t2.a AND AND t1.a = t2.b AND t1.b = t2.a AND t1.b = t2.b
   ```

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),

Review comment:
       nit: why we can't just pattern matching `ShuffleExchangeExec(_, leftChild, _)` here? It seems to be looking simpler to me.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),
+        rSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            rChild,
+            rChildOutputPartitioning: HashPartitioning),
+          _),
+        false) if isPartialHashDistribution(
+          s.leftKeys, lChildOutputPartitioning, s.rightKeys, rChildOutputPartitioning) =>
+        // Remove ShuffleExchangeExec.
+        s.copy(left = lSort.copy(child = lChild), right = rSort.copy(child = rChild))
+      case other => other
+    }
+  }
+
+  /*
+   * Returns true if both HashPartitioning have the same number of partitions and
+   * their partitioning expressions are a subset of their respective join keys.
+   */
+  private def isPartialHashDistribution(
+      leftKeys: Seq[Expression],
+      leftPartitioning: HashPartitioning,
+      rightKeys: Seq[Expression],
+      rightPartitioning: HashPartitioning): Boolean = {
+    val mapping = leftKeyToRightKeyMapping(leftKeys, rightKeys)
+    (leftPartitioning.numPartitions == rightPartitioning.numPartitions) &&
+      leftPartitioning.expressions.zip(rightPartitioning.expressions)
+        .forall {
+          case (le, re) => mapping.get(le.canonicalized)
+            .map(_.exists(_.semanticEquals(re)))
+            .getOrElse(false)
+        }

Review comment:
       sorry if I miss anything, but I feel this might not be correct. We should make sure the `leftPartitioning.expressions` and `rightPartitioning.expressions` has same size, and the order of expressions matters, right?
   
   `expressions` size is different, so we should not remove shuffle:
   ```
   t1 has 1024 buckets on column (a)
   t2 has 1024 buckets on columns (a, b)
   
   SELECT *
   FROM t1
   JOIN t2
   ON t1.a = t2.a AND t1.b = t2.b
   ```
   
   `expressions` size is same, but order is wrong, so we should not remove shuffle:
   
   ```
   t1 has 1024 buckets on column (a, b)
   t2 has 1024 buckets on columns (b, a)
   
   SELECT *
   FROM t1
   JOIN t2
   ON t1.a = t2.a AND AND t1.a = t2.b AND t1.b = t2.a AND t1.b = t2.b
   ```

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),

Review comment:
       nit: why we can't just pattern matching `ShuffleExchangeExec(_, leftChild, _)` here? It seems to be looking simpler to me.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),
+        rSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            rChild,
+            rChildOutputPartitioning: HashPartitioning),
+          _),
+        false) if isPartialHashDistribution(
+          s.leftKeys, lChildOutputPartitioning, s.rightKeys, rChildOutputPartitioning) =>
+        // Remove ShuffleExchangeExec.
+        s.copy(left = lSort.copy(child = lChild), right = rSort.copy(child = rChild))
+      case other => other
+    }
+  }
+
+  /*
+   * Returns true if both HashPartitioning have the same number of partitions and
+   * their partitioning expressions are a subset of their respective join keys.
+   */
+  private def isPartialHashDistribution(
+      leftKeys: Seq[Expression],
+      leftPartitioning: HashPartitioning,
+      rightKeys: Seq[Expression],
+      rightPartitioning: HashPartitioning): Boolean = {
+    val mapping = leftKeyToRightKeyMapping(leftKeys, rightKeys)
+    (leftPartitioning.numPartitions == rightPartitioning.numPartitions) &&
+      leftPartitioning.expressions.zip(rightPartitioning.expressions)
+        .forall {
+          case (le, re) => mapping.get(le.canonicalized)
+            .map(_.exists(_.semanticEquals(re)))
+            .getOrElse(false)
+        }

Review comment:
       sorry if I miss anything, but I feel this might not be correct. We should make sure the `leftPartitioning.expressions` and `rightPartitioning.expressions` has same size, and the order of expressions matters, right?
   
   `expressions` size is different, so we should not remove shuffle:
   ```
   t1 has 1024 buckets on column (a)
   t2 has 1024 buckets on columns (a, b)
   
   SELECT *
   FROM t1
   JOIN t2
   ON t1.a = t2.a AND t1.b = t2.b
   ```
   
   `expressions` size is same, but order is wrong, so we should not remove shuffle:
   
   ```
   t1 has 1024 buckets on column (a, b)
   t2 has 1024 buckets on columns (b, a)
   
   SELECT *
   FROM t1
   JOIN t2
   ON t1.a = t2.a AND AND t1.a = t2.b AND t1.b = t2.a AND t1.b = t2.b
   ```

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),

Review comment:
       nit: why we can't just pattern matching `ShuffleExchangeExec(_, leftChild, _)` here? It seems to be looking simpler to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-688086171


   Merged build finished. Test FAILed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-691431898


   cc @cloud-fan I think we would like to get your opinions here, as you reviewed https://github.com/apache/spark/pull/19054 in the past and have context on this. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on a change in pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
maropu commented on a change in pull request #29655:
URL: https://github.com/apache/spark/pull/29655#discussion_r484572659



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {

Review comment:
       We cannot implement this optimization in `EnsureRequirements` instead? Any reason to apply this rule after `EnsureRequirements` insert shuffles?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] andyvanyperenAM edited a comment on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
andyvanyperenAM edited a comment on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-690980677


   Great to see this PR, I bump into the same issues.
   While investigating what is going wrong, I found following PR as well
   https://github.com/apache/spark/pull/19054
   which is old and closed due to inactivity (which I regret obviously). 
   What is the difference between the current and this older PR? 
   
   I'm not an spark-contributor, so I really can't judge the importance of the "unknown error code -9" in the test above, but maybe the above can help you. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
maropu commented on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-691102549


   I didn't notice the discussion and thanks for the sharing, @andyvanyperenAM. I've checked it and, yea, they look the same. Could you check the discussion in SPARK-18067/#19054, @imback82 ? I think we need to address all the issues described there, e.g., data skew. cc: @hvanhovell 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] imback82 commented on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
imback82 commented on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-691222048






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] imback82 commented on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
imback82 commented on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-691356366


   > My first thought is like the concerns as same as @hvanhovell in the previous discussion.
   
   Is the concern with the data skew, or are there any other concerns? I couldn't find more in the discussion.
   
   The main scenario that this PR is going after is to allow bucketed tables to be utilized by more workloads. Since bucketed tables are created by users, we rarely observed cases where users pick bucket columns with low cardinality - similar to how users pick partition columns.
   
   I could make the rule more restrictive to check if sources are bucketed tables. (btw, if this approach is fine, I could extend the rule to support `PartitioningCollection` - still making sure that sources are bucketed tables -, which would help removing shuffles in nested joins.) WDYT?
   
   cc: @c21 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] andyvanyperenAM commented on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
andyvanyperenAM commented on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-690980677


   Great to see this PR, I bump into the same issues.
   While investigating what is going wrong, I found following PR as well
   https://github.com/apache/spark/pull/19054
   which is old and closed due to inactivity (which I regret obviously). 
   
   I'm not an spark-contributor, so I really can't judge the importance of the "unknown error code -9" in the test above, but maybe the above can help you. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] andyvanyperenAM edited a comment on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
andyvanyperenAM edited a comment on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-690980677


   Great to see this PR, I bump into the same issues.
   While investigating what is going wrong, I found following PR as well
   https://github.com/apache/spark/pull/19054
   which is old and closed due to inactivity (which I regret obviously). 
   What is the difference between the current and this older PR? 
   
   I'm not an spark-contributor, so I really can't judge the importance of the "unknown error code -9" in the test above, but maybe the above can help you. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] imback82 commented on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
imback82 commented on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-691522827


   Thanks @c21 for your feedback! If #19054 is a better approach, we should proceed with it; I wanted to come up with a less intrusive solution.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-687991585






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-688086183


   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/128330/
   Test FAILed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] imback82 commented on a change in pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
imback82 commented on a change in pull request #29655:
URL: https://github.com/apache/spark/pull/29655#discussion_r487431129



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),
+        rSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            rChild,
+            rChildOutputPartitioning: HashPartitioning),
+          _),
+        false) if isPartialHashDistribution(
+          s.leftKeys, lChildOutputPartitioning, s.rightKeys, rChildOutputPartitioning) =>
+        // Remove ShuffleExchangeExec.
+        s.copy(left = lSort.copy(child = lChild), right = rSort.copy(child = rChild))
+      case other => other
+    }
+  }
+
+  /*
+   * Returns true if both HashPartitioning have the same number of partitions and
+   * their partitioning expressions are a subset of their respective join keys.
+   */
+  private def isPartialHashDistribution(
+      leftKeys: Seq[Expression],
+      leftPartitioning: HashPartitioning,
+      rightKeys: Seq[Expression],
+      rightPartitioning: HashPartitioning): Boolean = {
+    val mapping = leftKeyToRightKeyMapping(leftKeys, rightKeys)
+    (leftPartitioning.numPartitions == rightPartitioning.numPartitions) &&
+      leftPartitioning.expressions.zip(rightPartitioning.expressions)
+        .forall {
+          case (le, re) => mapping.get(le.canonicalized)
+            .map(_.exists(_.semanticEquals(re)))
+            .getOrElse(false)
+        }

Review comment:
       Thanks. I agree with your concerns for both cases. But, for the first example, only one side will be shuffled, so the rule should not kick in. For the second example,  we have `t1.a = t2.b AND t1.b = t2.a` which matches the bucket ordering, so this should be also fine.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #29655:
URL: https://github.com/apache/spark/pull/29655#discussion_r487576892



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/OptimizeSortMergeJoinWithPartialHashDistribution.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SortExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule removes shuffle for the sort merge join if the following conditions are met:
+ * - The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions
+ *   as the other side of join.
+ * - The child of ShuffleExchangeExec has output partitioning which has the subset of
+ *   join keys on the respective join side.
+ *
+ * If the above conditions are met, shuffle can be eliminated for the sort merge join
+ * because rows are sorted before join logic is applied.
+ */
+case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.optimizeSortMergeJoinWithPartialHashDistribution) {
+      return plan
+    }
+
+    plan.transformUp {
+      case s @ SortMergeJoinExec(_, _, _, _,
+        lSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            lChild,
+            lChildOutputPartitioning: HashPartitioning),
+          _),
+        rSort @ SortExec(_, _,
+          ExtractShuffleExchangeExecChild(
+            rChild,
+            rChildOutputPartitioning: HashPartitioning),
+          _),
+        false) if isPartialHashDistribution(
+          s.leftKeys, lChildOutputPartitioning, s.rightKeys, rChildOutputPartitioning) =>
+        // Remove ShuffleExchangeExec.
+        s.copy(left = lSort.copy(child = lChild), right = rSort.copy(child = rChild))
+      case other => other
+    }
+  }
+
+  /*
+   * Returns true if both HashPartitioning have the same number of partitions and
+   * their partitioning expressions are a subset of their respective join keys.
+   */
+  private def isPartialHashDistribution(
+      leftKeys: Seq[Expression],
+      leftPartitioning: HashPartitioning,
+      rightKeys: Seq[Expression],
+      rightPartitioning: HashPartitioning): Boolean = {
+    val mapping = leftKeyToRightKeyMapping(leftKeys, rightKeys)
+    (leftPartitioning.numPartitions == rightPartitioning.numPartitions) &&
+      leftPartitioning.expressions.zip(rightPartitioning.expressions)
+        .forall {
+          case (le, re) => mapping.get(le.canonicalized)
+            .map(_.exists(_.semanticEquals(re)))
+            .getOrElse(false)
+        }

Review comment:
       Sorry if I miss anything:
   
   > But, for the first example, only one side will be shuffled, so the rule should not kick in.
   
   If the number of buckets for `t1` is less than number of shuffle partitions, shouldn't it shuffle both sides ? (in [`EnsureRequirements`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L96)). So the rule kicks in here and removes both shuffles, but we shouldn't remove any shuffle here.
   
   > For the second example, we have t1.a = t2.b AND t1.b = t2.a which matches the bucket ordering, so this should be also fine.
   
   I think it's unsafe if we do not shuffle both sides. `HashPartitioning(Seq(a, b))` and `HashPartitioning(Seq(b, a))` are not same thing, e.g. for tuple (a: 1, b: 2) it will be assigned to different buckets given current `Murmur3Hash` implementation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] andyvanyperenAM edited a comment on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
andyvanyperenAM edited a comment on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-690980677






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
maropu commented on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-692409151


   > we added #19054 in our internal fork and don't see much OOM issues.
   
   Even so, I think removing shuffles in the middles of stages (e.g., many join cases) can make the prob. of OOM higher in theory in case of data skew. Since we can control input distributions somewhat, e.g.,  by the bucketing technique, it might be worth trying the restrictive approach that @imback82 suggested above, I think.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-691431898


   cc @cloud-fan I think we would like to get your opinions here, as you reviewed https://github.com/apache/spark/pull/19054 in the past and have context on this. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] andyvanyperenAM commented on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
andyvanyperenAM commented on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-690980677


   Great to see this PR, I bump into the same issues.
   While investigating what is going wrong, I found following PR as well
   https://github.com/apache/spark/pull/19054
   which is old and closed due to inactivity (which I regret obviously). 
   
   I'm not an spark-contributor, so I really can't judge the importance of the "unknown error code -9" in the test above, but maybe the above can help you. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-688086171






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-691261843


   My first thought is like the concerns as same as @hvanhovell in the previous discussion.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-687991152


   **[Test build #128330 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/128330/testReport)** for PR 29655 at commit [`1b5c4e9`](https://github.com/apache/spark/commit/1b5c4e963f0562c38f52f55703efad2c9056a12f).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29655: [SPARK-32806][SQL] SortMergeJoin with partial hash distribution can be optimized to remove shuffle

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29655:
URL: https://github.com/apache/spark/pull/29655#issuecomment-688084979


   **[Test build #128330 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/128330/testReport)** for PR 29655 at commit [`1b5c4e9`](https://github.com/apache/spark/commit/1b5c4e963f0562c38f52f55703efad2c9056a12f).
    * This patch **fails due to an unknown error code, -9**.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org