You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Richard Tsai <ri...@gmail.com> on 2017/04/01 07:29:21 UTC

[Spark Core]: flatMap/reduceByKey seems to be quite slow with Long keys on some distributions

Hi all,

I'm using Spark to process some corpora and I need to count the occurrence
of each 2-gram. I started with counting tuples (wordID1, wordID2) and it
worked fine except the large memory usage and gc overhead due to the
substantial number of small tuple objects. Then I tried to pack a pair of
Int into a Long, and the gc overhead did reduce greatly, but the run time
also increased several times.

I ran some small experiments with random data on different distributions.
It seems that the performance issue only occurs on exponential distributed
data. The example code is attached.

The job is split into two stages, flatMap() and count(). When counting
Tuples, flatMap() takes about 6s and count() takes about 2s, while when
counting Longs, flatMap() takes 18s and count() takes 10s.

I haven't look into Spark's implementation of flatMap/reduceByKey, but I
guess Spark has some specializations for Long keys which happen to perform
not very well on some specific distributions. Does anyone have ideas about
this?

Best wishes,
Richard

// lines of word IDs
val data = (1 to 5000).par.map({ _ =>
  (1 to 1000) map { _ => (-1000 * Math.log(Random.nextDouble)).toInt }
}).seq

// count Tuples, fast
sc parallelize(data) flatMap { line =>
  val first = line.iterator
  val second = line.iterator.drop(1)
  for (pair <- first zip(second))
    yield (pair, 1L)
} reduceByKey { _ + _ } count()

// count Long, slow
sc parallelize(data) flatMap { line =>
  val first = line.iterator
  val second = line.iterator.drop(1)
  for ((a, b) <- first zip(second))
    yield ((a.toLong << 32) | b, 1L)
} reduceByKey { _ + _ } count()