You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Eren Avsarogullari (Jira)" <ji...@apache.org> on 2023/10/06 22:01:00 UTC

[jira] [Created] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated IMR materialization

Eren Avsarogullari created SPARK-45443:
------------------------------------------

             Summary: Revisit TableCacheQueryStage to avoid replicated IMR materialization
                 Key: SPARK-45443
                 URL: https://issues.apache.org/jira/browse/SPARK-45443
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.5.0
            Reporter: Eren Avsarogullari


TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE  optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance.
For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization.
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281]

*Sample Query to simulate the problem:*
// Both join legs uses same IMR instance
{code:java}
import spark.implicits._

val arr = (1 to 12).map { i => {
    val index = i % 5
    (index, s"Employee_$index", s"Department_$index")
  }
}
val df = arr.toDF("id", "name", "department")
  .filter('id >= 0)
  .sort("id")
  .groupBy('id, 'name, 'department)
  .count().as("count")
df.persist()

val df2 = df.sort("count").filter('count <= 2)
val df3 = df.sort("count").filter('count >= 3)
val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter")

df4.show() {code}
*Physical Plan:*
{code:java}
== Physical Plan ==
AdaptiveSparkPlan (31)
+- == Final Plan ==
   CollectLimit (21)
   +- * Project (20)
      +- * SortMergeJoin FullOuter (19)
         :- * Sort (10)
         :  +- * Filter (9)
         :     +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5)
         :        +- InMemoryTableScan (1)
         :              +- InMemoryRelation (2)
         :                    +- AdaptiveSparkPlan (7)
         :                       +- HashAggregate (6)
         :                          +- Exchange (5)
         :                             +- HashAggregate (4)
         :                                +- LocalTableScan (3)
         +- * Sort (18)
            +- * Filter (17)
               +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, rowCount=5)
                  +- InMemoryTableScan (11)
                        +- InMemoryRelation (12)
                              +- AdaptiveSparkPlan (15)
                                 +- HashAggregate (14)
                                    +- Exchange (13)
                                       +- HashAggregate (4)
                                          +- LocalTableScan (3) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org