You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Reynold Xin (JIRA)" <ji...@apache.org> on 2016/11/11 05:47:58 UTC

[jira] [Assigned] (SPARK-18367) DataFrame join spawns unreasonably high number of open files

     [ https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Reynold Xin reassigned SPARK-18367:
-----------------------------------

    Assignee: Reynold Xin

> DataFrame join spawns unreasonably high number of open files
> ------------------------------------------------------------
>
>                 Key: SPARK-18367
>                 URL: https://issues.apache.org/jira/browse/SPARK-18367
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.1, 2.1.0
>         Environment: Python 3.5, Java 8
>            Reporter: Nicholas Chammas
>            Assignee: Reynold Xin
>         Attachments: spark-lsof.txt
>
>
> I have a moderately complex DataFrame query that spawns north of 10K open files, causing my job to crash. 10K is the macOS limit on how many files a single process can have open at once. It seems unreasonable that Spark should hold that many files open at once.
> I was able to boil down what I'm seeing to the following minimal reproduction:
> {code}
> import pyspark
> from pyspark.sql import Row
> if __name__ == '__main__':
>     spark = pyspark.sql.SparkSession.builder.getOrCreate()
>     df = spark.createDataFrame([
>         Row(a=n)
>         for n in range(500000)
>     ])  #.coalesce(1)  # a coalesce(1) here "fixes" the problem
>     df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help
>     print('partitions:', df.rdd.getNumPartitions())
>     df.explain()
>     df.show(1)
> {code}
> When I run this code, Spark spawns over 2K open files. I can "fix" the problem by adding a {{coalesce(1)}} in the right place, as indicated in the comments above. When I do, Spark spawns no more than 600 open files. The number of partitions without the coalesce is 200.
> Here are the execution plans with and without the coalesce.
> 2K+ open files, without the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#3L], Inner
>    :- *Sort [a#0L ASC NULLS FIRST], false, 0
>    :  +- Exchange hashpartitioning(a#0L, 200)
>    :     +- *Filter isnotnull(a#0L)
>    :        +- Scan ExistingRDD[a#0L]
>    +- *Sort [a#3L ASC NULLS FIRST], false, 0
>       +- Exchange hashpartitioning(a#3L, 200)
>          +- *Filter isnotnull(a#3L)
>             +- Scan ExistingRDD[a#3L]
> {code}
> <600 open files, with the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#4L], Inner
>    :- *Sort [a#0L ASC NULLS FIRST], false, 0
>    :  +- Coalesce 1
>    :     +- *Filter isnotnull(a#0L)
>    :        +- Scan ExistingRDD[a#0L]
>    +- *Sort [a#4L ASC NULLS FIRST], false, 0
>       +- Coalesce 1
>          +- *Filter isnotnull(a#4L)
>             +- Scan ExistingRDD[a#4L]
> {code}
> So the key difference appears to be the {{Exchange hashpartitioning(a#0L, 200)}} operator.
> Is the large number of open files perhaps expected given the join on a large number of distinct keys? If so, how would one mitigate that issue? If not, is this a bug in Spark?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org