You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Xiao Li (JIRA)" <ji...@apache.org> on 2018/07/23 16:54:00 UTC

[jira] [Resolved] (SPARK-24850) Query plan string representation grows exponentially on queries with recursive cached datasets

     [ https://issues.apache.org/jira/browse/SPARK-24850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Xiao Li resolved SPARK-24850.
-----------------------------
       Resolution: Fixed
         Assignee: Onur Satici
    Fix Version/s: 2.4.0

> Query plan string representation grows exponentially on queries with recursive cached datasets
> ----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-24850
>                 URL: https://issues.apache.org/jira/browse/SPARK-24850
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.0
>            Reporter: Onur Satici
>            Assignee: Onur Satici
>            Priority: Major
>             Fix For: 2.4.0
>
>
> As of [https://github.com/apache/spark/pull/21018], InMemoryRelation includes its cacheBuilder when logging query plans. This CachedRDDBuilder includes the cachedPlan, so calling treeString on InMemoryRelation will log the cachedPlan in the cacheBuilder.
> Given the sample dataset:
> {code:java}
> $ cat test.csv
> A,B
> 0,0{code}
> If the query plan has multiple cached datasets that depend on each other:
> {code:java}
> var df_cached = spark.read.format("csv").option("header", "true").load("test.csv").cache()
> 0 to 1 foreach { _ =>
> df_cached = df_cached.join(spark.read.format("csv").option("header", "true").load("test.csv"), "A").cache()
> }
> df_cached.explain
> {code}
> results in:
> {code:java}
> == Physical Plan ==
> InMemoryTableScan [A#10, B#11, B#35, B#87]
> +- InMemoryRelation [A#10, B#11, B#35, B#87], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(2) Project [A#10, B#11, B#35, B#87]
> +- *(2) BroadcastHashJoin [A#10], [A#86], Inner, BuildRight
> :- *(2) Filter isnotnull(A#10)
> : +- InMemoryTableScan [A#10, B#11, B#35], [isnotnull(A#10)]
> : +- InMemoryRelation [A#10, B#11, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(2) Project [A#10, B#11, B#35]
> +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight
> :- *(2) Filter isnotnull(A#10)
> : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
> : +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
> ,None)
> : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
> +- *(1) Filter isnotnull(A#34)
> +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)]
> +- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
> ,None)
> +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
> ,None)
> : +- *(2) Project [A#10, B#11, B#35]
> : +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight
> : :- *(2) Filter isnotnull(A#10)
> : : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
> : : +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
> ,None)
> : : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
> : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
> : +- *(1) Filter isnotnull(A#34)
> : +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)]
> : +- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
> ,None)
> : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
> +- *(1) Filter isnotnull(A#86)
> +- InMemoryTableScan [A#86, B#87], [isnotnull(A#86)]
> +- InMemoryRelation [A#86, B#87], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
> ,None)
> +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
> ,None)
> +- *(2) Project [A#10, B#11, B#35, B#87]
> +- *(2) BroadcastHashJoin [A#10], [A#86], Inner, BuildRight
> :- *(2) Filter isnotnull(A#10)
> : +- InMemoryTableScan [A#10, B#11, B#35], [isnotnull(A#10)]
> : +- InMemoryRelation [A#10, B#11, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(2) Project [A#10, B#11, B#35]
> +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight
> :- *(2) Filter isnotnull(A#10)
> : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
> : +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
> ,None)
> : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
> +- *(1) Filter isnotnull(A#34)
> +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)]
> +- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
> ,None)
> +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
> ,None)
> : +- *(2) Project [A#10, B#11, B#35]
> : +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight
> : :- *(2) Filter isnotnull(A#10)
> : : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
> : : +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
> ,None)
> : : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
> : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
> : +- *(1) Filter isnotnull(A#34)
> : +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)]
> : +- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
> ,None)
> : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
> +- *(1) Filter isnotnull(A#86)
> +- InMemoryTableScan [A#86, B#87], [isnotnull(A#86)]
> +- InMemoryRelation [A#86, B#87], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
> ,None)
> +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
> {code}
> previously this yielded:
> {code:java}
> == Physical Plan ==
> InMemoryTableScan [A#10, B#11, B#37, B#89]
> +- InMemoryRelation [A#10, B#11, B#37, B#89], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
> +- *(2) Project [A#10, B#11, B#37, B#89]
> +- *(2) BroadcastHashJoin [A#10], [A#88], Inner, BuildRight
> :- *(2) Filter isnotnull(A#10)
> : +- InMemoryTableScan [A#10, B#11, B#37], [isnotnull(A#10)]
> : +- InMemoryRelation [A#10, B#11, B#37], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
> : +- *(2) Project [A#10, B#11, B#37]
> : +- *(2) BroadcastHashJoin [A#10], [A#36], Inner, BuildRight
> : :- *(2) Filter isnotnull(A#10)
> : : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
> : : +- InMemoryRelation [A#10, B#11], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
> : : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
> : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
> : +- *(1) Filter isnotnull(A#36)
> : +- InMemoryTableScan [A#36, B#37], [isnotnull(A#36)]
> : +- InMemoryRelation [A#36, B#37], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
> : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
> +- *(1) Filter isnotnull(A#88)
> +- InMemoryTableScan [A#88, B#89], [isnotnull(A#88)]
> +- InMemoryRelation [A#88, B#89], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
> +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
> {code}
> This exponential growth can OOM the driver on large query plans with cached datasets.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org