You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2022/12/12 13:20:00 UTC

[jira] [Commented] (SPARK-41214) SQL metrics are missing from Spark UI when AQE for Cached DataFrame is enabled

    [ https://issues.apache.org/jira/browse/SPARK-41214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17646093#comment-17646093 ] 

Apache Spark commented on SPARK-41214:
--------------------------------------

User 'ulysses-you' has created a pull request for this issue:
https://github.com/apache/spark/pull/39037

> SQL metrics are missing from Spark UI when AQE for Cached DataFrame is enabled
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-41214
>                 URL: https://issues.apache.org/jira/browse/SPARK-41214
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.4.0
>            Reporter: Eren Avsarogullari
>            Priority: Major
>         Attachments: DAG when AQE=ON and AQECachedDFSupport=ON with fix.png, DAG when AQE=ON and AQECachedDFSupport=ON without fix.png
>
>
> *spark.sql.optimizer.canChangeCachedPlanOutputPartitioning* enables AQE optimizations under InMemoryRelation(IMR) nodes. Following sample query has IMR node on both BroadcastHashJoin legs. However, 
> when spark.sql.optimizer.canChangeCachedPlanOutputPartitioning = true, following datas are missed due to lack of final sub-plans (under IMR) submissions (into UI).
> {code:java}
> - Physical operators' SQLMetrics (before AdaptiveSparkPlan) are missed such as Exchange and HashAggregate on left BHJ leg and HashAggregate on right BHJ leg,
> - WSCG blocks are missed on left BHJ leg, 
> - AQEShuffleRead node is missed on left BHJ leg. {code}
> *Sample to reproduce:*
> {code:java}
> val spark = SparkSession
>     .builder()
>     .config("spark.sql.adaptive.enabled", "true")
>     .config("spark.sql.optimizer.canChangeCachedPlanOutputPartitioning", "true")
>     .master("local[*]")
>     .getOrCreate()
> import spark.implicits._
> // Create 1th DF 
> val arr = Seq(
>   (1, "Employee_1", "Department_1"),
>   (2, "Employee_2", "Department_2"))
> val df = arr.toDF("id", "name", "department")
>   .filter('id < 3)
>   .groupBy('name)
>   .count()
> df.cache()
> // Create 2th DF
> val arr2 = Seq((1, "Employee_1", "Department_1"))
> val df2 = arr2.toDF("id", "name", "department")
>   .filter('id > 0)
>   .groupBy('name)
>   .count()
> df2.cache()
> // Trigger query execution
> val df3 = df.join(df2, "name")
> df3.show() {code}
> *DAG before fix:*
> *!DAG when AQE=ON and AQECachedDFSupport=ON without fix.png|width=33,height=86!*
> *DAG after fix:*
> *!DAG when AQE=ON and AQECachedDFSupport=ON with fix.png|width=33,height=82!*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org