You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Peter Toth (Jira)" <ji...@apache.org> on 2021/03/16 08:50:00 UTC

[jira] [Updated] (SPARK-33482) V2 Datasources that extend FileScan preclude exchange reuse

     [ https://issues.apache.org/jira/browse/SPARK-33482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Peter Toth updated SPARK-33482:
-------------------------------
    Affects Version/s: 3.0.0
                       3.0.1
                       3.0.2
                       3.1.1

> V2 Datasources that extend FileScan preclude exchange reuse
> -----------------------------------------------------------
>
>                 Key: SPARK-33482
>                 URL: https://issues.apache.org/jira/browse/SPARK-33482
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.0.0, 3.0.1, 3.0.2, 3.1.0, 3.1.1
>            Reporter: Bruce Robbins
>            Priority: Major
>
> Sample query:
> {noformat}
> spark.read.parquet("tbl").createOrReplaceTempView("tbl")
> spark.read.parquet("lookup").createOrReplaceTempView("lookup")
> sql("""
>    select tbl.col1, fk1, fk2
>    from tbl, lookup l1, lookup l2
>    where fk1 = l1.key
>    and fk2 = l2.key
> """).explain
> {noformat}
> Test files can be created as so:
> {noformat}
> import scala.util.Random
> val rand = Random
> val tbl = spark.range(1, 10000).map { x =>
>   (rand.nextLong.abs % 20,
>    rand.nextLong.abs % 20,
>    x)
> }.toDF("fk1", "fk2", "col1")
> tbl.write.mode("overwrite").parquet("tbl")
> val lookup = spark.range(0, 20).map { x =>
>   (x + 1, x * 10000, (x + 1) * 10000)
> }.toDF("key", "col1", "col2")
> lookup.write.mode("overwrite").parquet("lookup")
> {noformat}
> Output with V1 Parquet reader:
> {noformat}
>  == Physical Plan ==
> *(3) Project [col1#2L, fk1#0L, fk2#1L]
> +- *(3) BroadcastHashJoin [fk2#1L], [key#12L], Inner, BuildRight, false
>    :- *(3) Project [fk1#0L, fk2#1L, col1#2L]
>    :  +- *(3) BroadcastHashJoin [fk1#0L], [key#6L], Inner, BuildRight, false
>    :     :- *(3) Filter (isnotnull(fk1#0L) AND isnotnull(fk2#1L))
>    :     :  +- *(3) ColumnarToRow
>    :     :     +- FileScan parquet [fk1#0L,fk2#1L,col1#2L] Batched: true, DataFilters: [isnotnull(fk1#0L), isnotnull(fk2#1L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/tbl], PartitionFilters: [], PushedFilters: [IsNotNull(fk1), IsNotNull(fk2)], ReadSchema: struct<fk1:bigint,fk2:bigint,col1:bigint>
>    :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#75]
>    :        +- *(1) Filter isnotnull(key#6L)
>    :           +- *(1) ColumnarToRow
>    :              +- FileScan parquet [key#6L] Batched: true, DataFilters: [isnotnull(key#6L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/lookup], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint>
>    +- ReusedExchange [key#12L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#75]
> {noformat}
> With V1 Parquet reader, the exchange for lookup is reused (see last line).
> Output with V2 Parquet reader (spark.sql.sources.useV1SourceList=""):
> {noformat}
>  == Physical Plan ==
> *(3) Project [col1#2L, fk1#0L, fk2#1L]
> +- *(3) BroadcastHashJoin [fk2#1L], [key#12L], Inner, BuildRight, false
>    :- *(3) Project [fk1#0L, fk2#1L, col1#2L]
>    :  +- *(3) BroadcastHashJoin [fk1#0L], [key#6L], Inner, BuildRight, false
>    :     :- *(3) Filter (isnotnull(fk1#0L) AND isnotnull(fk2#1L))
>    :     :  +- *(3) ColumnarToRow
>    :     :     +- BatchScan[fk1#0L, fk2#1L, col1#2L] ParquetScan DataFilters: [isnotnull(fk1#0L), isnotnull(fk2#1L)], Format: parquet, Location: InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/tbl], PartitionFilters: [], PushedFilers: [IsNotNull(fk1), IsNotNull(fk2)], ReadSchema: struct<fk1:bigint,fk2:bigint,col1:bigint>, PushedFilters: [IsNotNull(fk1), IsNotNull(fk2)]
>    :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#75]
>    :        +- *(1) Filter isnotnull(key#6L)
>    :           +- *(1) ColumnarToRow
>    :              +- BatchScan[key#6L] ParquetScan DataFilters: [isnotnull(key#6L)], Format: parquet, Location: InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/lookup], PartitionFilters: [], PushedFilers: [IsNotNull(key)], ReadSchema: struct<key:bigint>, PushedFilters: [IsNotNull(key)]
>    +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#83]
>       +- *(2) Filter isnotnull(key#12L)
>          +- *(2) ColumnarToRow
>             +- BatchScan[key#12L] ParquetScan DataFilters: [isnotnull(key#12L)], Format: parquet, Location: InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/lookup], PartitionFilters: [], PushedFilers: [IsNotNull(key)], ReadSchema: struct<key:bigint>, PushedFilters: [IsNotNull(key)]
> {noformat}
> With the V2 Parquet reader, the exchange for lookup is not reused (see last 4 lines).
> You can see the same issue with the Orc reader (and I assume any other datasource that extends Filescan).
> The issue appears to be this check in FileScan#equals:
> {code:java}
> ExpressionSet(partitionFilters) == ExpressionSet(f.partitionFilters) &&
> ExpressionSet(dataFilters) == ExpressionSet(f.dataFilters)
> {code}
> partitionFilters and dataFilters are not normalized, so their exprIds don't match. Thus FileScan objects don't match, even if they are the same.
> As a side note, FileScan#equals has a dangling boolean expression:
> {code:java}
> fileIndex == f.fileIndex && readSchema == f.readSchema
> {code}
> The result of that expression is not actually used anywhere. We might want to include it in the final decision, even though that's not the issue here.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org