You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Saiph Kappa <sa...@gmail.com> on 2015/03/03 20:32:06 UTC
Why different numbers of partitions give different results for the
same computation on the same dataset?
Hi,
I have a spark streaming application, running on a single node, consisting
mainly of map operations. I perform repartitioning to control the number of
CPU cores that I want to use. The code goes like this:
val ssc = new StreamingContext(sparkConf, Seconds(5))
> val distFile = ssc.textFileStream("/home/myuser/spark-example/dump")
> val words = distFile.repartition(cores.toInt).flatMap(_.split(" "))
> .filter(_.length > 3)
>
> val wordCharValues = words.map(word => {
> var sum = 0
> word.toCharArray.foreach(c => {sum += c.toInt})
> sum.toDouble / word.length.toDouble
> }).foreachRDD(rdd => {
> println("MEAN: " + rdd.mean())
> })
>
I have 2 questions:
1) How can I use coalesce in this code instead of repartition?
2) Why, using the same dataset (which is a small file processed within a
single batch), the result that I obtain for the mean varies with the number
of partitions? If I don't call the repartition method, the result is always
the same for every execution, as it should be. But repartitioning for
instance in 2 partitions gives a different mean value than using 8
partitions. I really don't understand why given that my code is
deterministic. Can someone enlighten me on this?
Thanks.
Re: Why different numbers of partitions give different results for
the same computation on the same dataset?
Posted by Tathagata Das <td...@databricks.com>.
You can use DStream.transform() to do any arbitrary RDD transformations on
the RDDs generated by a DStream.
val coalescedDStream = myDStream.transform { _.coalesce(...) }
On Tue, Mar 3, 2015 at 1:47 PM, Saiph Kappa <sa...@gmail.com> wrote:
> Sorry I made a mistake in my code. Please ignore my question number 2.
> Different numbers of partitions give *the same* results!
>
>
> On Tue, Mar 3, 2015 at 7:32 PM, Saiph Kappa <sa...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a spark streaming application, running on a single node,
>> consisting mainly of map operations. I perform repartitioning to control
>> the number of CPU cores that I want to use. The code goes like this:
>>
>> val ssc = new StreamingContext(sparkConf, Seconds(5))
>>> val distFile = ssc.textFileStream("/home/myuser/spark-example/dump")
>>> val words = distFile.repartition(cores.toInt).flatMap(_.split(" "))
>>> .filter(_.length > 3)
>>>
>>> val wordCharValues = words.map(word => {
>>> var sum = 0
>>> word.toCharArray.foreach(c => {sum += c.toInt})
>>> sum.toDouble / word.length.toDouble
>>> }).foreachRDD(rdd => {
>>> println("MEAN: " + rdd.mean())
>>> })
>>>
>>
>> I have 2 questions:
>> 1) How can I use coalesce in this code instead of repartition?
>>
>> 2) Why, using the same dataset (which is a small file processed within a
>> single batch), the result that I obtain for the mean varies with the number
>> of partitions? If I don't call the repartition method, the result is always
>> the same for every execution, as it should be. But repartitioning for
>> instance in 2 partitions gives a different mean value than using 8
>> partitions. I really don't understand why given that my code is
>> deterministic. Can someone enlighten me on this?
>>
>> Thanks.
>>
>
>
Re: Why different numbers of partitions give different results for
the same computation on the same dataset?
Posted by Saiph Kappa <sa...@gmail.com>.
Sorry I made a mistake in my code. Please ignore my question number 2.
Different numbers of partitions give *the same* results!
On Tue, Mar 3, 2015 at 7:32 PM, Saiph Kappa <sa...@gmail.com> wrote:
> Hi,
>
> I have a spark streaming application, running on a single node, consisting
> mainly of map operations. I perform repartitioning to control the number of
> CPU cores that I want to use. The code goes like this:
>
> val ssc = new StreamingContext(sparkConf, Seconds(5))
>> val distFile = ssc.textFileStream("/home/myuser/spark-example/dump")
>> val words = distFile.repartition(cores.toInt).flatMap(_.split(" "))
>> .filter(_.length > 3)
>>
>> val wordCharValues = words.map(word => {
>> var sum = 0
>> word.toCharArray.foreach(c => {sum += c.toInt})
>> sum.toDouble / word.length.toDouble
>> }).foreachRDD(rdd => {
>> println("MEAN: " + rdd.mean())
>> })
>>
>
> I have 2 questions:
> 1) How can I use coalesce in this code instead of repartition?
>
> 2) Why, using the same dataset (which is a small file processed within a
> single batch), the result that I obtain for the mean varies with the number
> of partitions? If I don't call the repartition method, the result is always
> the same for every execution, as it should be. But repartitioning for
> instance in 2 partitions gives a different mean value than using 8
> partitions. I really don't understand why given that my code is
> deterministic. Can someone enlighten me on this?
>
> Thanks.
>