You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by David Thomas <> on 2014/01/28 20:35:40 UTC

RDD and Partition

Lets say I have an RDD of Strings and there are 26 machines in the cluster.
How can I repartition the RDD in such a way that all strings starting with
A gets collected on machine1, B on machine2 and so on.

Re: RDD and Partition

Posted by Mark Hamstra <>.
Of course, there's an excess dot in `rdd26..mapPartitionsWithIndex{}`.

On Tue, Jan 28, 2014 at 1:29 PM, Mark Hamstra <>wrote:

> If I'm understanding you correctly, there's lots of ways you could do
> that.  Here's one, continuing from the previous example:
> // rdd26: RDD[String] split by first letter into 26 partitions
> val range = (Char.char2int('A') - 65 to Char.char2int('M') - 65)
> rdd26..mapPartitionsWithIndex { (idx, itr) => if (range.contains(idx))
> else itr }
> On Tue, Jan 28, 2014 at 12:52 PM, David Thomas <>wrote:
>> Thank you! That helps.
>> A follow up question on this. How can I apply a function only on a subset
>> of this RDD. Lets say, I need all strings starting in the range 'A' - 'M'
>> be applied toUpperCase and not touch the remaining. Is that possible
>> without running an 'if' condition on all the partitions in the cluster?
>> On Tue, Jan 28, 2014 at 1:20 PM, Mark Hamstra <>wrote:
>>> scala> import org.apache.spark.RangePartitioner
>>> scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog",
>>> "Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long",
>>> "moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk",
>>> "umbrella", "voice", "Walrus", "xeon", "Yam", "zebra"))
>>> scala> rdd.keyBy(s => s(0).toUpper)
>>> res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy
>>> at <console>:15
>>> scala> res0.partitionBy(new RangePartitioner[Char, String](26,
>>> res0)).values
>>> res2: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at
>>> <console>:18
>>> scala> res2.mapPartitionsWithIndex((idx, itr) => => (idx,
>>> s))).collect.foreach(println)
>>> (0,apple)
>>> (1,Ball)
>>> (2,cat)
>>> (3,dog)
>>> (4,Elephant)
>>> (5,fox)
>>> (6,gas)
>>> (7,horse)
>>> (8,index)
>>> (9,jet)
>>> (10,kitsch)
>>> (11,long)
>>> (12,moon)
>>> (13,Neptune)
>>> (14,ooze)
>>> (15,Pen)
>>> (16,quiet)
>>> (17,rose)
>>> (18,sun)
>>> (19,talk)
>>> (20,umbrella)
>>> (21,voice)
>>> (22,Walrus)
>>> (23,xeon)
>>> (24,Yam)
>>> (25,zebra)
>>> On Tue, Jan 28, 2014 at 11:48 AM, Nick Pentreath <
>>>> wrote:
>>>> If you do something like:
>>>>{ str => (str.take(1), str) }
>>>> you will have an RDD[(String, String)] where the key is the first
>>>> character of the string. Now when you perform an operation that uses
>>>> partitioning (e.g. reduceByKey) you will end up with the 1st reduce task
>>>> receiving all the strings with A, the 2nd all the strings with B etc. Note
>>>> that you may not be able to enforce that each *machine* gets a
>>>> different letter, but in most cases that doesn't particularly matter as
>>>> long as you get "all values for a given key go to the same reducer"
>>>> behaviour.
>>>> Perhaps if you expand on your use case we can provide more detailed
>>>> assistance.
>>>> On Tue, Jan 28, 2014 at 9:35 PM, David Thomas <>wrote:
>>>>> Lets say I have an RDD of Strings and there are 26 machines in the
>>>>> cluster. How can I repartition the RDD in such a way that all strings
>>>>> starting with A gets collected on machine1, B on machine2 and so on.

Re: RDD and Partition

Posted by Mark Hamstra <>.
Doesn't avoid an 'if' on every partition, but does avoid it on every
element of every partition.

On Tue, Jan 28, 2014 at 1:29 PM, Mark Hamstra <>wrote:

> If I'm understanding you correctly, there's lots of ways you could do
> that.  Here's one, continuing from the previous example:
> // rdd26: RDD[String] split by first letter into 26 partitions
> val range = (Char.char2int('A') - 65 to Char.char2int('M') - 65)
> rdd26..mapPartitionsWithIndex { (idx, itr) => if (range.contains(idx))
> else itr }
> On Tue, Jan 28, 2014 at 12:52 PM, David Thomas <>wrote:
>> Thank you! That helps.
>> A follow up question on this. How can I apply a function only on a subset
>> of this RDD. Lets say, I need all strings starting in the range 'A' - 'M'
>> be applied toUpperCase and not touch the remaining. Is that possible
>> without running an 'if' condition on all the partitions in the cluster?
>> On Tue, Jan 28, 2014 at 1:20 PM, Mark Hamstra <>wrote:
>>> scala> import org.apache.spark.RangePartitioner
>>> scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog",
>>> "Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long",
>>> "moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk",
>>> "umbrella", "voice", "Walrus", "xeon", "Yam", "zebra"))
>>> scala> rdd.keyBy(s => s(0).toUpper)
>>> res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy
>>> at <console>:15
>>> scala> res0.partitionBy(new RangePartitioner[Char, String](26,
>>> res0)).values
>>> res2: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at
>>> <console>:18
>>> scala> res2.mapPartitionsWithIndex((idx, itr) => => (idx,
>>> s))).collect.foreach(println)
>>> (0,apple)
>>> (1,Ball)
>>> (2,cat)
>>> (3,dog)
>>> (4,Elephant)
>>> (5,fox)
>>> (6,gas)
>>> (7,horse)
>>> (8,index)
>>> (9,jet)
>>> (10,kitsch)
>>> (11,long)
>>> (12,moon)
>>> (13,Neptune)
>>> (14,ooze)
>>> (15,Pen)
>>> (16,quiet)
>>> (17,rose)
>>> (18,sun)
>>> (19,talk)
>>> (20,umbrella)
>>> (21,voice)
>>> (22,Walrus)
>>> (23,xeon)
>>> (24,Yam)
>>> (25,zebra)
>>> On Tue, Jan 28, 2014 at 11:48 AM, Nick Pentreath <
>>>> wrote:
>>>> If you do something like:
>>>>{ str => (str.take(1), str) }
>>>> you will have an RDD[(String, String)] where the key is the first
>>>> character of the string. Now when you perform an operation that uses
>>>> partitioning (e.g. reduceByKey) you will end up with the 1st reduce task
>>>> receiving all the strings with A, the 2nd all the strings with B etc. Note
>>>> that you may not be able to enforce that each *machine* gets a
>>>> different letter, but in most cases that doesn't particularly matter as
>>>> long as you get "all values for a given key go to the same reducer"
>>>> behaviour.
>>>> Perhaps if you expand on your use case we can provide more detailed
>>>> assistance.
>>>> On Tue, Jan 28, 2014 at 9:35 PM, David Thomas <>wrote:
>>>>> Lets say I have an RDD of Strings and there are 26 machines in the
>>>>> cluster. How can I repartition the RDD in such a way that all strings
>>>>> starting with A gets collected on machine1, B on machine2 and so on.

Re: RDD and Partition

Posted by Christopher Nguyen <>.
David, a PPRDD (mobile abbrev) is a new RDD that contains a subset of
partitions of the original RDD. Subsequent transformations/operations will
only see this subset. So yes, it may do what you need, or not, depending on
whether you still need to do something with the other partitions, as
implied by Mark. You can of course still refer to the original RDD, or
create yet another PPRDD containing that other subset, etc., just as you
can call sc.runJob() on different partitions each time.

I'm sure you can decide which pattern best fits your use case. Beware of
over optimizing leading to unnecessary complexity, though I've also learned
not to underestimate others' real needs based on their toy examples.

Sent while mobile. Pls excuse typos etc.
On Jan 28, 2014 3:41 PM, "David Thomas" <> wrote:

> Thanks for those tips.
> I was looking into the docs for PartitionPruningRDD. It says, "A RDD used
> to prune RDD partitions/partitions so we can avoid launching tasks on all
> partitions". I did not understand this exactly and I couldn't find any
> sample code. Can we use this to apply a function only on certain partitions?
> On Tue, Jan 28, 2014 at 4:36 PM, Christopher Nguyen <>wrote:
>> Hence the qualification to determine whether it is necessary *and*
>> sufficient, depending on what David is trying to do overall :)
>> Sent while mobile. Pls excuse typos etc.
>> On Jan 28, 2014 2:10 PM, "Mark Hamstra" <> wrote:
>>> SparkContext#runJob is the basis of an RDD action, so the result of
>>> using runJob to call toUpperCase on the A-to-M partitions will be the
>>> uppercased strings materialized in the driver process, not a transformation
>>> of the original RDD.
>>> On Tue, Jan 28, 2014 at 1:37 PM, Christopher Nguyen <>wrote:
>>>> David,
>>>> map() would iterate row by row, forcing an if on each row.
>>>> mapPartitions*() allows you to have a conditional on the whole
>>>> partition first, as Mark suggests. That should usually be sufficient.
>>>> SparkContext.runJob() allows you to specify which partitions to run on,
>>>> if you're sure it's necessary and sufficient, and not over optimization.
>>>> Sent while mobile. Pls excuse typos etc.
>>>> On Jan 28, 2014 1:30 PM, "Mark Hamstra" <>
>>>> wrote:
>>>>> If I'm understanding you correctly, there's lots of ways you could do
>>>>> that.  Here's one, continuing from the previous example:
>>>>> // rdd26: RDD[String] split by first letter into 26 partitions
>>>>> val range = (Char.char2int('A') - 65 to Char.char2int('M') - 65)
>>>>> rdd26..mapPartitionsWithIndex { (idx, itr) => if (range.contains(idx))
>>>>> else itr }
>>>>> On Tue, Jan 28, 2014 at 12:52 PM, David Thomas <>wrote:
>>>>>> Thank you! That helps.
>>>>>> A follow up question on this. How can I apply a function only on a
>>>>>> subset of this RDD. Lets say, I need all strings starting in the range 'A'
>>>>>> - 'M' be applied toUpperCase and not touch the remaining. Is that possible
>>>>>> without running an 'if' condition on all the partitions in the cluster?
>>>>>> On Tue, Jan 28, 2014 at 1:20 PM, Mark Hamstra <
>>>>>>> wrote:
>>>>>>> scala> import org.apache.spark.RangePartitioner
>>>>>>> scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog",
>>>>>>> "Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long",
>>>>>>> "moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk",
>>>>>>> "umbrella", "voice", "Walrus", "xeon", "Yam", "zebra"))
>>>>>>> scala> rdd.keyBy(s => s(0).toUpper)
>>>>>>> res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at
>>>>>>> keyBy at <console>:15
>>>>>>> scala> res0.partitionBy(new RangePartitioner[Char, String](26,
>>>>>>> res0)).values
>>>>>>> res2: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at
>>>>>>> <console>:18
>>>>>>> scala> res2.mapPartitionsWithIndex((idx, itr) => => (idx,
>>>>>>> s))).collect.foreach(println)
>>>>>>> (0,apple)
>>>>>>> (1,Ball)
>>>>>>> (2,cat)
>>>>>>> (3,dog)
>>>>>>> (4,Elephant)
>>>>>>> (5,fox)
>>>>>>> (6,gas)
>>>>>>> (7,horse)
>>>>>>> (8,index)
>>>>>>> (9,jet)
>>>>>>> (10,kitsch)
>>>>>>> (11,long)
>>>>>>> (12,moon)
>>>>>>> (13,Neptune)
>>>>>>> (14,ooze)
>>>>>>> (15,Pen)
>>>>>>> (16,quiet)
>>>>>>> (17,rose)
>>>>>>> (18,sun)
>>>>>>> (19,talk)
>>>>>>> (20,umbrella)
>>>>>>> (21,voice)
>>>>>>> (22,Walrus)
>>>>>>> (23,xeon)
>>>>>>> (24,Yam)
>>>>>>> (25,zebra)
>>>>>>> On Tue, Jan 28, 2014 at 11:48 AM, Nick Pentreath <
>>>>>>>> wrote:
>>>>>>>> If you do something like:
>>>>>>>>{ str => (str.take(1), str) }
>>>>>>>> you will have an RDD[(String, String)] where the key is the first
>>>>>>>> character of the string. Now when you perform an operation that uses
>>>>>>>> partitioning (e.g. reduceByKey) you will end up with the 1st reduce task
>>>>>>>> receiving all the strings with A, the 2nd all the strings with B etc. Note
>>>>>>>> that you may not be able to enforce that each *machine* gets a
>>>>>>>> different letter, but in most cases that doesn't particularly matter as
>>>>>>>> long as you get "all values for a given key go to the same reducer"
>>>>>>>> behaviour.
>>>>>>>> Perhaps if you expand on your use case we can provide more detailed
>>>>>>>> assistance.
>>>>>>>> On Tue, Jan 28, 2014 at 9:35 PM, David Thomas <>wrote:
>>>>>>>>> Lets say I have an RDD of Strings and there are 26 machines in the
>>>>>>>>> cluster. How can I repartition the RDD in such a way that all strings
>>>>>>>>> starting with A gets collected on machine1, B on machine2 and so on.

Re: RDD and Partition

Posted by David Thomas <>.
Thanks for those tips.

I was looking into the docs for PartitionPruningRDD. It says, "A RDD used
to prune RDD partitions/partitions so we can avoid launching tasks on all
partitions". I did not understand this exactly and I couldn't find any
sample code. Can we use this to apply a function only on certain partitions?

On Tue, Jan 28, 2014 at 4:36 PM, Christopher Nguyen <> wrote:

> Hence the qualification to determine whether it is necessary *and*
> sufficient, depending on what David is trying to do overall :)
> Sent while mobile. Pls excuse typos etc.
> On Jan 28, 2014 2:10 PM, "Mark Hamstra" <> wrote:
>> SparkContext#runJob is the basis of an RDD action, so the result of using
>> runJob to call toUpperCase on the A-to-M partitions will be the uppercased
>> strings materialized in the driver process, not a transformation of the
>> original RDD.
>> On Tue, Jan 28, 2014 at 1:37 PM, Christopher Nguyen <>wrote:
>>> David,
>>> map() would iterate row by row, forcing an if on each row.
>>> mapPartitions*() allows you to have a conditional on the whole partition
>>> first, as Mark suggests. That should usually be sufficient.
>>> SparkContext.runJob() allows you to specify which partitions to run on,
>>> if you're sure it's necessary and sufficient, and not over optimization.
>>> Sent while mobile. Pls excuse typos etc.
>>> On Jan 28, 2014 1:30 PM, "Mark Hamstra" <> wrote:
>>>> If I'm understanding you correctly, there's lots of ways you could do
>>>> that.  Here's one, continuing from the previous example:
>>>> // rdd26: RDD[String] split by first letter into 26 partitions
>>>> val range = (Char.char2int('A') - 65 to Char.char2int('M') - 65)
>>>> rdd26..mapPartitionsWithIndex { (idx, itr) => if (range.contains(idx))
>>>> else itr }
>>>> On Tue, Jan 28, 2014 at 12:52 PM, David Thomas <>wrote:
>>>>> Thank you! That helps.
>>>>> A follow up question on this. How can I apply a function only on a
>>>>> subset of this RDD. Lets say, I need all strings starting in the range 'A'
>>>>> - 'M' be applied toUpperCase and not touch the remaining. Is that possible
>>>>> without running an 'if' condition on all the partitions in the cluster?
>>>>> On Tue, Jan 28, 2014 at 1:20 PM, Mark Hamstra <
>>>>> > wrote:
>>>>>> scala> import org.apache.spark.RangePartitioner
>>>>>> scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog",
>>>>>> "Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long",
>>>>>> "moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk",
>>>>>> "umbrella", "voice", "Walrus", "xeon", "Yam", "zebra"))
>>>>>> scala> rdd.keyBy(s => s(0).toUpper)
>>>>>> res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at
>>>>>> keyBy at <console>:15
>>>>>> scala> res0.partitionBy(new RangePartitioner[Char, String](26,
>>>>>> res0)).values
>>>>>> res2: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at
>>>>>> <console>:18
>>>>>> scala> res2.mapPartitionsWithIndex((idx, itr) => => (idx,
>>>>>> s))).collect.foreach(println)
>>>>>> (0,apple)
>>>>>> (1,Ball)
>>>>>> (2,cat)
>>>>>> (3,dog)
>>>>>> (4,Elephant)
>>>>>> (5,fox)
>>>>>> (6,gas)
>>>>>> (7,horse)
>>>>>> (8,index)
>>>>>> (9,jet)
>>>>>> (10,kitsch)
>>>>>> (11,long)
>>>>>> (12,moon)
>>>>>> (13,Neptune)
>>>>>> (14,ooze)
>>>>>> (15,Pen)
>>>>>> (16,quiet)
>>>>>> (17,rose)
>>>>>> (18,sun)
>>>>>> (19,talk)
>>>>>> (20,umbrella)
>>>>>> (21,voice)
>>>>>> (22,Walrus)
>>>>>> (23,xeon)
>>>>>> (24,Yam)
>>>>>> (25,zebra)
>>>>>> On Tue, Jan 28, 2014 at 11:48 AM, Nick Pentreath <
>>>>>>> wrote:
>>>>>>> If you do something like:
>>>>>>>{ str => (str.take(1), str) }
>>>>>>> you will have an RDD[(String, String)] where the key is the first
>>>>>>> character of the string. Now when you perform an operation that uses
>>>>>>> partitioning (e.g. reduceByKey) you will end up with the 1st reduce task
>>>>>>> receiving all the strings with A, the 2nd all the strings with B etc. Note
>>>>>>> that you may not be able to enforce that each *machine* gets a
>>>>>>> different letter, but in most cases that doesn't particularly matter as
>>>>>>> long as you get "all values for a given key go to the same reducer"
>>>>>>> behaviour.
>>>>>>> Perhaps if you expand on your use case we can provide more detailed
>>>>>>> assistance.
>>>>>>> On Tue, Jan 28, 2014 at 9:35 PM, David Thomas <>wrote:
>>>>>>>> Lets say I have an RDD of Strings and there are 26 machines in the
>>>>>>>> cluster. How can I repartition the RDD in such a way that all strings
>>>>>>>> starting with A gets collected on machine1, B on machine2 and so on.

Re: RDD and Partition

Posted by Christopher Nguyen <>.
Hence the qualification to determine whether it is necessary *and*
sufficient, depending on what David is trying to do overall :)

Sent while mobile. Pls excuse typos etc.
On Jan 28, 2014 2:10 PM, "Mark Hamstra" <> wrote:

> SparkContext#runJob is the basis of an RDD action, so the result of using
> runJob to call toUpperCase on the A-to-M partitions will be the uppercased
> strings materialized in the driver process, not a transformation of the
> original RDD.
> On Tue, Jan 28, 2014 at 1:37 PM, Christopher Nguyen <>wrote:
>> David,
>> map() would iterate row by row, forcing an if on each row.
>> mapPartitions*() allows you to have a conditional on the whole partition
>> first, as Mark suggests. That should usually be sufficient.
>> SparkContext.runJob() allows you to specify which partitions to run on,
>> if you're sure it's necessary and sufficient, and not over optimization.
>> Sent while mobile. Pls excuse typos etc.
>> On Jan 28, 2014 1:30 PM, "Mark Hamstra" <> wrote:
>>> If I'm understanding you correctly, there's lots of ways you could do
>>> that.  Here's one, continuing from the previous example:
>>> // rdd26: RDD[String] split by first letter into 26 partitions
>>> val range = (Char.char2int('A') - 65 to Char.char2int('M') - 65)
>>> rdd26..mapPartitionsWithIndex { (idx, itr) => if (range.contains(idx))
>>> else itr }
>>> On Tue, Jan 28, 2014 at 12:52 PM, David Thomas <>wrote:
>>>> Thank you! That helps.
>>>> A follow up question on this. How can I apply a function only on a
>>>> subset of this RDD. Lets say, I need all strings starting in the range 'A'
>>>> - 'M' be applied toUpperCase and not touch the remaining. Is that possible
>>>> without running an 'if' condition on all the partitions in the cluster?
>>>> On Tue, Jan 28, 2014 at 1:20 PM, Mark Hamstra <>wrote:
>>>>> scala> import org.apache.spark.RangePartitioner
>>>>> scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog",
>>>>> "Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long",
>>>>> "moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk",
>>>>> "umbrella", "voice", "Walrus", "xeon", "Yam", "zebra"))
>>>>> scala> rdd.keyBy(s => s(0).toUpper)
>>>>> res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy
>>>>> at <console>:15
>>>>> scala> res0.partitionBy(new RangePartitioner[Char, String](26,
>>>>> res0)).values
>>>>> res2: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at
>>>>> <console>:18
>>>>> scala> res2.mapPartitionsWithIndex((idx, itr) => => (idx,
>>>>> s))).collect.foreach(println)
>>>>> (0,apple)
>>>>> (1,Ball)
>>>>> (2,cat)
>>>>> (3,dog)
>>>>> (4,Elephant)
>>>>> (5,fox)
>>>>> (6,gas)
>>>>> (7,horse)
>>>>> (8,index)
>>>>> (9,jet)
>>>>> (10,kitsch)
>>>>> (11,long)
>>>>> (12,moon)
>>>>> (13,Neptune)
>>>>> (14,ooze)
>>>>> (15,Pen)
>>>>> (16,quiet)
>>>>> (17,rose)
>>>>> (18,sun)
>>>>> (19,talk)
>>>>> (20,umbrella)
>>>>> (21,voice)
>>>>> (22,Walrus)
>>>>> (23,xeon)
>>>>> (24,Yam)
>>>>> (25,zebra)
>>>>> On Tue, Jan 28, 2014 at 11:48 AM, Nick Pentreath <
>>>>>> wrote:
>>>>>> If you do something like:
>>>>>>{ str => (str.take(1), str) }
>>>>>> you will have an RDD[(String, String)] where the key is the first
>>>>>> character of the string. Now when you perform an operation that uses
>>>>>> partitioning (e.g. reduceByKey) you will end up with the 1st reduce task
>>>>>> receiving all the strings with A, the 2nd all the strings with B etc. Note
>>>>>> that you may not be able to enforce that each *machine* gets a
>>>>>> different letter, but in most cases that doesn't particularly matter as
>>>>>> long as you get "all values for a given key go to the same reducer"
>>>>>> behaviour.
>>>>>> Perhaps if you expand on your use case we can provide more detailed
>>>>>> assistance.
>>>>>> On Tue, Jan 28, 2014 at 9:35 PM, David Thomas <>wrote:
>>>>>>> Lets say I have an RDD of Strings and there are 26 machines in the
>>>>>>> cluster. How can I repartition the RDD in such a way that all strings
>>>>>>> starting with A gets collected on machine1, B on machine2 and so on.

Re: RDD and Partition

Posted by Mark Hamstra <>.
SparkContext#runJob is the basis of an RDD action, so the result of using
runJob to call toUpperCase on the A-to-M partitions will be the uppercased
strings materialized in the driver process, not a transformation of the
original RDD.

On Tue, Jan 28, 2014 at 1:37 PM, Christopher Nguyen <> wrote:

> David,
> map() would iterate row by row, forcing an if on each row.
> mapPartitions*() allows you to have a conditional on the whole partition
> first, as Mark suggests. That should usually be sufficient.
> SparkContext.runJob() allows you to specify which partitions to run on, if
> you're sure it's necessary and sufficient, and not over optimization.
> Sent while mobile. Pls excuse typos etc.
> On Jan 28, 2014 1:30 PM, "Mark Hamstra" <> wrote:
>> If I'm understanding you correctly, there's lots of ways you could do
>> that.  Here's one, continuing from the previous example:
>> // rdd26: RDD[String] split by first letter into 26 partitions
>> val range = (Char.char2int('A') - 65 to Char.char2int('M') - 65)
>> rdd26..mapPartitionsWithIndex { (idx, itr) => if (range.contains(idx))
>> else itr }
>> On Tue, Jan 28, 2014 at 12:52 PM, David Thomas <>wrote:
>>> Thank you! That helps.
>>> A follow up question on this. How can I apply a function only on a
>>> subset of this RDD. Lets say, I need all strings starting in the range 'A'
>>> - 'M' be applied toUpperCase and not touch the remaining. Is that possible
>>> without running an 'if' condition on all the partitions in the cluster?
>>> On Tue, Jan 28, 2014 at 1:20 PM, Mark Hamstra <>wrote:
>>>> scala> import org.apache.spark.RangePartitioner
>>>> scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog",
>>>> "Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long",
>>>> "moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk",
>>>> "umbrella", "voice", "Walrus", "xeon", "Yam", "zebra"))
>>>> scala> rdd.keyBy(s => s(0).toUpper)
>>>> res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy
>>>> at <console>:15
>>>> scala> res0.partitionBy(new RangePartitioner[Char, String](26,
>>>> res0)).values
>>>> res2: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at
>>>> <console>:18
>>>> scala> res2.mapPartitionsWithIndex((idx, itr) => => (idx,
>>>> s))).collect.foreach(println)
>>>> (0,apple)
>>>> (1,Ball)
>>>> (2,cat)
>>>> (3,dog)
>>>> (4,Elephant)
>>>> (5,fox)
>>>> (6,gas)
>>>> (7,horse)
>>>> (8,index)
>>>> (9,jet)
>>>> (10,kitsch)
>>>> (11,long)
>>>> (12,moon)
>>>> (13,Neptune)
>>>> (14,ooze)
>>>> (15,Pen)
>>>> (16,quiet)
>>>> (17,rose)
>>>> (18,sun)
>>>> (19,talk)
>>>> (20,umbrella)
>>>> (21,voice)
>>>> (22,Walrus)
>>>> (23,xeon)
>>>> (24,Yam)
>>>> (25,zebra)
>>>> On Tue, Jan 28, 2014 at 11:48 AM, Nick Pentreath <
>>>>> wrote:
>>>>> If you do something like:
>>>>>{ str => (str.take(1), str) }
>>>>> you will have an RDD[(String, String)] where the key is the first
>>>>> character of the string. Now when you perform an operation that uses
>>>>> partitioning (e.g. reduceByKey) you will end up with the 1st reduce task
>>>>> receiving all the strings with A, the 2nd all the strings with B etc. Note
>>>>> that you may not be able to enforce that each *machine* gets a
>>>>> different letter, but in most cases that doesn't particularly matter as
>>>>> long as you get "all values for a given key go to the same reducer"
>>>>> behaviour.
>>>>> Perhaps if you expand on your use case we can provide more detailed
>>>>> assistance.
>>>>> On Tue, Jan 28, 2014 at 9:35 PM, David Thomas <>wrote:
>>>>>> Lets say I have an RDD of Strings and there are 26 machines in the
>>>>>> cluster. How can I repartition the RDD in such a way that all strings
>>>>>> starting with A gets collected on machine1, B on machine2 and so on.

Re: RDD and Partition

Posted by Christopher Nguyen <>.

map() would iterate row by row, forcing an if on each row.

mapPartitions*() allows you to have a conditional on the whole partition
first, as Mark suggests. That should usually be sufficient.

SparkContext.runJob() allows you to specify which partitions to run on, if
you're sure it's necessary and sufficient, and not over optimization.

Sent while mobile. Pls excuse typos etc.
On Jan 28, 2014 1:30 PM, "Mark Hamstra" <> wrote:

> If I'm understanding you correctly, there's lots of ways you could do
> that.  Here's one, continuing from the previous example:
> // rdd26: RDD[String] split by first letter into 26 partitions
> val range = (Char.char2int('A') - 65 to Char.char2int('M') - 65)
> rdd26..mapPartitionsWithIndex { (idx, itr) => if (range.contains(idx))
> else itr }
> On Tue, Jan 28, 2014 at 12:52 PM, David Thomas <>wrote:
>> Thank you! That helps.
>> A follow up question on this. How can I apply a function only on a subset
>> of this RDD. Lets say, I need all strings starting in the range 'A' - 'M'
>> be applied toUpperCase and not touch the remaining. Is that possible
>> without running an 'if' condition on all the partitions in the cluster?
>> On Tue, Jan 28, 2014 at 1:20 PM, Mark Hamstra <>wrote:
>>> scala> import org.apache.spark.RangePartitioner
>>> scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog",
>>> "Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long",
>>> "moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk",
>>> "umbrella", "voice", "Walrus", "xeon", "Yam", "zebra"))
>>> scala> rdd.keyBy(s => s(0).toUpper)
>>> res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy
>>> at <console>:15
>>> scala> res0.partitionBy(new RangePartitioner[Char, String](26,
>>> res0)).values
>>> res2: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at
>>> <console>:18
>>> scala> res2.mapPartitionsWithIndex((idx, itr) => => (idx,
>>> s))).collect.foreach(println)
>>> (0,apple)
>>> (1,Ball)
>>> (2,cat)
>>> (3,dog)
>>> (4,Elephant)
>>> (5,fox)
>>> (6,gas)
>>> (7,horse)
>>> (8,index)
>>> (9,jet)
>>> (10,kitsch)
>>> (11,long)
>>> (12,moon)
>>> (13,Neptune)
>>> (14,ooze)
>>> (15,Pen)
>>> (16,quiet)
>>> (17,rose)
>>> (18,sun)
>>> (19,talk)
>>> (20,umbrella)
>>> (21,voice)
>>> (22,Walrus)
>>> (23,xeon)
>>> (24,Yam)
>>> (25,zebra)
>>> On Tue, Jan 28, 2014 at 11:48 AM, Nick Pentreath <
>>>> wrote:
>>>> If you do something like:
>>>>{ str => (str.take(1), str) }
>>>> you will have an RDD[(String, String)] where the key is the first
>>>> character of the string. Now when you perform an operation that uses
>>>> partitioning (e.g. reduceByKey) you will end up with the 1st reduce task
>>>> receiving all the strings with A, the 2nd all the strings with B etc. Note
>>>> that you may not be able to enforce that each *machine* gets a
>>>> different letter, but in most cases that doesn't particularly matter as
>>>> long as you get "all values for a given key go to the same reducer"
>>>> behaviour.
>>>> Perhaps if you expand on your use case we can provide more detailed
>>>> assistance.
>>>> On Tue, Jan 28, 2014 at 9:35 PM, David Thomas <>wrote:
>>>>> Lets say I have an RDD of Strings and there are 26 machines in the
>>>>> cluster. How can I repartition the RDD in such a way that all strings
>>>>> starting with A gets collected on machine1, B on machine2 and so on.

Re: RDD and Partition

Posted by Mark Hamstra <>.
If I'm understanding you correctly, there's lots of ways you could do that.
 Here's one, continuing from the previous example:

// rdd26: RDD[String] split by first letter into 26 partitions

val range = (Char.char2int('A') - 65 to Char.char2int('M') - 65)

rdd26..mapPartitionsWithIndex { (idx, itr) => if (range.contains(idx)) else itr }

On Tue, Jan 28, 2014 at 12:52 PM, David Thomas <> wrote:

> Thank you! That helps.
> A follow up question on this. How can I apply a function only on a subset
> of this RDD. Lets say, I need all strings starting in the range 'A' - 'M'
> be applied toUpperCase and not touch the remaining. Is that possible
> without running an 'if' condition on all the partitions in the cluster?
> On Tue, Jan 28, 2014 at 1:20 PM, Mark Hamstra <>wrote:
>> scala> import org.apache.spark.RangePartitioner
>> scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog",
>> "Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long",
>> "moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk",
>> "umbrella", "voice", "Walrus", "xeon", "Yam", "zebra"))
>> scala> rdd.keyBy(s => s(0).toUpper)
>> res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy at
>> <console>:15
>> scala> res0.partitionBy(new RangePartitioner[Char, String](26,
>> res0)).values
>> res2: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at
>> <console>:18
>> scala> res2.mapPartitionsWithIndex((idx, itr) => => (idx,
>> s))).collect.foreach(println)
>> (0,apple)
>> (1,Ball)
>> (2,cat)
>> (3,dog)
>> (4,Elephant)
>> (5,fox)
>> (6,gas)
>> (7,horse)
>> (8,index)
>> (9,jet)
>> (10,kitsch)
>> (11,long)
>> (12,moon)
>> (13,Neptune)
>> (14,ooze)
>> (15,Pen)
>> (16,quiet)
>> (17,rose)
>> (18,sun)
>> (19,talk)
>> (20,umbrella)
>> (21,voice)
>> (22,Walrus)
>> (23,xeon)
>> (24,Yam)
>> (25,zebra)
>> On Tue, Jan 28, 2014 at 11:48 AM, Nick Pentreath <
>>> wrote:
>>> If you do something like:
>>>{ str => (str.take(1), str) }
>>> you will have an RDD[(String, String)] where the key is the first
>>> character of the string. Now when you perform an operation that uses
>>> partitioning (e.g. reduceByKey) you will end up with the 1st reduce task
>>> receiving all the strings with A, the 2nd all the strings with B etc. Note
>>> that you may not be able to enforce that each *machine* gets a
>>> different letter, but in most cases that doesn't particularly matter as
>>> long as you get "all values for a given key go to the same reducer"
>>> behaviour.
>>> Perhaps if you expand on your use case we can provide more detailed
>>> assistance.
>>> On Tue, Jan 28, 2014 at 9:35 PM, David Thomas <>wrote:
>>>> Lets say I have an RDD of Strings and there are 26 machines in the
>>>> cluster. How can I repartition the RDD in such a way that all strings
>>>> starting with A gets collected on machine1, B on machine2 and so on.

Re: RDD and Partition

Posted by David Thomas <>.
Thank you! That helps.

A follow up question on this. How can I apply a function only on a subset
of this RDD. Lets say, I need all strings starting in the range 'A' - 'M'
be applied toUpperCase and not touch the remaining. Is that possible
without running an 'if' condition on all the partitions in the cluster?

On Tue, Jan 28, 2014 at 1:20 PM, Mark Hamstra <>wrote:

> scala> import org.apache.spark.RangePartitioner
> scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog",
> "Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long",
> "moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk",
> "umbrella", "voice", "Walrus", "xeon", "Yam", "zebra"))
> scala> rdd.keyBy(s => s(0).toUpper)
> res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy at
> <console>:15
> scala> res0.partitionBy(new RangePartitioner[Char, String](26,
> res0)).values
> res2: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at
> <console>:18
> scala> res2.mapPartitionsWithIndex((idx, itr) => => (idx,
> s))).collect.foreach(println)
> (0,apple)
> (1,Ball)
> (2,cat)
> (3,dog)
> (4,Elephant)
> (5,fox)
> (6,gas)
> (7,horse)
> (8,index)
> (9,jet)
> (10,kitsch)
> (11,long)
> (12,moon)
> (13,Neptune)
> (14,ooze)
> (15,Pen)
> (16,quiet)
> (17,rose)
> (18,sun)
> (19,talk)
> (20,umbrella)
> (21,voice)
> (22,Walrus)
> (23,xeon)
> (24,Yam)
> (25,zebra)
> On Tue, Jan 28, 2014 at 11:48 AM, Nick Pentreath <
> > wrote:
>> If you do something like:
>>{ str => (str.take(1), str) }
>> you will have an RDD[(String, String)] where the key is the first
>> character of the string. Now when you perform an operation that uses
>> partitioning (e.g. reduceByKey) you will end up with the 1st reduce task
>> receiving all the strings with A, the 2nd all the strings with B etc. Note
>> that you may not be able to enforce that each *machine* gets a different
>> letter, but in most cases that doesn't particularly matter as long as you
>> get "all values for a given key go to the same reducer" behaviour.
>> Perhaps if you expand on your use case we can provide more detailed
>> assistance.
>> On Tue, Jan 28, 2014 at 9:35 PM, David Thomas <>wrote:
>>> Lets say I have an RDD of Strings and there are 26 machines in the
>>> cluster. How can I repartition the RDD in such a way that all strings
>>> starting with A gets collected on machine1, B on machine2 and so on.

Re: RDD and Partition

Posted by Mark Hamstra <>.
scala> import org.apache.spark.RangePartitioner

scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog",
"Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long",
"moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk",
"umbrella", "voice", "Walrus", "xeon", "Yam", "zebra"))

scala> rdd.keyBy(s => s(0).toUpper)
res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy at

scala> res0.partitionBy(new RangePartitioner[Char, String](26, res0)).values
res2: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at

scala> res2.mapPartitionsWithIndex((idx, itr) => => (idx,


On Tue, Jan 28, 2014 at 11:48 AM, Nick Pentreath

> If you do something like:
>{ str => (str.take(1), str) }
> you will have an RDD[(String, String)] where the key is the first
> character of the string. Now when you perform an operation that uses
> partitioning (e.g. reduceByKey) you will end up with the 1st reduce task
> receiving all the strings with A, the 2nd all the strings with B etc. Note
> that you may not be able to enforce that each *machine* gets a different
> letter, but in most cases that doesn't particularly matter as long as you
> get "all values for a given key go to the same reducer" behaviour.
> Perhaps if you expand on your use case we can provide more detailed
> assistance.
> On Tue, Jan 28, 2014 at 9:35 PM, David Thomas <> wrote:
>> Lets say I have an RDD of Strings and there are 26 machines in the
>> cluster. How can I repartition the RDD in such a way that all strings
>> starting with A gets collected on machine1, B on machine2 and so on.

Re: RDD and Partition

Posted by Nick Pentreath <>.
If you do something like:{ str => (str.take(1), str) }

you will have an RDD[(String, String)] where the key is the first character
of the string. Now when you perform an operation that uses partitioning
(e.g. reduceByKey) you will end up with the 1st reduce task receiving all
the strings with A, the 2nd all the strings with B etc. Note that you may
not be able to enforce that each *machine* gets a different letter, but in
most cases that doesn't particularly matter as long as you get "all values
for a given key go to the same reducer" behaviour.

Perhaps if you expand on your use case we can provide more detailed

On Tue, Jan 28, 2014 at 9:35 PM, David Thomas <> wrote:

> Lets say I have an RDD of Strings and there are 26 machines in the
> cluster. How can I repartition the RDD in such a way that all strings
> starting with A gets collected on machine1, B on machine2 and so on.