You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Liang-Chi Hsieh (JIRA)" <ji...@apache.org> on 2017/11/20 07:02:00 UTC

[jira] [Comment Edited] (SPARK-22541) Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators

    [ https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16258868#comment-16258868 ] 

Liang-Chi Hsieh edited comment on SPARK-22541 at 11/20/17 7:01 AM:
-------------------------------------------------------------------

Sorry, my previous reply is not completely correct.

This behavior is also related to how PySpark runs python udfs. We can try to show the query plan of the {{df}}:

{code}
...
== Physical Plan ==
*Project [key#0, val1#1L, val2#2L]
+- *Filter (pythonUDF0#21 && pythonUDF1#22)
   +- BatchEvalPython [myfilter1(val1#1L), myfilter2(val2#2L)], [key#0, val1#1L, val2#2L, pythonUDF0#21, pythonUDF1#22]
      +- Scan ExistingRDD[key#0,val1#1L,val2#2L]
{code}

The python udfs are pushed down to a special physical operator {{BatchEvalPython}} to execute. Due to the implementation details, the pushed down python udfs are not conditional. That's said they are evaluated on all rows, even logically in the original query they are only evaluated on part of rows by using some conditional expressions such as when or if. The issue you found here is also the same reason.






was (Author: viirya):
Sorry, my previous reply is not completely correct.

This behavior is related to how PySpark runs python udfs. We can try to show the query plan of the {{df}}:

{code}
...
== Physical Plan ==
*Project [key#0, val1#1L, val2#2L]
+- *Filter (pythonUDF0#21 && pythonUDF1#22)
   +- BatchEvalPython [myfilter1(val1#1L), myfilter2(val2#2L)], [key#0, val1#1L, val2#2L, pythonUDF0#21, pythonUDF1#22]
      +- Scan ExistingRDD[key#0,val1#1L,val2#2L]
{code}

The python udfs are pushed down to a special physical operator {{BatchEvalPython}} to execute. Due to the implementation details, the pushed down python udfs are not conditional. That's said they are evaluated on all rows, even logically in the original query they are only evaluated on part of rows by using some conditional expressions such as when or if. The issue you found here is also the same reason.





> Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators
> ------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-22541
>                 URL: https://issues.apache.org/jira/browse/SPARK-22541
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.2.0
>         Environment: pyspark 2.2.0, ubuntu
>            Reporter: Janne K. Olesen
>
> I'm using udf filters and accumulators to keep track of filtered rows in dataframes.
> If I'm applying multiple filters one after the other, they seem to be executed in parallel, not in sequence, which messes with the accumulators i'm using to keep track of filtered data. 
> {code:title=example.py|borderStyle=solid}
> from pyspark.sql.functions import udf, col
> from pyspark.sql.types import BooleanType
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.getOrCreate()
> sc = spark.sparkContext
> df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", "val1", "val2"])
> def __myfilter(val, acc):
>     if val < 2:
>         return True
>     else:
>         acc.add(1)
>     return False
> acc1 = sc.accumulator(0)
> acc2 = sc.accumulator(0)
> def myfilter1(val):
>     return __myfilter(val, acc1)
> def myfilter2(val):
>     return __myfilter(val, acc2)
> my_udf1 = udf(myfilter1, BooleanType())
> my_udf2 = udf(myfilter2, BooleanType())
> df.show()
> # +---+----+----+
> # |key|val1|val2|
> # +---+----+----+
> # |  a|   1|   1|
> # |  b|   2|   2|
> # |  c|   3|   3|
> # +---+----+----+
> df = df.filter(my_udf1(col("val1")))
> # df.show()
> # +---+----+----+
> # |key|val1|val2|
> # +---+----+----+
> # |  a|   1|   1|
> # +---+----+----+
> # expected acc1: 2
> # expected acc2: 0
> df = df.filter(my_udf2(col("val2")))
> # df.show()
> # +---+----+----+
> # |key|val1|val2|
> # +---+----+----+
> # |  a|   1|   1|
> # +---+----+----+
> # expected acc1: 2
> # expected acc2: 0
> df.show()
> print("acc1: %s" % acc1.value)  # expected 2, is 2 OK
> print("acc2: %s" % acc2.value)  # expected 0, is 2 !!!
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org