You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Kazuaki Ishizaki (JIRA)" <ji...@apache.org> on 2018/02/25 17:54:00 UTC

[jira] [Commented] (SPARK-23512) Complex operations on Dataframe corrupts data

    [ https://issues.apache.org/jira/browse/SPARK-23512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16376171#comment-16376171 ] 

Kazuaki Ishizaki commented on SPARK-23512:
------------------------------------------

This problem can be reproduced with master using the following scala code.

{code:java}
  test("SPARK-23512") {
    val diffs = Seq(
      ("1", Array("1"), Array("2"), "2"),
      ("2", Array("2"), Array("1"), "1"),
      ("3", Array("1"), Array("4", "3"), "3"),
      ("3", Array("1"), Array("4", "3"), "4"),
      ("4", Array("1"), Array("4", "3"), "3"),
      ("4", Array("1"), Array("4", "3"), "4")
    ).toDF("id", "a", "b", "t")

    val a = diffs.select($"id", explode($"a").alias("l"), $"t").withColumn("problem", lit("a"))
    val b = diffs.select($"id", explode($"b").alias("l"), $"t").withColumn("problem", lit("b"))
      .filter("t != l")

    val all = a.union(b)

    val grouped = all
      .groupBy("l", "t", "problem").agg(count("id").alias("count"))
      .withColumn("rn", row_number().over(
        org.apache.spark.sql.expressions.Window.partitionBy($"l", $"problem")
          .orderBy(col("count").desc)))
      .withColumn("f", (col("rn") < 2) and (col("count") > 1))

    val keep = grouped.filter("f").select("l", "t", "problem", "count")

    val agg = all.join(grouped.filter("f == false"), Seq("l", "t", "problem"))
      .withColumn("t", lit(null))
      .groupBy("l", "t", "problem").agg(countDistinct("id").alias("count"))

    keep.union(agg).show() // as expected

    val groupedCache = grouped.cache
    val keepc = groupedCache.filter("f").select("l", "t", "problem", "count")
    keepc.union(agg).show() // corrupts column "problem"

    //    agg.union(keep).show() // as expected
  }
{code}

{code}
+---+----+-------+-----+
|  l|   t|problem|count|
+---+----+-------+-----+
|  1|   4|      a|    2|
|  3|   4|      b|    2|
|  4|   3|      b|    2|
|  1|null|      a|    3|
|  2|null|      a|    1|
+---+----+-------+-----+

+---+----+-------+-----+
|  l|   t|problem|count|
+---+----+-------+-----+
|  1|   4|      a|    2|
|  3|   4|      a|    2|
|  4|   3|      a|    2|
|  1|null|      a|    3|
|  2|null|      a|    1|
+---+----+-------+-----+
{code}
 

> Complex operations on Dataframe corrupts data
> ---------------------------------------------
>
>                 Key: SPARK-23512
>                 URL: https://issues.apache.org/jira/browse/SPARK-23512
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.2.1
>            Reporter: Nazarii Bardiuk
>            Priority: Major
>
> Next code demonstrates sequence of transformations for a DataFrame that corrupts data
> {code}
> from pyspark import SparkContext, SQLContext, Row
> from pyspark.sql import Window
> from pyspark.sql.functions import explode, lit, count, row_number, col, countDistinct
> ss = SQLContext(SparkContext('local', 'pyspark'))
> diffs = ss.createDataFrame([
>     Row(id="1", a=["1"], b=["2"], t="2"),
>     Row(id="2", a=["2"], b=["1"], t="1"),
>     Row(id="3", a=["1"], b=["4", "3"], t="3"),
>     Row(id="3", a=["1"], b=["4", "3"], t="4"),
>     Row(id="4", a=["1"], b=["4", "3"], t="3"),
>     Row(id="4", a=["1"], b=["4", "3"], t="4")
> ])
> a = diffs.select("id", explode("a").alias("l"), "t").withColumn("problem", lit("a"))
> b = diffs.select("id", explode("b").alias("l"), "t").withColumn("problem", lit("b")) \
>     .filter(col("t") != col("l"))
> all = a.union(b)
> grouped = all \
>     .groupBy("l", "t", "problem").agg(count("id").alias("count")) \
>     .withColumn("rn", row_number().over(Window.partitionBy("l", "problem").orderBy(col("count").desc()))) \
>     .withColumn("f", (col("rn") < 2) & (col("count") > 1)) \
>     .cache()  # the change that broke test
> keep = grouped.filter("f").select("l", "t", "problem", "count")
> agg = all.join(grouped.filter(~col("f")), ["l", "t", "problem"]) \
>     .withColumn("t", lit(None)) \
>     .groupBy("l", "t", "problem").agg(countDistinct("id").alias("count"))
> keep.union(agg).show() # corrupts column "problem"
> agg.union(keep).show() # as expected
> {code}
>  
> Expected: data in "problem" column of both unions is the same
>  Actual: "problem" column looses data
> {code}
> keep.union(agg).show() # corrupts column "problem"
> +---+----+-------+-----+                                                        
> |  l|   t|problem|count|
> +---+----+-------+-----+
> |  3|   4|      a|    2|
> |  4|   3|      a|    2|
> |  1|   4|      a|    2|
> |  1|null|      a|    3|
> |  2|null|      a|    1|
> +---+----+-------+-----+
> agg.union(keep).show() # as expected
> +---+----+-------+-----+                                                        
> |  l|   t|problem|count|
> +---+----+-------+-----+
> |  1|null|      a|    3|
> |  2|null|      a|    1|
> |  3|   4|      b|    2|
> |  4|   3|      b|    2|
> |  1|   4|      a|    2|
> +---+----+-------+-----+
> {code}
> Note a cache() statement that was a tipping point that broke our code, without it works as expected



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org