You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:16:43 UTC

[jira] [Resolved] (SPARK-22828) Data corruption happens when same RDD being repeatedly used as parent RDD of a custom RDD which reads each parent RDD in concurrent threads

     [ https://issues.apache.org/jira/browse/SPARK-22828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-22828.
----------------------------------
    Resolution: Incomplete

> Data corruption happens when same RDD being repeatedly used as parent RDD of a custom RDD which reads each parent RDD in concurrent threads
> -------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-22828
>                 URL: https://issues.apache.org/jira/browse/SPARK-22828
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.0
>            Reporter: Yongqin Xiao
>            Priority: Major
>              Labels: bulk-closed
>
> I have defined a custom RDD that 
> - computes the output based on input data using our traditional data transformation code. To give an extreme example, this custom RDD can behave as a union, joiner etc. 
> - takes one or more parent RDDs as input, where some or all parent RDDs can be the same
> - reads input parent RDDs in concurrent threads (i.e. reader threads)
> - computes the data in one or more transformation thread that concurrently running as the reader threads
> - ...
> In certain cases, we see{color:red} data being corrupted{color} when our reader threads read them in. The corruption happens when all of the following conditions are met:
> - Multiple parent RDDs of the custom RDD are actually the same RDD. e.g. same-source union.
> {code:java}
> The scala code is kind of like this:
> Rdd rdd1 = ...
> Rdd customRdd = new MyRdd(rdd1, rdd1, ...)
> {code}
> - The parent RDD is not a result of repartitioning or sorting-within-partition.
> - There is no persistence on the same parent RDD.
> - spark.sql.shuffle.partitions is set to 1. We saw corruption as well when the value is set to small value like 2, which is also the source partition count.
> This data corruption happens even when number of executors and cores are set to 1. Meaning this corruption is not related to multiple partitions running concurrently.
> Data corruption doesn't happen when either of the condition is met:
> 1. Instead of setting the same parent RDD as multiple input to my custom RDD, we do a select (of all columns) operation on that parent RDD, and use different select RDD as input.
> {code:java}
>  The scala code is like this:
> Rdd rdd1 = ...
> Rdd customRdd = new MyRdd(rdd1.select($1,$2,...), rdd1.select($1, $2), ...)
> {code}
> 2. we persist the parent RDD
> {code:java}
> Rdd rdd1 = ...
> rdd1.persist(...)
> Rdd customRdd = MyRdd(rdd1, rdd1, ...)
> {code}
> 3. we use single thread to read parent RDD in custom RDD implementation
> 4. Use our default value (100) for spark.sql.shuffle.partitions 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org