You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/11/03 10:33:19 UTC

[GitHub] [spark] fred-db opened a new pull request, #38497: [SPARK-40999] Hint propagation to subqueries

fred-db opened a new pull request, #38497:
URL: https://github.com/apache/spark/pull/38497

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   We add a hint field to the `SubqueryExpression` class, pull hints in subqueries into the hint field during `EliminateResolvedHint` and propagate this hint to joins formed from the subquery in `RewritePredicateSubquery`
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   Currently, if a user tries to specify a query like the following, the hints on the subquery won't be respected.
   
   ```
   SELECT * FROM target t WHERE EXISTS
   (SELECT /*+ BROADCAST */ * FROM source s WHERE s.key = t.key)
   ```
   This happens as hints are removed from the plan and pulled into joins in the beginning of the optimization stage, but subqueries are only turned into joins during optimization. As we remove any hints that are not below a join, we end up removing hints that are below a subquery. 
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   Yes. Hints on subqueries will now work.
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   UTs to check whether hints are correctly propagated to joins formed from subqueries.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] fred-db commented on a diff in pull request #38497: [SPARK-40999] Hint propagation to subqueries

Posted by GitBox <gi...@apache.org>.
fred-db commented on code in PR #38497:
URL: https://github.com/apache/spark/pull/38497#discussion_r1026135835


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/EliminateResolvedHint.scala:
##########
@@ -31,20 +34,35 @@ object EliminateResolvedHint extends Rule[LogicalPlan] {
   // This is also called in the beginning of the optimization phase, and as a result
   // is using transformUp rather than resolveOperators.
   def apply(plan: LogicalPlan): LogicalPlan = {
-    val pulledUp = plan transformUp {
+    val joinsWithHints = plan transformUp {
       case j: Join if j.hint == JoinHint.NONE =>
         val (newLeft, leftHints) = extractHintsFromPlan(j.left)
         val (newRight, rightHints) = extractHintsFromPlan(j.right)
         val newJoinHint = JoinHint(mergeHints(leftHints), mergeHints(rightHints))
         j.copy(left = newLeft, right = newRight, hint = newJoinHint)
     }
-    pulledUp.transformUp {
+    val shouldPullHintsIntoSubqueries = SQLConf.get.getConf(SQLConf.PULL_HINTS_INTO_SUBQUERIES)
+    val joinsAndSubqueriesWithHints = if (shouldPullHintsIntoSubqueries) {
+      pullHintsIntoSubqueries(joinsWithHints)
+    } else {
+      joinsWithHints
+    }
+    joinsAndSubqueriesWithHints.transformUp {
       case h: ResolvedHint =>
         hintErrorHandler.joinNotFoundForJoinHint(h.hints)
         h.child
     }
   }
 
+  def pullHintsIntoSubqueries(plan: LogicalPlan): LogicalPlan = {
+    plan.transformAllExpressionsUpWithPruning(_.containsPattern(PLAN_EXPRESSION)) {

Review Comment:
   If there are two nested subqueries, we'd want to extract hints from the innermost first, and then from the outer one. Otherwise, hints for the innermost will end up in the outermost subquery. I think in practice nested subqueries are not supported, but I think it's better to keep it this way if we decide to support them at some point. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] fred-db commented on a diff in pull request #38497: [SPARK-40999] Hint propagation to subqueries

Posted by GitBox <gi...@apache.org>.
fred-db commented on code in PR #38497:
URL: https://github.com/apache/spark/pull/38497#discussion_r1019250270


##########
sql/core/src/test/scala/org/apache/spark/sql/SubqueryHintPropagationSuite.scala:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
+import org.apache.spark.sql.catalyst.plans.{InnerLike, LeftSemi}
+import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, Join, JoinHint, LogicalPlan}
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.test.SharedSparkSession
+
+class SubqueryHintPropagationSuite extends QueryTest with SharedSparkSession {
+
+  setupTestData()
+
+  private val expectedHint =
+    Some(HintInfo(strategy = Some(BROADCAST)))
+  private val hints = Seq("BROADCAST", "SHUFFLE_MERGE")

Review Comment:
   It can also surface other bugs if one or more hints are in the plan on subqueries, e.g. during Analysis. For example `cleanQueryInScalarSubquery ` in `checkAnalysis` would fail if it would not have been written recursively.



##########
sql/core/src/test/scala/org/apache/spark/sql/SubqueryHintPropagationSuite.scala:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
+import org.apache.spark.sql.catalyst.plans.{InnerLike, LeftSemi}
+import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, Join, JoinHint, LogicalPlan}
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.test.SharedSparkSession
+
+class SubqueryHintPropagationSuite extends QueryTest with SharedSparkSession {
+
+  setupTestData()
+
+  private val expectedHint =
+    Some(HintInfo(strategy = Some(BROADCAST)))
+  private val hints = Seq("BROADCAST", "SHUFFLE_MERGE")

Review Comment:
   It can also surface other bugs if one or more hints are in the plan on subqueries during Analysis. For example `cleanQueryInScalarSubquery ` in `checkAnalysis` would fail if it would not have been written recursively.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] fred-db commented on a diff in pull request #38497: [SPARK-40999] Hint propagation to subqueries

Posted by GitBox <gi...@apache.org>.
fred-db commented on code in PR #38497:
URL: https://github.com/apache/spark/pull/38497#discussion_r1018810904


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/EliminateResolvedHint.scala:
##########
@@ -31,20 +34,35 @@ object EliminateResolvedHint extends Rule[LogicalPlan] {
   // This is also called in the beginning of the optimization phase, and as a result
   // is using transformUp rather than resolveOperators.
   def apply(plan: LogicalPlan): LogicalPlan = {
-    val pulledUp = plan transformUp {
+    val joinsWithHints = plan transformUp {
       case j: Join if j.hint == JoinHint.NONE =>
         val (newLeft, leftHints) = extractHintsFromPlan(j.left)
         val (newRight, rightHints) = extractHintsFromPlan(j.right)
         val newJoinHint = JoinHint(mergeHints(leftHints), mergeHints(rightHints))
         j.copy(left = newLeft, right = newRight, hint = newJoinHint)
     }
-    pulledUp.transformUp {
+    val shouldPullHintsIntoSubqueries = SQLConf.get.getConf(SQLConf.PULL_HINTS_INTO_SUBQUERIES)
+    val joinsAndSubqueriesWithHints = if (shouldPullHintsIntoSubqueries) {
+      pullHintsIntoSubqueries(joinsWithHints)
+    } else {
+      joinsWithHints
+    }
+    joinsAndSubqueriesWithHints.transformUp {
       case h: ResolvedHint =>
         hintErrorHandler.joinNotFoundForJoinHint(h.hints)
         h.child
     }
   }
 
+  def pullHintsIntoSubqueries(plan: LogicalPlan): LogicalPlan = {
+    plan.transformAllExpressionsUpWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
+      case s: SubqueryExpression if s.hint.isEmpty =>
+        val (newPlan, subqueryHints) = extractHintsFromPlan(s.plan)

Review Comment:
   The `extractHintsFromPlan` method does not extract hints from underneath a binary node, which a join is. So when extracting hints for a subquery, we would never get hints from joins inside the subquery. Will add a test to show that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] allisonwang-db commented on a diff in pull request #38497: [SPARK-40999] Hint propagation to subqueries

Posted by GitBox <gi...@apache.org>.
allisonwang-db commented on code in PR #38497:
URL: https://github.com/apache/spark/pull/38497#discussion_r1018331188


##########
sql/core/src/test/scala/org/apache/spark/sql/SubqueryHintPropagationSuite.scala:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.plans.{InnerLike, LeftSemi}
+import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, Join, JoinHint, LogicalPlan}
+import org.apache.spark.sql.test.SharedSparkSession
+
+class SubqueryHintPropagationSuite extends QueryTest with SharedSparkSession {
+
+  setupTestData()
+
+  private val expectedHint =
+    Some(HintInfo(strategy = Some(BROADCAST)))
+  private val hints = Seq("BROADCAST")
+  private val hintStringified = hints.map("/*+ " + _ + " */").mkString
+
+  def verifyJoinContainsHint(plan: LogicalPlan): Unit = {
+    val expectedJoinHint = JoinHint(leftHint = None, rightHint = expectedHint)
+    var matchedJoin = false
+    plan.transformUp {
+      case j @ Join(_, _, _, _, foundHint) =>
+        assert(expectedJoinHint == foundHint)
+        matchedJoin = true
+        j
+    }
+    assert(matchedJoin)
+  }
+
+  test("Correlated Exists") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE EXISTS
+         |(SELECT $hintStringified
+         |s2.key FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Correlated Exists with hints in tempView") {
+    val tempView = "tmpView"
+    withTempView(tempView) {
+      val df = spark
+        .range(30)
+        .where("true")
+      val dfWithHints = hints.foldLeft(df)((newDf, hint) => newDf.hint(hint))
+        .selectExpr("id as key", "id as value")
+      dfWithHints.createOrReplaceTempView(tempView)
+
+      val queryDf = sql(
+        s"""SELECT * FROM testData s1 WHERE EXISTS
+           |(SELECT s2.key FROM $tempView s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+           |""".stripMargin)
+
+      verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+    }
+  }
+
+  test("Correlated Exists containing join with hint") {
+    val queryDf = sql(
+      s"""select * from testData s1 WHERE EXISTS
+         |(SELECT s2.key FROM
+         |(SELECT $hintStringified * FROM testData) s2 JOIN testData s3
+         |ON s2.key = s3.key
+         |WHERE s2.key = s1.key)
+         |""".stripMargin)
+    val optimized = queryDf.queryExecution.optimizedPlan
+
+    // the subquery will be turned into a left semi join and should not contain any hints
+    optimized.transform {
+      case j @ Join(_, _, joinType, _, hint) =>
+        joinType match {
+          case _: InnerLike => assert(expectedHint == hint.leftHint)
+          case LeftSemi => assert(hint.leftHint.isEmpty && hint.rightHint.isEmpty)
+          case _ => throw new IllegalArgumentException("Unexpected join found.")
+        }
+        j
+    }
+  }
+
+  test("Negated Exists with hint") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE NOT EXISTS
+         |(SELECT $hintStringified
+         |* FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Exists with complex predicate") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE EXISTS
+         |(SELECT $hintStringified
+         |* FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value) OR s1.key = 5
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Non-correlated IN") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key IN
+         |(SELECT $hintStringified key FROM testData s2)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Correlated IN") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key IN
+         |(SELECT $hintStringified
+         |key FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Negated IN with hint") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key NOT IN
+         |(SELECT $hintStringified
+         |key FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("IN with complex predicate") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key in
+         |(SELECT $hintStringified
+         | key FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value) OR s1.key = 5
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Scalar subquery") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key =
+         |(SELECT $hintStringified MAX(key) FROM
+         |testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Scalar subquery with COUNT") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key =
+         |(SELECT $hintStringified COUNT(key) FROM
+         |testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Scalar subquery nested subquery") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key =
+         |(SELECT MAX(key) FROM
+         |(SELECT $hintStringified key FROM testData s2 WHERE
+         |s1.key = s2.key AND s1.value = s2.value))
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+

Review Comment:
   Yes, only the outermost join should have the join hint.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38497: [SPARK-40999] Hint propagation to subqueries

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38497:
URL: https://github.com/apache/spark/pull/38497#discussion_r1018755179


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/EliminateResolvedHint.scala:
##########
@@ -31,20 +34,35 @@ object EliminateResolvedHint extends Rule[LogicalPlan] {
   // This is also called in the beginning of the optimization phase, and as a result
   // is using transformUp rather than resolveOperators.
   def apply(plan: LogicalPlan): LogicalPlan = {
-    val pulledUp = plan transformUp {
+    val joinsWithHints = plan transformUp {
       case j: Join if j.hint == JoinHint.NONE =>
         val (newLeft, leftHints) = extractHintsFromPlan(j.left)
         val (newRight, rightHints) = extractHintsFromPlan(j.right)
         val newJoinHint = JoinHint(mergeHints(leftHints), mergeHints(rightHints))
         j.copy(left = newLeft, right = newRight, hint = newJoinHint)
     }
-    pulledUp.transformUp {
+    val shouldPullHintsIntoSubqueries = SQLConf.get.getConf(SQLConf.PULL_HINTS_INTO_SUBQUERIES)
+    val joinsAndSubqueriesWithHints = if (shouldPullHintsIntoSubqueries) {
+      pullHintsIntoSubqueries(joinsWithHints)
+    } else {
+      joinsWithHints
+    }
+    joinsAndSubqueriesWithHints.transformUp {
       case h: ResolvedHint =>
         hintErrorHandler.joinNotFoundForJoinHint(h.hints)
         h.child
     }
   }
 
+  def pullHintsIntoSubqueries(plan: LogicalPlan): LogicalPlan = {
+    plan.transformAllExpressionsUpWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
+      case s: SubqueryExpression if s.hint.isEmpty =>
+        val (newPlan, subqueryHints) = extractHintsFromPlan(s.plan)

Review Comment:
   question: what if the subquery is complicated and has joins, the hint is for joins in subquery but not subquery itself?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] fred-db commented on a diff in pull request #38497: [SPARK-40999] Hint propagation to subqueries

Posted by GitBox <gi...@apache.org>.
fred-db commented on code in PR #38497:
URL: https://github.com/apache/spark/pull/38497#discussion_r1019245404


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala:
##########
@@ -148,7 +150,7 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
           // will have the final conditions in the LEFT ANTI as
           // (A.A1 = B.B1 OR ISNULL(A.A1 = B.B1)) AND (B.B2 = A.A2) AND B.B3 > 1
           val finalJoinCond = (nullAwareJoinConds ++ conditions).reduceLeft(And)
-          Join(outerPlan, newSub, LeftAnti, Option(finalJoinCond), JoinHint.NONE)
+          Join(outerPlan, newSub, LeftAnti, Option(finalJoinCond), JoinHint(None, subHint))

Review Comment:
   Not sure how useful this is, as from the code it is more or less apparent where the subquery hint goes; if the subquery goes on the right side, then the hint goes on the right side too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #38497: [SPARK-40999] Hint propagation to subqueries

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on PR #38497:
URL: https://github.com/apache/spark/pull/38497#issuecomment-1302849566

   cc @allisonwang-db FYI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] fred-db commented on a diff in pull request #38497: [SPARK-40999] Hint propagation to subqueries

Posted by GitBox <gi...@apache.org>.
fred-db commented on code in PR #38497:
URL: https://github.com/apache/spark/pull/38497#discussion_r1017907695


##########
sql/core/src/test/scala/org/apache/spark/sql/SubqueryHintPropagationSuite.scala:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.plans.{InnerLike, LeftSemi}
+import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, Join, JoinHint, LogicalPlan}
+import org.apache.spark.sql.test.SharedSparkSession
+
+class SubqueryHintPropagationSuite extends QueryTest with SharedSparkSession {
+
+  setupTestData()
+
+  private val expectedHint =
+    Some(HintInfo(strategy = Some(BROADCAST)))
+  private val hints = Seq("BROADCAST")
+  private val hintStringified = hints.map("/*+ " + _ + " */").mkString
+
+  def verifyJoinContainsHint(plan: LogicalPlan): Unit = {
+    val expectedJoinHint = JoinHint(leftHint = None, rightHint = expectedHint)
+    var matchedJoin = false
+    plan.transformUp {
+      case j @ Join(_, _, _, _, foundHint) =>
+        assert(expectedJoinHint == foundHint)
+        matchedJoin = true
+        j
+    }
+    assert(matchedJoin)
+  }
+
+  test("Correlated Exists") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE EXISTS
+         |(SELECT $hintStringified
+         |s2.key FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Correlated Exists with hints in tempView") {
+    val tempView = "tmpView"
+    withTempView(tempView) {
+      val df = spark
+        .range(30)
+        .where("true")
+      val dfWithHints = hints.foldLeft(df)((newDf, hint) => newDf.hint(hint))
+        .selectExpr("id as key", "id as value")
+      dfWithHints.createOrReplaceTempView(tempView)
+
+      val queryDf = sql(
+        s"""SELECT * FROM testData s1 WHERE EXISTS
+           |(SELECT s2.key FROM $tempView s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+           |""".stripMargin)
+
+      verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+    }
+  }
+
+  test("Correlated Exists containing join with hint") {
+    val queryDf = sql(
+      s"""select * from testData s1 WHERE EXISTS
+         |(SELECT s2.key FROM
+         |(SELECT $hintStringified * FROM testData) s2 JOIN testData s3
+         |ON s2.key = s3.key
+         |WHERE s2.key = s1.key)
+         |""".stripMargin)
+    val optimized = queryDf.queryExecution.optimizedPlan
+
+    // the subquery will be turned into a left semi join and should not contain any hints
+    optimized.transform {
+      case j @ Join(_, _, joinType, _, hint) =>
+        joinType match {
+          case _: InnerLike => assert(expectedHint == hint.leftHint)
+          case LeftSemi => assert(hint.leftHint.isEmpty && hint.rightHint.isEmpty)
+          case _ => throw new IllegalArgumentException("Unexpected join found.")
+        }
+        j
+    }
+  }
+
+  test("Negated Exists with hint") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE NOT EXISTS
+         |(SELECT $hintStringified
+         |* FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Exists with complex predicate") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE EXISTS
+         |(SELECT $hintStringified
+         |* FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value) OR s1.key = 5
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Non-correlated IN") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key IN
+         |(SELECT $hintStringified key FROM testData s2)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Correlated IN") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key IN
+         |(SELECT $hintStringified
+         |key FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Negated IN with hint") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key NOT IN
+         |(SELECT $hintStringified
+         |key FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("IN with complex predicate") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key in
+         |(SELECT $hintStringified
+         | key FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value) OR s1.key = 5
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Scalar subquery") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key =
+         |(SELECT $hintStringified MAX(key) FROM
+         |testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Scalar subquery with COUNT") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key =
+         |(SELECT $hintStringified COUNT(key) FROM
+         |testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Scalar subquery nested subquery") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key =
+         |(SELECT MAX(key) FROM
+         |(SELECT $hintStringified key FROM testData s2 WHERE
+         |s1.key = s2.key AND s1.value = s2.value))
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+

Review Comment:
   I had to divert from the default test template I used for this test. With non-equality predicates, there are two joins; one join for the source and target that generates all rows matching the non-equality predicates and another join that then joins the aggregate with the target table. Only the join with the aggregate is formed directly from the subquery though, so I am only checking if the hint is added there. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38497: [SPARK-40999] Hint propagation to subqueries

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38497:
URL: https://github.com/apache/spark/pull/38497#discussion_r1019068317


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala:
##########
@@ -148,7 +150,7 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
           // will have the final conditions in the LEFT ANTI as
           // (A.A1 = B.B1 OR ISNULL(A.A1 = B.B1)) AND (B.B2 = A.A2) AND B.B3 > 1
           val finalJoinCond = (nullAwareJoinConds ++ conditions).reduceLeft(And)
-          Join(outerPlan, newSub, LeftAnti, Option(finalJoinCond), JoinHint.NONE)
+          Join(outerPlan, newSub, LeftAnti, Option(finalJoinCond), JoinHint(None, subHint))

Review Comment:
   Shall we add comment here as well?
   > Add subquery hint as right hint as the subquery plan is on the right side of the join
   
   Maybe add a method `def createJoinHint(subHint ...)`, so that we don't need to duplicate the comment in many places: ``



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala:
##########
@@ -148,7 +150,7 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
           // will have the final conditions in the LEFT ANTI as
           // (A.A1 = B.B1 OR ISNULL(A.A1 = B.B1)) AND (B.B2 = A.A2) AND B.B3 > 1
           val finalJoinCond = (nullAwareJoinConds ++ conditions).reduceLeft(And)
-          Join(outerPlan, newSub, LeftAnti, Option(finalJoinCond), JoinHint.NONE)
+          Join(outerPlan, newSub, LeftAnti, Option(finalJoinCond), JoinHint(None, subHint))

Review Comment:
   Shall we add comment here as well?
   > Add subquery hint as right hint as the subquery plan is on the right side of the join
   
   Maybe add a method `def createJoinHint(subHint ...)`, so that we don't need to duplicate the comment in many places.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38497: [SPARK-40999] Hint propagation to subqueries

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38497:
URL: https://github.com/apache/spark/pull/38497#discussion_r1025472753


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala:
##########
@@ -201,15 +204,17 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
           //     +- Relation[id#80] parquet
           val nullAwareJoinConds = inConditions.map(c => Or(c, IsNull(c)))
           val finalJoinCond = (nullAwareJoinConds ++ conditions).reduceLeft(And)
-          newPlan = Join(newPlan, newSub, ExistenceJoin(exists), Some(finalJoinCond), JoinHint.NONE)
+          val joinHint = JoinHint(None, subHint)
+          newPlan = Join(newPlan, newSub, ExistenceJoin(exists), Some(finalJoinCond), joinHint)
           Not(exists)
-        case InSubquery(values, ListQuery(sub, _, _, _, conditions)) =>
+        case InSubquery(values, ListQuery(sub, _, _, _, conditions, subHint)) =>
           val exists = AttributeReference("exists", BooleanType, nullable = false)()
           // Deduplicate conflicting attributes if any.
           val newSub = dedupSubqueryOnSelfJoin(newPlan, sub, Some(values))
           val inConditions = values.zip(newSub.output).map(EqualTo.tupled)
           val newConditions = (inConditions ++ conditions).reduceLeftOption(And)
-          newPlan = Join(newPlan, newSub, ExistenceJoin(exists), newConditions, JoinHint.NONE)
+          newPlan =
+            Join(newPlan, newSub, ExistenceJoin(exists), newConditions, JoinHint(None, subHint))

Review Comment:
   super nit: it's interesting to see 3 code styles here...  one pulls `ExistenceJoin(exists)` out to a variable, one pulls `JoinHint(None, subHint)` out to a variable, and this one just adds a new line. Can we be consistent?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #38497: [SPARK-40999] Hint propagation to subqueries

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on PR #38497:
URL: https://github.com/apache/spark/pull/38497#issuecomment-1302809681

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] fred-db commented on a diff in pull request #38497: [SPARK-40999] Hint propagation to subqueries

Posted by GitBox <gi...@apache.org>.
fred-db commented on code in PR #38497:
URL: https://github.com/apache/spark/pull/38497#discussion_r1017887578


##########
sql/core/src/test/scala/org/apache/spark/sql/SubqueryHintPropagationSuite.scala:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.plans.{InnerLike, LeftSemi}
+import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, Join, JoinHint, LogicalPlan}
+import org.apache.spark.sql.test.SharedSparkSession
+
+class SubqueryHintPropagationSuite extends QueryTest with SharedSparkSession {
+
+  setupTestData()
+
+  private val expectedHint =
+    Some(HintInfo(strategy = Some(BROADCAST)))
+  private val hints = Seq("BROADCAST")
+  private val hintStringified = hints.map("/*+ " + _ + " */").mkString
+
+  def verifyJoinContainsHint(plan: LogicalPlan): Unit = {
+    val expectedJoinHint = JoinHint(leftHint = None, rightHint = expectedHint)
+    var matchedJoin = false
+    plan.transformUp {
+      case j @ Join(_, _, _, _, foundHint) =>
+        assert(expectedJoinHint == foundHint)
+        matchedJoin = true
+        j
+    }
+    assert(matchedJoin)
+  }
+
+  test("Correlated Exists") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE EXISTS
+         |(SELECT $hintStringified
+         |s2.key FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Correlated Exists with hints in tempView") {
+    val tempView = "tmpView"
+    withTempView(tempView) {
+      val df = spark
+        .range(30)
+        .where("true")
+      val dfWithHints = hints.foldLeft(df)((newDf, hint) => newDf.hint(hint))
+        .selectExpr("id as key", "id as value")
+      dfWithHints.createOrReplaceTempView(tempView)
+
+      val queryDf = sql(
+        s"""SELECT * FROM testData s1 WHERE EXISTS
+           |(SELECT s2.key FROM $tempView s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+           |""".stripMargin)
+
+      verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+    }
+  }
+
+  test("Correlated Exists containing join with hint") {
+    val queryDf = sql(
+      s"""select * from testData s1 WHERE EXISTS
+         |(SELECT s2.key FROM
+         |(SELECT $hintStringified * FROM testData) s2 JOIN testData s3
+         |ON s2.key = s3.key
+         |WHERE s2.key = s1.key)
+         |""".stripMargin)
+    val optimized = queryDf.queryExecution.optimizedPlan
+
+    // the subquery will be turned into a left semi join and should not contain any hints
+    optimized.transform {
+      case j @ Join(_, _, joinType, _, hint) =>
+        joinType match {
+          case _: InnerLike => assert(expectedHint == hint.leftHint)
+          case LeftSemi => assert(hint.leftHint.isEmpty && hint.rightHint.isEmpty)
+          case _ => throw new IllegalArgumentException("Unexpected join found.")
+        }
+        j
+    }
+  }
+
+  test("Negated Exists with hint") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE NOT EXISTS
+         |(SELECT $hintStringified
+         |* FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Exists with complex predicate") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE EXISTS
+         |(SELECT $hintStringified
+         |* FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value) OR s1.key = 5
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Non-correlated IN") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key IN
+         |(SELECT $hintStringified key FROM testData s2)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Correlated IN") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key IN
+         |(SELECT $hintStringified
+         |key FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Negated IN with hint") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key NOT IN
+         |(SELECT $hintStringified
+         |key FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("IN with complex predicate") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key in
+         |(SELECT $hintStringified
+         | key FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value) OR s1.key = 5
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Scalar subquery") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key =
+         |(SELECT $hintStringified MAX(key) FROM
+         |testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Scalar subquery with COUNT") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key =
+         |(SELECT $hintStringified COUNT(key) FROM
+         |testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Scalar subquery nested subquery") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key =
+         |(SELECT MAX(key) FROM
+         |(SELECT $hintStringified key FROM testData s2 WHERE
+         |s1.key = s2.key AND s1.value = s2.value))
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Lateral subquery") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1, LATERAL
+         |(SELECT $hintStringified * FROM testData s2)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }

Review Comment:
   Is this needed? Hints should not affect the correctness of query



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] allisonwang-db commented on a diff in pull request #38497: [SPARK-40999] Hint propagation to subqueries

Posted by GitBox <gi...@apache.org>.
allisonwang-db commented on code in PR #38497:
URL: https://github.com/apache/spark/pull/38497#discussion_r1016948419


##########
sql/core/src/test/scala/org/apache/spark/sql/SubqueryHintPropagationSuite.scala:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.plans.{InnerLike, LeftSemi}
+import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, Join, JoinHint, LogicalPlan}
+import org.apache.spark.sql.test.SharedSparkSession
+
+class SubqueryHintPropagationSuite extends QueryTest with SharedSparkSession {
+
+  setupTestData()
+
+  private val expectedHint =
+    Some(HintInfo(strategy = Some(BROADCAST)))
+  private val hints = Seq("BROADCAST")
+  private val hintStringified = hints.map("/*+ " + _ + " */").mkString
+
+  def verifyJoinContainsHint(plan: LogicalPlan): Unit = {
+    val expectedJoinHint = JoinHint(leftHint = None, rightHint = expectedHint)
+    var matchedJoin = false
+    plan.transformUp {
+      case j @ Join(_, _, _, _, foundHint) =>
+        assert(expectedJoinHint == foundHint)
+        matchedJoin = true
+        j
+    }
+    assert(matchedJoin)
+  }
+
+  test("Correlated Exists") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE EXISTS
+         |(SELECT $hintStringified
+         |s2.key FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Correlated Exists with hints in tempView") {
+    val tempView = "tmpView"
+    withTempView(tempView) {
+      val df = spark
+        .range(30)
+        .where("true")
+      val dfWithHints = hints.foldLeft(df)((newDf, hint) => newDf.hint(hint))
+        .selectExpr("id as key", "id as value")
+      dfWithHints.createOrReplaceTempView(tempView)
+
+      val queryDf = sql(
+        s"""SELECT * FROM testData s1 WHERE EXISTS
+           |(SELECT s2.key FROM $tempView s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+           |""".stripMargin)
+
+      verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+    }
+  }
+
+  test("Correlated Exists containing join with hint") {
+    val queryDf = sql(
+      s"""select * from testData s1 WHERE EXISTS
+         |(SELECT s2.key FROM
+         |(SELECT $hintStringified * FROM testData) s2 JOIN testData s3
+         |ON s2.key = s3.key
+         |WHERE s2.key = s1.key)
+         |""".stripMargin)
+    val optimized = queryDf.queryExecution.optimizedPlan
+
+    // the subquery will be turned into a left semi join and should not contain any hints
+    optimized.transform {
+      case j @ Join(_, _, joinType, _, hint) =>
+        joinType match {
+          case _: InnerLike => assert(expectedHint == hint.leftHint)
+          case LeftSemi => assert(hint.leftHint.isEmpty && hint.rightHint.isEmpty)
+          case _ => throw new IllegalArgumentException("Unexpected join found.")
+        }
+        j
+    }
+  }
+
+  test("Negated Exists with hint") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE NOT EXISTS
+         |(SELECT $hintStringified
+         |* FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Exists with complex predicate") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE EXISTS
+         |(SELECT $hintStringified
+         |* FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value) OR s1.key = 5
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Non-correlated IN") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key IN
+         |(SELECT $hintStringified key FROM testData s2)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Correlated IN") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key IN
+         |(SELECT $hintStringified
+         |key FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Negated IN with hint") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key NOT IN
+         |(SELECT $hintStringified
+         |key FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("IN with complex predicate") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key in
+         |(SELECT $hintStringified
+         | key FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value) OR s1.key = 5
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Scalar subquery") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key =
+         |(SELECT $hintStringified MAX(key) FROM
+         |testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Scalar subquery with COUNT") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key =
+         |(SELECT $hintStringified COUNT(key) FROM
+         |testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Scalar subquery nested subquery") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key =
+         |(SELECT MAX(key) FROM
+         |(SELECT $hintStringified key FROM testData s2 WHERE
+         |s1.key = s2.key AND s1.value = s2.value))
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Lateral subquery") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1, LATERAL
+         |(SELECT $hintStringified * FROM testData s2)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }

Review Comment:
   Can we also verify the results for these queries?



##########
sql/core/src/test/scala/org/apache/spark/sql/SubqueryHintPropagationSuite.scala:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.plans.{InnerLike, LeftSemi}
+import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, Join, JoinHint, LogicalPlan}
+import org.apache.spark.sql.test.SharedSparkSession
+
+class SubqueryHintPropagationSuite extends QueryTest with SharedSparkSession {
+
+  setupTestData()
+
+  private val expectedHint =
+    Some(HintInfo(strategy = Some(BROADCAST)))
+  private val hints = Seq("BROADCAST")

Review Comment:
   Should we test all hint strategies?



##########
sql/core/src/test/scala/org/apache/spark/sql/SubqueryHintPropagationSuite.scala:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.plans.{InnerLike, LeftSemi}
+import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, Join, JoinHint, LogicalPlan}
+import org.apache.spark.sql.test.SharedSparkSession
+
+class SubqueryHintPropagationSuite extends QueryTest with SharedSparkSession {
+
+  setupTestData()
+
+  private val expectedHint =
+    Some(HintInfo(strategy = Some(BROADCAST)))
+  private val hints = Seq("BROADCAST")
+  private val hintStringified = hints.map("/*+ " + _ + " */").mkString
+
+  def verifyJoinContainsHint(plan: LogicalPlan): Unit = {
+    val expectedJoinHint = JoinHint(leftHint = None, rightHint = expectedHint)
+    var matchedJoin = false
+    plan.transformUp {
+      case j @ Join(_, _, _, _, foundHint) =>
+        assert(expectedJoinHint == foundHint)
+        matchedJoin = true
+        j
+    }
+    assert(matchedJoin)
+  }
+
+  test("Correlated Exists") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE EXISTS
+         |(SELECT $hintStringified
+         |s2.key FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Correlated Exists with hints in tempView") {
+    val tempView = "tmpView"
+    withTempView(tempView) {
+      val df = spark
+        .range(30)
+        .where("true")
+      val dfWithHints = hints.foldLeft(df)((newDf, hint) => newDf.hint(hint))
+        .selectExpr("id as key", "id as value")
+      dfWithHints.createOrReplaceTempView(tempView)
+
+      val queryDf = sql(
+        s"""SELECT * FROM testData s1 WHERE EXISTS
+           |(SELECT s2.key FROM $tempView s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+           |""".stripMargin)
+
+      verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+    }
+  }
+
+  test("Correlated Exists containing join with hint") {
+    val queryDf = sql(
+      s"""select * from testData s1 WHERE EXISTS

Review Comment:
   ```suggestion
         s"""SELECT * FROM testData s1 WHERE EXISTS
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/SubqueryHintPropagationSuite.scala:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.plans.{InnerLike, LeftSemi}
+import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, Join, JoinHint, LogicalPlan}
+import org.apache.spark.sql.test.SharedSparkSession
+
+class SubqueryHintPropagationSuite extends QueryTest with SharedSparkSession {
+
+  setupTestData()
+
+  private val expectedHint =
+    Some(HintInfo(strategy = Some(BROADCAST)))
+  private val hints = Seq("BROADCAST")
+  private val hintStringified = hints.map("/*+ " + _ + " */").mkString
+
+  def verifyJoinContainsHint(plan: LogicalPlan): Unit = {
+    val expectedJoinHint = JoinHint(leftHint = None, rightHint = expectedHint)
+    var matchedJoin = false
+    plan.transformUp {
+      case j @ Join(_, _, _, _, foundHint) =>
+        assert(expectedJoinHint == foundHint)
+        matchedJoin = true
+        j
+    }
+    assert(matchedJoin)
+  }
+
+  test("Correlated Exists") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE EXISTS
+         |(SELECT $hintStringified
+         |s2.key FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Correlated Exists with hints in tempView") {
+    val tempView = "tmpView"
+    withTempView(tempView) {
+      val df = spark
+        .range(30)
+        .where("true")
+      val dfWithHints = hints.foldLeft(df)((newDf, hint) => newDf.hint(hint))
+        .selectExpr("id as key", "id as value")
+      dfWithHints.createOrReplaceTempView(tempView)
+
+      val queryDf = sql(
+        s"""SELECT * FROM testData s1 WHERE EXISTS
+           |(SELECT s2.key FROM $tempView s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+           |""".stripMargin)
+
+      verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+    }
+  }
+
+  test("Correlated Exists containing join with hint") {
+    val queryDf = sql(
+      s"""select * from testData s1 WHERE EXISTS
+         |(SELECT s2.key FROM
+         |(SELECT $hintStringified * FROM testData) s2 JOIN testData s3
+         |ON s2.key = s3.key
+         |WHERE s2.key = s1.key)
+         |""".stripMargin)
+    val optimized = queryDf.queryExecution.optimizedPlan
+
+    // the subquery will be turned into a left semi join and should not contain any hints
+    optimized.transform {
+      case j @ Join(_, _, joinType, _, hint) =>
+        joinType match {
+          case _: InnerLike => assert(expectedHint == hint.leftHint)
+          case LeftSemi => assert(hint.leftHint.isEmpty && hint.rightHint.isEmpty)
+          case _ => throw new IllegalArgumentException("Unexpected join found.")
+        }
+        j
+    }
+  }
+
+  test("Negated Exists with hint") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE NOT EXISTS
+         |(SELECT $hintStringified
+         |* FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Exists with complex predicate") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE EXISTS
+         |(SELECT $hintStringified
+         |* FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value) OR s1.key = 5
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Non-correlated IN") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key IN
+         |(SELECT $hintStringified key FROM testData s2)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Correlated IN") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key IN
+         |(SELECT $hintStringified
+         |key FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Negated IN with hint") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key NOT IN
+         |(SELECT $hintStringified
+         |key FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("IN with complex predicate") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key in
+         |(SELECT $hintStringified
+         | key FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value) OR s1.key = 5
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Scalar subquery") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key =
+         |(SELECT $hintStringified MAX(key) FROM
+         |testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Scalar subquery with COUNT") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key =
+         |(SELECT $hintStringified COUNT(key) FROM
+         |testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Scalar subquery nested subquery") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key =
+         |(SELECT MAX(key) FROM
+         |(SELECT $hintStringified key FROM testData s2 WHERE
+         |s1.key = s2.key AND s1.value = s2.value))
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+

Review Comment:
   Please also add a test case for scalar subqueries with correlated non-equality predicates.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] allisonwang-db commented on a diff in pull request #38497: [SPARK-40999] Hint propagation to subqueries

Posted by GitBox <gi...@apache.org>.
allisonwang-db commented on code in PR #38497:
URL: https://github.com/apache/spark/pull/38497#discussion_r1018333336


##########
sql/core/src/test/scala/org/apache/spark/sql/SubqueryHintPropagationSuite.scala:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.plans.{InnerLike, LeftSemi}
+import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, Join, JoinHint, LogicalPlan}
+import org.apache.spark.sql.test.SharedSparkSession
+
+class SubqueryHintPropagationSuite extends QueryTest with SharedSparkSession {
+
+  setupTestData()
+
+  private val expectedHint =
+    Some(HintInfo(strategy = Some(BROADCAST)))
+  private val hints = Seq("BROADCAST")
+  private val hintStringified = hints.map("/*+ " + _ + " */").mkString
+
+  def verifyJoinContainsHint(plan: LogicalPlan): Unit = {
+    val expectedJoinHint = JoinHint(leftHint = None, rightHint = expectedHint)
+    var matchedJoin = false
+    plan.transformUp {
+      case j @ Join(_, _, _, _, foundHint) =>
+        assert(expectedJoinHint == foundHint)
+        matchedJoin = true
+        j
+    }
+    assert(matchedJoin)
+  }
+
+  test("Correlated Exists") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE EXISTS
+         |(SELECT $hintStringified
+         |s2.key FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Correlated Exists with hints in tempView") {
+    val tempView = "tmpView"
+    withTempView(tempView) {
+      val df = spark
+        .range(30)
+        .where("true")
+      val dfWithHints = hints.foldLeft(df)((newDf, hint) => newDf.hint(hint))
+        .selectExpr("id as key", "id as value")
+      dfWithHints.createOrReplaceTempView(tempView)
+
+      val queryDf = sql(
+        s"""SELECT * FROM testData s1 WHERE EXISTS
+           |(SELECT s2.key FROM $tempView s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+           |""".stripMargin)
+
+      verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+    }
+  }
+
+  test("Correlated Exists containing join with hint") {
+    val queryDf = sql(
+      s"""select * from testData s1 WHERE EXISTS
+         |(SELECT s2.key FROM
+         |(SELECT $hintStringified * FROM testData) s2 JOIN testData s3
+         |ON s2.key = s3.key
+         |WHERE s2.key = s1.key)
+         |""".stripMargin)
+    val optimized = queryDf.queryExecution.optimizedPlan
+
+    // the subquery will be turned into a left semi join and should not contain any hints
+    optimized.transform {
+      case j @ Join(_, _, joinType, _, hint) =>
+        joinType match {
+          case _: InnerLike => assert(expectedHint == hint.leftHint)
+          case LeftSemi => assert(hint.leftHint.isEmpty && hint.rightHint.isEmpty)
+          case _ => throw new IllegalArgumentException("Unexpected join found.")
+        }
+        j
+    }
+  }
+
+  test("Negated Exists with hint") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE NOT EXISTS
+         |(SELECT $hintStringified
+         |* FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Exists with complex predicate") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE EXISTS
+         |(SELECT $hintStringified
+         |* FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value) OR s1.key = 5
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Non-correlated IN") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key IN
+         |(SELECT $hintStringified key FROM testData s2)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Correlated IN") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key IN
+         |(SELECT $hintStringified
+         |key FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Negated IN with hint") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key NOT IN
+         |(SELECT $hintStringified
+         |key FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("IN with complex predicate") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key in
+         |(SELECT $hintStringified
+         | key FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value) OR s1.key = 5
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Scalar subquery") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key =
+         |(SELECT $hintStringified MAX(key) FROM
+         |testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Scalar subquery with COUNT") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key =
+         |(SELECT $hintStringified COUNT(key) FROM
+         |testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Scalar subquery nested subquery") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key =
+         |(SELECT MAX(key) FROM
+         |(SELECT $hintStringified key FROM testData s2 WHERE
+         |s1.key = s2.key AND s1.value = s2.value))
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Lateral subquery") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1, LATERAL
+         |(SELECT $hintStringified * FROM testData s2)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }

Review Comment:
   I think we should also verify the results just to be extra safe. These are the only tests that run correlated subqueries with hints. It can also prevent future regressions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38497: [SPARK-40999] Hint propagation to subqueries

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38497:
URL: https://github.com/apache/spark/pull/38497#discussion_r1019074653


##########
sql/core/src/test/scala/org/apache/spark/sql/SubqueryHintPropagationSuite.scala:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
+import org.apache.spark.sql.catalyst.plans.{InnerLike, LeftSemi}
+import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, Join, JoinHint, LogicalPlan}
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.test.SharedSparkSession
+
+class SubqueryHintPropagationSuite extends QueryTest with SharedSparkSession {
+
+  setupTestData()
+
+  private val expectedHint =
+    Some(HintInfo(strategy = Some(BROADCAST)))
+  private val hints = Seq("BROADCAST", "SHUFFLE_MERGE")
+  private val hintStringified = hints.map("/*+ " + _ + " */").mkString
+
+  def verifyJoinContainsHint(plan: LogicalPlan): Unit = {
+    val expectedJoinHint = JoinHint(leftHint = None, rightHint = expectedHint)
+    var matchedJoin = false
+    plan.transformUp {
+      case j @ Join(_, _, _, _, foundHint) =>
+        assert(expectedJoinHint == foundHint)
+        matchedJoin = true

Review Comment:
   instead of updating a variable, we can use `plan.collect` and check the number of returned plans,



##########
sql/core/src/test/scala/org/apache/spark/sql/SubqueryHintPropagationSuite.scala:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
+import org.apache.spark.sql.catalyst.plans.{InnerLike, LeftSemi}
+import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, Join, JoinHint, LogicalPlan}
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.test.SharedSparkSession
+
+class SubqueryHintPropagationSuite extends QueryTest with SharedSparkSession {
+
+  setupTestData()
+
+  private val expectedHint =
+    Some(HintInfo(strategy = Some(BROADCAST)))
+  private val hints = Seq("BROADCAST", "SHUFFLE_MERGE")
+  private val hintStringified = hints.map("/*+ " + _ + " */").mkString
+
+  def verifyJoinContainsHint(plan: LogicalPlan): Unit = {
+    val expectedJoinHint = JoinHint(leftHint = None, rightHint = expectedHint)
+    var matchedJoin = false
+    plan.transformUp {
+      case j @ Join(_, _, _, _, foundHint) =>
+        assert(expectedJoinHint == foundHint)
+        matchedJoin = true

Review Comment:
   instead of updating a variable, we can use `plan.collect` and check the number of returned plans.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38497: [SPARK-40999] Hint propagation to subqueries

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38497:
URL: https://github.com/apache/spark/pull/38497#discussion_r1019079991


##########
sql/core/src/test/scala/org/apache/spark/sql/SubqueryHintPropagationSuite.scala:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
+import org.apache.spark.sql.catalyst.plans.{InnerLike, LeftSemi}
+import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, Join, JoinHint, LogicalPlan}
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.test.SharedSparkSession
+
+class SubqueryHintPropagationSuite extends QueryTest with SharedSparkSession {
+
+  setupTestData()
+
+  private val expectedHint =
+    Some(HintInfo(strategy = Some(BROADCAST)))
+  private val hints = Seq("BROADCAST", "SHUFFLE_MERGE")
+  private val hintStringified = hints.map("/*+ " + _ + " */").mkString
+
+  def verifyJoinContainsHint(plan: LogicalPlan): Unit = {
+    val expectedJoinHint = JoinHint(leftHint = None, rightHint = expectedHint)
+    var matchedJoin = false
+    plan.transformUp {
+      case j @ Join(_, _, _, _, foundHint) =>
+        assert(expectedJoinHint == foundHint)
+        matchedJoin = true
+        j
+    }
+    assert(matchedJoin)
+  }
+
+  test("Correlated Exists") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE EXISTS
+         |(SELECT $hintStringified
+         |s2.key FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+    checkAnswer(queryDf, testData)
+  }
+
+  test("Correlated Exists with hints in tempView") {
+    val tempView = "tmpView"
+    withTempView(tempView) {
+      val df = spark
+        .range(1, 30)
+        .where("true")
+      val dfWithHints = hints.foldRight(df)((hint, newDf) => newDf.hint(hint))
+        .selectExpr("id as key", "id as value")
+        .withColumn("value", col("value").cast("string"))
+      dfWithHints.createOrReplaceTempView(tempView)
+
+      val queryDf = sql(
+        s"""SELECT * FROM testData s1 WHERE EXISTS
+           |(SELECT s2.key FROM $tempView s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+           |""".stripMargin)
+
+      verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+      checkAnswer(queryDf, dfWithHints)
+    }
+  }
+
+  test("Correlated Exists containing join with hint") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE EXISTS
+         |(SELECT s2.key FROM
+         |(SELECT $hintStringified * FROM testData) s2 JOIN testData s3
+         |ON s2.key = s3.key
+         |WHERE s2.key = s1.key)
+         |""".stripMargin)
+    val optimized = queryDf.queryExecution.optimizedPlan
+
+    // the subquery will be turned into a left semi join and should not contain any hints
+    optimized.transform {

Review Comment:
   since we don't need the result, we can call `optimized.foreach`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] fred-db commented on a diff in pull request #38497: [SPARK-40999] Hint propagation to subqueries

Posted by GitBox <gi...@apache.org>.
fred-db commented on code in PR #38497:
URL: https://github.com/apache/spark/pull/38497#discussion_r1018854115


##########
sql/core/src/test/scala/org/apache/spark/sql/SubqueryHintPropagationSuite.scala:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.plans.{InnerLike, LeftSemi}
+import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, Join, JoinHint, LogicalPlan}
+import org.apache.spark.sql.test.SharedSparkSession
+
+class SubqueryHintPropagationSuite extends QueryTest with SharedSparkSession {
+
+  setupTestData()
+
+  private val expectedHint =
+    Some(HintInfo(strategy = Some(BROADCAST)))
+  private val hints = Seq("BROADCAST")
+  private val hintStringified = hints.map("/*+ " + _ + " */").mkString
+
+  def verifyJoinContainsHint(plan: LogicalPlan): Unit = {
+    val expectedJoinHint = JoinHint(leftHint = None, rightHint = expectedHint)
+    var matchedJoin = false
+    plan.transformUp {
+      case j @ Join(_, _, _, _, foundHint) =>
+        assert(expectedJoinHint == foundHint)
+        matchedJoin = true
+        j
+    }
+    assert(matchedJoin)
+  }
+
+  test("Correlated Exists") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE EXISTS
+         |(SELECT $hintStringified
+         |s2.key FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Correlated Exists with hints in tempView") {
+    val tempView = "tmpView"
+    withTempView(tempView) {
+      val df = spark
+        .range(30)
+        .where("true")
+      val dfWithHints = hints.foldLeft(df)((newDf, hint) => newDf.hint(hint))
+        .selectExpr("id as key", "id as value")
+      dfWithHints.createOrReplaceTempView(tempView)
+
+      val queryDf = sql(
+        s"""SELECT * FROM testData s1 WHERE EXISTS
+           |(SELECT s2.key FROM $tempView s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+           |""".stripMargin)
+
+      verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+    }
+  }
+
+  test("Correlated Exists containing join with hint") {
+    val queryDf = sql(
+      s"""select * from testData s1 WHERE EXISTS
+         |(SELECT s2.key FROM
+         |(SELECT $hintStringified * FROM testData) s2 JOIN testData s3
+         |ON s2.key = s3.key
+         |WHERE s2.key = s1.key)
+         |""".stripMargin)
+    val optimized = queryDf.queryExecution.optimizedPlan
+
+    // the subquery will be turned into a left semi join and should not contain any hints
+    optimized.transform {
+      case j @ Join(_, _, joinType, _, hint) =>
+        joinType match {
+          case _: InnerLike => assert(expectedHint == hint.leftHint)
+          case LeftSemi => assert(hint.leftHint.isEmpty && hint.rightHint.isEmpty)
+          case _ => throw new IllegalArgumentException("Unexpected join found.")
+        }
+        j
+    }
+  }
+
+  test("Negated Exists with hint") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE NOT EXISTS
+         |(SELECT $hintStringified
+         |* FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Exists with complex predicate") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE EXISTS
+         |(SELECT $hintStringified
+         |* FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value) OR s1.key = 5
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Non-correlated IN") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key IN
+         |(SELECT $hintStringified key FROM testData s2)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Correlated IN") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key IN
+         |(SELECT $hintStringified
+         |key FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Negated IN with hint") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key NOT IN
+         |(SELECT $hintStringified
+         |key FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("IN with complex predicate") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key in
+         |(SELECT $hintStringified
+         | key FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value) OR s1.key = 5
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Scalar subquery") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key =
+         |(SELECT $hintStringified MAX(key) FROM
+         |testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Scalar subquery with COUNT") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key =
+         |(SELECT $hintStringified COUNT(key) FROM
+         |testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Scalar subquery nested subquery") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE key =
+         |(SELECT MAX(key) FROM
+         |(SELECT $hintStringified key FROM testData s2 WHERE
+         |s1.key = s2.key AND s1.value = s2.value))
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }
+
+  test("Lateral subquery") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1, LATERAL
+         |(SELECT $hintStringified * FROM testData s2)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+  }

Review Comment:
   Sure, I have to say that the tests I wrote do not test any interesting behavior, and either match all rows or none. I can still include the tests though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] fred-db commented on a diff in pull request #38497: [SPARK-40999] Hint propagation to subqueries

Posted by GitBox <gi...@apache.org>.
fred-db commented on code in PR #38497:
URL: https://github.com/apache/spark/pull/38497#discussion_r1026144771


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala:
##########
@@ -201,15 +204,17 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
           //     +- Relation[id#80] parquet
           val nullAwareJoinConds = inConditions.map(c => Or(c, IsNull(c)))
           val finalJoinCond = (nullAwareJoinConds ++ conditions).reduceLeft(And)
-          newPlan = Join(newPlan, newSub, ExistenceJoin(exists), Some(finalJoinCond), JoinHint.NONE)
+          val joinHint = JoinHint(None, subHint)
+          newPlan = Join(newPlan, newSub, ExistenceJoin(exists), Some(finalJoinCond), joinHint)
           Not(exists)
-        case InSubquery(values, ListQuery(sub, _, _, _, conditions)) =>
+        case InSubquery(values, ListQuery(sub, _, _, _, conditions, subHint)) =>
           val exists = AttributeReference("exists", BooleanType, nullable = false)()
           // Deduplicate conflicting attributes if any.
           val newSub = dedupSubqueryOnSelfJoin(newPlan, sub, Some(values))
           val inConditions = values.zip(newSub.output).map(EqualTo.tupled)
           val newConditions = (inConditions ++ conditions).reduceLeftOption(And)
-          newPlan = Join(newPlan, newSub, ExistenceJoin(exists), newConditions, JoinHint.NONE)
+          newPlan =
+            Join(newPlan, newSub, ExistenceJoin(exists), newConditions, JoinHint(None, subHint))

Review Comment:
   ```suggestion
             val joinHint = JoinHint(None, subHint)
             newPlan = Join(newPlan, newSub, ExistenceJoin(exists), newConditions, joinHint)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] allisonwang-db commented on pull request #38497: [SPARK-40999] Hint propagation to subqueries

Posted by GitBox <gi...@apache.org>.
allisonwang-db commented on PR #38497:
URL: https://github.com/apache/spark/pull/38497#issuecomment-1307618157

   cc @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] fred-db commented on a diff in pull request #38497: [SPARK-40999] Hint propagation to subqueries

Posted by GitBox <gi...@apache.org>.
fred-db commented on code in PR #38497:
URL: https://github.com/apache/spark/pull/38497#discussion_r1017750256


##########
sql/core/src/test/scala/org/apache/spark/sql/SubqueryHintPropagationSuite.scala:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.plans.{InnerLike, LeftSemi}
+import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, Join, JoinHint, LogicalPlan}
+import org.apache.spark.sql.test.SharedSparkSession
+
+class SubqueryHintPropagationSuite extends QueryTest with SharedSparkSession {
+
+  setupTestData()
+
+  private val expectedHint =
+    Some(HintInfo(strategy = Some(BROADCAST)))
+  private val hints = Seq("BROADCAST")

Review Comment:
   I added another join strategy to test the behavior with multiple hints, but I think testing with all hint strategies is overkill as the code does not depend on the specific join strategy. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38497: [SPARK-40999] Hint propagation to subqueries

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38497:
URL: https://github.com/apache/spark/pull/38497#discussion_r1019082930


##########
sql/core/src/test/scala/org/apache/spark/sql/SubqueryHintPropagationSuite.scala:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
+import org.apache.spark.sql.catalyst.plans.{InnerLike, LeftSemi}
+import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, Join, JoinHint, LogicalPlan}
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.test.SharedSparkSession
+
+class SubqueryHintPropagationSuite extends QueryTest with SharedSparkSession {
+
+  setupTestData()
+
+  private val expectedHint =
+    Some(HintInfo(strategy = Some(BROADCAST)))
+  private val hints = Seq("BROADCAST", "SHUFFLE_MERGE")
+  private val hintStringified = hints.map("/*+ " + _ + " */").mkString
+
+  def verifyJoinContainsHint(plan: LogicalPlan): Unit = {
+    val expectedJoinHint = JoinHint(leftHint = None, rightHint = expectedHint)
+    var matchedJoin = false
+    plan.transformUp {
+      case j @ Join(_, _, _, _, foundHint) =>
+        assert(expectedJoinHint == foundHint)
+        matchedJoin = true
+        j
+    }
+    assert(matchedJoin)
+  }
+
+  test("Correlated Exists") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE EXISTS
+         |(SELECT $hintStringified
+         |s2.key FROM testData s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+         |""".stripMargin)
+    verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+    checkAnswer(queryDf, testData)
+  }
+
+  test("Correlated Exists with hints in tempView") {
+    val tempView = "tmpView"
+    withTempView(tempView) {
+      val df = spark
+        .range(1, 30)
+        .where("true")
+      val dfWithHints = hints.foldRight(df)((hint, newDf) => newDf.hint(hint))
+        .selectExpr("id as key", "id as value")
+        .withColumn("value", col("value").cast("string"))
+      dfWithHints.createOrReplaceTempView(tempView)
+
+      val queryDf = sql(
+        s"""SELECT * FROM testData s1 WHERE EXISTS
+           |(SELECT s2.key FROM $tempView s2 WHERE s1.key = s2.key AND s1.value = s2.value)
+           |""".stripMargin)
+
+      verifyJoinContainsHint(queryDf.queryExecution.optimizedPlan)
+      checkAnswer(queryDf, dfWithHints)
+    }
+  }
+
+  test("Correlated Exists containing join with hint") {
+    val queryDf = sql(
+      s"""SELECT * FROM testData s1 WHERE EXISTS
+         |(SELECT s2.key FROM
+         |(SELECT $hintStringified * FROM testData) s2 JOIN testData s3
+         |ON s2.key = s3.key
+         |WHERE s2.key = s1.key)
+         |""".stripMargin)
+    val optimized = queryDf.queryExecution.optimizedPlan
+
+    // the subquery will be turned into a left semi join and should not contain any hints
+    optimized.transform {
+      case j @ Join(_, _, joinType, _, hint) =>
+        joinType match {
+          case _: InnerLike => assert(expectedHint == hint.leftHint)
+          case LeftSemi => assert(hint.leftHint.isEmpty && hint.rightHint.isEmpty)
+          case _ => throw new IllegalArgumentException("Unexpected join found.")
+        }
+        j
+    }
+    checkAnswer(queryDf, testData)
+  }
+
+  test("Negated Exists with hint") {

Review Comment:
   ```suggestion
     test("Not Exists with hint") {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38497: [SPARK-40999] Hint propagation to subqueries

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38497:
URL: https://github.com/apache/spark/pull/38497#discussion_r1019077400


##########
sql/core/src/test/scala/org/apache/spark/sql/SubqueryHintPropagationSuite.scala:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
+import org.apache.spark.sql.catalyst.plans.{InnerLike, LeftSemi}
+import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, Join, JoinHint, LogicalPlan}
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.test.SharedSparkSession
+
+class SubqueryHintPropagationSuite extends QueryTest with SharedSparkSession {
+
+  setupTestData()
+
+  private val expectedHint =
+    Some(HintInfo(strategy = Some(BROADCAST)))
+  private val hints = Seq("BROADCAST", "SHUFFLE_MERGE")

Review Comment:
   what are we test here? make sure broadcast hint has higher priority than shuffle merge hint?



##########
sql/core/src/test/scala/org/apache/spark/sql/SubqueryHintPropagationSuite.scala:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
+import org.apache.spark.sql.catalyst.plans.{InnerLike, LeftSemi}
+import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, Join, JoinHint, LogicalPlan}
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.test.SharedSparkSession
+
+class SubqueryHintPropagationSuite extends QueryTest with SharedSparkSession {
+
+  setupTestData()
+
+  private val expectedHint =
+    Some(HintInfo(strategy = Some(BROADCAST)))
+  private val hints = Seq("BROADCAST", "SHUFFLE_MERGE")

Review Comment:
   what are we testing here? make sure broadcast hint has higher priority than shuffle merge hint?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan closed pull request #38497: [SPARK-40999] Hint propagation to subqueries

Posted by GitBox <gi...@apache.org>.
cloud-fan closed pull request #38497: [SPARK-40999] Hint propagation to subqueries
URL: https://github.com/apache/spark/pull/38497


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] fred-db commented on a diff in pull request #38497: [SPARK-40999] Hint propagation to subqueries

Posted by GitBox <gi...@apache.org>.
fred-db commented on code in PR #38497:
URL: https://github.com/apache/spark/pull/38497#discussion_r1026142927


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala:
##########
@@ -201,15 +204,17 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
           //     +- Relation[id#80] parquet
           val nullAwareJoinConds = inConditions.map(c => Or(c, IsNull(c)))
           val finalJoinCond = (nullAwareJoinConds ++ conditions).reduceLeft(And)
-          newPlan = Join(newPlan, newSub, ExistenceJoin(exists), Some(finalJoinCond), JoinHint.NONE)
+          val joinHint = JoinHint(None, subHint)
+          newPlan = Join(newPlan, newSub, ExistenceJoin(exists), Some(finalJoinCond), joinHint)
           Not(exists)
-        case InSubquery(values, ListQuery(sub, _, _, _, conditions)) =>
+        case InSubquery(values, ListQuery(sub, _, _, _, conditions, subHint)) =>
           val exists = AttributeReference("exists", BooleanType, nullable = false)()
           // Deduplicate conflicting attributes if any.
           val newSub = dedupSubqueryOnSelfJoin(newPlan, sub, Some(values))
           val inConditions = values.zip(newSub.output).map(EqualTo.tupled)
           val newConditions = (inConditions ++ conditions).reduceLeftOption(And)
-          newPlan = Join(newPlan, newSub, ExistenceJoin(exists), newConditions, JoinHint.NONE)
+          newPlan =
+            Join(newPlan, newSub, ExistenceJoin(exists), newConditions, JoinHint(None, subHint))

Review Comment:
   I think the issue I had here is that the scala style guide does not specify how to deal with this. My approach was to first try to put the method invocation on a separate line, if that does not work pull the join hint out, and if this is not enough the existence join as well. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38497: [SPARK-40999] Hint propagation to subqueries

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38497:
URL: https://github.com/apache/spark/pull/38497#discussion_r1026161789


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/EliminateResolvedHint.scala:
##########
@@ -31,20 +34,35 @@ object EliminateResolvedHint extends Rule[LogicalPlan] {
   // This is also called in the beginning of the optimization phase, and as a result
   // is using transformUp rather than resolveOperators.
   def apply(plan: LogicalPlan): LogicalPlan = {
-    val pulledUp = plan transformUp {
+    val joinsWithHints = plan transformUp {
       case j: Join if j.hint == JoinHint.NONE =>
         val (newLeft, leftHints) = extractHintsFromPlan(j.left)
         val (newRight, rightHints) = extractHintsFromPlan(j.right)
         val newJoinHint = JoinHint(mergeHints(leftHints), mergeHints(rightHints))
         j.copy(left = newLeft, right = newRight, hint = newJoinHint)
     }
-    pulledUp.transformUp {
+    val shouldPullHintsIntoSubqueries = SQLConf.get.getConf(SQLConf.PULL_HINTS_INTO_SUBQUERIES)
+    val joinsAndSubqueriesWithHints = if (shouldPullHintsIntoSubqueries) {
+      pullHintsIntoSubqueries(joinsWithHints)
+    } else {
+      joinsWithHints
+    }
+    joinsAndSubqueriesWithHints.transformUp {
       case h: ResolvedHint =>
         hintErrorHandler.joinNotFoundForJoinHint(h.hints)
         h.child
     }
   }
 
+  def pullHintsIntoSubqueries(plan: LogicalPlan): LogicalPlan = {
+    plan.transformAllExpressionsUpWithPruning(_.containsPattern(PLAN_EXPRESSION)) {

Review Comment:
   > Otherwise, hints for the innermost will end up in the outermost subquery.
   Sorry I'm a bit confused. `extractHintsFromPlan` will not recurse into subqueries. So it should only collect hints from subquery's own logical plan.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/EliminateResolvedHint.scala:
##########
@@ -31,20 +34,35 @@ object EliminateResolvedHint extends Rule[LogicalPlan] {
   // This is also called in the beginning of the optimization phase, and as a result
   // is using transformUp rather than resolveOperators.
   def apply(plan: LogicalPlan): LogicalPlan = {
-    val pulledUp = plan transformUp {
+    val joinsWithHints = plan transformUp {
       case j: Join if j.hint == JoinHint.NONE =>
         val (newLeft, leftHints) = extractHintsFromPlan(j.left)
         val (newRight, rightHints) = extractHintsFromPlan(j.right)
         val newJoinHint = JoinHint(mergeHints(leftHints), mergeHints(rightHints))
         j.copy(left = newLeft, right = newRight, hint = newJoinHint)
     }
-    pulledUp.transformUp {
+    val shouldPullHintsIntoSubqueries = SQLConf.get.getConf(SQLConf.PULL_HINTS_INTO_SUBQUERIES)
+    val joinsAndSubqueriesWithHints = if (shouldPullHintsIntoSubqueries) {
+      pullHintsIntoSubqueries(joinsWithHints)
+    } else {
+      joinsWithHints
+    }
+    joinsAndSubqueriesWithHints.transformUp {
       case h: ResolvedHint =>
         hintErrorHandler.joinNotFoundForJoinHint(h.hints)
         h.child
     }
   }
 
+  def pullHintsIntoSubqueries(plan: LogicalPlan): LogicalPlan = {
+    plan.transformAllExpressionsUpWithPruning(_.containsPattern(PLAN_EXPRESSION)) {

Review Comment:
   > Otherwise, hints for the innermost will end up in the outermost subquery.
   
   Sorry I'm a bit confused. `extractHintsFromPlan` will not recurse into subqueries. So it should only collect hints from subquery's own logical plan.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] fred-db commented on a diff in pull request #38497: [SPARK-40999] Hint propagation to subqueries

Posted by GitBox <gi...@apache.org>.
fred-db commented on code in PR #38497:
URL: https://github.com/apache/spark/pull/38497#discussion_r1026198667


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/EliminateResolvedHint.scala:
##########
@@ -31,20 +34,35 @@ object EliminateResolvedHint extends Rule[LogicalPlan] {
   // This is also called in the beginning of the optimization phase, and as a result
   // is using transformUp rather than resolveOperators.
   def apply(plan: LogicalPlan): LogicalPlan = {
-    val pulledUp = plan transformUp {
+    val joinsWithHints = plan transformUp {
       case j: Join if j.hint == JoinHint.NONE =>
         val (newLeft, leftHints) = extractHintsFromPlan(j.left)
         val (newRight, rightHints) = extractHintsFromPlan(j.right)
         val newJoinHint = JoinHint(mergeHints(leftHints), mergeHints(rightHints))
         j.copy(left = newLeft, right = newRight, hint = newJoinHint)
     }
-    pulledUp.transformUp {
+    val shouldPullHintsIntoSubqueries = SQLConf.get.getConf(SQLConf.PULL_HINTS_INTO_SUBQUERIES)
+    val joinsAndSubqueriesWithHints = if (shouldPullHintsIntoSubqueries) {
+      pullHintsIntoSubqueries(joinsWithHints)
+    } else {
+      joinsWithHints
+    }
+    joinsAndSubqueriesWithHints.transformUp {
       case h: ResolvedHint =>
         hintErrorHandler.joinNotFoundForJoinHint(h.hints)
         h.child
     }
   }
 
+  def pullHintsIntoSubqueries(plan: LogicalPlan): LogicalPlan = {
+    plan.transformAllExpressionsUpWithPruning(_.containsPattern(PLAN_EXPRESSION)) {

Review Comment:
   Yes you are right, I forgot for a second that it's not a logical plan node.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] fred-db commented on a diff in pull request #38497: [SPARK-40999] Hint propagation to subqueries

Posted by GitBox <gi...@apache.org>.
fred-db commented on code in PR #38497:
URL: https://github.com/apache/spark/pull/38497#discussion_r1026196114


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/EliminateResolvedHint.scala:
##########
@@ -31,20 +34,35 @@ object EliminateResolvedHint extends Rule[LogicalPlan] {
   // This is also called in the beginning of the optimization phase, and as a result
   // is using transformUp rather than resolveOperators.
   def apply(plan: LogicalPlan): LogicalPlan = {
-    val pulledUp = plan transformUp {
+    val joinsWithHints = plan transformUp {
       case j: Join if j.hint == JoinHint.NONE =>
         val (newLeft, leftHints) = extractHintsFromPlan(j.left)
         val (newRight, rightHints) = extractHintsFromPlan(j.right)
         val newJoinHint = JoinHint(mergeHints(leftHints), mergeHints(rightHints))
         j.copy(left = newLeft, right = newRight, hint = newJoinHint)
     }
-    pulledUp.transformUp {
+    val shouldPullHintsIntoSubqueries = SQLConf.get.getConf(SQLConf.PULL_HINTS_INTO_SUBQUERIES)
+    val joinsAndSubqueriesWithHints = if (shouldPullHintsIntoSubqueries) {
+      pullHintsIntoSubqueries(joinsWithHints)
+    } else {
+      joinsWithHints
+    }
+    joinsAndSubqueriesWithHints.transformUp {
       case h: ResolvedHint =>
         hintErrorHandler.joinNotFoundForJoinHint(h.hints)
         h.child
     }
   }
 
+  def pullHintsIntoSubqueries(plan: LogicalPlan): LogicalPlan = {
+    plan.transformAllExpressionsUpWithPruning(_.containsPattern(PLAN_EXPRESSION)) {

Review Comment:
   ```suggestion
       plan.transformAllExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38497: [SPARK-40999] Hint propagation to subqueries

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38497:
URL: https://github.com/apache/spark/pull/38497#discussion_r1025470097


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/EliminateResolvedHint.scala:
##########
@@ -31,20 +34,35 @@ object EliminateResolvedHint extends Rule[LogicalPlan] {
   // This is also called in the beginning of the optimization phase, and as a result
   // is using transformUp rather than resolveOperators.
   def apply(plan: LogicalPlan): LogicalPlan = {
-    val pulledUp = plan transformUp {
+    val joinsWithHints = plan transformUp {
       case j: Join if j.hint == JoinHint.NONE =>
         val (newLeft, leftHints) = extractHintsFromPlan(j.left)
         val (newRight, rightHints) = extractHintsFromPlan(j.right)
         val newJoinHint = JoinHint(mergeHints(leftHints), mergeHints(rightHints))
         j.copy(left = newLeft, right = newRight, hint = newJoinHint)
     }
-    pulledUp.transformUp {
+    val shouldPullHintsIntoSubqueries = SQLConf.get.getConf(SQLConf.PULL_HINTS_INTO_SUBQUERIES)
+    val joinsAndSubqueriesWithHints = if (shouldPullHintsIntoSubqueries) {
+      pullHintsIntoSubqueries(joinsWithHints)
+    } else {
+      joinsWithHints
+    }
+    joinsAndSubqueriesWithHints.transformUp {
       case h: ResolvedHint =>
         hintErrorHandler.joinNotFoundForJoinHint(h.hints)
         h.child
     }
   }
 
+  def pullHintsIntoSubqueries(plan: LogicalPlan): LogicalPlan = {
+    plan.transformAllExpressionsUpWithPruning(_.containsPattern(PLAN_EXPRESSION)) {

Review Comment:
   does the transformation order matter here? If not then we don't need to add `transformAllExpressionsUpWithPruning`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #38497: [SPARK-40999] Hint propagation to subqueries

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on PR #38497:
URL: https://github.com/apache/spark/pull/38497#issuecomment-1320017014

   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] fred-db commented on pull request #38497: [SPARK-40999] Hint propagation to subqueries

Posted by GitBox <gi...@apache.org>.
fred-db commented on PR #38497:
URL: https://github.com/apache/spark/pull/38497#issuecomment-1315136114

   @allisonwang-db Incorporated all the changes requested, lmk what you think! :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] fred-db commented on a diff in pull request #38497: [SPARK-40999] Hint propagation to subqueries

Posted by GitBox <gi...@apache.org>.
fred-db commented on code in PR #38497:
URL: https://github.com/apache/spark/pull/38497#discussion_r1018810904


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/EliminateResolvedHint.scala:
##########
@@ -31,20 +34,35 @@ object EliminateResolvedHint extends Rule[LogicalPlan] {
   // This is also called in the beginning of the optimization phase, and as a result
   // is using transformUp rather than resolveOperators.
   def apply(plan: LogicalPlan): LogicalPlan = {
-    val pulledUp = plan transformUp {
+    val joinsWithHints = plan transformUp {
       case j: Join if j.hint == JoinHint.NONE =>
         val (newLeft, leftHints) = extractHintsFromPlan(j.left)
         val (newRight, rightHints) = extractHintsFromPlan(j.right)
         val newJoinHint = JoinHint(mergeHints(leftHints), mergeHints(rightHints))
         j.copy(left = newLeft, right = newRight, hint = newJoinHint)
     }
-    pulledUp.transformUp {
+    val shouldPullHintsIntoSubqueries = SQLConf.get.getConf(SQLConf.PULL_HINTS_INTO_SUBQUERIES)
+    val joinsAndSubqueriesWithHints = if (shouldPullHintsIntoSubqueries) {
+      pullHintsIntoSubqueries(joinsWithHints)
+    } else {
+      joinsWithHints
+    }
+    joinsAndSubqueriesWithHints.transformUp {
       case h: ResolvedHint =>
         hintErrorHandler.joinNotFoundForJoinHint(h.hints)
         h.child
     }
   }
 
+  def pullHintsIntoSubqueries(plan: LogicalPlan): LogicalPlan = {
+    plan.transformAllExpressionsUpWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
+      case s: SubqueryExpression if s.hint.isEmpty =>
+        val (newPlan, subqueryHints) = extractHintsFromPlan(s.plan)

Review Comment:
   The `extractHintsFromPlan` method does not extract hints from underneath a binary node, which a join is. So when extracting hints for a subquery, we would never get hints from joins inside the subquery. I test for this behavior, see the test `Correlated Exists containing join with hint`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38497: [SPARK-40999] Hint propagation to subqueries

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38497:
URL: https://github.com/apache/spark/pull/38497#discussion_r1019064915


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala:
##########
@@ -52,10 +52,12 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
       outerPlan: LogicalPlan,
       subplan: LogicalPlan,
       joinType: JoinType,
-      condition: Option[Expression]): Join = {
+      condition: Option[Expression],
+      subHint: Option[HintInfo]): Join = {
     // Deduplicate conflicting attributes if any.
     val dedupSubplan = dedupSubqueryOnSelfJoin(outerPlan, subplan, None, condition)
-    Join(outerPlan, dedupSubplan, joinType, condition, JoinHint.NONE)
+    // Add sub hint as right hint as the subquery plan is on the right side of the join

Review Comment:
   ```suggestion
       // Add subquery hint as right hint as the subquery plan is on the right side of the join
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org