You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Yan Yang <ya...@wealthfront.com> on 2016/02/27 04:41:30 UTC

.cache() changes contents of RDD

Hi

I am pretty new to Spark, and after experimentation on our pipelines. I ran
into this weird issue.

The Scala code is as below:

val input = sc.newAPIHadoopRDD(...)
val rdd = input.map(...)
rdd.cache()
rdd.saveAsTextFile(...)

I found rdd to consist of 80+K identical rows. To be more precise, the
number of rows is right, but all are identical.

The truly weird part is if I remove rdd.cache(), everything works just
fine. I have encountered this issue on a few occasions.

Thanks
Yan

Re: .cache() changes contents of RDD

Posted by Ted Yu <yu...@gmail.com>.
Can you reveal what is done inside the map() ?

Which Spark release are you using ?

Cheers

On Fri, Feb 26, 2016 at 7:41 PM, Yan Yang <ya...@wealthfront.com> wrote:

> Hi
>
> I am pretty new to Spark, and after experimentation on our pipelines. I
> ran into this weird issue.
>
> The Scala code is as below:
>
> val input = sc.newAPIHadoopRDD(...)
> val rdd = input.map(...)
> rdd.cache()
> rdd.saveAsTextFile(...)
>
> I found rdd to consist of 80+K identical rows. To be more precise, the
> number of rows is right, but all are identical.
>
> The truly weird part is if I remove rdd.cache(), everything works just
> fine. I have encountered this issue on a few occasions.
>
> Thanks
> Yan
>
>
>
>
>

Re: .cache() changes contents of RDD

Posted by Igor Berman <ig...@gmail.com>.
are you using avro format by any chance?
there is some formats that need to be "deep"-copy before caching or
aggregating
try something like
val input = sc.newAPIHadoopRDD(...)
val rdd = input.map(deepCopyTransformation).map(...)
rdd.cache()
rdd.saveAsTextFile(...)

where deepCopyTransformation is function that deep copies every object

On 26 February 2016 at 19:41, Yan Yang <ya...@wealthfront.com> wrote:

> Hi
>
> I am pretty new to Spark, and after experimentation on our pipelines. I
> ran into this weird issue.
>
> The Scala code is as below:
>
> val input = sc.newAPIHadoopRDD(...)
> val rdd = input.map(...)
> rdd.cache()
> rdd.saveAsTextFile(...)
>
> I found rdd to consist of 80+K identical rows. To be more precise, the
> number of rows is right, but all are identical.
>
> The truly weird part is if I remove rdd.cache(), everything works just
> fine. I have encountered this issue on a few occasions.
>
> Thanks
> Yan
>
>
>
>
>

Re: .cache() changes contents of RDD

Posted by Sabarish Sasidharan <sa...@manthan.com>.
This is because Hadoop writables are being reused. Just map it to some
custom type and then do further operations including cache() on it.

Regards
Sab
On 27-Feb-2016 9:11 am, "Yan Yang" <ya...@wealthfront.com> wrote:

> Hi
>
> I am pretty new to Spark, and after experimentation on our pipelines. I
> ran into this weird issue.
>
> The Scala code is as below:
>
> val input = sc.newAPIHadoopRDD(...)
> val rdd = input.map(...)
> rdd.cache()
> rdd.saveAsTextFile(...)
>
> I found rdd to consist of 80+K identical rows. To be more precise, the
> number of rows is right, but all are identical.
>
> The truly weird part is if I remove rdd.cache(), everything works just
> fine. I have encountered this issue on a few occasions.
>
> Thanks
> Yan
>
>
>
>
>