You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2021/04/25 15:12:00 UTC

[jira] [Assigned] (SPARK-35213) Corrupt DataFrame for certain withField patterns

     [ https://issues.apache.org/jira/browse/SPARK-35213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apache Spark reassigned SPARK-35213:
------------------------------------

    Assignee:     (was: Apache Spark)

> Corrupt DataFrame for certain withField patterns
> ------------------------------------------------
>
>                 Key: SPARK-35213
>                 URL: https://issues.apache.org/jira/browse/SPARK-35213
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer, SQL
>    Affects Versions: 3.1.1
>            Reporter: Adam Binford
>            Priority: Major
>
> We encountered a very weird bug heavily using withField in production with the Spark 3.1.1. Jobs were dying with a lot of very weird JVM crashing errors (like jshort_disjoint_arraycopy during a copyMemory call), and occasional NegativeArraySize exceptions. We finally found a work around by ordering our withField calls in a certain way, and I was finally able to create some minimal examples to reproduce similar weird/broken behavior.
> It seems to stem from the optimizations added in [https://github.com/apache/spark/pull/29812.] Because the same new optimization was added as an analyzer, there seems to be two different ways this issue can crop up, once at analysis time and once at runtime.
> While these examples might seem odd, they represent how we've created a helper function that can create columns in arbitrary nested fields even if the intermediate fields don't exist yet.
> Example of what I assume is an issue during analysis:
> {code:java}
> import pyspark.sql.functions as F 
> df = spark.range(1).withColumn('data', F.struct()
>     .withField('a', F.struct())
>     .withField('b', F.struct())
>     .withField('a.aa', F.lit('aa'))
>     .withField('b.ba', F.lit('ba'))
>     .withField('a.ab', F.lit('ab'))
>     .withField('b.bb', F.lit('bb'))
>     .withField('a.ac', F.lit('ac'))
> )
> df.printSchema(){code}
> Output schema:
> {code:java}
> root
>  |-- id: long (nullable = false)
>  |-- data: struct (nullable = false)
>  | |-- b: struct (nullable = false)
>  | | |-- aa: string (nullable = false)
>  | | |-- ab: string (nullable = false)
>  | | |-- bb: string (nullable = false)
>  | |-- a: struct (nullable = false)
>  | | |-- aa: string (nullable = false)
>  | | |-- ab: string (nullable = false)
>  | | |-- ac: string (nullable = false){code}
> And an example of runtime data issue:
> {code:java}
> df = (spark.range(1)
>  .withColumn('data', F.struct()
>    .withField('a', F.struct().withField('aa', F.lit('aa')))
>    .withField('b', F.struct().withField('ba', F.lit('ba')))
>  )
>  .withColumn('data', F.col('data').withField('b.bb', F.lit('bb')))
>  .withColumn('data', F.col('data').withField('a.ab', F.lit('ab')))
> )
> df.printSchema()
> df.groupBy('data.a.aa', 'data.a.ab', 'data.b.ba', 'data.b.bb').count().show()
> {code}
>  Output:
> {code:java}
> root
>  |-- id: long (nullable = false)
>  |-- data: struct (nullable = false)
>  | |-- a: struct (nullable = false)
>  | | |-- aa: string (nullable = false)
>  | | |-- ab: string (nullable = false)
>  | |-- b: struct (nullable = false)
>  | | |-- ba: string (nullable = false)
>  | | |-- bb: string (nullable = false)
> +---+---+---+---+-----+
> | aa| ab| ba| bb|count|
> +---+---+---+---+-----+
> | ba| bb| aa| ab|    1|
> +---+---+---+---+-----+
> {code}
> The columns have the wrong data in them, even though the schema is correct. Additionally, if you add another column you get an exception:
> {code:java}
> df = (spark.range(1)
>  .withColumn('data', F.struct()
>    .withField('a', F.struct().withField('aa', F.lit('aa')))
>    .withField('b', F.struct().withField('ba', F.lit('ba')))
>  )
>  .withColumn('data', F.col('data').withField('a.ab', F.lit('ab')))
>  .withColumn('data', F.col('data').withField('b.bb', F.lit('bb')))
>  .withColumn('data', F.col('data').withField('a.ac', F.lit('ac')))
> )
> df.groupBy('data.a.aa', 'data.a.ab', 'data.a.ac', 'data.b.ba', 'data.b.bb').count().show()
> java.lang.ArrayIndexOutOfBoundsException: 2 at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.genericGet(rows.scala:201) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getAs(rows.scala:35) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String(rows.scala:46) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String$(rows.scala:46) at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doConsume_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
> {code}
>  But if you reorder the withField expressions, you get correct behavior:
> {code:java}
> df = (spark.range(1)
>  .withColumn('data', F.struct()
>    .withField('a', F.struct().withField('aa', F.lit('aa')))
>    .withField('b', F.struct().withField('ba', F.lit('ba')))
>  )
>  .withColumn('data', F.col('data').withField('a.ab', F.lit('ab')))
>  .withColumn('data', F.col('data').withField('a.ac', F.lit('ac')))
>  .withColumn('data', F.col('data').withField('b.bb', F.lit('bb')))
> )
> df.groupBy('data.a.aa', 'data.a.ab', 'data.a.ac', 'data.b.ba', 'data.b.bb').count().show()
> +---+---+---+---+---+-----+
> | aa| ab| ac| ba| bb|count|
> +---+---+---+---+---+-----+
> | aa| ab| ac| ba| bb|    1|
> +---+---+---+---+---+-----+
> {code}
>  I think this has to do with the double ".reverse" method to dedupe expressions in OptimizeUpdateFields. I'm working on a PR to try to fix these issues.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org