You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2022/08/05 21:59:00 UTC

[jira] [Commented] (SPARK-39991) AQE should use available column statistics from completed query stages

    [ https://issues.apache.org/jira/browse/SPARK-39991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17576062#comment-17576062 ] 

Apache Spark commented on SPARK-39991:
--------------------------------------

User 'andygrove' has created a pull request for this issue:
https://github.com/apache/spark/pull/37424

> AQE should use available column statistics from completed query stages
> ----------------------------------------------------------------------
>
>                 Key: SPARK-39991
>                 URL: https://issues.apache.org/jira/browse/SPARK-39991
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.3.0
>            Reporter: Andy Grove
>            Priority: Major
>
> In QueryStageExec.computeStats we copy partial statistics from materlized query stages by calling QueryStageExec#getRuntimeStatistics, which in turn calls ShuffleExchangeLike#runtimeStatistics or BroadcastExchangeLike#runtimeStatistics.
> Only dataSize and numOutputRows are copied into the new Statistics object:
>  {code:scala}
>   def computeStats(): Option[Statistics] = if (isMaterialized) {
>     val runtimeStats = getRuntimeStatistics
>     val dataSize = runtimeStats.sizeInBytes.max(0)
>     val numOutputRows = runtimeStats.rowCount.map(_.max(0))
>     Some(Statistics(dataSize, numOutputRows, isRuntime = true))
>   } else {
>     None
>   }
> {code}
> I would like to also copy over the column statistics stored in Statistics.attributeMap so that they can be fed back into the logical plan optimization phase. This is a small change as shown below:
> {code:scala}
>   def computeStats(): Option[Statistics] = if (isMaterialized) {
>     val runtimeStats = getRuntimeStatistics
>     val dataSize = runtimeStats.sizeInBytes.max(0)
>     val numOutputRows = runtimeStats.rowCount.map(_.max(0))
>     val attributeStats = runtimeStats.attributeStats
>     Some(Statistics(dataSize, numOutputRows, attributeStats, isRuntime = true))
>   } else {
>     None
>   }
> {code}
> The Spark implementations of ShuffleExchangeLike and BroadcastExchangeLike do not currently provide such column statistics, but other custom implementations can.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org