You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yan (JIRA)" <ji...@apache.org> on 2016/04/09 02:45:25 UTC
[jira] [Commented] (SPARK-11368) Spark shouldn't scan all
partitions when using Python UDF and filter over partitioned column is
given
[ https://issues.apache.org/jira/browse/SPARK-11368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15233231#comment-15233231 ]
Yan commented on SPARK-11368:
-----------------------------
The issue seems to be gone with the latest master code (for 2.0):
sqlCtx.sql('select count(*) from df where id >= 990 and multiply2(value) > 200000').explain(True):
WholeStageCodegen
: +- TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count(1)#14L])
: +- INPUT
+- Exchange SinglePartition, None
+- WholeStageCodegen
: +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#17L])
: +- Project
: +- Project [value#0L,id#1]
: +- Filter (cast(pythonUDF0#18 as double) > 200000.0)
: +- INPUT
+- !BatchPythonEvaluation [multiply2(value#0L)], [value#0L,id#1,pythonUDF0#18]
+- WholeStageCodegen
: +- BatchedScan HadoopFiles[value#0L,id#1] Format: ParquetFormat, PushedFilters: [], ReadSchema: struct<value:bigint>
while 1.6 still had the issue as reported.
> Spark shouldn't scan all partitions when using Python UDF and filter over partitioned column is given
> -----------------------------------------------------------------------------------------------------
>
> Key: SPARK-11368
> URL: https://issues.apache.org/jira/browse/SPARK-11368
> Project: Spark
> Issue Type: Bug
> Components: PySpark, SQL
> Reporter: Maciej BryĆski
> Priority: Critical
>
> Hi,
> I think this is huge performance bug.
> I created parquet file partitioned by column.
> Then I make query with filter over partition column and filter with UDF.
> Result is that all partition are scanned.
> Sample data:
> {code}
> rdd = sc.parallelize(range(0,10000000)).map(lambda x: Row(id=x%1000,value=x)).repartition(1)
> df = sqlCtx.createDataFrame(rdd)
> df.write.mode("overwrite").partitionBy('id').parquet('/mnt/mfs/udf_test')
> df = sqlCtx.read.parquet('/mnt/mfs/udf_test')
> df.registerTempTable('df')
> {code}
> Then queries:
> Without udf - Spark reads only 10 partitions:
> {code}
> start = time.time()
> sqlCtx.sql('select * from df where id >= 990 and value > 100000').count()
> print(time.time() - start)
> 0.9993703365325928
> == Physical Plan ==
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#22L])
> TungstenExchange SinglePartition
> TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[currentCount#25L])
> Project
> Filter (value#5L > 100000)
> Scan ParquetRelation[file:/mnt/mfs/udf_test][value#5L]
> {code}
> With udf Spark reads all the partitions:
> {code}
> sqlCtx.registerFunction('multiply2', lambda x: x *2 )
> start = time.time()
> sqlCtx.sql('select * from df where id >= 990 and multiply2(value) > 200000').count()
> print(time.time() - start)
> 13.0826096534729
> == Physical Plan ==
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#34L])
> TungstenExchange SinglePartition
> TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[currentCount#37L])
> TungstenProject
> Filter ((id#6 > 990) && (cast(pythonUDF#33 as double) > 200000.0))
> !BatchPythonEvaluation PythonUDF#multiply2(value#5L), [value#5L,id#6,pythonUDF#33]
> Scan ParquetRelation[file:/mnt/mfs/udf_test][value#5L,id#6]
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org