You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@crunch.apache.org by Gabriel Reid <ga...@gmail.com> on 2012/09/06 06:16:03 UTC

Checkpointing in pipelines

Hi guys,  

In some instances, we want to do some kind of iterative processing in Crunch, and run the same (or a similar) DoFn on the same PCollection multiple times.

For example, let's say we've got a PCollection of "grid" objects, and we want to iteratively divide each of these grids into four sub-grids, leading to exponential growth of the data. The naive way to do this would be to do the following:

PCollection<Grid> grids = …;
for (…){
   grids = grids.parallelDo(new SubdivideFn());
}

However, the above code would be optimized into a single string of DoFns, and not increasing the number of mappers we've got per iteration, which of course wouldn't work well with the exponential growth of data.

The current way of getting around this is to add a call to materialize().iterator() on the PCollection in each iteration (this is also done in the PageRankIT integration test).

What I propose is adding a "checkpoint" method to PCollection to signify that this should be an actual step in processing. This could work as follows:

PCollection<Grid> grids = …;
for (…){
   grids = grids.parallelDo(new SubdivideFn()).checkpoint();
}


In the short term this could even be implemented as just a call to materialize().iterator(), but putting encapsulating it in a method like this would allow us to work more efficiently with it in the future, especially once CRUNCH-34 is merged.

Any thoughts on this? The actual name of the method is my biggest concern, I'm not sure if "checkpoint" is the best name for it, but I can't think of anything better at the moment.

- Gabriel  


Re: Checkpointing in pipelines

Posted by Gabriel Reid <ga...@gmail.com>.

On Thursday 20 September 2012 at 16:50, Josh Wills wrote:

> Hey Gabriel (and others),
>  
> I think we are on the same page-- you're basically talking about
> creating a way to send hints (or perhaps, orders) to the optimizer in
> terms of how it should decide how to break a job up. I am very much on
> board with this.
>  
> J

Ok, cool, I'll try to put something together for this.

- Gabriel

  
>  
> On Thu, Sep 20, 2012 at 5:40 AM, Gabriel Reid <gabriel.reid@gmail.com (mailto:gabriel.reid@gmail.com)> wrote:
> > Hi Josh (and others),
> >  
> > I'm not sure if we were on the same page about this or not -- any thoughts on it in the meantime?
> >  
> > - Gabriel
> >  
> >  
> > On Thursday 6 September 2012 at 16:18, Gabriel Reid wrote:
> >  
> > > Hi Josh,
> > >  
> > > The last thing I would be doing after completing a trans-atlantic
> > > flight is checking developer mailing lists ;-)
> > >  
> > > What you're talking about (having a kind of rollback for job failures
> > > somewhere along the pipeline) could be facilitated with what I was
> > > talking about here, but it's not what I was trying to accomplish (I
> > > think you realize that, but I'm just making sure). However, it does
> > > kind of show that the name "checkpoint" isn't that descriptive for the
> > > specific use case that I was talking about (which is what I was a bit
> > > worried about).
> > >  
> > > To clarify, I'm talking about making it possible to have specify that
> > > a node in the execution graph of the pipeline shouldn't be merged in
> > > between two other nodes (for example, an output or a GBK). The
> > > specific use case that I'm going for is customizing the execution plan
> > > for performance, and not for failure recovery.
> > >  
> > > I think we're on the same page here, but just referring to two
> > > different use cases, right?
> > >  
> > > - Gabriel
> > >  
> > >  
> > > On Thu, Sep 6, 2012 at 4:00 PM, Josh Wills <jwills@cloudera.com (mailto:jwills@cloudera.com)> wrote:
> > > > I grok the concept and see the use case, but I was expecting that this
> > > > email was going to be about checkpointing in the sense of having Crunch
> > > > save state about the intermediate outputs of a processing pipeline and then
> > > > supporting the ability to restart a failed pipeline from a checkpointed
> > > > stage-- does that notion line up with what you had in mind here, or am I
> > > > just sleep deprived?
> > > >  
> > > > Josh, who just arrived in London
> > > >  
> > > > On Wed, Sep 5, 2012 at 9:16 PM, Gabriel Reid <gabriel.reid@gmail.com (mailto:gabriel.reid@gmail.com)> wrote:
> > > >  
> > > > > Hi guys,
> > > > >  
> > > > > In some instances, we want to do some kind of iterative processing in
> > > > > Crunch, and run the same (or a similar) DoFn on the same PCollection
> > > > > multiple times.
> > > > >  
> > > > > For example, let's say we've got a PCollection of "grid" objects, and we
> > > > > want to iteratively divide each of these grids into four sub-grids, leading
> > > > > to exponential growth of the data. The naive way to do this would be to do
> > > > > the following:
> > > > >  
> > > > > PCollection<Grid> grids = …;
> > > > > for (…){
> > > > > grids = grids.parallelDo(new SubdivideFn());
> > > > > }
> > > > >  
> > > > > However, the above code would be optimized into a single string of DoFns,
> > > > > and not increasing the number of mappers we've got per iteration, which of
> > > > > course wouldn't work well with the exponential growth of data.
> > > > >  
> > > > > The current way of getting around this is to add a call to
> > > > > materialize().iterator() on the PCollection in each iteration (this is also
> > > > > done in the PageRankIT integration test).
> > > > >  
> > > > > What I propose is adding a "checkpoint" method to PCollection to signify
> > > > > that this should be an actual step in processing. This could work as
> > > > > follows:
> > > > >  
> > > > > PCollection<Grid> grids = …;
> > > > > for (…){
> > > > > grids = grids.parallelDo(new SubdivideFn()).checkpoint();
> > > > > }
> > > > >  
> > > > >  
> > > > > In the short term this could even be implemented as just a call to
> > > > > materialize().iterator(), but putting encapsulating it in a method like
> > > > > this would allow us to work more efficiently with it in the future,
> > > > > especially once CRUNCH-34 is merged.
> > > > >  
> > > > > Any thoughts on this? The actual name of the method is my biggest concern,
> > > > > I'm not sure if "checkpoint" is the best name for it, but I can't think of
> > > > > anything better at the moment.
> > > > >  
> > > > > - Gabriel
> > > >  
> > > >  
> > > > --
> > > > Director of Data Science
> > > > Cloudera <http://www.cloudera.com>
> > > > Twitter: @josh_wills <http://twitter.com/josh_wills>
> > >  
> >  
>  
>  
>  
>  
>  
> --  
> Director of Data Science
> Cloudera
> Twitter: @josh_wills




Re: Checkpointing in pipelines

Posted by Josh Wills <jw...@cloudera.com>.
Hey Gabriel (and others),

I think we are on the same page-- you're basically talking about
creating a way to send hints (or perhaps, orders) to the optimizer in
terms of how it should decide how to break a job up. I am very much on
board with this.

J

On Thu, Sep 20, 2012 at 5:40 AM, Gabriel Reid <ga...@gmail.com> wrote:
> Hi Josh (and others),
>
> I'm not sure if we were on the same page about this or not -- any thoughts on it in the meantime?
>
> - Gabriel
>
>
> On Thursday 6 September 2012 at 16:18, Gabriel Reid wrote:
>
>> Hi Josh,
>>
>> The last thing I would be doing after completing a trans-atlantic
>> flight is checking developer mailing lists ;-)
>>
>> What you're talking about (having a kind of rollback for job failures
>> somewhere along the pipeline) could be facilitated with what I was
>> talking about here, but it's not what I was trying to accomplish (I
>> think you realize that, but I'm just making sure). However, it does
>> kind of show that the name "checkpoint" isn't that descriptive for the
>> specific use case that I was talking about (which is what I was a bit
>> worried about).
>>
>> To clarify, I'm talking about making it possible to have specify that
>> a node in the execution graph of the pipeline shouldn't be merged in
>> between two other nodes (for example, an output or a GBK). The
>> specific use case that I'm going for is customizing the execution plan
>> for performance, and not for failure recovery.
>>
>> I think we're on the same page here, but just referring to two
>> different use cases, right?
>>
>> - Gabriel
>>
>>
>> On Thu, Sep 6, 2012 at 4:00 PM, Josh Wills <jwills@cloudera.com (mailto:jwills@cloudera.com)> wrote:
>> > I grok the concept and see the use case, but I was expecting that this
>> > email was going to be about checkpointing in the sense of having Crunch
>> > save state about the intermediate outputs of a processing pipeline and then
>> > supporting the ability to restart a failed pipeline from a checkpointed
>> > stage-- does that notion line up with what you had in mind here, or am I
>> > just sleep deprived?
>> >
>> > Josh, who just arrived in London
>> >
>> > On Wed, Sep 5, 2012 at 9:16 PM, Gabriel Reid <gabriel.reid@gmail.com (mailto:gabriel.reid@gmail.com)> wrote:
>> >
>> > > Hi guys,
>> > >
>> > > In some instances, we want to do some kind of iterative processing in
>> > > Crunch, and run the same (or a similar) DoFn on the same PCollection
>> > > multiple times.
>> > >
>> > > For example, let's say we've got a PCollection of "grid" objects, and we
>> > > want to iteratively divide each of these grids into four sub-grids, leading
>> > > to exponential growth of the data. The naive way to do this would be to do
>> > > the following:
>> > >
>> > > PCollection<Grid> grids = …;
>> > > for (…){
>> > > grids = grids.parallelDo(new SubdivideFn());
>> > > }
>> > >
>> > > However, the above code would be optimized into a single string of DoFns,
>> > > and not increasing the number of mappers we've got per iteration, which of
>> > > course wouldn't work well with the exponential growth of data.
>> > >
>> > > The current way of getting around this is to add a call to
>> > > materialize().iterator() on the PCollection in each iteration (this is also
>> > > done in the PageRankIT integration test).
>> > >
>> > > What I propose is adding a "checkpoint" method to PCollection to signify
>> > > that this should be an actual step in processing. This could work as
>> > > follows:
>> > >
>> > > PCollection<Grid> grids = …;
>> > > for (…){
>> > > grids = grids.parallelDo(new SubdivideFn()).checkpoint();
>> > > }
>> > >
>> > >
>> > > In the short term this could even be implemented as just a call to
>> > > materialize().iterator(), but putting encapsulating it in a method like
>> > > this would allow us to work more efficiently with it in the future,
>> > > especially once CRUNCH-34 is merged.
>> > >
>> > > Any thoughts on this? The actual name of the method is my biggest concern,
>> > > I'm not sure if "checkpoint" is the best name for it, but I can't think of
>> > > anything better at the moment.
>> > >
>> > > - Gabriel
>> >
>> >
>> > --
>> > Director of Data Science
>> > Cloudera <http://www.cloudera.com>
>> > Twitter: @josh_wills <http://twitter.com/josh_wills>
>>
>
>
>



-- 
Director of Data Science
Cloudera
Twitter: @josh_wills

Re: Checkpointing in pipelines

Posted by Gabriel Reid <ga...@gmail.com>.
Hi Josh (and others),

I'm not sure if we were on the same page about this or not -- any thoughts on it in the meantime?

- Gabriel  


On Thursday 6 September 2012 at 16:18, Gabriel Reid wrote:

> Hi Josh,
>  
> The last thing I would be doing after completing a trans-atlantic
> flight is checking developer mailing lists ;-)
>  
> What you're talking about (having a kind of rollback for job failures
> somewhere along the pipeline) could be facilitated with what I was
> talking about here, but it's not what I was trying to accomplish (I
> think you realize that, but I'm just making sure). However, it does
> kind of show that the name "checkpoint" isn't that descriptive for the
> specific use case that I was talking about (which is what I was a bit
> worried about).
>  
> To clarify, I'm talking about making it possible to have specify that
> a node in the execution graph of the pipeline shouldn't be merged in
> between two other nodes (for example, an output or a GBK). The
> specific use case that I'm going for is customizing the execution plan
> for performance, and not for failure recovery.
>  
> I think we're on the same page here, but just referring to two
> different use cases, right?
>  
> - Gabriel
>  
>  
> On Thu, Sep 6, 2012 at 4:00 PM, Josh Wills <jwills@cloudera.com (mailto:jwills@cloudera.com)> wrote:
> > I grok the concept and see the use case, but I was expecting that this
> > email was going to be about checkpointing in the sense of having Crunch
> > save state about the intermediate outputs of a processing pipeline and then
> > supporting the ability to restart a failed pipeline from a checkpointed
> > stage-- does that notion line up with what you had in mind here, or am I
> > just sleep deprived?
> >  
> > Josh, who just arrived in London
> >  
> > On Wed, Sep 5, 2012 at 9:16 PM, Gabriel Reid <gabriel.reid@gmail.com (mailto:gabriel.reid@gmail.com)> wrote:
> >  
> > > Hi guys,
> > >  
> > > In some instances, we want to do some kind of iterative processing in
> > > Crunch, and run the same (or a similar) DoFn on the same PCollection
> > > multiple times.
> > >  
> > > For example, let's say we've got a PCollection of "grid" objects, and we
> > > want to iteratively divide each of these grids into four sub-grids, leading
> > > to exponential growth of the data. The naive way to do this would be to do
> > > the following:
> > >  
> > > PCollection<Grid> grids = …;
> > > for (…){
> > > grids = grids.parallelDo(new SubdivideFn());
> > > }
> > >  
> > > However, the above code would be optimized into a single string of DoFns,
> > > and not increasing the number of mappers we've got per iteration, which of
> > > course wouldn't work well with the exponential growth of data.
> > >  
> > > The current way of getting around this is to add a call to
> > > materialize().iterator() on the PCollection in each iteration (this is also
> > > done in the PageRankIT integration test).
> > >  
> > > What I propose is adding a "checkpoint" method to PCollection to signify
> > > that this should be an actual step in processing. This could work as
> > > follows:
> > >  
> > > PCollection<Grid> grids = …;
> > > for (…){
> > > grids = grids.parallelDo(new SubdivideFn()).checkpoint();
> > > }
> > >  
> > >  
> > > In the short term this could even be implemented as just a call to
> > > materialize().iterator(), but putting encapsulating it in a method like
> > > this would allow us to work more efficiently with it in the future,
> > > especially once CRUNCH-34 is merged.
> > >  
> > > Any thoughts on this? The actual name of the method is my biggest concern,
> > > I'm not sure if "checkpoint" is the best name for it, but I can't think of
> > > anything better at the moment.
> > >  
> > > - Gabriel
> >  
> >  
> > --
> > Director of Data Science
> > Cloudera <http://www.cloudera.com>
> > Twitter: @josh_wills <http://twitter.com/josh_wills>
>  




Re: Checkpointing in pipelines

Posted by Gabriel Reid <ga...@gmail.com>.
Hi Josh,

The last thing I would be doing after completing a trans-atlantic
flight is checking developer mailing lists ;-)

What you're talking about (having a kind of rollback for job failures
somewhere along the pipeline) could be facilitated with what I was
talking about here, but it's not what I was trying to accomplish (I
think you realize that, but I'm just making sure). However, it does
kind of show that the name "checkpoint" isn't that descriptive for the
specific use case that I was talking about (which is what I was a bit
worried about).

To clarify, I'm talking about making it possible to have specify that
a node in the execution graph of the pipeline shouldn't be merged in
between two other nodes (for example, an output or a GBK). The
specific use case that I'm going for is customizing the execution plan
for performance, and not for failure recovery.

I think we're on the same page here, but just referring to two
different use cases, right?

- Gabriel


On Thu, Sep 6, 2012 at 4:00 PM, Josh Wills <jw...@cloudera.com> wrote:
> I grok the concept and see the use case, but I was expecting that this
> email was going to be about checkpointing in the sense of having Crunch
> save state about the intermediate outputs of a processing pipeline and then
> supporting the ability to restart a failed pipeline from a checkpointed
> stage-- does that notion line up with what you had in mind here, or am I
> just sleep deprived?
>
> Josh, who just arrived in London
>
> On Wed, Sep 5, 2012 at 9:16 PM, Gabriel Reid <ga...@gmail.com> wrote:
>
>> Hi guys,
>>
>> In some instances, we want to do some kind of iterative processing in
>> Crunch, and run the same (or a similar) DoFn on the same PCollection
>> multiple times.
>>
>> For example, let's say we've got a PCollection of "grid" objects, and we
>> want to iteratively divide each of these grids into four sub-grids, leading
>> to exponential growth of the data. The naive way to do this would be to do
>> the following:
>>
>> PCollection<Grid> grids = …;
>> for (…){
>>    grids = grids.parallelDo(new SubdivideFn());
>> }
>>
>> However, the above code would be optimized into a single string of DoFns,
>> and not increasing the number of mappers we've got per iteration, which of
>> course wouldn't work well with the exponential growth of data.
>>
>> The current way of getting around this is to add a call to
>> materialize().iterator() on the PCollection in each iteration (this is also
>> done in the PageRankIT integration test).
>>
>> What I propose is adding a "checkpoint" method to PCollection to signify
>> that this should be an actual step in processing. This could work as
>> follows:
>>
>> PCollection<Grid> grids = …;
>> for (…){
>>    grids = grids.parallelDo(new SubdivideFn()).checkpoint();
>> }
>>
>>
>> In the short term this could even be implemented as just a call to
>> materialize().iterator(), but putting encapsulating it in a method like
>> this would allow us to work more efficiently with it in the future,
>> especially once CRUNCH-34 is merged.
>>
>> Any thoughts on this? The actual name of the method is my biggest concern,
>> I'm not sure if "checkpoint" is the best name for it, but I can't think of
>> anything better at the moment.
>>
>> - Gabriel
>>
>>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>

Re: Checkpointing in pipelines

Posted by Josh Wills <jw...@cloudera.com>.
I grok the concept and see the use case, but I was expecting that this
email was going to be about checkpointing in the sense of having Crunch
save state about the intermediate outputs of a processing pipeline and then
supporting the ability to restart a failed pipeline from a checkpointed
stage-- does that notion line up with what you had in mind here, or am I
just sleep deprived?

Josh, who just arrived in London

On Wed, Sep 5, 2012 at 9:16 PM, Gabriel Reid <ga...@gmail.com> wrote:

> Hi guys,
>
> In some instances, we want to do some kind of iterative processing in
> Crunch, and run the same (or a similar) DoFn on the same PCollection
> multiple times.
>
> For example, let's say we've got a PCollection of "grid" objects, and we
> want to iteratively divide each of these grids into four sub-grids, leading
> to exponential growth of the data. The naive way to do this would be to do
> the following:
>
> PCollection<Grid> grids = …;
> for (…){
>    grids = grids.parallelDo(new SubdivideFn());
> }
>
> However, the above code would be optimized into a single string of DoFns,
> and not increasing the number of mappers we've got per iteration, which of
> course wouldn't work well with the exponential growth of data.
>
> The current way of getting around this is to add a call to
> materialize().iterator() on the PCollection in each iteration (this is also
> done in the PageRankIT integration test).
>
> What I propose is adding a "checkpoint" method to PCollection to signify
> that this should be an actual step in processing. This could work as
> follows:
>
> PCollection<Grid> grids = …;
> for (…){
>    grids = grids.parallelDo(new SubdivideFn()).checkpoint();
> }
>
>
> In the short term this could even be implemented as just a call to
> materialize().iterator(), but putting encapsulating it in a method like
> this would allow us to work more efficiently with it in the future,
> especially once CRUNCH-34 is merged.
>
> Any thoughts on this? The actual name of the method is my biggest concern,
> I'm not sure if "checkpoint" is the best name for it, but I can't think of
> anything better at the moment.
>
> - Gabriel
>
>


-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>