You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Shardul Mahadik (Jira)" <ji...@apache.org> on 2021/09/28 16:42:00 UTC

[jira] [Updated] (SPARK-36877) Calling ds.rdd with AQE enabled leads to being jobs being run, eventually causing reruns

     [ https://issues.apache.org/jira/browse/SPARK-36877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Shardul Mahadik updated SPARK-36877:
------------------------------------
    Attachment: Screen Shot 2021-09-28 at 09.32.20.png

> Calling ds.rdd with AQE enabled leads to being jobs being run, eventually causing reruns
> ----------------------------------------------------------------------------------------
>
>                 Key: SPARK-36877
>                 URL: https://issues.apache.org/jira/browse/SPARK-36877
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.1.2, 3.2.1
>            Reporter: Shardul Mahadik
>            Priority: Major
>         Attachments: Screen Shot 2021-09-28 at 09.32.20.png
>
>
> In one of our jobs we perform the following operation:
> {code:scala}
> val df = /* some expensive multi-table/multi-stage join */
> val numPartitions = df.rdd.getNumPartitions
> df.repartition(x).write.....
> {code}
> With AQE enabled, we found that the expensive stages were being run twice causing significant performance regression after enabling AQE; once when calling {{df.rdd}} and again when calling {{df.write}}.
> A more concrete example:
> {code:scala}
> scala> sql("SET spark.sql.adaptive.enabled=true")
> res0: org.apache.spark.sql.DataFrame = [key: string, value: string]
> scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1")
> res1: org.apache.spark.sql.DataFrame = [key: string, value: string]
> scala> val df1 = spark.range(10).withColumn("id2", $"id")
> df1: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> val df2 = df1.join(spark.range(10), "id").join(spark.range(10), "id").join(spark.range(10), "id")
> df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> val df3 = df2.groupBy("id2").count()
> df3: org.apache.spark.sql.DataFrame = [id2: bigint, count: bigint]
> scala> df3.rdd.getNumPartitions
> res2: Int = 10                                                    (0 + 16) / 16]
> scala> df3.repartition(5).write.mode("overwrite").orc("/tmp/orc1")
> {code}
> In the screenshot below, you can see that the first 3 stages (0 to 4) were rerun again (5 to 9).
> I have two questions:
> 1) Should calling df.rdd trigger actual job execution when AQE is enabled?
> 2) Should calling df.write later cause rerun of the stages? If df.rdd has already partially executed the stages, shouldn't it reuse the result from previous stages?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org