You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/04/29 04:45:11 UTC

[GitHub] [spark] ulysses-you opened a new pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

ulysses-you opened a new pull request #32391:
URL: https://github.com/apache/spark/pull/32391


   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error message, please read the guideline first:
        https://spark.apache.org/error-message-guidelines.html
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   This PR aims to add a new AQE optimizer rule `DynamicJoinSelection`. Like other AQE partition number configs, this rule add a new broadcast threshold config `spark.sql.adaptive.autoBroadcastJoinThreshold`.
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   The main idea here is that make join config isolation between normal planner and aqe planner which shared the same code path.
   
   Actually we don not very trust using the static stat to consider if it can build broadcast hash join. In our experience it's very common that Spark throw broadcast timeout or driver side OOM exception when execute a bit large plan. And due to braodcast join is not reversed which means if we covert join to braodcast hash join at first time, we(AQE) can not optimize it again, so it should make sense to decide if we can do broadcast at aqe side using different sql config.
   
   In order to achieve this we use a specific join hint in advance during AQE framework and then at JoinSelection side it will take and follow the inserted hint.
   
   For now we only support select strategy for equi join, and follow this order
   1. mark join as broadcast hash join if possible
   2. mark join as shuffled hash join if possible
   
   Note that, we don't override join strategy if user specifies a join hint.
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   Yes, a new config `spark.sql.adaptive.autoBroadcastJoinThreshold` added.
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   -->
   Add new test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #32391:
URL: https://github.com/apache/spark/pull/32391#discussion_r623611013



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##########
@@ -1606,4 +1606,52 @@ class AdaptiveQueryExecSuite
       }
     }
   }
+
+  test("SPARK-35264: Support AQE side broadcastJoin threshold") {
+    withTempView("t1", "t2") {
+      def checkJoinStrategy(adaptiveJoinStrategy: String): Unit = {

Review comment:
       now we can use a boolean flag `shouldBroadcast`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on a change in pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on a change in pull request #32391:
URL: https://github.com/apache/spark/pull/32391#discussion_r623528281



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, JoinSelectionHelper}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, JoinHint, LogicalPlan, NO_BROADCAST_HASH, SHUFFLE_HASH}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+/**
+ * The main idea here is that make join config isolation between normal planner and aqe planner
+ * which shared the same code path.
+ * In order to achieve this we use a specific join hint in advance during AQE framework and then
+ * at JoinSelection side it will take and follow the inserted hint.
+ *
+ * For now we only support select strategy for equi join, and follow this order:
+ *   1. mark join as broadcast hash join if possible
+ *   2. mark join as shuffled hash join if possible
+ *
+ * Note that, we don't override join strategy if user specifies a join hint.
+ */
+object DynamicJoinSelection extends Rule[LogicalPlan] with JoinSelectionHelper {
+  final override val isAdaptive: Boolean = true
+
+  private def insertJoinHint(

Review comment:
       @cloud-fan Updated it. Added a flag `isAdaptive` in `Statistics` which you suggested to simplify the logic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-829121985


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138068/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #32391:
URL: https://github.com/apache/spark/pull/32391#discussion_r623611602



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##########
@@ -1606,4 +1606,52 @@ class AdaptiveQueryExecSuite
       }
     }
   }
+
+  test("SPARK-35264: Support AQE side broadcastJoin threshold") {
+    withTempView("t1", "t2") {
+      def checkJoinStrategy(adaptiveJoinStrategy: String): Unit = {
+        withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+          val (origin1, adaptive1) = runAdaptiveAndVerifyResult(
+            "SELECT t1.c1, t2.c1 FROM t1 JOIN t2 ON t1.c1 = t2.c1")
+          assert(findTopLevelSortMergeJoin(origin1).size == 1)
+          adaptiveJoinStrategy match {
+            case "BHJ" =>
+              assert(findTopLevelBroadcastHashJoin(adaptive1).size == 1)
+            case "SHJ" =>
+              assert(findTopLevelShuffledHashJoin(adaptive1).size == 1)
+            case "SMJ" =>
+              assert(findTopLevelSortMergeJoin(adaptive1).size == 1)
+            case _ =>
+              throw new IllegalArgumentException(s"Not support strategy: $adaptiveJoinStrategy")
+          }
+
+          // respect user specified join hint
+          val (origin2, adaptive2) = runAdaptiveAndVerifyResult(
+            "SELECT /*+ MERGE(t1) */ t1.c1, t2.c1 FROM t1 JOIN t2 ON t1.c1 = t2.c1")
+          assert(findTopLevelSortMergeJoin(origin2).size == 1)
+          assert(findTopLevelSortMergeJoin(adaptive2).size == 1)
+        }
+      }
+
+      // t1: 1600 bytes
+      // t2: 160 bytes
+      spark.sparkContext.parallelize(
+        (1 to 100).map(i => TestData(i, i.toString)), 10)
+        .toDF("c1", "c2").createOrReplaceTempView("t1")
+      spark.sparkContext.parallelize(
+        (1 to 10).map(i => TestData(i, i.toString)), 5)
+        .toDF("c1", "c2").createOrReplaceTempView("t2")
+
+      Seq("" -> "", SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1")

Review comment:
       it's simpler to write
   ```
   checkJoinStrategy("SMJ")
   withSQLConf(SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD ...) {
     checkJoinStrategy("SMJ")
   } 
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-829871699






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Gabriel39 commented on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
Gabriel39 commented on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-845594216


   @ulysses-you Well, I think you want to make broadcast threshold isolation between AQE and normal because current broadcast can lead to OOM. However, when a join is converted to a BHJ during normal planning process using static stats, it is definitely a BHJ and AQE should not optimize it to other join type since static stats (e.g sizeInBytes) is always larger or equal the actual value. So driver side OOM will occur only if the broadcast threshold is too large. 
   
   So Im not sure this PR make sense since OOM commonly due to  unreasonable broadcast threshold.
   
   If I misunderstand your point, feel free to point out my mistake. Thx.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-829973747


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138098/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-828936481


   **[Test build #138068 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138068/testReport)** for PR 32391 at commit [`21591b8`](https://github.com/apache/spark/commit/21591b81c084f8cfc48a9736fe266aa954c58bbb).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-829994403


   **[Test build #138099 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138099/testReport)** for PR 32391 at commit [`225bde3`](https://github.com/apache/spark/commit/225bde3815c06c4857a28b2ab6496e8a1199d123).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-829750262






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on a change in pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on a change in pull request #32391:
URL: https://github.com/apache/spark/pull/32391#discussion_r623573928



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
##########
@@ -47,11 +47,13 @@ object Statistics {
  *                    defaults to the product of children's `sizeInBytes`.
  * @param rowCount Estimated number of rows.
  * @param attributeStats Statistics for Attributes.
+ * @param isAdaptive Statistics from AQE.

Review comment:
       sounds better




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-845616002


   To add a bit more color: The static size estimation in Spark is usually underestimated, due to things like file compression. We can set the AQE broadcast threshold a bit higher as AQE size estimation is more precise.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-828951848


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42587/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on a change in pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on a change in pull request #32391:
URL: https://github.com/apache/spark/pull/32391#discussion_r623574701



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
##########
@@ -376,7 +377,8 @@ trait JoinSelectionHelper {
    * dynamic.
    */
   private def canBuildLocalHashMapBySize(plan: LogicalPlan, conf: SQLConf): Boolean = {
-    plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
+    plan.stats.sizeInBytes <
+      autoBroadcastJoinThreshold(plan.stats.isAdaptive, conf) * conf.numShufflePartitions

Review comment:
       yeah, I have the similar thought. The current condition of converting join to shuffled hash join is a bit rough that assume the data is not skewed for all partition.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-829849295


   @cloud-fan thank you for the review, address all comments: 
   * inline the config check
   * simplify the test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #32391:
URL: https://github.com/apache/spark/pull/32391#discussion_r623088100



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, JoinSelectionHelper}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, JoinHint, LogicalPlan, NO_BROADCAST_HASH, SHUFFLE_HASH}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+/**
+ * The main idea here is that make join config isolation between normal planner and aqe planner
+ * which shared the same code path.
+ * In order to achieve this we use a specific join hint in advance during AQE framework and then
+ * at JoinSelection side it will take and follow the inserted hint.
+ *
+ * For now we only support select strategy for equi join, and follow this order:
+ *   1. mark join as broadcast hash join if possible
+ *   2. mark join as shuffled hash join if possible
+ *
+ * Note that, we don't override join strategy if user specifies a join hint.
+ */
+object DynamicJoinSelection extends Rule[LogicalPlan] with JoinSelectionHelper {
+  final override val isAdaptive: Boolean = true
+
+  private def insertJoinHint(

Review comment:
       Seems we are not on the same page. I'm good with a new config. I'm asking for moving the logic to the existing planner rule, and use different configs for AQE and normal stats.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-829755947


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42613/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-829853174


   **[Test build #138099 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138099/testReport)** for PR 32391 at commit [`225bde3`](https://github.com/apache/spark/commit/225bde3815c06c4857a28b2ab6496e8a1199d123).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-829737968


   **[Test build #138093 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138093/testReport)** for PR 32391 at commit [`2923933`](https://github.com/apache/spark/commit/2923933c8ae2146b667b5a9621271a41f21e65be).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-830006550


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138099/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-828951848


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42587/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-829973747


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138098/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-828951820






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-829866817


   **[Test build #138093 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138093/testReport)** for PR 32391 at commit [`2923933`](https://github.com/apache/spark/commit/2923933c8ae2146b667b5a9621271a41f21e65be).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-829853174


   **[Test build #138099 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138099/testReport)** for PR 32391 at commit [`225bde3`](https://github.com/apache/spark/commit/225bde3815c06c4857a28b2ab6496e8a1199d123).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-829755947


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42613/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Gabriel39 commented on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
Gabriel39 commented on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-844840808


   Hi @ulysses-you , I have some questions for this PR. In current version, I think stats estimation is always larger or equal than actual value, so it seems like if a join is determined to convert to BHJ, it is also meet the broadcast conditions in AQE.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-828972048


   cc @maropu @cloud-fan @maryannxue do you have any thoughts about this feature ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-830006550


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138099/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #32391:
URL: https://github.com/apache/spark/pull/32391#discussion_r623610625



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
##########
@@ -278,7 +278,8 @@ trait JoinSelectionHelper {
    * Matches a plan whose output should be small enough to be used in broadcast join.
    */
   def canBroadcastBySize(plan: LogicalPlan, conf: SQLConf): Boolean = {
-    plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold
+    plan.stats.sizeInBytes >= 0 &&
+      plan.stats.sizeInBytes <= autoBroadcastJoinThreshold(plan.stats.isRuntime, conf)

Review comment:
       We don't need the method now, just put if-else here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-845581748


   @Gabriel39 Not sure I see your point. Do you mean a plan which can be broadcast in AQE is always smaller than the config we specified ? If so, What's the issue with that.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-829960917


   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on a change in pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on a change in pull request #32391:
URL: https://github.com/apache/spark/pull/32391#discussion_r623574242



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##########
@@ -1606,4 +1606,64 @@ class AdaptiveQueryExecSuite
       }
     }
   }
+
+  test("SPARK-35264: Support AQE side broadcastJoin threshold") {
+    withTable("t1", "t2") {

Review comment:
       updated




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-829852271


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42618/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-829121985


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138068/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #32391:
URL: https://github.com/apache/spark/pull/32391#discussion_r623566610



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
##########
@@ -47,11 +47,13 @@ object Statistics {
  *                    defaults to the product of children's `sizeInBytes`.
  * @param rowCount Estimated number of rows.
  * @param attributeStats Statistics for Attributes.
+ * @param isAdaptive Statistics from AQE.

Review comment:
       "adaptive" sounds weird for statistics. How about
   ```
   @param isRuntime Whether the statistics is inferred from query stage runtime statistics during
                    adaptive query execution.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #32391:
URL: https://github.com/apache/spark/pull/32391#discussion_r622804271



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, JoinSelectionHelper}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, JoinHint, LogicalPlan, NO_BROADCAST_HASH, SHUFFLE_HASH}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+/**
+ * The main idea here is that make join config isolation between normal planner and aqe planner
+ * which shared the same code path.
+ * In order to achieve this we use a specific join hint in advance during AQE framework and then
+ * at JoinSelection side it will take and follow the inserted hint.
+ *
+ * For now we only support select strategy for equi join, and follow this order:
+ *   1. mark join as broadcast hash join if possible
+ *   2. mark join as shuffled hash join if possible
+ *
+ * Note that, we don't override join strategy if user specifies a join hint.
+ */
+object DynamicJoinSelection extends Rule[LogicalPlan] with JoinSelectionHelper {
+  final override val isAdaptive: Boolean = true
+
+  private def insertJoinHint(

Review comment:
       My worry is that we need to keep the join selection logic in sync with the normal planner, which can be hard to maintain.
   
   Can we put the logic in `JoinSelectionHelper` directly? We can distinguish AQE stats and normal stats by adding a boolean flag to `Statistics`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Gabriel39 commented on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
Gabriel39 commented on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-845712527


   @ulysses-you @cloud-fan Thank you for your patience! I think I really misunderstood this PR and stats estimation before. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-845603097


   @Gabriel39 I guess you misunderstand the logic of AQE.
   
   > AQE should not optimize it to other join type since static stats (e.g sizeInBytes) is always larger or equal the actual value
   
   That's wrong, AQE can never change a BHJ to other join strategy which is decided at normal planner side. It's not about the stats, you can see some key code in `LogicalQueryStageStrategy`.
   
   And this new config is assuming a join is not a BHJ before AQE, so that AQE can use the new config and runtime stats to make a join (mostly is SMJ) as BHJ.
   
   So, usually the right way of using this new config is 1) forbid the normal auto broadcast or reduce the value 2) tune the new config value.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-829877146






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-829846355


   Kubernetes integration test unable to build dist.
   
   exiting with code: 1
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42618/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on a change in pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on a change in pull request #32391:
URL: https://github.com/apache/spark/pull/32391#discussion_r622892555



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, JoinSelectionHelper}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, JoinHint, LogicalPlan, NO_BROADCAST_HASH, SHUFFLE_HASH}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+/**
+ * The main idea here is that make join config isolation between normal planner and aqe planner
+ * which shared the same code path.
+ * In order to achieve this we use a specific join hint in advance during AQE framework and then
+ * at JoinSelection side it will take and follow the inserted hint.
+ *
+ * For now we only support select strategy for equi join, and follow this order:
+ *   1. mark join as broadcast hash join if possible
+ *   2. mark join as shuffled hash join if possible
+ *
+ * Note that, we don't override join strategy if user specifies a join hint.
+ */
+object DynamicJoinSelection extends Rule[LogicalPlan] with JoinSelectionHelper {
+  final override val isAdaptive: Boolean = true
+
+  private def insertJoinHint(

Review comment:
       @cloud-fan  That might work but I want to explain some reason of adding this rule at AQE optimizer side. Currently it looks like we follow the `JoinSelection` logic but it may be broken. Then we can easily add other join selection in future.
   
   Let's assume if we have an another config like `spark.sql.adaptive.shuffledHashJoinBuildSideThreshold`, then we can add the check in this rule so that we can pick the SMJ instead of SHJ if it's partition size over the threshold.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-829737968


   **[Test build #138093 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138093/testReport)** for PR 32391 at commit [`2923933`](https://github.com/apache/spark/commit/2923933c8ae2146b667b5a9621271a41f21e65be).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-830004523


   thanks for merging!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #32391:
URL: https://github.com/apache/spark/pull/32391#discussion_r623567498



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##########
@@ -1606,4 +1606,64 @@ class AdaptiveQueryExecSuite
       }
     }
   }
+
+  test("SPARK-35264: Support AQE side broadcastJoin threshold") {
+    withTable("t1", "t2") {

Review comment:
       `withTempView`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-829802697


   **[Test build #138098 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138098/testReport)** for PR 32391 at commit [`7d383cf`](https://github.com/apache/spark/commit/7d383cfa070952d9b2e90e7537b168abbb39c9f3).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan closed pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
cloud-fan closed pull request #32391:
URL: https://github.com/apache/spark/pull/32391


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-829852271


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42618/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-829802697


   **[Test build #138098 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138098/testReport)** for PR 32391 at commit [`7d383cf`](https://github.com/apache/spark/commit/7d383cfa070952d9b2e90e7537b168abbb39c9f3).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-829955046


   **[Test build #138098 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138098/testReport)** for PR 32391 at commit [`7d383cf`](https://github.com/apache/spark/commit/7d383cfa070952d9b2e90e7537b168abbb39c9f3).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #32391:
URL: https://github.com/apache/spark/pull/32391#discussion_r623568566



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
##########
@@ -376,7 +377,8 @@ trait JoinSelectionHelper {
    * dynamic.
    */
   private def canBuildLocalHashMapBySize(plan: LogicalPlan, conf: SQLConf): Boolean = {
-    plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
+    plan.stats.sizeInBytes <
+      autoBroadcastJoinThreshold(plan.stats.isAdaptive, conf) * conf.numShufflePartitions

Review comment:
       Let's not touch shuffle hash join for now. I think in AQE we should check per-partition size to decide if we want to go SHJ or not, instead of using the old formula.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-828936481


   **[Test build #138068 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138068/testReport)** for PR 32391 at commit [`21591b8`](https://github.com/apache/spark/commit/21591b81c084f8cfc48a9736fe266aa954c58bbb).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-829109713


   **[Test build #138068 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138068/testReport)** for PR 32391 at commit [`21591b8`](https://github.com/apache/spark/commit/21591b81c084f8cfc48a9736fe266aa954c58bbb).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32391: [SPARK-35264][SQL] Support AQE side broadcastJoin threshold

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32391:
URL: https://github.com/apache/spark/pull/32391#issuecomment-829877146






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org