You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yan (JIRA)" <ji...@apache.org> on 2016/04/09 02:45:25 UTC

[jira] [Commented] (SPARK-11368) Spark shouldn't scan all partitions when using Python UDF and filter over partitioned column is given

    [ https://issues.apache.org/jira/browse/SPARK-11368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15233231#comment-15233231 ] 

Yan commented on SPARK-11368:
-----------------------------

The issue seems to be gone with the latest master code (for 2.0):

sqlCtx.sql('select count(*) from df where id >= 990 and multiply2(value) > 200000').explain(True):

WholeStageCodegen
:  +- TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count(1)#14L])
:     +- INPUT
+- Exchange SinglePartition, None
   +- WholeStageCodegen
      :  +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#17L])
      :     +- Project
      :        +- Project [value#0L,id#1]
      :           +- Filter (cast(pythonUDF0#18 as double) > 200000.0)
      :              +- INPUT
      +- !BatchPythonEvaluation [multiply2(value#0L)], [value#0L,id#1,pythonUDF0#18]
         +- WholeStageCodegen
            :  +- BatchedScan HadoopFiles[value#0L,id#1] Format: ParquetFormat, PushedFilters: [], ReadSchema: struct<value:bigint>

while 1.6 still had the issue as reported.

> Spark shouldn't scan all partitions when using Python UDF and filter over partitioned column is given
> -----------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-11368
>                 URL: https://issues.apache.org/jira/browse/SPARK-11368
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, SQL
>            Reporter: Maciej BryƄski
>            Priority: Critical
>
> Hi,
> I think this is huge performance bug.
> I created parquet file partitioned by column.
> Then I make query with filter over partition column and filter with UDF.
> Result is that all partition are scanned.
> Sample data:
> {code}
> rdd = sc.parallelize(range(0,10000000)).map(lambda x: Row(id=x%1000,value=x)).repartition(1)
> df = sqlCtx.createDataFrame(rdd)
> df.write.mode("overwrite").partitionBy('id').parquet('/mnt/mfs/udf_test')
> df = sqlCtx.read.parquet('/mnt/mfs/udf_test')
> df.registerTempTable('df')
> {code}
> Then queries:
> Without udf - Spark reads only 10 partitions:
> {code}
> start = time.time()
> sqlCtx.sql('select * from df where id >= 990 and value > 100000').count()
> print(time.time() - start)
> 0.9993703365325928
> == Physical Plan ==
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#22L])
>  TungstenExchange SinglePartition
>   TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[currentCount#25L])
>    Project
>     Filter (value#5L > 100000)
>      Scan ParquetRelation[file:/mnt/mfs/udf_test][value#5L]
> {code}
> With udf Spark reads all the partitions:
> {code}
> sqlCtx.registerFunction('multiply2', lambda x: x *2 )
> start = time.time()
> sqlCtx.sql('select * from df where id >= 990 and multiply2(value) > 200000').count()
> print(time.time() - start)
> 13.0826096534729
> == Physical Plan ==
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#34L])
>  TungstenExchange SinglePartition
>   TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[currentCount#37L])
>    TungstenProject
>     Filter ((id#6 > 990) && (cast(pythonUDF#33 as double) > 200000.0))
>      !BatchPythonEvaluation PythonUDF#multiply2(value#5L), [value#5L,id#6,pythonUDF#33]
>       Scan ParquetRelation[file:/mnt/mfs/udf_test][value#5L,id#6]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org