You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Aniket Bhatnagar <an...@gmail.com> on 2014/11/18 14:56:29 UTC

Is sorting persisted after pair rdd transformations?

I am trying to figure out if sorting is persisted after applying Pair RDD
transformations and I am not able to decisively tell after reading the
documentation.

For example:
val numbers = .. // RDD of numbers
val pairedNumbers = numbers.map(number => (number % 100, number))
val sortedPairedNumbers = pairedNumbers.sortBy(pairedNumber =>
pairedNumber._2) // Sort by values in the pair
val aggregates = sortedPairedNumbers.combineByKey(..)

In this example, will the combine functions see values in sorted order?
What if I had done groupByKey and then combineByKey? What transformations
can unsort an already sorted data?

Re: Is sorting persisted after pair rdd transformations?

Posted by Aniket Bhatnagar <an...@gmail.com>.
Thanks Daniel :-). It seems to make sense and something I was hoping for. I
will proceed with this assumption and report back if I see any anomalies.

On Wed Nov 19 2014 at 19:30:02 Daniel Darabos <
daniel.darabos@lynxanalytics.com> wrote:

> Ah, so I misunderstood you too :).
>
> My reading of org/ apache/spark/Aggregator.scala is that your function
> will always see the items in the order that they are in the input RDD. An
> RDD partition is always accessed as an iterator, so it will not be read out
> of order.
>
> On Wed, Nov 19, 2014 at 2:28 PM, Aniket Bhatnagar <
> aniket.bhatnagar@gmail.com> wrote:
>
>> Thanks Daniel. I can understand that the keys will not be in sorted order
>> but what I am trying to understanding is whether the functions are passed
>> values in sorted order in a given partition.
>>
>> For example:
>>
>> sc.parallelize(1 to 8).map(i => (1, i)).sortBy(t =>
>> t._2).foldByKey(0)((a, b) => b).collect
>> res0: Array[(Int, Int)] = Array((1,8))
>>
>> The fold always given me last value as 8 which suggests values preserve
>> sorting earlier defined in stage in DAG?
>>
>> On Wed Nov 19 2014 at 18:10:11 Daniel Darabos <
>> daniel.darabos@lynxanalytics.com> wrote:
>>
>>> Akhil, I think Aniket uses the word "persisted" in a different way than
>>> what you mean. I.e. not in the RDD.persist() way. Aniket asks if running
>>> combineByKey on a sorted RDD will result in a sorted RDD. (I.e. the sorting
>>> is preserved.)
>>>
>>> I think the answer is no. combineByKey uses AppendOnlyMap, which is a
>>> hashmap. That will shuffle your keys. You can quickly verify it in
>>> spark-shell:
>>>
>>> scala> sc.parallelize(7 to 8).map(_ -> 1).reduceByKey(_ + _).collect
>>> res0: Array[(Int, Int)] = Array((8,1), (7,1))
>>>
>>> (The initial size of the AppendOnlyMap seems to be 8, so 8 is the first
>>> number that demonstrates this.)
>>>
>>> On Wed, Nov 19, 2014 at 9:05 AM, Akhil Das <ak...@sigmoidanalytics.com>
>>> wrote:
>>>
>>>> If something is persisted you can easily see them under the Storage tab
>>>> in the web ui.
>>>>
>>>> Thanks
>>>> Best Regards
>>>>
>>>> On Tue, Nov 18, 2014 at 7:26 PM, Aniket Bhatnagar <
>>>> aniket.bhatnagar@gmail.com> wrote:
>>>>
>>>>> I am trying to figure out if sorting is persisted after applying Pair
>>>>> RDD transformations and I am not able to decisively tell after reading the
>>>>> documentation.
>>>>>
>>>>> For example:
>>>>> val numbers = .. // RDD of numbers
>>>>> val pairedNumbers = numbers.map(number => (number % 100, number))
>>>>> val sortedPairedNumbers = pairedNumbers.sortBy(pairedNumber =>
>>>>> pairedNumber._2) // Sort by values in the pair
>>>>> val aggregates = sortedPairedNumbers.combineByKey(..)
>>>>>
>>>>> In this example, will the combine functions see values in sorted
>>>>> order? What if I had done groupByKey and then combineByKey? What
>>>>> transformations can unsort an already sorted data?
>>>>>
>>>>
>>>>
>>>
>

Re: Is sorting persisted after pair rdd transformations?

Posted by Daniel Darabos <da...@lynxanalytics.com>.
Ah, so I misunderstood you too :).

My reading of org/ apache/spark/Aggregator.scala is that your function will
always see the items in the order that they are in the input RDD. An RDD
partition is always accessed as an iterator, so it will not be read out of
order.

On Wed, Nov 19, 2014 at 2:28 PM, Aniket Bhatnagar <
aniket.bhatnagar@gmail.com> wrote:

> Thanks Daniel. I can understand that the keys will not be in sorted order
> but what I am trying to understanding is whether the functions are passed
> values in sorted order in a given partition.
>
> For example:
>
> sc.parallelize(1 to 8).map(i => (1, i)).sortBy(t => t._2).foldByKey(0)((a,
> b) => b).collect
> res0: Array[(Int, Int)] = Array((1,8))
>
> The fold always given me last value as 8 which suggests values preserve
> sorting earlier defined in stage in DAG?
>
> On Wed Nov 19 2014 at 18:10:11 Daniel Darabos <
> daniel.darabos@lynxanalytics.com> wrote:
>
>> Akhil, I think Aniket uses the word "persisted" in a different way than
>> what you mean. I.e. not in the RDD.persist() way. Aniket asks if running
>> combineByKey on a sorted RDD will result in a sorted RDD. (I.e. the sorting
>> is preserved.)
>>
>> I think the answer is no. combineByKey uses AppendOnlyMap, which is a
>> hashmap. That will shuffle your keys. You can quickly verify it in
>> spark-shell:
>>
>> scala> sc.parallelize(7 to 8).map(_ -> 1).reduceByKey(_ + _).collect
>> res0: Array[(Int, Int)] = Array((8,1), (7,1))
>>
>> (The initial size of the AppendOnlyMap seems to be 8, so 8 is the first
>> number that demonstrates this.)
>>
>> On Wed, Nov 19, 2014 at 9:05 AM, Akhil Das <ak...@sigmoidanalytics.com>
>> wrote:
>>
>>> If something is persisted you can easily see them under the Storage tab
>>> in the web ui.
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Tue, Nov 18, 2014 at 7:26 PM, Aniket Bhatnagar <
>>> aniket.bhatnagar@gmail.com> wrote:
>>>
>>>> I am trying to figure out if sorting is persisted after applying Pair
>>>> RDD transformations and I am not able to decisively tell after reading the
>>>> documentation.
>>>>
>>>> For example:
>>>> val numbers = .. // RDD of numbers
>>>> val pairedNumbers = numbers.map(number => (number % 100, number))
>>>> val sortedPairedNumbers = pairedNumbers.sortBy(pairedNumber =>
>>>> pairedNumber._2) // Sort by values in the pair
>>>> val aggregates = sortedPairedNumbers.combineByKey(..)
>>>>
>>>> In this example, will the combine functions see values in sorted order?
>>>> What if I had done groupByKey and then combineByKey? What transformations
>>>> can unsort an already sorted data?
>>>>
>>>
>>>
>>

Re: Is sorting persisted after pair rdd transformations?

Posted by Aniket Bhatnagar <an...@gmail.com>.
Thanks Daniel. I can understand that the keys will not be in sorted order
but what I am trying to understanding is whether the functions are passed
values in sorted order in a given partition.

For example:

sc.parallelize(1 to 8).map(i => (1, i)).sortBy(t => t._2).foldByKey(0)((a,
b) => b).collect
res0: Array[(Int, Int)] = Array((1,8))

The fold always given me last value as 8 which suggests values preserve
sorting earlier defined in stage in DAG?

On Wed Nov 19 2014 at 18:10:11 Daniel Darabos <
daniel.darabos@lynxanalytics.com> wrote:

> Akhil, I think Aniket uses the word "persisted" in a different way than
> what you mean. I.e. not in the RDD.persist() way. Aniket asks if running
> combineByKey on a sorted RDD will result in a sorted RDD. (I.e. the sorting
> is preserved.)
>
> I think the answer is no. combineByKey uses AppendOnlyMap, which is a
> hashmap. That will shuffle your keys. You can quickly verify it in
> spark-shell:
>
> scala> sc.parallelize(7 to 8).map(_ -> 1).reduceByKey(_ + _).collect
> res0: Array[(Int, Int)] = Array((8,1), (7,1))
>
> (The initial size of the AppendOnlyMap seems to be 8, so 8 is the first
> number that demonstrates this.)
>
> On Wed, Nov 19, 2014 at 9:05 AM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>
>> If something is persisted you can easily see them under the Storage tab
>> in the web ui.
>>
>> Thanks
>> Best Regards
>>
>> On Tue, Nov 18, 2014 at 7:26 PM, Aniket Bhatnagar <
>> aniket.bhatnagar@gmail.com> wrote:
>>
>>> I am trying to figure out if sorting is persisted after applying Pair
>>> RDD transformations and I am not able to decisively tell after reading the
>>> documentation.
>>>
>>> For example:
>>> val numbers = .. // RDD of numbers
>>> val pairedNumbers = numbers.map(number => (number % 100, number))
>>> val sortedPairedNumbers = pairedNumbers.sortBy(pairedNumber =>
>>> pairedNumber._2) // Sort by values in the pair
>>> val aggregates = sortedPairedNumbers.combineByKey(..)
>>>
>>> In this example, will the combine functions see values in sorted order?
>>> What if I had done groupByKey and then combineByKey? What transformations
>>> can unsort an already sorted data?
>>>
>>
>>
>

Re: Is sorting persisted after pair rdd transformations?

Posted by Daniel Darabos <da...@lynxanalytics.com>.
Akhil, I think Aniket uses the word "persisted" in a different way than
what you mean. I.e. not in the RDD.persist() way. Aniket asks if running
combineByKey on a sorted RDD will result in a sorted RDD. (I.e. the sorting
is preserved.)

I think the answer is no. combineByKey uses AppendOnlyMap, which is a
hashmap. That will shuffle your keys. You can quickly verify it in
spark-shell:

scala> sc.parallelize(7 to 8).map(_ -> 1).reduceByKey(_ + _).collect
res0: Array[(Int, Int)] = Array((8,1), (7,1))

(The initial size of the AppendOnlyMap seems to be 8, so 8 is the first
number that demonstrates this.)

On Wed, Nov 19, 2014 at 9:05 AM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> If something is persisted you can easily see them under the Storage tab in
> the web ui.
>
> Thanks
> Best Regards
>
> On Tue, Nov 18, 2014 at 7:26 PM, Aniket Bhatnagar <
> aniket.bhatnagar@gmail.com> wrote:
>
>> I am trying to figure out if sorting is persisted after applying Pair RDD
>> transformations and I am not able to decisively tell after reading the
>> documentation.
>>
>> For example:
>> val numbers = .. // RDD of numbers
>> val pairedNumbers = numbers.map(number => (number % 100, number))
>> val sortedPairedNumbers = pairedNumbers.sortBy(pairedNumber =>
>> pairedNumber._2) // Sort by values in the pair
>> val aggregates = sortedPairedNumbers.combineByKey(..)
>>
>> In this example, will the combine functions see values in sorted order?
>> What if I had done groupByKey and then combineByKey? What transformations
>> can unsort an already sorted data?
>>
>
>

Re: Is sorting persisted after pair rdd transformations?

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
If something is persisted you can easily see them under the Storage tab in
the web ui.

Thanks
Best Regards

On Tue, Nov 18, 2014 at 7:26 PM, Aniket Bhatnagar <
aniket.bhatnagar@gmail.com> wrote:

> I am trying to figure out if sorting is persisted after applying Pair RDD
> transformations and I am not able to decisively tell after reading the
> documentation.
>
> For example:
> val numbers = .. // RDD of numbers
> val pairedNumbers = numbers.map(number => (number % 100, number))
> val sortedPairedNumbers = pairedNumbers.sortBy(pairedNumber =>
> pairedNumber._2) // Sort by values in the pair
> val aggregates = sortedPairedNumbers.combineByKey(..)
>
> In this example, will the combine functions see values in sorted order?
> What if I had done groupByKey and then combineByKey? What transformations
> can unsort an already sorted data?
>