You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2022/12/12 13:20:00 UTC
[jira] [Commented] (SPARK-41214) SQL metrics are missing from Spark UI when AQE for Cached DataFrame is enabled
[ https://issues.apache.org/jira/browse/SPARK-41214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17646093#comment-17646093 ]
Apache Spark commented on SPARK-41214:
--------------------------------------
User 'ulysses-you' has created a pull request for this issue:
https://github.com/apache/spark/pull/39037
> SQL metrics are missing from Spark UI when AQE for Cached DataFrame is enabled
> ------------------------------------------------------------------------------
>
> Key: SPARK-41214
> URL: https://issues.apache.org/jira/browse/SPARK-41214
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.4.0
> Reporter: Eren Avsarogullari
> Priority: Major
> Attachments: DAG when AQE=ON and AQECachedDFSupport=ON with fix.png, DAG when AQE=ON and AQECachedDFSupport=ON without fix.png
>
>
> *spark.sql.optimizer.canChangeCachedPlanOutputPartitioning* enables AQE optimizations under InMemoryRelation(IMR) nodes. Following sample query has IMR node on both BroadcastHashJoin legs. However,
> when spark.sql.optimizer.canChangeCachedPlanOutputPartitioning = true, following datas are missed due to lack of final sub-plans (under IMR) submissions (into UI).
> {code:java}
> - Physical operators' SQLMetrics (before AdaptiveSparkPlan) are missed such as Exchange and HashAggregate on left BHJ leg and HashAggregate on right BHJ leg,
> - WSCG blocks are missed on left BHJ leg,
> - AQEShuffleRead node is missed on left BHJ leg. {code}
> *Sample to reproduce:*
> {code:java}
> val spark = SparkSession
> .builder()
> .config("spark.sql.adaptive.enabled", "true")
> .config("spark.sql.optimizer.canChangeCachedPlanOutputPartitioning", "true")
> .master("local[*]")
> .getOrCreate()
> import spark.implicits._
> // Create 1th DF
> val arr = Seq(
> (1, "Employee_1", "Department_1"),
> (2, "Employee_2", "Department_2"))
> val df = arr.toDF("id", "name", "department")
> .filter('id < 3)
> .groupBy('name)
> .count()
> df.cache()
> // Create 2th DF
> val arr2 = Seq((1, "Employee_1", "Department_1"))
> val df2 = arr2.toDF("id", "name", "department")
> .filter('id > 0)
> .groupBy('name)
> .count()
> df2.cache()
> // Trigger query execution
> val df3 = df.join(df2, "name")
> df3.show() {code}
> *DAG before fix:*
> *!DAG when AQE=ON and AQECachedDFSupport=ON without fix.png|width=33,height=86!*
> *DAG after fix:*
> *!DAG when AQE=ON and AQECachedDFSupport=ON with fix.png|width=33,height=82!*
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org