You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@crunch.apache.org by Danny Morgan <un...@hotmail.com> on 2015/01/13 22:11:27 UTC

Planning Optimization for Sort

Hi Everyone,
I have a crunch job that reads some data from s3 and applies a simple MapFn and then does a total order sort.
PCollection<String> rawdata = readTextFile("s3n://data");PCollection<String> data = rawdata.parallelDo(new myMapFn());Sort.sort(data); 
I noticed that Sort from the sort library works in two phases the former being called the presort phase. When I execute this pipeline as is the data is read and transformed three times, the first time to generate the PCollections, second time for the presort phase, and third for the final sort.
The snippet below ends up only reading the data from s3 once.
PCollection<String> rawdata = readTextFile("s3n://data");PCollection<String> data = rawdata.parallelDo(new myMapFn());data.cache();pipeline.run();Sort.sort(data);
Might be a crunch planner optimization opportunity?
Thanks!
Danny 		 	   		  

RE: Planning Optimization for Sort

Posted by Danny Morgan <un...@hotmail.com>.
Yes you are right I write "data" out at some point later as well.
Rerunning the MapFn is also kind of expensive, maybe instead of doing it based on the Source some interface similar to scaleFactor() in the MapFn?
Danny 
From: jwills@cloudera.com
Date: Tue, 13 Jan 2015 13:36:56 -0800
Subject: Re: Planning Optimization for Sort
To: user@crunch.apache.org

I counted two reads of the first job instead of three-- are you writing out the "data" PCollection as part of the job as well?
Trying to think of how I would want to communicate the fact that the s3 read is slow/expensive to the planner; maybe a bit on Source that could be used to signal an expensive source that should only ever be read once?
On Tue, Jan 13, 2015 at 1:11 PM, Danny Morgan <un...@hotmail.com> wrote:



Hi Everyone,
I have a crunch job that reads some data from s3 and applies a simple MapFn and then does a total order sort.
PCollection<String> rawdata = readTextFile("s3n://data");PCollection<String> data = rawdata.parallelDo(new myMapFn());Sort.sort(data); 
I noticed that Sort from the sort library works in two phases the former being called the presort phase. When I execute this pipeline as is the data is read and transformed three times, the first time to generate the PCollections, second time for the presort phase, and third for the final sort.
The snippet below ends up only reading the data from s3 once.
PCollection<String> rawdata = readTextFile("s3n://data");PCollection<String> data = rawdata.parallelDo(new myMapFn());data.cache();pipeline.run();Sort.sort(data);
Might be a crunch planner optimization opportunity?
Thanks!
Danny 		 	   		  


-- 
Director of Data ScienceClouderaTwitter: @josh_wills
 		 	   		  

Re: Planning Optimization for Sort

Posted by Josh Wills <jw...@cloudera.com>.
I counted two reads of the first job instead of three-- are you writing out
the "data" PCollection as part of the job as well?

Trying to think of how I would want to communicate the fact that the s3
read is slow/expensive to the planner; maybe a bit on Source that could be
used to signal an expensive source that should only ever be read once?

On Tue, Jan 13, 2015 at 1:11 PM, Danny Morgan <un...@hotmail.com>
wrote:

> Hi Everyone,
>
> I have a crunch job that reads some data from s3 and applies a simple
> MapFn and then does a total order sort.
>
> PCollection<String> rawdata = readTextFile("s3n://data");
> PCollection<String> data = rawdata.parallelDo(new myMapFn());
> Sort.sort(data);
>
> I noticed that Sort from the sort library works in two phases the former
> being called the presort phase. When I execute this pipeline as is the data
> is read and transformed three times, the first time to generate the
> PCollections, second time for the presort phase, and third for the final
> sort.
>
> The snippet below ends up only reading the data from s3 once.
>
> PCollection<String> rawdata = readTextFile("s3n://data");
> PCollection<String> data = rawdata.parallelDo(new myMapFn());
> data.cache();
> pipeline.run();
> Sort.sort(data);
>
> Might be a crunch planner optimization opportunity?
>
> Thanks!
>
> Danny
>



-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>