You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Michael Chen (Jira)" <ji...@apache.org> on 2021/11/22 19:02:00 UTC

[jira] [Created] (SPARK-37442) In AQE, wrong InMemoryRelation size estimation causes "Cannot broadcast the table that is larger than 8GB: 8 GB" failure

Michael Chen created SPARK-37442:
------------------------------------

             Summary: In AQE, wrong InMemoryRelation size estimation causes "Cannot broadcast the table that is larger than 8GB: 8 GB" failure
                 Key: SPARK-37442
                 URL: https://issues.apache.org/jira/browse/SPARK-37442
             Project: Spark
          Issue Type: Bug
          Components: Optimizer, SQL
    Affects Versions: 3.2.0, 3.1.1
            Reporter: Michael Chen


There is a period in time where an InMemoryRelation will have the cached buffers loaded, but the statistics will be inaccurate (anywhere between 0 -> size in bytes reported by accumulators). When AQE is enabled, it is possible that join planning strategies will happen in this window. In this scenario, join children sizes including InMemoryRelation are greatly underestimated and a broadcast join can be planned when it shouldn't be. We have seen scenarios where a broadcast join is planned with the builder size greater than 8GB because at planning time, the optimizer believes the InMemoryRelation is 0 bytes.

Here is an example test case where the broadcast threshold is being ignored. It can mimic the 8GB error by increasing the size of the tables.
{code:java}
withSQLConf(
  SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
  SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1048584") {
  // Spark estimates a string column as 20 bytes so with 60k rows, these relations should be
  // estimated at ~120m bytes which is greater than the broadcast join threshold
  Seq.fill(60000)("a").toDF("key")
    .createOrReplaceTempView("temp")
  Seq.fill(60000)("b").toDF("key")
    .createOrReplaceTempView("temp2")

  Seq("a").toDF("key").createOrReplaceTempView("smallTemp")
  spark.sql("SELECT key as newKey FROM temp").persist()

  val query =
  s"""
     |SELECT t3.newKey
     |FROM
     |  (SELECT t1.newKey
     |  FROM (SELECT key as newKey FROM temp) as t1
     |        JOIN
     |        (SELECT key FROM smallTemp) as t2
     |        ON t1.newKey = t2.key
     |  ) as t3
     |  JOIN
     |  (SELECT key FROM temp2) as t4
     |  ON t3.newKey = t4.key
     |UNION
     |SELECT t1.newKey
     |FROM
     |    (SELECT key as newKey FROM temp) as t1
     |    JOIN
     |    (SELECT key FROM temp2) as t2
     |    ON t1.newKey = t2.key
     |""".stripMargin
  val df = spark.sql(query)
  df.collect()
  val adaptivePlan = df.queryExecution.executedPlan
  val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
  assert(bhj.length == 1) {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org