You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Lietong Liu (Jira)" <ji...@apache.org> on 2021/11/15 12:28:00 UTC

[jira] [Updated] (SPARK-37328) SPARK-33832 brings the bug that OptimizeSkewedJoin may not work since it was applied on whole plan innstead of new stage plan

     [ https://issues.apache.org/jira/browse/SPARK-37328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Lietong Liu updated SPARK-37328:
--------------------------------
    Summary: SPARK-33832 brings the bug that OptimizeSkewedJoin may not work since it was applied on whole plan innstead of new stage plan  (was: SPARK-33832 brings the bug that OptimizeSkewedJoin may not work since it was applied onn whole plan innstead of new stage plan)

> SPARK-33832 brings the bug that OptimizeSkewedJoin may not work since it was applied on whole plan innstead of new stage plan
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-37328
>                 URL: https://issues.apache.org/jira/browse/SPARK-37328
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.2.0
>            Reporter: Lietong Liu
>            Priority: Major
>
> Since OptimizeSkewedJoin was moved from queryStageOptimizerRules to 
> queryStagePreparationRules, the position OptimizeSkewedJoin was applied has been moved from newQueryStage() to reOptimize(). The plan OptimizeSkewedJoin applied on changed from plan of new stage which is about to submit to whole spark plan.
> In the cases where skewedJoin is not last stage, OptimizeSkewedJoin may not work because the number of collected shuffleStages is more than 2.
> The following test will prove it:
>  
>  
> {code:java}
> test("OptimizeSkewJoin may not work") {
>   withSQLConf(
>     SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
>     SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
>     SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "100",
>     SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "100",
>     SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
>     SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
>     withTempView("skewData1", "skewData2", "skewData3") {
>       spark
>         .range(0, 1000, 1, 10)
>         .selectExpr("id % 3 as key1", "id % 3 as value1")
>         .createOrReplaceTempView("skewData1")
>       spark
>         .range(0, 1000, 1, 10)
>         .selectExpr("id % 1 as key2", "id as value2")
>         .createOrReplaceTempView("skewData2")
>       spark
>         .range(0, 1000, 1, 10)
>         .selectExpr("id % 1 as key3", "id as value3")
>         .createOrReplaceTempView("skewData3")
>       // Query has two skewedJoin in two continuous stages.
>       val (_, adaptive1) =
>         runAdaptiveAndVerifyResult(
>           """
>             |SELECT key1 FROM skewData1 s1
>             |JOIN skewData2 s2
>             |ON s1.key1 = s2.key2
>             |JOIN skewData3
>             |ON s1.value1 = value3
>             |""".stripMargin)
>       val shuffles1 = collect(adaptive1) {
>         case s: ShuffleExchangeExec => s
>       }
>       assert(shuffles1.size == 4)
>       val smj1 = findTopLevelSortMergeJoin(adaptive1)
>       assert(smj1.size == 2 && smj1.forall(_.isSkewJoin))
>     }
>   }
> } {code}
> I'll open a PR shortly to fix this issue
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org