You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Daniel Imberman <da...@gmail.com> on 2016/02/26 21:29:02 UTC

Attempting to aggregate multiple values

Hi all,

So over the past few days I've been attempting to create a function that
takes an RDD[U], and creates three MMaps. I've been attempting to aggregate
these values but I'm running into a major issue.

when I initially tried to use separate aggregators for each map, I noticed
a significant slowdown due to the fact that I was running three aggregates.
To battle this issue I created one aggregator that takes in all three
values as a tuple and acts someone

val zeroValue: (A, B, C) = ??? // (accum1.zero, accum2.zero, accum3.zero)
def seqOp(r: (A, B, C), t: T): (A, B, C) = r match {
  // (accum1.addAccumulator(a, t), ..., accum3..addAccumulator(c, t))
  case (a, b, c) =>  ??? }
def combOp(r1: (A, B, C), r2: (A, B, C)): (A, B, C) = (r1, r2) match {
  // (acc1.addInPlace(a1, a2), ..., acc3.addInPlace(c1, c2))
  case ((a1, b1, c1), (a2, b2, c2)) => ???}
val rdd: RDD[T] = ???
val accums: (A, B, C) = rdd.aggregate(zeroValue)(seqOp, combOp)


However, upon building this joint aggregator I've noticed an obscene amount
of garbage collection which is grinding my progress to a halt. My current
theory is that because I'm using a tuple of maps rather than individual
mutable maps that the system is creating way too many objects. Has anyone
run into a problem like this before? Does anyone have any suggestions for
aggregating multiple values without creating a new object eve