You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dongjoon Hyun (Jira)" <ji...@apache.org> on 2022/01/27 19:56:00 UTC

[jira] [Commented] (SPARK-37442) In AQE, wrong InMemoryRelation size estimation causes "Cannot broadcast the table that is larger than 8GB: 8 GB" failure

    [ https://issues.apache.org/jira/browse/SPARK-37442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17483405#comment-17483405 ] 

Dongjoon Hyun commented on SPARK-37442:
---------------------------------------

I converted this into SPARK-37063's subtask.

> In AQE, wrong InMemoryRelation size estimation causes "Cannot broadcast the table that is larger than 8GB: 8 GB" failure
> ------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-37442
>                 URL: https://issues.apache.org/jira/browse/SPARK-37442
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Optimizer, SQL
>    Affects Versions: 3.1.1, 3.2.0
>            Reporter: Michael Chen
>            Assignee: Michael Chen
>            Priority: Major
>             Fix For: 3.2.1, 3.3.0
>
>
> There is a period in time where an InMemoryRelation will have the cached buffers loaded, but the statistics will be inaccurate (anywhere between 0 -> size in bytes reported by accumulators). When AQE is enabled, it is possible that join planning strategies will happen in this window. In this scenario, join children sizes including InMemoryRelation are greatly underestimated and a broadcast join can be planned when it shouldn't be. We have seen scenarios where a broadcast join is planned with the builder size greater than 8GB because at planning time, the optimizer believes the InMemoryRelation is 0 bytes.
> Here is an example test case where the broadcast threshold is being ignored. It can mimic the 8GB error by increasing the size of the tables.
> {code:java}
> withSQLConf(
>   SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
>   SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1048584") {
>   // Spark estimates a string column as 20 bytes so with 60k rows, these relations should be
>   // estimated at ~120m bytes which is greater than the broadcast join threshold
>   Seq.fill(60000)("a").toDF("key")
>     .createOrReplaceTempView("temp")
>   Seq.fill(60000)("b").toDF("key")
>     .createOrReplaceTempView("temp2")
>   Seq("a").toDF("key").createOrReplaceTempView("smallTemp")
>   spark.sql("SELECT key as newKey FROM temp").persist()
>   val query =
>   s"""
>      |SELECT t3.newKey
>      |FROM
>      |  (SELECT t1.newKey
>      |  FROM (SELECT key as newKey FROM temp) as t1
>      |        JOIN
>      |        (SELECT key FROM smallTemp) as t2
>      |        ON t1.newKey = t2.key
>      |  ) as t3
>      |  JOIN
>      |  (SELECT key FROM temp2) as t4
>      |  ON t3.newKey = t4.key
>      |UNION
>      |SELECT t1.newKey
>      |FROM
>      |    (SELECT key as newKey FROM temp) as t1
>      |    JOIN
>      |    (SELECT key FROM temp2) as t2
>      |    ON t1.newKey = t2.key
>      |""".stripMargin
>   val df = spark.sql(query)
>   df.collect()
>   val adaptivePlan = df.queryExecution.executedPlan
>   val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
>   assert(bhj.length == 1) {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org