You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:02:25 UTC
[jira] [Updated] (SPARK-23512) Complex operations on Dataframe
corrupts data
[ https://issues.apache.org/jira/browse/SPARK-23512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon updated SPARK-23512:
---------------------------------
Labels: bulk-closed (was: )
> Complex operations on Dataframe corrupts data
> ---------------------------------------------
>
> Key: SPARK-23512
> URL: https://issues.apache.org/jira/browse/SPARK-23512
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 2.2.1
> Reporter: Nazarii Bardiuk
> Priority: Major
> Labels: bulk-closed
>
> Next code demonstrates sequence of transformations for a DataFrame that corrupts data
> {code}
> from pyspark import SparkContext, SQLContext, Row
> from pyspark.sql import Window
> from pyspark.sql.functions import explode, lit, count, row_number, col, countDistinct
> ss = SQLContext(SparkContext('local', 'pyspark'))
> diffs = ss.createDataFrame([
> Row(id="1", a=["1"], b=["2"], t="2"),
> Row(id="2", a=["2"], b=["1"], t="1"),
> Row(id="3", a=["1"], b=["4", "3"], t="3"),
> Row(id="3", a=["1"], b=["4", "3"], t="4"),
> Row(id="4", a=["1"], b=["4", "3"], t="3"),
> Row(id="4", a=["1"], b=["4", "3"], t="4")
> ])
> a = diffs.select("id", explode("a").alias("l"), "t").withColumn("problem", lit("a"))
> b = diffs.select("id", explode("b").alias("l"), "t").withColumn("problem", lit("b")) \
> .filter(col("t") != col("l"))
> all = a.union(b)
> grouped = all \
> .groupBy("l", "t", "problem").agg(count("id").alias("count")) \
> .withColumn("rn", row_number().over(Window.partitionBy("l", "problem").orderBy(col("count").desc()))) \
> .withColumn("f", (col("rn") < 2) & (col("count") > 1)) \
> .cache() # the change that broke test
> keep = grouped.filter("f").select("l", "t", "problem", "count")
> agg = all.join(grouped.filter(~col("f")), ["l", "t", "problem"]) \
> .withColumn("t", lit(None)) \
> .groupBy("l", "t", "problem").agg(countDistinct("id").alias("count"))
> keep.union(agg).show() # corrupts column "problem"
> agg.union(keep).show() # as expected
> {code}
>
> Expected: data in "problem" column of both unions is the same
> Actual: "problem" column looses data
> {code}
> keep.union(agg).show() # corrupts column "problem"
> +---+----+-------+-----+
> | l| t|problem|count|
> +---+----+-------+-----+
> | 3| 4| a| 2|
> | 4| 3| a| 2|
> | 1| 4| a| 2|
> | 1|null| a| 3|
> | 2|null| a| 1|
> +---+----+-------+-----+
> agg.union(keep).show() # as expected
> +---+----+-------+-----+
> | l| t|problem|count|
> +---+----+-------+-----+
> | 1|null| a| 3|
> | 2|null| a| 1|
> | 3| 4| b| 2|
> | 4| 3| b| 2|
> | 1| 4| a| 2|
> +---+----+-------+-----+
> {code}
> Note a cache() statement that was a tipping point that broke our code, without it works as expected
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org