You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "wangyum (via GitHub)" <gi...@apache.org> on 2023/07/24 03:32:08 UTC

[GitHub] [spark] wangyum commented on a diff in pull request #42114: [SPARK-44514][SQL] Rewrite the join to filter if one side maximum number of rows is 1

wangyum commented on code in PR #42114:
URL: https://github.com/apache/spark/pull/42114#discussion_r1271694995


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala:
##########
@@ -234,6 +234,85 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
   }
 }
 
+/**
+ * 1. Rewrite join to filter if one side max row number is 1
+ *
+ * {{{
+ *   SELECT t1.* FROM t1 INNER JOIN (SELECT max(c1) AS c1 FROM t) t2 ON t1.c1 = t2.c1  ==>
+ *   SELECT t1.* FROM t1 WHERE t1.c = (SELECT max(c1) AS c1 FROM t)
+ * }}}
+ *
+ * 2. Removes outer join if streamed side max row number is 1
+ * {{{
+ *   SELECT t1.* FROM t1 LEFT JOIN (SELECT max(c1) AS c1 FROM t) t2 ON t1.c1 = t2.c1  ==>
+ *   SELECT t1.* FROM t1
+ * }}}
+ *
+ * {{{
+ *   SELECT t1.* FROM t1 FULL JOIN (SELECT max(c1) AS c1 FROM t) t2  ==>
+ *   SELECT t1.* FROM t1
+ * }}}
+ */
+object EliminateJoin extends Rule[LogicalPlan] with PredicateHelper with JoinSelectionHelper {
+  private def eliminateRightSide(j: Join): Option[LogicalPlan] = {
+    j.joinType match {
+      case  _: InnerLike | LeftSemi if j.condition.nonEmpty =>
+        ExtractEquiJoinKeys.unapply(j) match {
+          case Some((_, leftKeys, rightKeys, None, _, left, right, _)) =>
+            val conditions = leftKeys.zipWithIndex.map { case (exp, index) =>
+              val projectList = Seq(Alias(rightKeys(index), "_joinkey")())
+              EqualTo(exp, ScalarSubquery(Project(projectList, right)))
+            }
+            Some(Filter(conditions.reduceLeft(And), left))
+          case _ =>
+            None
+        }
+      case LeftOuter =>
+        Some(j.left)
+      case FullOuter if j.condition.isEmpty =>
+        Some(j.left)

Review Comment:
   Full outer join is support only when join condition is empty. For example:
   ```scala
   spark.sql("CREATE TEMPORARY VIEW t1 AS SELECT * FROM values(1), (null), (3) AS t1(id)")
   spark.sql("CREATE TEMPORARY VIEW t2 AS SELECT * FROM values(1), (2), (null) AS t2(id)")
   
   spark.sql("CREATE TEMPORARY VIEW null_row AS SELECT id FROM t2 WHERE id is null LIMIT 1")
   
   scala> spark.sql("select t1.* from t1 full join null_row on t1.id = null_row.id").show
   +----+                                                                          
   |  id|
   +----+
   |null|
   |null|
   |   1|
   |   3|
   +----+
   
   scala> spark.sql("select t1.* from t1 full join null_row").show
   +----+
   |  id|
   +----+
   |   1|
   |null|
   |   3|
   +----+
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org