You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Saiph Kappa <sa...@gmail.com> on 2015/03/03 20:32:06 UTC

Why different numbers of partitions give different results for the same computation on the same dataset?

Hi,

I have a spark streaming application, running on a single node, consisting
mainly of map operations. I perform repartitioning to control the number of
CPU cores that I want to use. The code goes like this:

    val ssc = new StreamingContext(sparkConf, Seconds(5))
>     val distFile = ssc.textFileStream("/home/myuser/spark-example/dump")
>     val words = distFile.repartition(cores.toInt).flatMap(_.split(" "))
>       .filter(_.length > 3)
>
>       val wordCharValues = words.map(word => {
>         var sum = 0
>         word.toCharArray.foreach(c => {sum += c.toInt})
>         sum.toDouble / word.length.toDouble
>       }).foreachRDD(rdd => {
>         println("MEAN: " + rdd.mean())
>       })
>

I have 2 questions:
1) How can I use coalesce in this code instead of repartition?

2) Why, using the same dataset (which is a small file processed within a
single batch), the result that I obtain for the mean varies with the number
of partitions? If I don't call the repartition method, the result is always
the same for every execution, as it should be. But repartitioning for
instance in 2 partitions gives a different mean value than using 8
partitions. I really don't understand why given that my code is
deterministic. Can someone enlighten me on this?

Thanks.

Re: Why different numbers of partitions give different results for the same computation on the same dataset?

Posted by Tathagata Das <td...@databricks.com>.
You can use DStream.transform() to do any arbitrary RDD transformations on
the RDDs generated by a DStream.

val coalescedDStream = myDStream.transform { _.coalesce(...) }



On Tue, Mar 3, 2015 at 1:47 PM, Saiph Kappa <sa...@gmail.com> wrote:

> Sorry I made a mistake in my code. Please ignore my question number 2.
> Different numbers of partitions give *the same* results!
>
>
> On Tue, Mar 3, 2015 at 7:32 PM, Saiph Kappa <sa...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a spark streaming application, running on a single node,
>> consisting mainly of map operations. I perform repartitioning to control
>> the number of CPU cores that I want to use. The code goes like this:
>>
>>     val ssc = new StreamingContext(sparkConf, Seconds(5))
>>>     val distFile = ssc.textFileStream("/home/myuser/spark-example/dump")
>>>     val words = distFile.repartition(cores.toInt).flatMap(_.split(" "))
>>>       .filter(_.length > 3)
>>>
>>>       val wordCharValues = words.map(word => {
>>>         var sum = 0
>>>         word.toCharArray.foreach(c => {sum += c.toInt})
>>>         sum.toDouble / word.length.toDouble
>>>       }).foreachRDD(rdd => {
>>>         println("MEAN: " + rdd.mean())
>>>       })
>>>
>>
>> I have 2 questions:
>> 1) How can I use coalesce in this code instead of repartition?
>>
>> 2) Why, using the same dataset (which is a small file processed within a
>> single batch), the result that I obtain for the mean varies with the number
>> of partitions? If I don't call the repartition method, the result is always
>> the same for every execution, as it should be. But repartitioning for
>> instance in 2 partitions gives a different mean value than using 8
>> partitions. I really don't understand why given that my code is
>> deterministic. Can someone enlighten me on this?
>>
>> Thanks.
>>
>
>

Re: Why different numbers of partitions give different results for the same computation on the same dataset?

Posted by Saiph Kappa <sa...@gmail.com>.
Sorry I made a mistake in my code. Please ignore my question number 2.
Different numbers of partitions give *the same* results!


On Tue, Mar 3, 2015 at 7:32 PM, Saiph Kappa <sa...@gmail.com> wrote:

> Hi,
>
> I have a spark streaming application, running on a single node, consisting
> mainly of map operations. I perform repartitioning to control the number of
> CPU cores that I want to use. The code goes like this:
>
>     val ssc = new StreamingContext(sparkConf, Seconds(5))
>>     val distFile = ssc.textFileStream("/home/myuser/spark-example/dump")
>>     val words = distFile.repartition(cores.toInt).flatMap(_.split(" "))
>>       .filter(_.length > 3)
>>
>>       val wordCharValues = words.map(word => {
>>         var sum = 0
>>         word.toCharArray.foreach(c => {sum += c.toInt})
>>         sum.toDouble / word.length.toDouble
>>       }).foreachRDD(rdd => {
>>         println("MEAN: " + rdd.mean())
>>       })
>>
>
> I have 2 questions:
> 1) How can I use coalesce in this code instead of repartition?
>
> 2) Why, using the same dataset (which is a small file processed within a
> single batch), the result that I obtain for the mean varies with the number
> of partitions? If I don't call the repartition method, the result is always
> the same for every execution, as it should be. But repartitioning for
> instance in 2 partitions gives a different mean value than using 8
> partitions. I really don't understand why given that my code is
> deterministic. Can someone enlighten me on this?
>
> Thanks.
>