You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Matthew Anthony (JIRA)" <ji...@apache.org> on 2018/04/12 15:00:00 UTC

[jira] [Updated] (SPARK-23970) pyspark - simple filter/select doesn't use all tasks when coalesce is set

     [ https://issues.apache.org/jira/browse/SPARK-23970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Matthew Anthony updated SPARK-23970:
------------------------------------
    Description: 
Running in (py)spark 2.2. 

Marking this as PySpark, but have not confirmed whether this is Spark-wide; I've observed it in pyspark which is my preferred API.
{code:java}
df = spark.sql(
"""
select <redacted>
from <inputtbl>
where <conditions
"""
)
df.coalesce(32).write.parquet(...){code}
The above code will only attempt to use 32 tasks to read and process all of the original input data. This compares to 
{code:java}
df = spark.sql(
"""
select <redacted>
from <inputtbl>
where <conditions
"""
).cache()
df.count()
df.coalesce(32).write.parquet(...){code}
where this will use the full complement of tasks available to the cluster to do the initial filter, with a subsequent shuffle to coalesce and write. The latter execution path is way more efficient, particularly at large volumes where filtering will remove most records and should be the default. Note that in the real setting in which I am running this, I'm operating a 20 node cluster with 16 cores and 56gb RAM per machine, and processing well over a TB of raw data in <inputtbl>.

 

  was:
 

Marking this as PySpark, but have not confirmed whether this is Spark-wide; I've observed it in pyspark which is my preferred API.
{code:java}
df = spark.sql(
"""
select <redacted>
from <inputtbl>
where <conditions
"""
)
df.coalesce(32).write.parquet(...){code}
The above code will only attempt to use 32 tasks to read and process all of the original input data. This compares to 
{code:java}
df = spark.sql(
"""
select <redacted>
from <inputtbl>
where <conditions
"""
).cache()
df.count()
df.coalesce(32).write.parquet(...){code}
where this will use the full complement of tasks available to the cluster to do the initial filter, with a subsequent shuffle to coalesce and write. The latter execution path is way more efficient, particularly at large volumes where filtering will remove most records and should be the default. Note that in the real setting in which I am running this, I'm operating a 20 node cluster with 16 cores and 56gb RAM per machine, and processing well over a TB of raw data in <inputtbl>.

 


> pyspark - simple filter/select doesn't use all tasks when coalesce is set
> -------------------------------------------------------------------------
>
>                 Key: SPARK-23970
>                 URL: https://issues.apache.org/jira/browse/SPARK-23970
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.2.0, 2.2.1
>            Reporter: Matthew Anthony
>            Priority: Major
>
> Running in (py)spark 2.2. 
> Marking this as PySpark, but have not confirmed whether this is Spark-wide; I've observed it in pyspark which is my preferred API.
> {code:java}
> df = spark.sql(
> """
> select <redacted>
> from <inputtbl>
> where <conditions
> """
> )
> df.coalesce(32).write.parquet(...){code}
> The above code will only attempt to use 32 tasks to read and process all of the original input data. This compares to 
> {code:java}
> df = spark.sql(
> """
> select <redacted>
> from <inputtbl>
> where <conditions
> """
> ).cache()
> df.count()
> df.coalesce(32).write.parquet(...){code}
> where this will use the full complement of tasks available to the cluster to do the initial filter, with a subsequent shuffle to coalesce and write. The latter execution path is way more efficient, particularly at large volumes where filtering will remove most records and should be the default. Note that in the real setting in which I am running this, I'm operating a 20 node cluster with 16 cores and 56gb RAM per machine, and processing well over a TB of raw data in <inputtbl>.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org