You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Liang-Chi Hsieh (JIRA)" <ji...@apache.org> on 2017/11/17 04:07:00 UTC

[jira] [Commented] (SPARK-22541) Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators

    [ https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16256441#comment-16256441 ] 

Liang-Chi Hsieh commented on SPARK-22541:
-----------------------------------------

Due to query optimization, two filters are combined and actually only one filter operation is performed. The filtered results are correct. So I don't think this is a bug.

> Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators
> ------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-22541
>                 URL: https://issues.apache.org/jira/browse/SPARK-22541
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.2.0
>         Environment: pyspark 2.2.0, ubuntu
>            Reporter: Janne K. Olesen
>
> I'm using udf filters and accumulators to keep track of filtered rows in dataframes.
> If I'm applying multiple filters one after the other, they seem to be executed in parallel, not in sequence, which messes with the accumulators i'm using to keep track of filtered data. 
> {code:title=example.py|borderStyle=solid}
> from pyspark.sql.functions import udf, col
> from pyspark.sql.types import BooleanType
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.getOrCreate()
> sc = spark.sparkContext
> df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", "val1", "val2"])
> def __myfilter(val, acc):
>     if val < 2:
>         return True
>     else:
>         acc.add(1)
>     return False
> acc1 = sc.accumulator(0)
> acc2 = sc.accumulator(0)
> def myfilter1(val):
>     return __myfilter(val, acc1)
> def myfilter2(val):
>     return __myfilter(val, acc2)
> my_udf1 = udf(myfilter1, BooleanType())
> my_udf2 = udf(myfilter2, BooleanType())
> df.show()
> # +---+----+----+
> # |key|val1|val2|
> # +---+----+----+
> # |  a|   1|   1|
> # |  b|   2|   2|
> # |  c|   3|   3|
> # +---+----+----+
> df = df.filter(my_udf1(col("val1")))
> # df.show()
> # +---+----+----+
> # |key|val1|val2|
> # +---+----+----+
> # |  a|   1|   1|
> # +---+----+----+
> # expected acc1: 2
> # expected acc2: 0
> df = df.filter(my_udf2(col("val2")))
> # df.show()
> # +---+----+----+
> # |key|val1|val2|
> # +---+----+----+
> # |  a|   1|   1|
> # +---+----+----+
> # expected acc1: 2
> # expected acc2: 0
> df.show()
> print("acc1: %s" % acc1.value)  # expected 2, is 2 OK
> print("acc2: %s" % acc2.value)  # expected 0, is 2 !!!
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org