You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Liang-Chi Hsieh (JIRA)" <ji...@apache.org> on 2017/11/17 04:07:00 UTC
[jira] [Commented] (SPARK-22541) Dataframes: applying multiple
filters one after another using udfs and accumulators results in faulty
accumulators
[ https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16256441#comment-16256441 ]
Liang-Chi Hsieh commented on SPARK-22541:
-----------------------------------------
Due to query optimization, two filters are combined and actually only one filter operation is performed. The filtered results are correct. So I don't think this is a bug.
> Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators
> ------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-22541
> URL: https://issues.apache.org/jira/browse/SPARK-22541
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 2.2.0
> Environment: pyspark 2.2.0, ubuntu
> Reporter: Janne K. Olesen
>
> I'm using udf filters and accumulators to keep track of filtered rows in dataframes.
> If I'm applying multiple filters one after the other, they seem to be executed in parallel, not in sequence, which messes with the accumulators i'm using to keep track of filtered data.
> {code:title=example.py|borderStyle=solid}
> from pyspark.sql.functions import udf, col
> from pyspark.sql.types import BooleanType
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.getOrCreate()
> sc = spark.sparkContext
> df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", "val1", "val2"])
> def __myfilter(val, acc):
> if val < 2:
> return True
> else:
> acc.add(1)
> return False
> acc1 = sc.accumulator(0)
> acc2 = sc.accumulator(0)
> def myfilter1(val):
> return __myfilter(val, acc1)
> def myfilter2(val):
> return __myfilter(val, acc2)
> my_udf1 = udf(myfilter1, BooleanType())
> my_udf2 = udf(myfilter2, BooleanType())
> df.show()
> # +---+----+----+
> # |key|val1|val2|
> # +---+----+----+
> # | a| 1| 1|
> # | b| 2| 2|
> # | c| 3| 3|
> # +---+----+----+
> df = df.filter(my_udf1(col("val1")))
> # df.show()
> # +---+----+----+
> # |key|val1|val2|
> # +---+----+----+
> # | a| 1| 1|
> # +---+----+----+
> # expected acc1: 2
> # expected acc2: 0
> df = df.filter(my_udf2(col("val2")))
> # df.show()
> # +---+----+----+
> # |key|val1|val2|
> # +---+----+----+
> # | a| 1| 1|
> # +---+----+----+
> # expected acc1: 2
> # expected acc2: 0
> df.show()
> print("acc1: %s" % acc1.value) # expected 2, is 2 OK
> print("acc2: %s" % acc2.value) # expected 0, is 2 !!!
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org