You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Tao Xiao <xi...@gmail.com> on 2014/11/19 08:06:53 UTC

A partitionBy problem

Hi all,

     I tested *partitionBy *feature in wordcount application, and I'm
puzzled by a phenomenon. In this application, I created an rdd from some
text files in HDFS(about 100GB in size), each of which has lines composed
of words separated by a character "#". I wanted to count the occurence for
each distinct word. *All lines have the same contents so finally the result
should be very small in bytes*.  The code is as follows:

      val text = sc.textFile(inputDir)
      val tuples = text.flatMap(line => line.split("#"))
                                   .map((_, 1))
                                   .reduceByKey(_ + _)
      tuples.collect.foreach{ case (word, count) => println(word + " -> " +
count)}

I submitted the application to a Spark cluster of 5 nodes and run it in
standalone mode. From the application UI
<http://imgbin.org/index.php?page=image&id=20976>, we can see that the
shuffle process for *collect* and *reduceByKey* occupied small bandwidth
(766.4KB for *collect*'s shuffle read and 961KB for *reduceByKey*'s shuffle
write).

*However, the shuffle process occupied quite large bandwith when I
added partitionBy like this:*

      val text = sc.textFile(inputDir)
       val tuples = text.flatMap(line => line.split("#"))
                                .map((_, 1))
                                .partitionBy(new HashPartitioner(100))
                                .reduceByKey(_ + _)
      tuples.collect.foreach{ case (word, count) => println(word + " -> " +
count)}

>From the application UI <http://imgbin.org/index.php?page=image&id=20977>,
we can see that the shuffle read for *collect* is 2.8GB and the shuffle
write for *map* is 3.5GB.

The *map* transformations are applied on 5 nodes of the cluster because the
HDFS blocks are distributed among these 5 nodes. The *map*
transformations are applied for each element in the rdd on different nodes
and doesn't need shuffle the new rdd. *So my first question is : why did
the map transformation occupy so large bandwidth(3.5GB) when I added
partitionBy in the codes ?*

When *collect* is applied, is needs to collect the results, namely (*word*,
*totalCount*) tuples from 5 nodes to the driver. That process should occupy
very small bandwidth because all lines have the same contents like
"AAA#BBB#CCC#DDD", which means the final results the *collect*  retrieved
should be very small in bytes(for example hundreds of KB). *So my second
question is : Why did the collect action occupy so large bandwidth(2.8GB)
when I added partitionByKey in the codes ?*

*And the third question : When I added partitionBy for an rdd, it will
return a new rdd. Does that mean the rdd will be immediately shuffled
across nodes to meet the requirement specified by the supplied partitioner,
or will the supplied partitioner merely be a sign indicating how to
partition the rdd later. *

Thanks.