You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Shuai Zheng <sz...@gmail.com> on 2015/03/09 23:41:28 UTC

Process time series RDD after sortByKey

Hi All,

 

I am processing some time series data. For one day, it might has 500GB, then
for each hour, it is around 20GB data.

 

I need to sort the data before I start process. Assume I can sort them
successfully

 

dayRDD.sortByKey

 

but after that, I might have thousands of partitions (to make the sort
successfully), might be 1000 partitions. And then I try to process the data
by hour (not need exactly one hour, but some kind of similar time frame).
And I can't just re-partition size to 24 because then one partition might be
too big to fit into memory (if it is 20GB). So is there any way for me to
just can process underlying partitions by certain order? Basically I want to
call mapPartitionsWithIndex with a range of index?

 

Anyway to do it? Hope I describe my issue clear. J

 

Regards,

 

Shuai

 

 


Re: Process time series RDD after sortByKey

Posted by Shawn Zheng <sz...@gmail.com>.
Hi Imran,
This is extremely helpful. This is not only an approach, also help me to
understand how to affect or customize my own DAG effectively.

Thanks a lot!

Shuai

On Monday, March 16, 2015, Imran Rashid <ir...@cloudera.com> wrote:

> Hi Shuai,
>
> yup, that is exactly what I meant -- implement your own class
> MyGroupingRDD.  This is definitely more detail than a lot of users will
> need to go, but its also not all that scary either.  In this case, you want
> something that is *extremely* close to the existing CoalescedRDD, so start
> by looking at that code.
>
>
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
>
> The only thing which is complicated in CoalescedRDD is the
> PartitionCoalescer, but that is completely irrelevant for you, so you can
> ignore it.  I started writing up a description of what to do but then I
> realized just writing the code would be easier :)  Totally untested, but
> here you go:
>
> https://gist.github.com/squito/c2d1dd5413a60830d6f3
>
> The only really interesting part here is getPartitions:
>
>
> https://gist.github.com/squito/c2d1dd5413a60830d6f3#file-groupedrdd-scala-L31
>
> That's where you create partitions in your new RDD, which depend on
> multiple RDDs from the parent.  Also note that compute() is very simple:
> you just concatenate together the iterators from each of the parent RDDs:
>
>
> https://gist.github.com/squito/c2d1dd5413a60830d6f3#file-groupedrdd-scala-L37
>
> let me know how it goes!
>
>
> On Mon, Mar 16, 2015 at 5:15 PM, Shuai Zheng <szheng.code@gmail.com
> <javascript:_e(%7B%7D,'cvml','szheng.code@gmail.com');>> wrote:
>
>> Hi Imran,
>>
>>
>>
>> I am a bit confused here. Assume I have RDD a with 1000 partition and
>> also has been sorted. How can I control when creating RDD b (with 20
>> partitions) to make sure 1-50 partition of RDD a map to 1st partition of
>> RDD b? I don’t see any control code/logic here?
>>
>>
>>
>> You code below:
>>
>>
>>
>> val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions)
>>
>>
>>
>>
>>
>> Does it means I need to define/develop my own MyGroupingRDD class? I am
>> not very clear how to do that, any place I can find an example? I never
>> create my own RDD class before (not RDD instance J). But this is very
>> valuable approach to me so I am desired to learn.
>>
>>
>>
>> Regards,
>>
>>
>>
>> Shuai
>>
>>
>>
>> *From:* Imran Rashid [mailto:irashid@cloudera.com
>> <javascript:_e(%7B%7D,'cvml','irashid@cloudera.com');>]
>> *Sent:* Monday, March 16, 2015 11:22 AM
>> *To:* Shawn Zheng; user@spark.apache.org
>> <javascript:_e(%7B%7D,'cvml','user@spark.apache.org');>
>> *Subject:* Re: Process time series RDD after sortByKey
>>
>>
>>
>> Hi Shuai,
>>
>>
>>
>> On Sat, Mar 14, 2015 at 11:02 AM, Shawn Zheng <szheng.code@gmail.com
>> <javascript:_e(%7B%7D,'cvml','szheng.code@gmail.com');>> wrote:
>>
>> Sorry I response late.
>>
>> Zhan Zhang's solution is very interesting and I look at into it, but it
>> is not what I want. Basically I want to run the job sequentially and also
>> gain parallelism. So if possible, if I have 1000 partition, the best case
>> is I can run it as 20 subtask, each one take partition: 1-50, 51-100,
>> 101-150, etc.
>>
>> If we have ability to do this, we will gain huge flexibility when we try
>> to process some time series like data and a lot of algo will benefit from
>> it.
>>
>>
>>
>> yes, this is what I was suggesting you do.  You would first create one
>> RDD (a) that has 1000 partitions.  Don't worry about the creation of this
>> RDD -- it wont' create any tasks, its just a logical holder of your raw
>> data.  Then you create another RDD (b) that depends on your RDD (a), but
>> that only has 20 partitions.  Each partition in (b) would depend on a
>> number of partitions from (a).  As you've suggested, partition 1 in (b)
>> would depend on partitions 1-50 in (a), partition 2 in (b) would depend on
>> 51-100 in (a), etc.   Note that RDD (b) still doesn't *do* anything.  Its
>> just another logical holder for your data, but this time grouped in the way
>> you want.  Then after RDD (b), you would do whatever other transformations
>> you wanted, but now you'd be working w/ 20 partitions:
>>
>>
>>
>> val rawData1000Partitions = sc.textFile(...) // or whatever
>>
>> val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions)
>>
>> groupedRawData20Partitions.map{...}.filter{...}.reduceByKey{...} //etc.
>>
>>
>>
>> note that this is almost exactly the same as what CoalescedRdd does.
>> However, it might combine the partitions in whatever ways it feels like --
>> you want them combined in a very particular order.  So you'll need to
>> create your own subclass.
>>
>>
>>
>>
>>
>> Back to Zhan Zhang's
>>
>> while( iterPartition < RDD.partitions.length) {
>>
>>       val res = sc.runJob(this, (it: Iterator[T]) =>
>> somFunc, iterPartition, allowLocal = true)
>>
>>       Some other function after processing one partition.
>>
>>       iterPartition += 1
>>
>> }
>>
>> I am curious how spark process this without parallelism, the indidivual
>> partition will pass back to driver to process or just run one task on that
>> node which partition exist? then follow by another partition on another
>> node?
>>
>>
>>
>>
>>
>> Not exactly.  The partition is not shipped back to the driver.  You
>> create a task which will be processed by a worker.  The task scheduling
>> will take data locality into account, so ideally the task will get
>> scheduled in the same location where the data already resides.  The worker
>> will execute someFunc, and after its done it will ship the *result* back to
>> the driver.  Then the process will get repeated for all the other
>> partitions.
>>
>>
>>
>> If you wanted all the data sent back to the driver, you could use
>> RDD.toLocalIterator.  That will send one partition back to the driver, let
>> you process it on the driver, then fetch the next partition, etc.
>>
>>
>>
>>
>>
>> Imran
>>
>>
>>
>>
>>
>
>

Re: Process time series RDD after sortByKey

Posted by Imran Rashid <ir...@cloudera.com>.
Hi Shuai,

yup, that is exactly what I meant -- implement your own class
MyGroupingRDD.  This is definitely more detail than a lot of users will
need to go, but its also not all that scary either.  In this case, you want
something that is *extremely* close to the existing CoalescedRDD, so start
by looking at that code.

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala

The only thing which is complicated in CoalescedRDD is the
PartitionCoalescer, but that is completely irrelevant for you, so you can
ignore it.  I started writing up a description of what to do but then I
realized just writing the code would be easier :)  Totally untested, but
here you go:

https://gist.github.com/squito/c2d1dd5413a60830d6f3

The only really interesting part here is getPartitions:

https://gist.github.com/squito/c2d1dd5413a60830d6f3#file-groupedrdd-scala-L31

That's where you create partitions in your new RDD, which depend on
multiple RDDs from the parent.  Also note that compute() is very simple:
you just concatenate together the iterators from each of the parent RDDs:

https://gist.github.com/squito/c2d1dd5413a60830d6f3#file-groupedrdd-scala-L37

let me know how it goes!


On Mon, Mar 16, 2015 at 5:15 PM, Shuai Zheng <sz...@gmail.com> wrote:

> Hi Imran,
>
>
>
> I am a bit confused here. Assume I have RDD a with 1000 partition and also
> has been sorted. How can I control when creating RDD b (with 20 partitions)
> to make sure 1-50 partition of RDD a map to 1st partition of RDD b? I
> don’t see any control code/logic here?
>
>
>
> You code below:
>
>
>
> val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions)
>
>
>
>
>
> Does it means I need to define/develop my own MyGroupingRDD class? I am
> not very clear how to do that, any place I can find an example? I never
> create my own RDD class before (not RDD instance J). But this is very
> valuable approach to me so I am desired to learn.
>
>
>
> Regards,
>
>
>
> Shuai
>
>
>
> *From:* Imran Rashid [mailto:irashid@cloudera.com]
> *Sent:* Monday, March 16, 2015 11:22 AM
> *To:* Shawn Zheng; user@spark.apache.org
> *Subject:* Re: Process time series RDD after sortByKey
>
>
>
> Hi Shuai,
>
>
>
> On Sat, Mar 14, 2015 at 11:02 AM, Shawn Zheng <sz...@gmail.com>
> wrote:
>
> Sorry I response late.
>
> Zhan Zhang's solution is very interesting and I look at into it, but it is
> not what I want. Basically I want to run the job sequentially and also gain
> parallelism. So if possible, if I have 1000 partition, the best case is I
> can run it as 20 subtask, each one take partition: 1-50, 51-100, 101-150,
> etc.
>
> If we have ability to do this, we will gain huge flexibility when we try
> to process some time series like data and a lot of algo will benefit from
> it.
>
>
>
> yes, this is what I was suggesting you do.  You would first create one RDD
> (a) that has 1000 partitions.  Don't worry about the creation of this RDD
> -- it wont' create any tasks, its just a logical holder of your raw data.
> Then you create another RDD (b) that depends on your RDD (a), but that only
> has 20 partitions.  Each partition in (b) would depend on a number of
> partitions from (a).  As you've suggested, partition 1 in (b) would depend
> on partitions 1-50 in (a), partition 2 in (b) would depend on 51-100 in
> (a), etc.   Note that RDD (b) still doesn't *do* anything.  Its just
> another logical holder for your data, but this time grouped in the way you
> want.  Then after RDD (b), you would do whatever other transformations you
> wanted, but now you'd be working w/ 20 partitions:
>
>
>
> val rawData1000Partitions = sc.textFile(...) // or whatever
>
> val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions)
>
> groupedRawData20Partitions.map{...}.filter{...}.reduceByKey{...} //etc.
>
>
>
> note that this is almost exactly the same as what CoalescedRdd does.
> However, it might combine the partitions in whatever ways it feels like --
> you want them combined in a very particular order.  So you'll need to
> create your own subclass.
>
>
>
>
>
> Back to Zhan Zhang's
>
> while( iterPartition < RDD.partitions.length) {
>
>       val res = sc.runJob(this, (it: Iterator[T]) =>
> somFunc, iterPartition, allowLocal = true)
>
>       Some other function after processing one partition.
>
>       iterPartition += 1
>
> }
>
> I am curious how spark process this without parallelism, the indidivual
> partition will pass back to driver to process or just run one task on that
> node which partition exist? then follow by another partition on another
> node?
>
>
>
>
>
> Not exactly.  The partition is not shipped back to the driver.  You create
> a task which will be processed by a worker.  The task scheduling will take
> data locality into account, so ideally the task will get scheduled in the
> same location where the data already resides.  The worker will execute
> someFunc, and after its done it will ship the *result* back to the driver.
> Then the process will get repeated for all the other partitions.
>
>
>
> If you wanted all the data sent back to the driver, you could use
> RDD.toLocalIterator.  That will send one partition back to the driver, let
> you process it on the driver, then fetch the next partition, etc.
>
>
>
>
>
> Imran
>
>
>
>
>

RE: Process time series RDD after sortByKey

Posted by Shuai Zheng <sz...@gmail.com>.
Hi Imran,

 

I am a bit confused here. Assume I have RDD a with 1000 partition and also has been sorted. How can I control when creating RDD b (with 20 partitions) to make sure 1-50 partition of RDD a map to 1st partition of RDD b? I don’t see any control code/logic here?

 

You code below:

 

val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions)

 

 

Does it means I need to define/develop my own MyGroupingRDD class? I am not very clear how to do that, any place I can find an example? I never create my own RDD class before (not RDD instance J). But this is very valuable approach to me so I am desired to learn.

 

Regards,

 

Shuai

 

From: Imran Rashid [mailto:irashid@cloudera.com] 
Sent: Monday, March 16, 2015 11:22 AM
To: Shawn Zheng; user@spark.apache.org
Subject: Re: Process time series RDD after sortByKey

 

Hi Shuai,

 

On Sat, Mar 14, 2015 at 11:02 AM, Shawn Zheng <sz...@gmail.com> wrote:

Sorry I response late.

Zhan Zhang's solution is very interesting and I look at into it, but it is not what I want. Basically I want to run the job sequentially and also gain parallelism. So if possible, if I have 1000 partition, the best case is I can run it as 20 subtask, each one take partition: 1-50, 51-100, 101-150, etc. 

If we have ability to do this, we will gain huge flexibility when we try to process some time series like data and a lot of algo will benefit from it.

 

yes, this is what I was suggesting you do.  You would first create one RDD (a) that has 1000 partitions.  Don't worry about the creation of this RDD -- it wont' create any tasks, its just a logical holder of your raw data.  Then you create another RDD (b) that depends on your RDD (a), but that only has 20 partitions.  Each partition in (b) would depend on a number of partitions from (a).  As you've suggested, partition 1 in (b) would depend on partitions 1-50 in (a), partition 2 in (b) would depend on 51-100 in (a), etc.   Note that RDD (b) still doesn't *do* anything.  Its just another logical holder for your data, but this time grouped in the way you want.  Then after RDD (b), you would do whatever other transformations you wanted, but now you'd be working w/ 20 partitions:

 

val rawData1000Partitions = sc.textFile(...) // or whatever

val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions)

groupedRawData20Partitions.map{...}.filter{...}.reduceByKey{...} //etc.

 

note that this is almost exactly the same as what CoalescedRdd does.  However, it might combine the partitions in whatever ways it feels like -- you want them combined in a very particular order.  So you'll need to create your own subclass.

 

 

Back to Zhan Zhang's 

while( iterPartition < RDD.partitions.length) {

      val res = sc.runJob(this, (it: Iterator[T]) => somFunc, iterPartition, allowLocal = true)

      Some other function after processing one partition.

      iterPartition += 1

}

I am curious how spark process this without parallelism, the indidivual partition will pass back to driver to process or just run one task on that node which partition exist? then follow by another partition on another node?

 

 

Not exactly.  The partition is not shipped back to the driver.  You create a task which will be processed by a worker.  The task scheduling will take data locality into account, so ideally the task will get scheduled in the same location where the data already resides.  The worker will execute someFunc, and after its done it will ship the *result* back to the driver.  Then the process will get repeated for all the other partitions.

 

If you wanted all the data sent back to the driver, you could use RDD.toLocalIterator.  That will send one partition back to the driver, let you process it on the driver, then fetch the next partition, etc.

 

 

Imran

 

 


Re: Process time series RDD after sortByKey

Posted by Imran Rashid <ir...@cloudera.com>.
Hi Shuai,

On Sat, Mar 14, 2015 at 11:02 AM, Shawn Zheng <sz...@gmail.com> wrote:

> Sorry I response late.
>
> Zhan Zhang's solution is very interesting and I look at into it, but it is
> not what I want. Basically I want to run the job sequentially and also gain
> parallelism. So if possible, if I have 1000 partition, the best case is I
> can run it as 20 subtask, each one take partition: 1-50, 51-100, 101-150,
> etc.
> If we have ability to do this, we will gain huge flexibility when we try
> to process some time series like data and a lot of algo will benefit from
> it.
>

yes, this is what I was suggesting you do.  You would first create one RDD
(a) that has 1000 partitions.  Don't worry about the creation of this RDD
-- it wont' create any tasks, its just a logical holder of your raw data.
Then you create another RDD (b) that depends on your RDD (a), but that only
has 20 partitions.  Each partition in (b) would depend on a number of
partitions from (a).  As you've suggested, partition 1 in (b) would depend
on partitions 1-50 in (a), partition 2 in (b) would depend on 51-100 in
(a), etc.   Note that RDD (b) still doesn't *do* anything.  Its just
another logical holder for your data, but this time grouped in the way you
want.  Then after RDD (b), you would do whatever other transformations you
wanted, but now you'd be working w/ 20 partitions:

val rawData1000Partitions = sc.textFile(...) // or whatever
val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions)
groupedRawData20Partitions.map{...}.filter{...}.reduceByKey{...} //etc.

note that this is almost exactly the same as what CoalescedRdd does.
However, it might combine the partitions in whatever ways it feels like --
you want them combined in a very particular order.  So you'll need to
create your own subclass.



> Back to Zhan Zhang's
>
> while( iterPartition < RDD.partitions.length) {
>       val res = sc.runJob(this, (it: Iterator[T]) =>
> somFunc, iterPartition, allowLocal = true)
>       Some other function after processing one partition.
>       iterPartition += 1
> }
> I am curious how spark process this without parallelism, the indidivual
> partition will pass back to driver to process or just run one task on that
> node which partition exist? then follow by another partition on another
> node?
>


Not exactly.  The partition is not shipped back to the driver.  You create
a task which will be processed by a worker.  The task scheduling will take
data locality into account, so ideally the task will get scheduled in the
same location where the data already resides.  The worker will execute
someFunc, and after its done it will ship the *result* back to the driver.
Then the process will get repeated for all the other partitions.

If you wanted all the data sent back to the driver, you could use
RDD.toLocalIterator.  That will send one partition back to the driver, let
you process it on the driver, then fetch the next partition, etc.


Imran

Re: Process time series RDD after sortByKey

Posted by Imran Rashid <ir...@cloudera.com>.
this is a very interesting use case.  First of all, its worth pointing out
that if you really need to process the data sequentially, fundamentally you
are limiting the parallelism you can get.  Eg., if you need to process the
entire data set sequentially, then you can't get any parallelism.  If you
can process each hour separately, but need to process data within an hour
sequentially, then the max parallelism you can get for one days is 24.

But lets say you're OK with that.  Zhan Zhang solution is good if you just
want to process the entire dataset sequentially.  But what if you wanted to
process each hour separately, so you at least can create 24 tasks that can
be run in parallel for one day?  I think you would need to create your own
subclass of RDD that is similar in spirit to what CoalescedRDD does.  Your
RDD would have 24 partitions, and each partition would depend on some set
of partitions in its parent (your sorted RDD with 1000 partitions).  I
don't think you could use CoalescedRDD directly b/c you want more control
over the way the partitions get grouped together.

this answer is very similar to my answer to your other question about
controlling partitions , hope its helps! :)


On Mon, Mar 9, 2015 at 5:41 PM, Shuai Zheng <sz...@gmail.com> wrote:

> Hi All,
>
>
>
> I am processing some time series data. For one day, it might has 500GB,
> then for each hour, it is around 20GB data.
>
>
>
> I need to sort the data before I start process. Assume I can sort them
> successfully
>
>
>
> *dayRDD.sortByKey*
>
>
>
> but after that, I might have thousands of partitions (to make the sort
> successfully), might be 1000 partitions. And then I try to process the data
> by hour (not need exactly one hour, but some kind of similar time frame).
> And I can’t just re-partition size to 24 because then one partition might
> be too big to fit into memory (if it is 20GB). So is there any way for me
> to just can process underlying partitions by certain order? Basically I
> want to call mapPartitionsWithIndex with a range of index?
>
>
>
> Anyway to do it? Hope I describe my issue clear… J
>
>
>
> Regards,
>
>
>
> Shuai
>
>
>
>
>

Re: Process time series RDD after sortByKey

Posted by Zhan Zhang <zz...@hortonworks.com>.
Does the code flow similar to following work for you, which processes each partition of an RDD sequentially?

while( iterPartition < RDD.partitions.length) {
      val res = sc.runJob(this, (it: Iterator[T]) => somFunc, iterPartition, allowLocal = true)
      Some other function after processing one partition.
      iterPartition += 1
}

You can refer RDD.take for example.

Thanks.

Zhan Zhang

On Mar 9, 2015, at 3:41 PM, Shuai Zheng <sz...@gmail.com>> wrote:

Hi All,

I am processing some time series data. For one day, it might has 500GB, then for each hour, it is around 20GB data.

I need to sort the data before I start process. Assume I can sort them successfully

dayRDD.sortByKey

but after that, I might have thousands of partitions (to make the sort successfully), might be 1000 partitions. And then I try to process the data by hour (not need exactly one hour, but some kind of similar time frame). And I can’t just re-partition size to 24 because then one partition might be too big to fit into memory (if it is 20GB). So is there any way for me to just can process underlying partitions by certain order? Basically I want to call mapPartitionsWithIndex with a range of index?

Anyway to do it? Hope I describe my issue clear… :)

Regards,

Shuai