You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by kriskalish <kr...@kalish.net> on 2014/09/15 22:58:27 UTC

Weird aggregation results when reusing objects inside reduceByKey

I have a pretty simple scala spark aggregation job that is summing up number
of occurrences of two types of events. I have run into situations where it
seems to generate bad values that are clearly incorrect after reviewing the
raw data. 

First I have a Record object which I use to do my aggregation: 

class Record (val PrimaryId: Int,
              val SubId: Int,
              var Event1Count: Int,
              var Event2Count: Int) extends Serializable  {
}

Then once I have an RDD I do a reduce by key:

    val allAgr = all.map(x => (s"${x.PrimaryId}-${x.SubId}", x)).reduceByKey
{ (l, r) =>
      l.Event1Count= l.Event1Count+ r.Event1Count
      l.Event2Count= l.Event2Count+ r.Event2Count
      l
    }.map(x => x._2)

The problem is that for some scenarios I get about 16 billion back for
Event1Count, but the value of Event2Count looks fine. If I refactor my
reduce by key function to actually produce a new object, it seems to work:

    val allAgr = all.map(x => (s"${x.PrimaryId}-${x.SubId}", x)).reduceByKey
{ (l, r) =>
      val n = new Record(l.PrimaryId, l.SubId, 0, 0 )
      n.Event1Count= l.Event1Count+ r.Event1Count
      n.Event2Count= l.Event2Count+ r.Event2Count
      n
    }.map(x => x._2)


This second option is clearly the safer way to go since there is no chance
for changing values via reference. However, it doesn't make sense to me that
this should fix it as in map reduce a once a object is reduced, it should
never be reduced again (otherwise double-counting would happen).

I dug into the source a little:

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Aggregator.scala


I didn't really see any obvious redflags and admittedly it is beyond my
comprehension.

Any ideas?




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Weird-aggregation-results-when-reusing-objects-inside-reduceByKey-tp14287.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Weird aggregation results when reusing objects inside reduceByKey

Posted by kriskalish <kr...@kalish.net>.
Thanks for the insight, I didn't realize there was internal object reuse
going on. Is this a mechanism of Scala/Java or is this a mechanism of Spark?

I actually just converted the code to use immutable case classes everywhere,
so it will be a little tricky to test foldByKey(). I'll try to get to it
after I upgrade to Spark 1.1.0. 

-Kris



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Weird-aggregation-results-when-reusing-objects-inside-reduceByKey-tp14287p14835.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Weird aggregation results when reusing objects inside reduceByKey

Posted by Sean Owen <so...@cloudera.com>.
It isn't a question of an item being reduced twice, but of when
objects may be reused to represent other items.

I don't think you have a guarantee that you can safely reuse the
objects in this argument, but I'd also be interested if there was a
case where this is guaranteed.

For example I'm guessing this does work if you foldByKey() and supply
your own starting value?

On Mon, Sep 15, 2014 at 9:58 PM, kriskalish <kr...@kalish.net> wrote:
> I have a pretty simple scala spark aggregation job that is summing up number
> of occurrences of two types of events. I have run into situations where it
> seems to generate bad values that are clearly incorrect after reviewing the
> raw data.
>
> First I have a Record object which I use to do my aggregation:
>
> class Record (val PrimaryId: Int,
>               val SubId: Int,
>               var Event1Count: Int,
>               var Event2Count: Int) extends Serializable  {
> }
>
> Then once I have an RDD I do a reduce by key:
>
>     val allAgr = all.map(x => (s"${x.PrimaryId}-${x.SubId}", x)).reduceByKey
> { (l, r) =>
>       l.Event1Count= l.Event1Count+ r.Event1Count
>       l.Event2Count= l.Event2Count+ r.Event2Count
>       l
>     }.map(x => x._2)
>
> The problem is that for some scenarios I get about 16 billion back for
> Event1Count, but the value of Event2Count looks fine. If I refactor my
> reduce by key function to actually produce a new object, it seems to work:
>
>     val allAgr = all.map(x => (s"${x.PrimaryId}-${x.SubId}", x)).reduceByKey
> { (l, r) =>
>       val n = new Record(l.PrimaryId, l.SubId, 0, 0 )
>       n.Event1Count= l.Event1Count+ r.Event1Count
>       n.Event2Count= l.Event2Count+ r.Event2Count
>       n
>     }.map(x => x._2)
>
>
> This second option is clearly the safer way to go since there is no chance
> for changing values via reference. However, it doesn't make sense to me that
> this should fix it as in map reduce a once a object is reduced, it should
> never be reduced again (otherwise double-counting would happen).
>
> I dug into the source a little:
>
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Aggregator.scala
>
>
> I didn't really see any obvious redflags and admittedly it is beyond my
> comprehension.
>
> Any ideas?
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Weird-aggregation-results-when-reusing-objects-inside-reduceByKey-tp14287.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org