You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Mohit Anchlia <mo...@gmail.com> on 2015/03/13 22:26:21 UTC

Partitioning

I am trying to look for a documentation on partitioning, which I can't seem
to find. I am looking at spark streaming and was wondering how does it
partition RDD in a multi node environment. Where are the keys defined that
is used for partitioning? For instance in below example keys seem to be
implicit:

Which one is key and which one is value? Or is it called a flatMap because
there are no keys?

// Split each line into words
JavaDStream<String> words = lines.flatMap(
  new FlatMapFunction<String, String>() {
    @Override public Iterable<String> call(String x) {
      return Arrays.asList(x.split(" "));
    }
  });


And are Keys available inside of Function2 in case it's required for a
given use case ?


JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
  new Function2<Integer, Integer, Integer>() {
    @Override public Integer call(Integer i1, Integer i2) throws Exception {
      return i1 + i2;
    }
  });

Re: Partitioning

Posted by Tathagata Das <td...@databricks.com>.
If you want to learn about how Spark partitions the data based on keys,
here is a recent talk about that
http://www.slideshare.net/databricks/strata-sj-everyday-im-shuffling-tips-for-writing-better-spark-programs?related=1

Of course you can read the original Spark paper
https://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf

On Fri, Mar 13, 2015 at 3:52 PM, Gerard Maas <ge...@gmail.com> wrote:

> In spark-streaming, the consumers will fetch data and put it into
> 'blocks'. Each block becomes a partition of the rdd generated during that
> batch interval.
> The size of each is block controlled by the conf:
> 'spark.streaming.blockInterval'. That is, the amount of data the consumer
> can collect in that time.
>
> The number of  RDD partitions in a streaming interval will be then: batch
> interval/ spark.streaming.blockInterval * # of consumers.
>
> -kr, Gerard
> On Mar 13, 2015 11:18 PM, "Mohit Anchlia" <mo...@gmail.com> wrote:
>
>> I still don't follow how spark is partitioning data in multi node
>> environment. Is there a document on how spark does portioning of data. For
>> eg: in word count eg how is spark distributing words to multiple nodes?
>>
>> On Fri, Mar 13, 2015 at 3:01 PM, Tathagata Das <td...@databricks.com>
>> wrote:
>>
>>> If you want to access the keys in an RDD that is partition by key, then
>>> you can use RDD.mapPartition(), which gives you access to the whole
>>> partition as an iterator<key, value>. You have the option of maintaing the
>>> partitioning information or not by setting the preservePartitioning flag in
>>> mapPartition (see docs). But use it at your own risk. If you modify the
>>> keys, and yet preserve partitioning, the partitioning would not make sense
>>> any more as the hash of the keys have changed.
>>>
>>> TD
>>>
>>>
>>>
>>> On Fri, Mar 13, 2015 at 2:26 PM, Mohit Anchlia <mo...@gmail.com>
>>> wrote:
>>>
>>>> I am trying to look for a documentation on partitioning, which I can't
>>>> seem to find. I am looking at spark streaming and was wondering how does it
>>>> partition RDD in a multi node environment. Where are the keys defined that
>>>> is used for partitioning? For instance in below example keys seem to be
>>>> implicit:
>>>>
>>>> Which one is key and which one is value? Or is it called a flatMap
>>>> because there are no keys?
>>>>
>>>> // Split each line into words
>>>> JavaDStream<String> words = lines.flatMap(
>>>>   new FlatMapFunction<String, String>() {
>>>>     @Override public Iterable<String> call(String x) {
>>>>       return Arrays.asList(x.split(" "));
>>>>     }
>>>>   });
>>>>
>>>>
>>>> And are Keys available inside of Function2 in case it's required for a
>>>> given use case ?
>>>>
>>>>
>>>> JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
>>>>   new Function2<Integer, Integer, Integer>() {
>>>>     @Override public Integer call(Integer i1, Integer i2) throws
>>>> Exception {
>>>>       return i1 + i2;
>>>>     }
>>>>   });
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>

Re: Partitioning

Posted by Gerard Maas <ge...@gmail.com>.
In spark-streaming, the consumers will fetch data and put it into 'blocks'.
Each block becomes a partition of the rdd generated during that batch
interval.
The size of each is block controlled by the conf:
'spark.streaming.blockInterval'. That is, the amount of data the consumer
can collect in that time.

The number of  RDD partitions in a streaming interval will be then: batch
interval/ spark.streaming.blockInterval * # of consumers.

-kr, Gerard
On Mar 13, 2015 11:18 PM, "Mohit Anchlia" <mo...@gmail.com> wrote:

> I still don't follow how spark is partitioning data in multi node
> environment. Is there a document on how spark does portioning of data. For
> eg: in word count eg how is spark distributing words to multiple nodes?
>
> On Fri, Mar 13, 2015 at 3:01 PM, Tathagata Das <td...@databricks.com>
> wrote:
>
>> If you want to access the keys in an RDD that is partition by key, then
>> you can use RDD.mapPartition(), which gives you access to the whole
>> partition as an iterator<key, value>. You have the option of maintaing the
>> partitioning information or not by setting the preservePartitioning flag in
>> mapPartition (see docs). But use it at your own risk. If you modify the
>> keys, and yet preserve partitioning, the partitioning would not make sense
>> any more as the hash of the keys have changed.
>>
>> TD
>>
>>
>>
>> On Fri, Mar 13, 2015 at 2:26 PM, Mohit Anchlia <mo...@gmail.com>
>> wrote:
>>
>>> I am trying to look for a documentation on partitioning, which I can't
>>> seem to find. I am looking at spark streaming and was wondering how does it
>>> partition RDD in a multi node environment. Where are the keys defined that
>>> is used for partitioning? For instance in below example keys seem to be
>>> implicit:
>>>
>>> Which one is key and which one is value? Or is it called a flatMap
>>> because there are no keys?
>>>
>>> // Split each line into words
>>> JavaDStream<String> words = lines.flatMap(
>>>   new FlatMapFunction<String, String>() {
>>>     @Override public Iterable<String> call(String x) {
>>>       return Arrays.asList(x.split(" "));
>>>     }
>>>   });
>>>
>>>
>>> And are Keys available inside of Function2 in case it's required for a
>>> given use case ?
>>>
>>>
>>> JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
>>>   new Function2<Integer, Integer, Integer>() {
>>>     @Override public Integer call(Integer i1, Integer i2) throws
>>> Exception {
>>>       return i1 + i2;
>>>     }
>>>   });
>>>
>>>
>>>
>>>
>>>
>>
>

Re: Partitioning

Posted by Mohit Anchlia <mo...@gmail.com>.
I still don't follow how spark is partitioning data in multi node
environment. Is there a document on how spark does portioning of data. For
eg: in word count eg how is spark distributing words to multiple nodes?

On Fri, Mar 13, 2015 at 3:01 PM, Tathagata Das <td...@databricks.com> wrote:

> If you want to access the keys in an RDD that is partition by key, then
> you can use RDD.mapPartition(), which gives you access to the whole
> partition as an iterator<key, value>. You have the option of maintaing the
> partitioning information or not by setting the preservePartitioning flag in
> mapPartition (see docs). But use it at your own risk. If you modify the
> keys, and yet preserve partitioning, the partitioning would not make sense
> any more as the hash of the keys have changed.
>
> TD
>
>
>
> On Fri, Mar 13, 2015 at 2:26 PM, Mohit Anchlia <mo...@gmail.com>
> wrote:
>
>> I am trying to look for a documentation on partitioning, which I can't
>> seem to find. I am looking at spark streaming and was wondering how does it
>> partition RDD in a multi node environment. Where are the keys defined that
>> is used for partitioning? For instance in below example keys seem to be
>> implicit:
>>
>> Which one is key and which one is value? Or is it called a flatMap
>> because there are no keys?
>>
>> // Split each line into words
>> JavaDStream<String> words = lines.flatMap(
>>   new FlatMapFunction<String, String>() {
>>     @Override public Iterable<String> call(String x) {
>>       return Arrays.asList(x.split(" "));
>>     }
>>   });
>>
>>
>> And are Keys available inside of Function2 in case it's required for a
>> given use case ?
>>
>>
>> JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
>>   new Function2<Integer, Integer, Integer>() {
>>     @Override public Integer call(Integer i1, Integer i2) throws
>> Exception {
>>       return i1 + i2;
>>     }
>>   });
>>
>>
>>
>>
>>
>

Re: Partitioning

Posted by Tathagata Das <td...@databricks.com>.
If you want to access the keys in an RDD that is partition by key, then you
can use RDD.mapPartition(), which gives you access to the whole partition
as an iterator<key, value>. You have the option of maintaing the
partitioning information or not by setting the preservePartitioning flag in
mapPartition (see docs). But use it at your own risk. If you modify the
keys, and yet preserve partitioning, the partitioning would not make sense
any more as the hash of the keys have changed.

TD



On Fri, Mar 13, 2015 at 2:26 PM, Mohit Anchlia <mo...@gmail.com>
wrote:

> I am trying to look for a documentation on partitioning, which I can't
> seem to find. I am looking at spark streaming and was wondering how does it
> partition RDD in a multi node environment. Where are the keys defined that
> is used for partitioning? For instance in below example keys seem to be
> implicit:
>
> Which one is key and which one is value? Or is it called a flatMap because
> there are no keys?
>
> // Split each line into words
> JavaDStream<String> words = lines.flatMap(
>   new FlatMapFunction<String, String>() {
>     @Override public Iterable<String> call(String x) {
>       return Arrays.asList(x.split(" "));
>     }
>   });
>
>
> And are Keys available inside of Function2 in case it's required for a
> given use case ?
>
>
> JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
>   new Function2<Integer, Integer, Integer>() {
>     @Override public Integer call(Integer i1, Integer i2) throws Exception
> {
>       return i1 + i2;
>     }
>   });
>
>
>
>
>