You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by Chandni Singh <ch...@datatorrent.com> on 2015/11/23 00:56:56 UTC

Why is Async checkpointing made default?

With Async checkpointing the checkpoint callback in CheckpointPoint
listener is called for a previous window, that is,
beginWindow (x) -> endWindow(x) -> checkpointed (x-1 )

This feature was newly introduced. With synchronous checkpointing, the
behavior was always
beginWindow(x) -> endWindow(x) -> checkpointed (x)

A lot of operators were written before asynchronous checkpointing was
introduced and few of them can rely on the sequencing guaranteed by
synchronous checkpointing.

So why was Async Checkpointed made default?

With how Async checkpoint is today, the complexity to handle transient
state in checkpointed callback falls on every operator. For eg, lets say
earlier I had a transient map which I cleared every time the checkpointed
was called, with async checkpointing this simple task will be a lot more
complicated.

I think Async checkpointing broke the semantics of operator callbacks and
should NOT be the default.

Re: Why is Async checkpointing made default?

Posted by Gaurav Gupta <ga...@datatorrent.com>.
Thomas,

I did it to maintain the semantics of Checkpoint. If we need to change this then I can change the implementation.

Thanks
- Gaurav

> On Nov 22, 2015, at 10:11 PM, Thomas Weise <th...@datatorrent.com> wrote:
> 
> You can only perform such operation in committed. Anything done in
> checkpointed can be repeated (until it becomes a recovery checkpoint).
> 
> On Sun, Nov 22, 2015 at 10:02 PM, Gaurav Gupta <ga...@datatorrent.com>
> wrote:
> 
>> Thomas,
>> 
>> This was done to preserve checkpointing semantics that is to tell the
>> operator that its state is preserved. Say if database is updated or files
>> are moved in checkpointed call but the state copy fails, how to address
>> such scenarios?
>> 
>> Thanks
>> - Gaurav
>> 
>>> On Nov 22, 2015, at 9:44 PM, Thomas Weise <th...@datatorrent.com>
>> wrote:
>>> 
>>> Alternatively I would ask why the checkpointed callback needs to wait
>> until
>>> the data was copied to HDFS instead upon completion of the state
>>> serialization.
>>> 
>>> Thomas
>>> 
>>> 
>>> On Sun, Nov 22, 2015 at 9:41 PM, Chandni Singh <ch...@datatorrent.com>
>>> wrote:
>>> 
>>>> Gaurav,
>>>> 
>>>> My question is about why Async was made the default when it changed the
>>>> semantics of operator callbacks. Your response doesn't answer that.
>>>> 
>>>> In a way we broke backward compatibility.
>>>> 
>>>> Chandni
>>>> 
>>>> On Sun, Nov 22, 2015 at 9:22 PM, Gaurav Gupta <ga...@datatorrent.com>
>>>> wrote:
>>>> 
>>>>> The idea behind Async checkpointing is to unblock operator while the
>>>> state
>>>>> is getting transferred to HDFS.
>>>>> Just to clarify that this beginWindow (x) -> endWindow(x) ->
>> checkpointed
>>>>> (x-1 ) should be an ideal sequence, but if the HDFS is slow or for some
>>>>> other reason transferring the state to HDFS is slow this sequence may
>> not
>>>>> hold true.
>>>>> 
>>>>> Can your use case be addressed by
>>>>> https://malhar.atlassian.net/browse/APEX-78 <
>>>>> https://malhar.atlassian.net/browse/APEX-78>?
>>>>> 
>>>>> Thanks
>>>>> - Gaurav
>>>>> 
>>>>>> On Nov 22, 2015, at 3:56 PM, Chandni Singh <ch...@datatorrent.com>
>>>>> wrote:
>>>>>> 
>>>>>> With Async checkpointing the checkpoint callback in CheckpointPoint
>>>>>> listener is called for a previous window, that is,
>>>>>> beginWindow (x) -> endWindow(x) -> checkpointed (x-1 )
>>>>>> 
>>>>>> This feature was newly introduced. With synchronous checkpointing, the
>>>>>> behavior was always
>>>>>> beginWindow(x) -> endWindow(x) -> checkpointed (x)
>>>>>> 
>>>>>> A lot of operators were written before asynchronous checkpointing was
>>>>>> introduced and few of them can rely on the sequencing guaranteed by
>>>>>> synchronous checkpointing.
>>>>>> 
>>>>>> So why was Async Checkpointed made default?
>>>>>> 
>>>>>> With how Async checkpoint is today, the complexity to handle transient
>>>>>> state in checkpointed callback falls on every operator. For eg, lets
>>>> say
>>>>>> earlier I had a transient map which I cleared every time the
>>>> checkpointed
>>>>>> was called, with async checkpointing this simple task will be a lot
>>>> more
>>>>>> complicated.
>>>>>> 
>>>>>> I think Async checkpointing broke the semantics of operator callbacks
>>>> and
>>>>>> should NOT be the default.
>>>>> 
>>>>> 
>>>> 
>> 
>> 


Re: Why is Async checkpointing made default?

Posted by Thomas Weise <th...@datatorrent.com>.
You can only perform such operation in committed. Anything done in
checkpointed can be repeated (until it becomes a recovery checkpoint).

On Sun, Nov 22, 2015 at 10:02 PM, Gaurav Gupta <ga...@datatorrent.com>
wrote:

> Thomas,
>
> This was done to preserve checkpointing semantics that is to tell the
> operator that its state is preserved. Say if database is updated or files
> are moved in checkpointed call but the state copy fails, how to address
> such scenarios?
>
> Thanks
> - Gaurav
>
> > On Nov 22, 2015, at 9:44 PM, Thomas Weise <th...@datatorrent.com>
> wrote:
> >
> > Alternatively I would ask why the checkpointed callback needs to wait
> until
> > the data was copied to HDFS instead upon completion of the state
> > serialization.
> >
> > Thomas
> >
> >
> > On Sun, Nov 22, 2015 at 9:41 PM, Chandni Singh <ch...@datatorrent.com>
> > wrote:
> >
> >> Gaurav,
> >>
> >> My question is about why Async was made the default when it changed the
> >> semantics of operator callbacks. Your response doesn't answer that.
> >>
> >> In a way we broke backward compatibility.
> >>
> >> Chandni
> >>
> >> On Sun, Nov 22, 2015 at 9:22 PM, Gaurav Gupta <ga...@datatorrent.com>
> >> wrote:
> >>
> >>> The idea behind Async checkpointing is to unblock operator while the
> >> state
> >>> is getting transferred to HDFS.
> >>> Just to clarify that this beginWindow (x) -> endWindow(x) ->
> checkpointed
> >>> (x-1 ) should be an ideal sequence, but if the HDFS is slow or for some
> >>> other reason transferring the state to HDFS is slow this sequence may
> not
> >>> hold true.
> >>>
> >>> Can your use case be addressed by
> >>> https://malhar.atlassian.net/browse/APEX-78 <
> >>> https://malhar.atlassian.net/browse/APEX-78>?
> >>>
> >>> Thanks
> >>> - Gaurav
> >>>
> >>>> On Nov 22, 2015, at 3:56 PM, Chandni Singh <ch...@datatorrent.com>
> >>> wrote:
> >>>>
> >>>> With Async checkpointing the checkpoint callback in CheckpointPoint
> >>>> listener is called for a previous window, that is,
> >>>> beginWindow (x) -> endWindow(x) -> checkpointed (x-1 )
> >>>>
> >>>> This feature was newly introduced. With synchronous checkpointing, the
> >>>> behavior was always
> >>>> beginWindow(x) -> endWindow(x) -> checkpointed (x)
> >>>>
> >>>> A lot of operators were written before asynchronous checkpointing was
> >>>> introduced and few of them can rely on the sequencing guaranteed by
> >>>> synchronous checkpointing.
> >>>>
> >>>> So why was Async Checkpointed made default?
> >>>>
> >>>> With how Async checkpoint is today, the complexity to handle transient
> >>>> state in checkpointed callback falls on every operator. For eg, lets
> >> say
> >>>> earlier I had a transient map which I cleared every time the
> >> checkpointed
> >>>> was called, with async checkpointing this simple task will be a lot
> >> more
> >>>> complicated.
> >>>>
> >>>> I think Async checkpointing broke the semantics of operator callbacks
> >> and
> >>>> should NOT be the default.
> >>>
> >>>
> >>
>
>

Re: Why is Async checkpointing made default?

Posted by "Chetan Narsude (cnarsude)" <cn...@cisco.com>.
Can you give an example of broken checkpointed call?

—
Chetan


On 11/23/15, 10:38 AM, "Chandni Singh" <ch...@datatorrent.com> wrote:

>:-) I did not write it that semantics. Looks like someone made that
>assumption along the way. Fix the code which assumes that.
>
>What  do you mean?
>We go on fixing all the code which was written because the behavior with
>Synchronous checkpointing was
>beginWindow(x) -> endWindow -> checkpointWindow(x)   ( when checkpointing
>is aligned with application window boundary)
>
>This was broken recently with async checkpointing but instead of fixing
>that, we go on fixing all the existing solutions.
>I guess I find this ridiculous.
>
>And why should we make it harder and harder to write operators?
>I think majority of people who participated in this discussion do believe
>that we need to fix and I think this is critical.
>
>Chandni
>
>
>
>On Mon, Nov 23, 2015 at 10:16 AM, Chetan Narsude (cnarsude) <
>cnarsude@cisco.com> wrote:
>
>>
>>
>> On 11/23/15, 8:44 AM, "Chandni Singh" <ch...@datatorrent.com> wrote:
>>
>> >I think in the API, there is windowId in the checkpointed callback for
>> >cases when checkpointing could happen within application windows.
>> >
>> >IMHO backward compatibility is broken. For 3 years since the platform
>>was
>> >created the semantics of checkpointed were
>> >beginWindow(x) -> endwindow -> checkpointWindow(x) when checkpointing
>>was
>> >aligned with application window boundaries.
>> >
>>
>> :-) I did not write it that semantics. Looks like someone made that
>> assumption along the way. Fix the code which assumes that.
>>
>> ‹
>> Chetan
>>
>> >Please not that aligning Checkpointing with Application Window
>>boundary is
>> >the DEFAUL behavior and I think most of us agree that aligning the
>> >checkpointing with application window boundaries is what we see in
>>every
>> >use case.
>> >
>> >Code was written with that assumption by committers of Apex (not even
>>by
>> >people who are new to Apex). It is broken by by a change introduced
>>couple
>> >of months back (August 2015).
>> >
>> >Moreover, what do we achieve by NOT guaranteeing that semantics-
>>delegate
>> >the complexity to operators to handle it. Even a simple scenario that I
>> >mentioned in my first mail, is very complicated.
>> >
>> >I think this is a big issue and we need to fix it.
>> >
>> >Chandni
>> >
>> >On Mon, Nov 23, 2015 at 7:12 AM, Munagala Ramanath
>><ra...@datatorrent.com>
>> >wrote:
>> >
>> >> I too find Gaurav's argument cogent: endWindow() does not take a
>> >> windowId parameter
>> >> leading to a natural guarantee that it matches the immediately
>> >> preceding beginWindow().
>> >>
>> >> Both committed() and checkpointed() take that parameter which
>>suggests
>> >>it
>> >> may
>> >> lag behind the current window. The comments on those methods say
>>nothing
>> >> that can be be interpreted as a guarantee that the windowId will
>>match
>> >>the
>> >> window just processed. So, from the viewpoint of "original intent" --
>> >> a phrase that
>> >> has a long and storied history in jurisprudence
>> >> (https://en.wikipedia.org/wiki/Original_intent) --
>> >> it seems that any code that assumed a guarantee when none existed
>>must
>> >> be regarded
>> >> as erroneous.
>> >>
>> >> Having said that, we are all aware that in software it is not at all
>> >> unusual to preserve behavior
>> >> in the interests of backward compatibility, regardless of many other
>> >> reasons for that behavior
>> >> to be considered objectionable. So, if the cost of fixing all the
>>code
>> >> that makes that
>> >> assumption is too high, we should seriously consider reverting it. In
>> >> this context, the
>> >> guarantee that checkpointed() simply means that the operator state
>>has
>> >> been serialized
>> >> seems adequate.
>> >>
>> >> We have a roughly analogous situation with respect to OS write()
>>calls:
>> >> When the call returns, we _do not_ have an assurance that the data
>>has
>> >> gone to disk.
>> >> All we know is that the OS has copied the data to its buffers for
>> >> flushing to disk. If we
>> >> want to know when the actual write to disk is done, we need to use a
>> >> different call -- fsync().
>> >>
>> >> Ram
>> >>
>> >> On Mon, Nov 23, 2015 at 6:19 AM, Pramod Immaneni
>> >><pr...@datatorrent.com>
>> >> wrote:
>> >> > I think the original intent for checkpointed callback was that it
>>can
>> >>be
>> >> > called anytime after checkpoint and not necessarily immediately
>>after
>> >>the
>> >> > window prior of checkpoint. As Gaurav mentioned the API bears it
>>out.
>> >> > Furthermore the callback has to be called within the lifecycle
>> >>methods of
>> >> > the operator so it will be called inside a window so you anyway
>>have
>> >>to
>> >> > deal with new data anyway before the callback is called. Even
>>though
>> >> > checkpointed is not committed shouldn't it at least guarantee that
>>the
>> >> > operator is recoverable to that state in which case it should be
>> >>called
>> >> > after the save to HDFS actually finishes.
>> >> >
>> >> > Thanks
>> >> >
>> >> > On Mon, Nov 23, 2015 at 5:38 AM, Gaurav Gupta
>><gaurav@datatorrent.com
>> >
>> >> > wrote:
>> >> >
>> >> >> IMO, I don¹t think there is any backward incompatibility wrt
>> >> Checkpointing
>> >> >> call back semantics because
>> >> >>
>> >> >> 1. The checkpointed call is only made once the operator state is
>> >> preserved.
>> >> >> 2. The window ids being passed to checkpointed are in increasing
>> >>order.
>> >> >> 3. The window ids being passed are still the same ids as were
>>passed
>> >> >> earlier.
>> >> >> 4. The sequence is still
>>begingWindow()->endWindow()->checkpointed().
>> >> >>
>> >> >> Thanks
>> >> >> - Gaurav
>> >> >>
>> >> >> > On Nov 23, 2015, at 1:03 AM, Gaurav Gupta
>><ga...@datatorrent.com>
>> >> >> wrote:
>> >> >> >
>> >> >> > If the requirement is that the order is always
>> >> >> begingWindow()->endWindow()->checkpointed(), why to pass windowId
>>in
>> >>the
>> >> >> checkpointed() call back?
>> >> >> >
>> >> >> > Thanks
>> >> >> > - Gaurav
>> >> >> >
>> >> >> >> On Nov 22, 2015, at 11:22 PM, Chandni Singh
>> >><chandni@datatorrent.com
>> >> >> <ma...@datatorrent.com>> wrote:
>> >> >> >>
>> >> >> >> FYI,
>> >> >> >>
>> >> >> >> HDHTWriter implementation is dependent on the older semantics
>>and
>> >> seems
>> >> >> to
>> >> >> >> be broken now.
>> >> >> >> startWindow(x) -> endWindow(x) -> checkpointed(x)
>> >> >> >> In the checkpointed implementation, it copies certain state
>> >> (transient)
>> >> >> and
>> >> >> >> transfers it to a checkpointedWriteCache with respect to window
>> >>'x'.
>> >> >> >>
>> >> >> >> With Async checkpointing it, the state that is transferred is
>>much
>> >> more
>> >> >> >> recent than window 'x'.
>> >> >> >>
>> >> >> >> Chandni
>> >> >> >>
>> >> >> >>
>> >> >> >> On Sun, Nov 22, 2015 at 11:04 PM, Chandni Singh <
>> >> >> chandni@datatorrent.com <ma...@datatorrent.com>>
>> >> >> >> wrote:
>> >> >> >>
>> >> >> >>> Agreed. Thomas's solution fixes the backward incompatibility.
>>I
>> >> think
>> >> >> we
>> >> >> >>> really need to fix this.
>> >> >> >>>
>> >> >> >>> On Sun, Nov 22, 2015 at 10:23 PM, Timothy Farkas <
>> >> tim@datatorrent.com
>> >> >> <ma...@datatorrent.com>>
>> >> >> >>> wrote:
>> >> >> >>>
>> >> >> >>>> Gaurav,
>> >> >> >>>>
>> >> >> >>>> I think if the state copy fails then STRAM should roll back
>>the
>> >> >> operator
>> >> >> >>>> to
>> >> >> >>>> a checkpoint that is further back than the last checkpoint.
>>If
>> >>you
>> >> are
>> >> >> >>>> saying that you want to preserve the semantic that
>>checkpointed
>> >>is
>> >> >> only
>> >> >> >>>> called after a checkpoint is completed, I would argue that
>>that
>> >> >> guarantee
>> >> >> >>>> is already pointless in the current implementation since it
>>is
>> >> always
>> >> >> >>>> possible for an operator to be rolled back to a checkpoint
>> >>before
>> >> it's
>> >> >> >>>> last
>> >> >> >>>> completed checkpoint. So, it is already currently possible
>>for
>> >>some
>> >> >> >>>> database or file operation performed after a completed
>> >>checkpoint
>> >> to
>> >> >> be
>> >> >> >>>> redone after a failure. Because of this I think Thomas's
>> >>solution
>> >> >> makes
>> >> >> >>>> the
>> >> >> >>>> most sense. Thomas's solution would also address Chandni's
>> >>original
>> >> >> point
>> >> >> >>>> that the semantics for the checkpointed call back have been
>> >> violated.
>> >> >> >>>> There
>> >> >> >>>> are operators in our libraries that have depended on the
>> >> >> beginWindow(x),
>> >> >> >>>> endWindow(x), and checkpointed(x) call sequence, which is now
>> >> broken.
>> >> >> We
>> >> >> >>>> should probably fix that.
>> >> >> >>>>
>> >> >> >>>> Tim
>> >> >> >>>>
>> >> >> >>>> On Sun, Nov 22, 2015 at 10:02 PM, Gaurav Gupta <
>> >> >> gaurav@datatorrent.com <ma...@datatorrent.com>>
>> >> >> >>>> wrote:
>> >> >> >>>>
>> >> >> >>>>> Thomas,
>> >> >> >>>>>
>> >> >> >>>>> This was done to preserve checkpointing semantics that is to
>> >>tell
>> >> the
>> >> >> >>>>> operator that its state is preserved. Say if database is
>> >>updated
>> >> or
>> >> >> >>>> files
>> >> >> >>>>> are moved in checkpointed call but the state copy fails,
>>how to
>> >> >> address
>> >> >> >>>>> such scenarios?
>> >> >> >>>>>
>> >> >> >>>>> Thanks
>> >> >> >>>>> - Gaurav
>> >> >> >>>>>
>> >> >> >>>>>> On Nov 22, 2015, at 9:44 PM, Thomas Weise <
>> >> thomas@datatorrent.com
>> >> >> <ma...@datatorrent.com>>
>> >> >> >>>>> wrote:
>> >> >> >>>>>>
>> >> >> >>>>>> Alternatively I would ask why the checkpointed callback
>>needs
>> >>to
>> >> >> wait
>> >> >> >>>>> until
>> >> >> >>>>>> the data was copied to HDFS instead upon completion of the
>> >>state
>> >> >> >>>>>> serialization.
>> >> >> >>>>>>
>> >> >> >>>>>> Thomas
>> >> >> >>>>>>
>> >> >> >>>>>>
>> >> >> >>>>>> On Sun, Nov 22, 2015 at 9:41 PM, Chandni Singh <
>> >> >> >>>> chandni@datatorrent.com <ma...@datatorrent.com>>
>> >> >> >>>>>> wrote:
>> >> >> >>>>>>
>> >> >> >>>>>>> Gaurav,
>> >> >> >>>>>>>
>> >> >> >>>>>>> My question is about why Async was made the default when
>>it
>> >> changed
>> >> >> >>>> the
>> >> >> >>>>>>> semantics of operator callbacks. Your response doesn't
>>answer
>> >> that.
>> >> >> >>>>>>>
>> >> >> >>>>>>> In a way we broke backward compatibility.
>> >> >> >>>>>>>
>> >> >> >>>>>>> Chandni
>> >> >> >>>>>>>
>> >> >> >>>>>>> On Sun, Nov 22, 2015 at 9:22 PM, Gaurav Gupta <
>> >> >> >>>> gaurav@datatorrent.com <ma...@datatorrent.com>>
>> >> >> >>>>>>> wrote:
>> >> >> >>>>>>>
>> >> >> >>>>>>>> The idea behind Async checkpointing is to unblock
>>operator
>> >> while
>> >> >> the
>> >> >> >>>>>>> state
>> >> >> >>>>>>>> is getting transferred to HDFS.
>> >> >> >>>>>>>> Just to clarify that this beginWindow (x) ->
>>endWindow(x) ->
>> >> >> >>>>> checkpointed
>> >> >> >>>>>>>> (x-1 ) should be an ideal sequence, but if the HDFS is
>>slow
>> >>or
>> >> for
>> >> >> >>>> some
>> >> >> >>>>>>>> other reason transferring the state to HDFS is slow this
>> >> sequence
>> >> >> >>>> may
>> >> >> >>>>> not
>> >> >> >>>>>>>> hold true.
>> >> >> >>>>>>>>
>> >> >> >>>>>>>> Can your use case be addressed by
>> >> >> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78 <
>> >> >> https://malhar.atlassian.net/browse/APEX-78> <
>> >> >> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78 <
>> >> >> https://malhar.atlassian.net/browse/APEX-78>>?
>> >> >> >>>>>>>>
>> >> >> >>>>>>>> Thanks
>> >> >> >>>>>>>> - Gaurav
>> >> >> >>>>>>>>
>> >> >> >>>>>>>>> On Nov 22, 2015, at 3:56 PM, Chandni Singh <
>> >> >> >>>> chandni@datatorrent.com <ma...@datatorrent.com>>
>> >> >> >>>>>>>> wrote:
>> >> >> >>>>>>>>>
>> >> >> >>>>>>>>> With Async checkpointing the checkpoint callback in
>> >> >> CheckpointPoint
>> >> >> >>>>>>>>> listener is called for a previous window, that is,
>> >> >> >>>>>>>>> beginWindow (x) -> endWindow(x) -> checkpointed (x-1 )
>> >> >> >>>>>>>>>
>> >> >> >>>>>>>>> This feature was newly introduced. With synchronous
>> >> >> checkpointing,
>> >> >> >>>> the
>> >> >> >>>>>>>>> behavior was always
>> >> >> >>>>>>>>> beginWindow(x) -> endWindow(x) -> checkpointed (x)
>> >> >> >>>>>>>>>
>> >> >> >>>>>>>>> A lot of operators were written before asynchronous
>> >> checkpointing
>> >> >> >>>> was
>> >> >> >>>>>>>>> introduced and few of them can rely on the sequencing
>> >> guaranteed
>> >> >> by
>> >> >> >>>>>>>>> synchronous checkpointing.
>> >> >> >>>>>>>>>
>> >> >> >>>>>>>>> So why was Async Checkpointed made default?
>> >> >> >>>>>>>>>
>> >> >> >>>>>>>>> With how Async checkpoint is today, the complexity to
>> >>handle
>> >> >> >>>> transient
>> >> >> >>>>>>>>> state in checkpointed callback falls on every operator.
>>For
>> >> eg,
>> >> >> >>>> lets
>> >> >> >>>>>>> say
>> >> >> >>>>>>>>> earlier I had a transient map which I cleared every time
>> >>the
>> >> >> >>>>>>> checkpointed
>> >> >> >>>>>>>>> was called, with async checkpointing this simple task
>>will
>> >>be
>> >> a
>> >> >> lot
>> >> >> >>>>>>> more
>> >> >> >>>>>>>>> complicated.
>> >> >> >>>>>>>>>
>> >> >> >>>>>>>>> I think Async checkpointing broke the semantics of
>>operator
>> >> >> >>>> callbacks
>> >> >> >>>>>>> and
>> >> >> >>>>>>>>> should NOT be the default.
>> >> >> >>>>>>>>
>> >> >> >>>>>>>>
>> >> >> >>>>>>>
>> >> >> >>>>>
>> >> >> >>>>>
>> >> >> >>>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >
>> >> >>
>> >> >>
>> >>
>>
>>


Re: Why is Async checkpointing made default?

Posted by Pramod Immaneni <pr...@datatorrent.com>.
There is a pull request for that :)

> On Dec 11, 2015, at 9:43 AM, Thomas Weise <th...@datatorrent.com> wrote:
>
> I think we should focus on getting APEXCORE-78 done instead. It will
> address a number of existing use cases, including those where we tried to
> utilize checkpointed(...).
>
>
> On Fri, Dec 11, 2015 at 9:36 AM, Chandni Singh <ch...@datatorrent.com>
> wrote:
>
>> Hi,
>>
>> There hasn't been a close bracket here (borrowing Ram's expression :-) ).
>> From what I see the majority agrees with making the fix.
>>
>> Do we need to start a vote for this?
>>
>> On Fri, Nov 27, 2015 at 3:00 PM, Thomas Weise <th...@datatorrent.com>
>> wrote:
>>
>>> Chetan,
>>>
>>> Would like to understand how the checkpointed callback helps you with
>> what
>>> you indicated. This may require a specific example. Let's take it
>> offline.
>>>
>>> Thomas
>>>
>>> On Wed, Nov 25, 2015 at 4:07 PM, Chetan Narsude (cnarsude) <
>>> cnarsude@cisco.com> wrote:
>>>
>>>> Yes - a few but cannot share the details - protected under NDA - ping
>> me
>>>> in private and I can probably be able to give you more generic details
>> on
>>>> similar cooked up examples.
>>>>
>>>> The part that follows “e.g.” below is an example that probably is
>>>> sufficient to infer the use case logically, I think. I shared that to
>>>> exemplify how changing the semantics will break semver.
>>>>
>>>> —
>>>> Chetan
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>> On 11/25/15, 3:51 PM, "Thomas Weise" <th...@datatorrent.com> wrote:
>>>>>
>>>>> Do you have a specific example?
>>>>>
>>>>> I see this happening in committed(), but not in checkpointed() where
>> the
>>>>> checkpoint remains intermediate, whether it was copied to HDFS or not.
>>>>>
>>>>>
>>>>> On Wed, Nov 25, 2015 at 3:42 PM, Chetan Narsude (cnarsude) <
>>>>> cnarsude@cisco.com> wrote:
>>>>>
>>>>>>>
>>>>>>> Until we have this, how about we restore the previous behavior
>>>>>>> temporarily?
>>>>>>> Calling checkpointed() immediately does not seem to pose any
>>> practical
>>>>>>> issue but ensures that the code that was written under this
>>> assumption
>>>>>> is
>>>>>>> not broken.
>>>>>>
>>>>>> We can¹t do it. It would be incorrect. It breaks all the other code
>>> that
>>>>>> (unassumingly) correctly complied to the semantics. e.g. an operator
>>>>>> which
>>>>>> informs interesting parties that the checkpointed data is available
>>> for
>>>>>> immediate consumption from storage.
>>>>>>
>>>>>> ‹
>>>>>> Chetan
>>>>>>
>>>>>>
>>>>
>>>>
>>>
>>

Re: Why is Async checkpointing made default?

Posted by Thomas Weise <th...@datatorrent.com>.
I think we should focus on getting APEXCORE-78 done instead. It will
address a number of existing use cases, including those where we tried to
utilize checkpointed(...).


On Fri, Dec 11, 2015 at 9:36 AM, Chandni Singh <ch...@datatorrent.com>
wrote:

> Hi,
>
> There hasn't been a close bracket here (borrowing Ram's expression :-) ).
> From what I see the majority agrees with making the fix.
>
> Do we need to start a vote for this?
>
> On Fri, Nov 27, 2015 at 3:00 PM, Thomas Weise <th...@datatorrent.com>
> wrote:
>
> > Chetan,
> >
> > Would like to understand how the checkpointed callback helps you with
> what
> > you indicated. This may require a specific example. Let's take it
> offline.
> >
> > Thomas
> >
> > On Wed, Nov 25, 2015 at 4:07 PM, Chetan Narsude (cnarsude) <
> > cnarsude@cisco.com> wrote:
> >
> > > Yes - a few but cannot share the details - protected under NDA - ping
> me
> > > in private and I can probably be able to give you more generic details
> on
> > > similar cooked up examples.
> > >
> > >  The part that follows “e.g.” below is an example that probably is
> > > sufficient to infer the use case logically, I think. I shared that to
> > > exemplify how changing the semantics will break semver.
> > >
> > > —
> > > Chetan
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On 11/25/15, 3:51 PM, "Thomas Weise" <th...@datatorrent.com> wrote:
> > >
> > > >Do you have a specific example?
> > > >
> > > >I see this happening in committed(), but not in checkpointed() where
> the
> > > >checkpoint remains intermediate, whether it was copied to HDFS or not.
> > > >
> > > >
> > > >On Wed, Nov 25, 2015 at 3:42 PM, Chetan Narsude (cnarsude) <
> > > >cnarsude@cisco.com> wrote:
> > > >
> > > >> >
> > > >> >Until we have this, how about we restore the previous behavior
> > > >> >temporarily?
> > > >> >Calling checkpointed() immediately does not seem to pose any
> > practical
> > > >> >issue but ensures that the code that was written under this
> > assumption
> > > >>is
> > > >> >not broken.
> > > >>
> > > >> We can¹t do it. It would be incorrect. It breaks all the other code
> > that
> > > >> (unassumingly) correctly complied to the semantics. e.g. an operator
> > > >>which
> > > >> informs interesting parties that the checkpointed data is available
> > for
> > > >> immediate consumption from storage.
> > > >>
> > > >> ‹
> > > >> Chetan
> > > >>
> > > >>
> > >
> > >
> >
>

Re: Why is Async checkpointing made default?

Posted by Chandni Singh <ch...@datatorrent.com>.
Hi,

There hasn't been a close bracket here (borrowing Ram's expression :-) ).
>From what I see the majority agrees with making the fix.

Do we need to start a vote for this?

On Fri, Nov 27, 2015 at 3:00 PM, Thomas Weise <th...@datatorrent.com>
wrote:

> Chetan,
>
> Would like to understand how the checkpointed callback helps you with what
> you indicated. This may require a specific example. Let's take it offline.
>
> Thomas
>
> On Wed, Nov 25, 2015 at 4:07 PM, Chetan Narsude (cnarsude) <
> cnarsude@cisco.com> wrote:
>
> > Yes - a few but cannot share the details - protected under NDA - ping me
> > in private and I can probably be able to give you more generic details on
> > similar cooked up examples.
> >
> >  The part that follows “e.g.” below is an example that probably is
> > sufficient to infer the use case logically, I think. I shared that to
> > exemplify how changing the semantics will break semver.
> >
> > —
> > Chetan
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > On 11/25/15, 3:51 PM, "Thomas Weise" <th...@datatorrent.com> wrote:
> >
> > >Do you have a specific example?
> > >
> > >I see this happening in committed(), but not in checkpointed() where the
> > >checkpoint remains intermediate, whether it was copied to HDFS or not.
> > >
> > >
> > >On Wed, Nov 25, 2015 at 3:42 PM, Chetan Narsude (cnarsude) <
> > >cnarsude@cisco.com> wrote:
> > >
> > >> >
> > >> >Until we have this, how about we restore the previous behavior
> > >> >temporarily?
> > >> >Calling checkpointed() immediately does not seem to pose any
> practical
> > >> >issue but ensures that the code that was written under this
> assumption
> > >>is
> > >> >not broken.
> > >>
> > >> We can¹t do it. It would be incorrect. It breaks all the other code
> that
> > >> (unassumingly) correctly complied to the semantics. e.g. an operator
> > >>which
> > >> informs interesting parties that the checkpointed data is available
> for
> > >> immediate consumption from storage.
> > >>
> > >> ‹
> > >> Chetan
> > >>
> > >>
> >
> >
>

Re: Why is Async checkpointing made default?

Posted by Thomas Weise <th...@datatorrent.com>.
Chetan,

Would like to understand how the checkpointed callback helps you with what
you indicated. This may require a specific example. Let's take it offline.

Thomas

On Wed, Nov 25, 2015 at 4:07 PM, Chetan Narsude (cnarsude) <
cnarsude@cisco.com> wrote:

> Yes - a few but cannot share the details - protected under NDA - ping me
> in private and I can probably be able to give you more generic details on
> similar cooked up examples.
>
>  The part that follows “e.g.” below is an example that probably is
> sufficient to infer the use case logically, I think. I shared that to
> exemplify how changing the semantics will break semver.
>
> —
> Chetan
>
>
>
>
>
>
>
>
>
>
>
> On 11/25/15, 3:51 PM, "Thomas Weise" <th...@datatorrent.com> wrote:
>
> >Do you have a specific example?
> >
> >I see this happening in committed(), but not in checkpointed() where the
> >checkpoint remains intermediate, whether it was copied to HDFS or not.
> >
> >
> >On Wed, Nov 25, 2015 at 3:42 PM, Chetan Narsude (cnarsude) <
> >cnarsude@cisco.com> wrote:
> >
> >> >
> >> >Until we have this, how about we restore the previous behavior
> >> >temporarily?
> >> >Calling checkpointed() immediately does not seem to pose any practical
> >> >issue but ensures that the code that was written under this assumption
> >>is
> >> >not broken.
> >>
> >> We can¹t do it. It would be incorrect. It breaks all the other code that
> >> (unassumingly) correctly complied to the semantics. e.g. an operator
> >>which
> >> informs interesting parties that the checkpointed data is available for
> >> immediate consumption from storage.
> >>
> >> ‹
> >> Chetan
> >>
> >>
>
>

Re: Why is Async checkpointing made default?

Posted by Gaurav Gupta <ga...@datatorrent.com>.
Tim,

If the state is stored to local disk, other parties need to be on same node which may not be the case. If state is in hdfs it can be accessed from all the nodes in a cluster.

Thanks
- Gaurav

> On Nov 25, 2015, at 11:33 PM, Timothy Farkas <ti...@datatorrent.com> wrote:
> 
> Chetan your use case is not valid, if checkpointed is called after the
> operator state is stored to local disk a similar storage function could be
> performed. It is not necessary to wait for that same state to be
> asynchronously moved to hdfs. Please provide an example of a valid use case
> :)
> 
> +1 for fixing this bug/regression as Thomas suggested.
> 
> On Wed, Nov 25, 2015 at 5:35 PM, Chandni Singh <ch...@datatorrent.com>
> wrote:
> 
>> Semver was broken with Async checkpointing. The behavior was changed as
>> pointed out before in the discussion. Also making it difficult for operator
>> developer doesn't give us anything.
>> 
>> +1 for fixing it in the way Thomas suggested.
>> 
>> On Wed, Nov 25, 2015 at 4:07 PM, Chetan Narsude (cnarsude) <
>> cnarsude@cisco.com> wrote:
>> 
>>> Yes - a few but cannot share the details - protected under NDA - ping me
>>> in private and I can probably be able to give you more generic details on
>>> similar cooked up examples.
>>> 
>>> The part that follows “e.g.” below is an example that probably is
>>> sufficient to infer the use case logically, I think. I shared that to
>>> exemplify how changing the semantics will break semver.
>>> 
>>> —
>>> Chetan
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> On 11/25/15, 3:51 PM, "Thomas Weise" <th...@datatorrent.com> wrote:
>>> 
>>>> Do you have a specific example?
>>>> 
>>>> I see this happening in committed(), but not in checkpointed() where the
>>>> checkpoint remains intermediate, whether it was copied to HDFS or not.
>>>> 
>>>> 
>>>> On Wed, Nov 25, 2015 at 3:42 PM, Chetan Narsude (cnarsude) <
>>>> cnarsude@cisco.com> wrote:
>>>> 
>>>>>> 
>>>>>> Until we have this, how about we restore the previous behavior
>>>>>> temporarily?
>>>>>> Calling checkpointed() immediately does not seem to pose any
>> practical
>>>>>> issue but ensures that the code that was written under this
>> assumption
>>>>> is
>>>>>> not broken.
>>>>> 
>>>>> We can¹t do it. It would be incorrect. It breaks all the other code
>> that
>>>>> (unassumingly) correctly complied to the semantics. e.g. an operator
>>>>> which
>>>>> informs interesting parties that the checkpointed data is available
>> for
>>>>> immediate consumption from storage.
>>>>> 
>>>>> ‹
>>>>> Chetan
>>>>> 
>>>>> 
>>> 
>>> 
>> 


Re: Why is Async checkpointing made default?

Posted by Chandni Singh <ch...@datatorrent.com>.
That is simple to answer. The trigger to external systems can be done
Storage Agent instead by the operator.
As I mentioned, the benefit of it that it will be done asynchronously  just
after copy to hdfs is done.

Chetan can change the code because it seems he recently did this or working
on it.

Pardon me for repeating this time and again, that backward compatibility
was broken when async checkpointing was introduced. Certain behavior
existed for 3 years and then was broken. This needs to be fixed.

Chandni

On Thu, Nov 26, 2015 at 12:08 AM, Gaurav Gupta <ga...@datatorrent.com>
wrote:

> Tim,
>
> The trigger to external systems is send by the operator in checkpointed()
> call back and not by the Storage Agent. Not sure how suggested solution
> will solve Chetan’s use case.
>
> Thanks
> - Gaurav
>
> > On Nov 25, 2015, at 11:58 PM, Timothy Farkas <ti...@datatorrent.com>
> wrote:
> >
> > Gaurav,
> >
> > Chandni's method would address your point. Or you can copy the state
> > wherever you want (even asynchronously) from the checkpointed callback.
> >
> > On Wed, Nov 25, 2015 at 11:47 PM, Chandni Singh <chandni@datatorrent.com
> >
> > wrote:
> >
> >> Another approach for Chetan's use case can be to extend
> AsyncFSStorageAgent
> >> and perform the function after copy to hdfs is completed.
> >> Benefit is that the function will be performed asynchronously (with
> copy to
> >> hdfs) and will not block operator's thread.
> >>
> >> Chandni
> >>
> >> On Wed, Nov 25, 2015 at 11:33 PM, Timothy Farkas <ti...@datatorrent.com>
> >> wrote:
> >>
> >>> Chetan your use case is not valid, if checkpointed is called after the
> >>> operator state is stored to local disk a similar storage function could
> >> be
> >>> performed. It is not necessary to wait for that same state to be
> >>> asynchronously moved to hdfs. Please provide an example of a valid use
> >> case
> >>> :)
> >>>
> >>> +1 for fixing this bug/regression as Thomas suggested.
> >>>
> >>> On Wed, Nov 25, 2015 at 5:35 PM, Chandni Singh <
> chandni@datatorrent.com>
> >>> wrote:
> >>>
> >>>> Semver was broken with Async checkpointing. The behavior was changed
> as
> >>>> pointed out before in the discussion. Also making it difficult for
> >>> operator
> >>>> developer doesn't give us anything.
> >>>>
> >>>> +1 for fixing it in the way Thomas suggested.
> >>>>
> >>>> On Wed, Nov 25, 2015 at 4:07 PM, Chetan Narsude (cnarsude) <
> >>>> cnarsude@cisco.com> wrote:
> >>>>
> >>>>> Yes - a few but cannot share the details - protected under NDA - ping
> >>> me
> >>>>> in private and I can probably be able to give you more generic
> >> details
> >>> on
> >>>>> similar cooked up examples.
> >>>>>
> >>>>> The part that follows “e.g.” below is an example that probably is
> >>>>> sufficient to infer the use case logically, I think. I shared that to
> >>>>> exemplify how changing the semantics will break semver.
> >>>>>
> >>>>> —
> >>>>> Chetan
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> On 11/25/15, 3:51 PM, "Thomas Weise" <th...@datatorrent.com> wrote:
> >>>>>
> >>>>>> Do you have a specific example?
> >>>>>>
> >>>>>> I see this happening in committed(), but not in checkpointed() where
> >>> the
> >>>>>> checkpoint remains intermediate, whether it was copied to HDFS or
> >> not.
> >>>>>>
> >>>>>>
> >>>>>> On Wed, Nov 25, 2015 at 3:42 PM, Chetan Narsude (cnarsude) <
> >>>>>> cnarsude@cisco.com> wrote:
> >>>>>>
> >>>>>>>>
> >>>>>>>> Until we have this, how about we restore the previous behavior
> >>>>>>>> temporarily?
> >>>>>>>> Calling checkpointed() immediately does not seem to pose any
> >>>> practical
> >>>>>>>> issue but ensures that the code that was written under this
> >>>> assumption
> >>>>>>> is
> >>>>>>>> not broken.
> >>>>>>>
> >>>>>>> We can¹t do it. It would be incorrect. It breaks all the other
> >> code
> >>>> that
> >>>>>>> (unassumingly) correctly complied to the semantics. e.g. an
> >> operator
> >>>>>>> which
> >>>>>>> informs interesting parties that the checkpointed data is
> >> available
> >>>> for
> >>>>>>> immediate consumption from storage.
> >>>>>>>
> >>>>>>> ‹
> >>>>>>> Chetan
> >>>>>>>
> >>>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>
>
>

Re: Why is Async checkpointing made default?

Posted by Gaurav Gupta <ga...@datatorrent.com>.
Tim,

The trigger to external systems is send by the operator in checkpointed() call back and not by the Storage Agent. Not sure how suggested solution will solve Chetan’s use case.

Thanks
- Gaurav

> On Nov 25, 2015, at 11:58 PM, Timothy Farkas <ti...@datatorrent.com> wrote:
> 
> Gaurav,
> 
> Chandni's method would address your point. Or you can copy the state
> wherever you want (even asynchronously) from the checkpointed callback.
> 
> On Wed, Nov 25, 2015 at 11:47 PM, Chandni Singh <ch...@datatorrent.com>
> wrote:
> 
>> Another approach for Chetan's use case can be to extend AsyncFSStorageAgent
>> and perform the function after copy to hdfs is completed.
>> Benefit is that the function will be performed asynchronously (with copy to
>> hdfs) and will not block operator's thread.
>> 
>> Chandni
>> 
>> On Wed, Nov 25, 2015 at 11:33 PM, Timothy Farkas <ti...@datatorrent.com>
>> wrote:
>> 
>>> Chetan your use case is not valid, if checkpointed is called after the
>>> operator state is stored to local disk a similar storage function could
>> be
>>> performed. It is not necessary to wait for that same state to be
>>> asynchronously moved to hdfs. Please provide an example of a valid use
>> case
>>> :)
>>> 
>>> +1 for fixing this bug/regression as Thomas suggested.
>>> 
>>> On Wed, Nov 25, 2015 at 5:35 PM, Chandni Singh <ch...@datatorrent.com>
>>> wrote:
>>> 
>>>> Semver was broken with Async checkpointing. The behavior was changed as
>>>> pointed out before in the discussion. Also making it difficult for
>>> operator
>>>> developer doesn't give us anything.
>>>> 
>>>> +1 for fixing it in the way Thomas suggested.
>>>> 
>>>> On Wed, Nov 25, 2015 at 4:07 PM, Chetan Narsude (cnarsude) <
>>>> cnarsude@cisco.com> wrote:
>>>> 
>>>>> Yes - a few but cannot share the details - protected under NDA - ping
>>> me
>>>>> in private and I can probably be able to give you more generic
>> details
>>> on
>>>>> similar cooked up examples.
>>>>> 
>>>>> The part that follows “e.g.” below is an example that probably is
>>>>> sufficient to infer the use case logically, I think. I shared that to
>>>>> exemplify how changing the semantics will break semver.
>>>>> 
>>>>> —
>>>>> Chetan
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> On 11/25/15, 3:51 PM, "Thomas Weise" <th...@datatorrent.com> wrote:
>>>>> 
>>>>>> Do you have a specific example?
>>>>>> 
>>>>>> I see this happening in committed(), but not in checkpointed() where
>>> the
>>>>>> checkpoint remains intermediate, whether it was copied to HDFS or
>> not.
>>>>>> 
>>>>>> 
>>>>>> On Wed, Nov 25, 2015 at 3:42 PM, Chetan Narsude (cnarsude) <
>>>>>> cnarsude@cisco.com> wrote:
>>>>>> 
>>>>>>>> 
>>>>>>>> Until we have this, how about we restore the previous behavior
>>>>>>>> temporarily?
>>>>>>>> Calling checkpointed() immediately does not seem to pose any
>>>> practical
>>>>>>>> issue but ensures that the code that was written under this
>>>> assumption
>>>>>>> is
>>>>>>>> not broken.
>>>>>>> 
>>>>>>> We can¹t do it. It would be incorrect. It breaks all the other
>> code
>>>> that
>>>>>>> (unassumingly) correctly complied to the semantics. e.g. an
>> operator
>>>>>>> which
>>>>>>> informs interesting parties that the checkpointed data is
>> available
>>>> for
>>>>>>> immediate consumption from storage.
>>>>>>> 
>>>>>>> ‹
>>>>>>> Chetan
>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>>> 
>>> 
>> 


Re: Why is Async checkpointing made default?

Posted by Timothy Farkas <ti...@datatorrent.com>.
Gaurav,

Chandni's method would address your point. Or you can copy the state
wherever you want (even asynchronously) from the checkpointed callback.

On Wed, Nov 25, 2015 at 11:47 PM, Chandni Singh <ch...@datatorrent.com>
wrote:

> Another approach for Chetan's use case can be to extend AsyncFSStorageAgent
> and perform the function after copy to hdfs is completed.
> Benefit is that the function will be performed asynchronously (with copy to
> hdfs) and will not block operator's thread.
>
> Chandni
>
> On Wed, Nov 25, 2015 at 11:33 PM, Timothy Farkas <ti...@datatorrent.com>
> wrote:
>
> > Chetan your use case is not valid, if checkpointed is called after the
> > operator state is stored to local disk a similar storage function could
> be
> > performed. It is not necessary to wait for that same state to be
> > asynchronously moved to hdfs. Please provide an example of a valid use
> case
> > :)
> >
> > +1 for fixing this bug/regression as Thomas suggested.
> >
> > On Wed, Nov 25, 2015 at 5:35 PM, Chandni Singh <ch...@datatorrent.com>
> > wrote:
> >
> > > Semver was broken with Async checkpointing. The behavior was changed as
> > > pointed out before in the discussion. Also making it difficult for
> > operator
> > > developer doesn't give us anything.
> > >
> > > +1 for fixing it in the way Thomas suggested.
> > >
> > > On Wed, Nov 25, 2015 at 4:07 PM, Chetan Narsude (cnarsude) <
> > > cnarsude@cisco.com> wrote:
> > >
> > > > Yes - a few but cannot share the details - protected under NDA - ping
> > me
> > > > in private and I can probably be able to give you more generic
> details
> > on
> > > > similar cooked up examples.
> > > >
> > > >  The part that follows “e.g.” below is an example that probably is
> > > > sufficient to infer the use case logically, I think. I shared that to
> > > > exemplify how changing the semantics will break semver.
> > > >
> > > > —
> > > > Chetan
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On 11/25/15, 3:51 PM, "Thomas Weise" <th...@datatorrent.com> wrote:
> > > >
> > > > >Do you have a specific example?
> > > > >
> > > > >I see this happening in committed(), but not in checkpointed() where
> > the
> > > > >checkpoint remains intermediate, whether it was copied to HDFS or
> not.
> > > > >
> > > > >
> > > > >On Wed, Nov 25, 2015 at 3:42 PM, Chetan Narsude (cnarsude) <
> > > > >cnarsude@cisco.com> wrote:
> > > > >
> > > > >> >
> > > > >> >Until we have this, how about we restore the previous behavior
> > > > >> >temporarily?
> > > > >> >Calling checkpointed() immediately does not seem to pose any
> > > practical
> > > > >> >issue but ensures that the code that was written under this
> > > assumption
> > > > >>is
> > > > >> >not broken.
> > > > >>
> > > > >> We can¹t do it. It would be incorrect. It breaks all the other
> code
> > > that
> > > > >> (unassumingly) correctly complied to the semantics. e.g. an
> operator
> > > > >>which
> > > > >> informs interesting parties that the checkpointed data is
> available
> > > for
> > > > >> immediate consumption from storage.
> > > > >>
> > > > >> ‹
> > > > >> Chetan
> > > > >>
> > > > >>
> > > >
> > > >
> > >
> >
>

Re: Why is Async checkpointing made default?

Posted by Chandni Singh <ch...@datatorrent.com>.
Another approach for Chetan's use case can be to extend AsyncFSStorageAgent
and perform the function after copy to hdfs is completed.
Benefit is that the function will be performed asynchronously (with copy to
hdfs) and will not block operator's thread.

Chandni

On Wed, Nov 25, 2015 at 11:33 PM, Timothy Farkas <ti...@datatorrent.com>
wrote:

> Chetan your use case is not valid, if checkpointed is called after the
> operator state is stored to local disk a similar storage function could be
> performed. It is not necessary to wait for that same state to be
> asynchronously moved to hdfs. Please provide an example of a valid use case
> :)
>
> +1 for fixing this bug/regression as Thomas suggested.
>
> On Wed, Nov 25, 2015 at 5:35 PM, Chandni Singh <ch...@datatorrent.com>
> wrote:
>
> > Semver was broken with Async checkpointing. The behavior was changed as
> > pointed out before in the discussion. Also making it difficult for
> operator
> > developer doesn't give us anything.
> >
> > +1 for fixing it in the way Thomas suggested.
> >
> > On Wed, Nov 25, 2015 at 4:07 PM, Chetan Narsude (cnarsude) <
> > cnarsude@cisco.com> wrote:
> >
> > > Yes - a few but cannot share the details - protected under NDA - ping
> me
> > > in private and I can probably be able to give you more generic details
> on
> > > similar cooked up examples.
> > >
> > >  The part that follows “e.g.” below is an example that probably is
> > > sufficient to infer the use case logically, I think. I shared that to
> > > exemplify how changing the semantics will break semver.
> > >
> > > —
> > > Chetan
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On 11/25/15, 3:51 PM, "Thomas Weise" <th...@datatorrent.com> wrote:
> > >
> > > >Do you have a specific example?
> > > >
> > > >I see this happening in committed(), but not in checkpointed() where
> the
> > > >checkpoint remains intermediate, whether it was copied to HDFS or not.
> > > >
> > > >
> > > >On Wed, Nov 25, 2015 at 3:42 PM, Chetan Narsude (cnarsude) <
> > > >cnarsude@cisco.com> wrote:
> > > >
> > > >> >
> > > >> >Until we have this, how about we restore the previous behavior
> > > >> >temporarily?
> > > >> >Calling checkpointed() immediately does not seem to pose any
> > practical
> > > >> >issue but ensures that the code that was written under this
> > assumption
> > > >>is
> > > >> >not broken.
> > > >>
> > > >> We can¹t do it. It would be incorrect. It breaks all the other code
> > that
> > > >> (unassumingly) correctly complied to the semantics. e.g. an operator
> > > >>which
> > > >> informs interesting parties that the checkpointed data is available
> > for
> > > >> immediate consumption from storage.
> > > >>
> > > >> ‹
> > > >> Chetan
> > > >>
> > > >>
> > >
> > >
> >
>

Re: Why is Async checkpointing made default?

Posted by Timothy Farkas <ti...@datatorrent.com>.
Chetan your use case is not valid, if checkpointed is called after the
operator state is stored to local disk a similar storage function could be
performed. It is not necessary to wait for that same state to be
asynchronously moved to hdfs. Please provide an example of a valid use case
:)

+1 for fixing this bug/regression as Thomas suggested.

On Wed, Nov 25, 2015 at 5:35 PM, Chandni Singh <ch...@datatorrent.com>
wrote:

> Semver was broken with Async checkpointing. The behavior was changed as
> pointed out before in the discussion. Also making it difficult for operator
> developer doesn't give us anything.
>
> +1 for fixing it in the way Thomas suggested.
>
> On Wed, Nov 25, 2015 at 4:07 PM, Chetan Narsude (cnarsude) <
> cnarsude@cisco.com> wrote:
>
> > Yes - a few but cannot share the details - protected under NDA - ping me
> > in private and I can probably be able to give you more generic details on
> > similar cooked up examples.
> >
> >  The part that follows “e.g.” below is an example that probably is
> > sufficient to infer the use case logically, I think. I shared that to
> > exemplify how changing the semantics will break semver.
> >
> > —
> > Chetan
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > On 11/25/15, 3:51 PM, "Thomas Weise" <th...@datatorrent.com> wrote:
> >
> > >Do you have a specific example?
> > >
> > >I see this happening in committed(), but not in checkpointed() where the
> > >checkpoint remains intermediate, whether it was copied to HDFS or not.
> > >
> > >
> > >On Wed, Nov 25, 2015 at 3:42 PM, Chetan Narsude (cnarsude) <
> > >cnarsude@cisco.com> wrote:
> > >
> > >> >
> > >> >Until we have this, how about we restore the previous behavior
> > >> >temporarily?
> > >> >Calling checkpointed() immediately does not seem to pose any
> practical
> > >> >issue but ensures that the code that was written under this
> assumption
> > >>is
> > >> >not broken.
> > >>
> > >> We can¹t do it. It would be incorrect. It breaks all the other code
> that
> > >> (unassumingly) correctly complied to the semantics. e.g. an operator
> > >>which
> > >> informs interesting parties that the checkpointed data is available
> for
> > >> immediate consumption from storage.
> > >>
> > >> ‹
> > >> Chetan
> > >>
> > >>
> >
> >
>

Re: Why is Async checkpointing made default?

Posted by Chandni Singh <ch...@datatorrent.com>.
Semver was broken with Async checkpointing. The behavior was changed as
pointed out before in the discussion. Also making it difficult for operator
developer doesn't give us anything.

+1 for fixing it in the way Thomas suggested.

On Wed, Nov 25, 2015 at 4:07 PM, Chetan Narsude (cnarsude) <
cnarsude@cisco.com> wrote:

> Yes - a few but cannot share the details - protected under NDA - ping me
> in private and I can probably be able to give you more generic details on
> similar cooked up examples.
>
>  The part that follows “e.g.” below is an example that probably is
> sufficient to infer the use case logically, I think. I shared that to
> exemplify how changing the semantics will break semver.
>
> —
> Chetan
>
>
>
>
>
>
>
>
>
>
>
> On 11/25/15, 3:51 PM, "Thomas Weise" <th...@datatorrent.com> wrote:
>
> >Do you have a specific example?
> >
> >I see this happening in committed(), but not in checkpointed() where the
> >checkpoint remains intermediate, whether it was copied to HDFS or not.
> >
> >
> >On Wed, Nov 25, 2015 at 3:42 PM, Chetan Narsude (cnarsude) <
> >cnarsude@cisco.com> wrote:
> >
> >> >
> >> >Until we have this, how about we restore the previous behavior
> >> >temporarily?
> >> >Calling checkpointed() immediately does not seem to pose any practical
> >> >issue but ensures that the code that was written under this assumption
> >>is
> >> >not broken.
> >>
> >> We can¹t do it. It would be incorrect. It breaks all the other code that
> >> (unassumingly) correctly complied to the semantics. e.g. an operator
> >>which
> >> informs interesting parties that the checkpointed data is available for
> >> immediate consumption from storage.
> >>
> >> ‹
> >> Chetan
> >>
> >>
>
>

Re: Why is Async checkpointing made default?

Posted by "Chetan Narsude (cnarsude)" <cn...@cisco.com>.
Yes - a few but cannot share the details - protected under NDA - ping me
in private and I can probably be able to give you more generic details on
similar cooked up examples.

 The part that follows “e.g.” below is an example that probably is
sufficient to infer the use case logically, I think. I shared that to
exemplify how changing the semantics will break semver.

—
Chetan











On 11/25/15, 3:51 PM, "Thomas Weise" <th...@datatorrent.com> wrote:

>Do you have a specific example?
>
>I see this happening in committed(), but not in checkpointed() where the
>checkpoint remains intermediate, whether it was copied to HDFS or not.
>
>
>On Wed, Nov 25, 2015 at 3:42 PM, Chetan Narsude (cnarsude) <
>cnarsude@cisco.com> wrote:
>
>> >
>> >Until we have this, how about we restore the previous behavior
>> >temporarily?
>> >Calling checkpointed() immediately does not seem to pose any practical
>> >issue but ensures that the code that was written under this assumption
>>is
>> >not broken.
>>
>> We can¹t do it. It would be incorrect. It breaks all the other code that
>> (unassumingly) correctly complied to the semantics. e.g. an operator
>>which
>> informs interesting parties that the checkpointed data is available for
>> immediate consumption from storage.
>>
>> ‹
>> Chetan
>>
>>


Re: Why is Async checkpointing made default?

Posted by Thomas Weise <th...@datatorrent.com>.
Do you have a specific example?

I see this happening in committed(), but not in checkpointed() where the
checkpoint remains intermediate, whether it was copied to HDFS or not.


On Wed, Nov 25, 2015 at 3:42 PM, Chetan Narsude (cnarsude) <
cnarsude@cisco.com> wrote:

> >
> >Until we have this, how about we restore the previous behavior
> >temporarily?
> >Calling checkpointed() immediately does not seem to pose any practical
> >issue but ensures that the code that was written under this assumption is
> >not broken.
>
> We can¹t do it. It would be incorrect. It breaks all the other code that
> (unassumingly) correctly complied to the semantics. e.g. an operator which
> informs interesting parties that the checkpointed data is available for
> immediate consumption from storage.
>
> ‹
> Chetan
>
>

Re: Why is Async checkpointing made default?

Posted by "Chetan Narsude (cnarsude)" <cn...@cisco.com>.
>
>Until we have this, how about we restore the previous behavior
>temporarily?
>Calling checkpointed() immediately does not seem to pose any practical
>issue but ensures that the code that was written under this assumption is
>not broken.

We can¹t do it. It would be incorrect. It breaks all the other code that
(unassumingly) correctly complied to the semantics. e.g. an operator which
informs interesting parties that the checkpointed data is available for
immediate consumption from storage.

‹
Chetan


Re: Why is Async checkpointing made default?

Posted by Thomas Weise <th...@datatorrent.com>.
Discarding state that was checkpointed is a good example how the callback
can be used. I don't see a reason to wait until the asynchronous copy to
HDFS has completed to implement this? In fact, with the guarantee that the
call occurs before the next beginWindow, it is easier to implement in the
operator, because there is no need to track which portion of state belongs
to which window.

Perhaps we can define new checkpointing related callbacks with actual use
cases in mind (now we have a few). Then we can extend the existing listener
interface so they remain optional for existing listener code but those
operators that require different semantics can take advantage of the
support.

Until we have this, how about we restore the previous behavior temporarily?
Calling checkpointed() immediately does not seem to pose any practical
issue but ensures that the code that was written under this assumption is
not broken.

Thomas





On Mon, Nov 23, 2015 at 12:12 PM, Pramod Immaneni <pr...@datatorrent.com>
wrote:

> Tim,
>
> If the operator is such that I can discard the state from the previous
> checkpoint to the current checkpoint on the checkpointed callback then the
> state can be reconstructed by the incoming data on replay and I don't need
> to wait till committed to remove the un-wanted state. Its more efficient
> memory wise.
>
> Thanks
>
> On Mon, Nov 23, 2015 at 11:17 AM, Timothy Farkas <ti...@datatorrent.com>
> wrote:
>
> > Pramod,
> >
> > Very good point about checkpointed being called within a window boundary.
> > But I still don't understand the purpose of calling checkpointed after
> the
> > asynchronous copy to HDFS is completed. It makes checkpointed
> > implementations more complex, and it does not provide any benefits. The
> > checkpointed implementation logic could be called multiple times for the
> > same window if the operator fails and is restored to an earlier
> checkpoint,
> > this fact invalidates any atomicity gaurantees that you are trying to
> > preserve. I would like a concrete example of when it is necessary to call
> > checkpointed after the asynchronous copy to hdfs. The only use case I can
> > think of is if the operator reads its own checkpointed state from hdfs in
> > the checkpointed call back, but this use case is not a valid use case.
> >
> > Tim
> >
> > On Mon, Nov 23, 2015 at 10:56 AM, Pramod Immaneni <
> pramod@datatorrent.com>
> > wrote:
> >
> > > There is a problem I see with the older way in which checkpointed was
> > > called as well which is relevant to this. It is called outside of
> window
> > > boundary between end window and begin window which I don't think is the
> > > correct as platform should not run operator business logic outside of
> > > window boundaries. The operator could end up sending tuples
> accidentally
> > in
> > > this call for example.
> > >
> > > I think we need to fix that and also not assume that checkpointed
> > callback
> > > will be called immediately after end window of the window being
> > > checkpointed.
> > >
> > > Thanks
> > >
> > > On Mon, Nov 23, 2015 at 10:38 AM, Chandni Singh <
> chandni@datatorrent.com
> > >
> > > wrote:
> > >
> > > > :-) I did not write it that semantics. Looks like someone made that
> > > > assumption along the way. Fix the code which assumes that.
> > > >
> > > > What  do you mean?
> > > > We go on fixing all the code which was written because the behavior
> > with
> > > > Synchronous checkpointing was
> > > > beginWindow(x) -> endWindow -> checkpointWindow(x)   ( when
> > checkpointing
> > > > is aligned with application window boundary)
> > > >
> > > > This was broken recently with async checkpointing but instead of
> fixing
> > > > that, we go on fixing all the existing solutions.
> > > > I guess I find this ridiculous.
> > > >
> > > > And why should we make it harder and harder to write operators?
> > > > I think majority of people who participated in this discussion do
> > believe
> > > > that we need to fix and I think this is critical.
> > > >
> > > > Chandni
> > > >
> > > >
> > > >
> > > > On Mon, Nov 23, 2015 at 10:16 AM, Chetan Narsude (cnarsude) <
> > > > cnarsude@cisco.com> wrote:
> > > >
> > > > >
> > > > >
> > > > > On 11/23/15, 8:44 AM, "Chandni Singh" <ch...@datatorrent.com>
> > wrote:
> > > > >
> > > > > >I think in the API, there is windowId in the checkpointed callback
> > for
> > > > > >cases when checkpointing could happen within application windows.
> > > > > >
> > > > > >IMHO backward compatibility is broken. For 3 years since the
> > platform
> > > > was
> > > > > >created the semantics of checkpointed were
> > > > > >beginWindow(x) -> endwindow -> checkpointWindow(x) when
> > checkpointing
> > > > was
> > > > > >aligned with application window boundaries.
> > > > > >
> > > > >
> > > > > :-) I did not write it that semantics. Looks like someone made that
> > > > > assumption along the way. Fix the code which assumes that.
> > > > >
> > > > > ‹
> > > > > Chetan
> > > > >
> > > > > >Please not that aligning Checkpointing with Application Window
> > > boundary
> > > > is
> > > > > >the DEFAUL behavior and I think most of us agree that aligning the
> > > > > >checkpointing with application window boundaries is what we see in
> > > every
> > > > > >use case.
> > > > > >
> > > > > >Code was written with that assumption by committers of Apex (not
> > even
> > > by
> > > > > >people who are new to Apex). It is broken by by a change
> introduced
> > > > couple
> > > > > >of months back (August 2015).
> > > > > >
> > > > > >Moreover, what do we achieve by NOT guaranteeing that semantics-
> > > > delegate
> > > > > >the complexity to operators to handle it. Even a simple scenario
> > that
> > > I
> > > > > >mentioned in my first mail, is very complicated.
> > > > > >
> > > > > >I think this is a big issue and we need to fix it.
> > > > > >
> > > > > >Chandni
> > > > > >
> > > > > >On Mon, Nov 23, 2015 at 7:12 AM, Munagala Ramanath <
> > > ram@datatorrent.com
> > > > >
> > > > > >wrote:
> > > > > >
> > > > > >> I too find Gaurav's argument cogent: endWindow() does not take a
> > > > > >> windowId parameter
> > > > > >> leading to a natural guarantee that it matches the immediately
> > > > > >> preceding beginWindow().
> > > > > >>
> > > > > >> Both committed() and checkpointed() take that parameter which
> > > suggests
> > > > > >>it
> > > > > >> may
> > > > > >> lag behind the current window. The comments on those methods say
> > > > nothing
> > > > > >> that can be be interpreted as a guarantee that the windowId will
> > > match
> > > > > >>the
> > > > > >> window just processed. So, from the viewpoint of "original
> intent"
> > > --
> > > > > >> a phrase that
> > > > > >> has a long and storied history in jurisprudence
> > > > > >> (https://en.wikipedia.org/wiki/Original_intent) --
> > > > > >> it seems that any code that assumed a guarantee when none
> existed
> > > must
> > > > > >> be regarded
> > > > > >> as erroneous.
> > > > > >>
> > > > > >> Having said that, we are all aware that in software it is not at
> > all
> > > > > >> unusual to preserve behavior
> > > > > >> in the interests of backward compatibility, regardless of many
> > other
> > > > > >> reasons for that behavior
> > > > > >> to be considered objectionable. So, if the cost of fixing all
> the
> > > code
> > > > > >> that makes that
> > > > > >> assumption is too high, we should seriously consider reverting
> it.
> > > In
> > > > > >> this context, the
> > > > > >> guarantee that checkpointed() simply means that the operator
> state
> > > has
> > > > > >> been serialized
> > > > > >> seems adequate.
> > > > > >>
> > > > > >> We have a roughly analogous situation with respect to OS write()
> > > > calls:
> > > > > >> When the call returns, we _do not_ have an assurance that the
> data
> > > has
> > > > > >> gone to disk.
> > > > > >> All we know is that the OS has copied the data to its buffers
> for
> > > > > >> flushing to disk. If we
> > > > > >> want to know when the actual write to disk is done, we need to
> > use a
> > > > > >> different call -- fsync().
> > > > > >>
> > > > > >> Ram
> > > > > >>
> > > > > >> On Mon, Nov 23, 2015 at 6:19 AM, Pramod Immaneni
> > > > > >><pr...@datatorrent.com>
> > > > > >> wrote:
> > > > > >> > I think the original intent for checkpointed callback was that
> > it
> > > > can
> > > > > >>be
> > > > > >> > called anytime after checkpoint and not necessarily
> immediately
> > > > after
> > > > > >>the
> > > > > >> > window prior of checkpoint. As Gaurav mentioned the API bears
> it
> > > > out.
> > > > > >> > Furthermore the callback has to be called within the lifecycle
> > > > > >>methods of
> > > > > >> > the operator so it will be called inside a window so you
> anyway
> > > have
> > > > > >>to
> > > > > >> > deal with new data anyway before the callback is called. Even
> > > though
> > > > > >> > checkpointed is not committed shouldn't it at least guarantee
> > that
> > > > the
> > > > > >> > operator is recoverable to that state in which case it should
> be
> > > > > >>called
> > > > > >> > after the save to HDFS actually finishes.
> > > > > >> >
> > > > > >> > Thanks
> > > > > >> >
> > > > > >> > On Mon, Nov 23, 2015 at 5:38 AM, Gaurav Gupta <
> > > > gaurav@datatorrent.com
> > > > > >
> > > > > >> > wrote:
> > > > > >> >
> > > > > >> >> IMO, I don¹t think there is any backward incompatibility wrt
> > > > > >> Checkpointing
> > > > > >> >> call back semantics because
> > > > > >> >>
> > > > > >> >> 1. The checkpointed call is only made once the operator state
> > is
> > > > > >> preserved.
> > > > > >> >> 2. The window ids being passed to checkpointed are in
> > increasing
> > > > > >>order.
> > > > > >> >> 3. The window ids being passed are still the same ids as were
> > > > passed
> > > > > >> >> earlier.
> > > > > >> >> 4. The sequence is still
> > > > begingWindow()->endWindow()->checkpointed().
> > > > > >> >>
> > > > > >> >> Thanks
> > > > > >> >> - Gaurav
> > > > > >> >>
> > > > > >> >> > On Nov 23, 2015, at 1:03 AM, Gaurav Gupta <
> > > > gaurav@datatorrent.com>
> > > > > >> >> wrote:
> > > > > >> >> >
> > > > > >> >> > If the requirement is that the order is always
> > > > > >> >> begingWindow()->endWindow()->checkpointed(), why to pass
> > windowId
> > > > in
> > > > > >>the
> > > > > >> >> checkpointed() call back?
> > > > > >> >> >
> > > > > >> >> > Thanks
> > > > > >> >> > - Gaurav
> > > > > >> >> >
> > > > > >> >> >> On Nov 22, 2015, at 11:22 PM, Chandni Singh
> > > > > >><chandni@datatorrent.com
> > > > > >> >> <ma...@datatorrent.com>> wrote:
> > > > > >> >> >>
> > > > > >> >> >> FYI,
> > > > > >> >> >>
> > > > > >> >> >> HDHTWriter implementation is dependent on the older
> > semantics
> > > > and
> > > > > >> seems
> > > > > >> >> to
> > > > > >> >> >> be broken now.
> > > > > >> >> >> startWindow(x) -> endWindow(x) -> checkpointed(x)
> > > > > >> >> >> In the checkpointed implementation, it copies certain
> state
> > > > > >> (transient)
> > > > > >> >> and
> > > > > >> >> >> transfers it to a checkpointedWriteCache with respect to
> > > window
> > > > > >>'x'.
> > > > > >> >> >>
> > > > > >> >> >> With Async checkpointing it, the state that is transferred
> > is
> > > > much
> > > > > >> more
> > > > > >> >> >> recent than window 'x'.
> > > > > >> >> >>
> > > > > >> >> >> Chandni
> > > > > >> >> >>
> > > > > >> >> >>
> > > > > >> >> >> On Sun, Nov 22, 2015 at 11:04 PM, Chandni Singh <
> > > > > >> >> chandni@datatorrent.com <ma...@datatorrent.com>>
> > > > > >> >> >> wrote:
> > > > > >> >> >>
> > > > > >> >> >>> Agreed. Thomas's solution fixes the backward
> > > incompatibility. I
> > > > > >> think
> > > > > >> >> we
> > > > > >> >> >>> really need to fix this.
> > > > > >> >> >>>
> > > > > >> >> >>> On Sun, Nov 22, 2015 at 10:23 PM, Timothy Farkas <
> > > > > >> tim@datatorrent.com
> > > > > >> >> <ma...@datatorrent.com>>
> > > > > >> >> >>> wrote:
> > > > > >> >> >>>
> > > > > >> >> >>>> Gaurav,
> > > > > >> >> >>>>
> > > > > >> >> >>>> I think if the state copy fails then STRAM should roll
> > back
> > > > the
> > > > > >> >> operator
> > > > > >> >> >>>> to
> > > > > >> >> >>>> a checkpoint that is further back than the last
> > checkpoint.
> > > If
> > > > > >>you
> > > > > >> are
> > > > > >> >> >>>> saying that you want to preserve the semantic that
> > > > checkpointed
> > > > > >>is
> > > > > >> >> only
> > > > > >> >> >>>> called after a checkpoint is completed, I would argue
> that
> > > > that
> > > > > >> >> guarantee
> > > > > >> >> >>>> is already pointless in the current implementation since
> > it
> > > is
> > > > > >> always
> > > > > >> >> >>>> possible for an operator to be rolled back to a
> checkpoint
> > > > > >>before
> > > > > >> it's
> > > > > >> >> >>>> last
> > > > > >> >> >>>> completed checkpoint. So, it is already currently
> possible
> > > for
> > > > > >>some
> > > > > >> >> >>>> database or file operation performed after a completed
> > > > > >>checkpoint
> > > > > >> to
> > > > > >> >> be
> > > > > >> >> >>>> redone after a failure. Because of this I think Thomas's
> > > > > >>solution
> > > > > >> >> makes
> > > > > >> >> >>>> the
> > > > > >> >> >>>> most sense. Thomas's solution would also address
> Chandni's
> > > > > >>original
> > > > > >> >> point
> > > > > >> >> >>>> that the semantics for the checkpointed call back have
> > been
> > > > > >> violated.
> > > > > >> >> >>>> There
> > > > > >> >> >>>> are operators in our libraries that have depended on the
> > > > > >> >> beginWindow(x),
> > > > > >> >> >>>> endWindow(x), and checkpointed(x) call sequence, which
> is
> > > now
> > > > > >> broken.
> > > > > >> >> We
> > > > > >> >> >>>> should probably fix that.
> > > > > >> >> >>>>
> > > > > >> >> >>>> Tim
> > > > > >> >> >>>>
> > > > > >> >> >>>> On Sun, Nov 22, 2015 at 10:02 PM, Gaurav Gupta <
> > > > > >> >> gaurav@datatorrent.com <ma...@datatorrent.com>>
> > > > > >> >> >>>> wrote:
> > > > > >> >> >>>>
> > > > > >> >> >>>>> Thomas,
> > > > > >> >> >>>>>
> > > > > >> >> >>>>> This was done to preserve checkpointing semantics that
> is
> > > to
> > > > > >>tell
> > > > > >> the
> > > > > >> >> >>>>> operator that its state is preserved. Say if database
> is
> > > > > >>updated
> > > > > >> or
> > > > > >> >> >>>> files
> > > > > >> >> >>>>> are moved in checkpointed call but the state copy
> fails,
> > > how
> > > > to
> > > > > >> >> address
> > > > > >> >> >>>>> such scenarios?
> > > > > >> >> >>>>>
> > > > > >> >> >>>>> Thanks
> > > > > >> >> >>>>> - Gaurav
> > > > > >> >> >>>>>
> > > > > >> >> >>>>>> On Nov 22, 2015, at 9:44 PM, Thomas Weise <
> > > > > >> thomas@datatorrent.com
> > > > > >> >> <ma...@datatorrent.com>>
> > > > > >> >> >>>>> wrote:
> > > > > >> >> >>>>>>
> > > > > >> >> >>>>>> Alternatively I would ask why the checkpointed
> callback
> > > > needs
> > > > > >>to
> > > > > >> >> wait
> > > > > >> >> >>>>> until
> > > > > >> >> >>>>>> the data was copied to HDFS instead upon completion of
> > the
> > > > > >>state
> > > > > >> >> >>>>>> serialization.
> > > > > >> >> >>>>>>
> > > > > >> >> >>>>>> Thomas
> > > > > >> >> >>>>>>
> > > > > >> >> >>>>>>
> > > > > >> >> >>>>>> On Sun, Nov 22, 2015 at 9:41 PM, Chandni Singh <
> > > > > >> >> >>>> chandni@datatorrent.com <mailto:chandni@datatorrent.com
> >>
> > > > > >> >> >>>>>> wrote:
> > > > > >> >> >>>>>>
> > > > > >> >> >>>>>>> Gaurav,
> > > > > >> >> >>>>>>>
> > > > > >> >> >>>>>>> My question is about why Async was made the default
> > when
> > > it
> > > > > >> changed
> > > > > >> >> >>>> the
> > > > > >> >> >>>>>>> semantics of operator callbacks. Your response
> doesn't
> > > > answer
> > > > > >> that.
> > > > > >> >> >>>>>>>
> > > > > >> >> >>>>>>> In a way we broke backward compatibility.
> > > > > >> >> >>>>>>>
> > > > > >> >> >>>>>>> Chandni
> > > > > >> >> >>>>>>>
> > > > > >> >> >>>>>>> On Sun, Nov 22, 2015 at 9:22 PM, Gaurav Gupta <
> > > > > >> >> >>>> gaurav@datatorrent.com <ma...@datatorrent.com>>
> > > > > >> >> >>>>>>> wrote:
> > > > > >> >> >>>>>>>
> > > > > >> >> >>>>>>>> The idea behind Async checkpointing is to unblock
> > > operator
> > > > > >> while
> > > > > >> >> the
> > > > > >> >> >>>>>>> state
> > > > > >> >> >>>>>>>> is getting transferred to HDFS.
> > > > > >> >> >>>>>>>> Just to clarify that this beginWindow (x) ->
> > > endWindow(x)
> > > > ->
> > > > > >> >> >>>>> checkpointed
> > > > > >> >> >>>>>>>> (x-1 ) should be an ideal sequence, but if the HDFS
> is
> > > > slow
> > > > > >>or
> > > > > >> for
> > > > > >> >> >>>> some
> > > > > >> >> >>>>>>>> other reason transferring the state to HDFS is slow
> > this
> > > > > >> sequence
> > > > > >> >> >>>> may
> > > > > >> >> >>>>> not
> > > > > >> >> >>>>>>>> hold true.
> > > > > >> >> >>>>>>>>
> > > > > >> >> >>>>>>>> Can your use case be addressed by
> > > > > >> >> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78 <
> > > > > >> >> https://malhar.atlassian.net/browse/APEX-78> <
> > > > > >> >> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78 <
> > > > > >> >> https://malhar.atlassian.net/browse/APEX-78>>?
> > > > > >> >> >>>>>>>>
> > > > > >> >> >>>>>>>> Thanks
> > > > > >> >> >>>>>>>> - Gaurav
> > > > > >> >> >>>>>>>>
> > > > > >> >> >>>>>>>>> On Nov 22, 2015, at 3:56 PM, Chandni Singh <
> > > > > >> >> >>>> chandni@datatorrent.com <mailto:chandni@datatorrent.com
> >>
> > > > > >> >> >>>>>>>> wrote:
> > > > > >> >> >>>>>>>>>
> > > > > >> >> >>>>>>>>> With Async checkpointing the checkpoint callback in
> > > > > >> >> CheckpointPoint
> > > > > >> >> >>>>>>>>> listener is called for a previous window, that is,
> > > > > >> >> >>>>>>>>> beginWindow (x) -> endWindow(x) -> checkpointed
> (x-1
> > )
> > > > > >> >> >>>>>>>>>
> > > > > >> >> >>>>>>>>> This feature was newly introduced. With synchronous
> > > > > >> >> checkpointing,
> > > > > >> >> >>>> the
> > > > > >> >> >>>>>>>>> behavior was always
> > > > > >> >> >>>>>>>>> beginWindow(x) -> endWindow(x) -> checkpointed (x)
> > > > > >> >> >>>>>>>>>
> > > > > >> >> >>>>>>>>> A lot of operators were written before asynchronous
> > > > > >> checkpointing
> > > > > >> >> >>>> was
> > > > > >> >> >>>>>>>>> introduced and few of them can rely on the
> sequencing
> > > > > >> guaranteed
> > > > > >> >> by
> > > > > >> >> >>>>>>>>> synchronous checkpointing.
> > > > > >> >> >>>>>>>>>
> > > > > >> >> >>>>>>>>> So why was Async Checkpointed made default?
> > > > > >> >> >>>>>>>>>
> > > > > >> >> >>>>>>>>> With how Async checkpoint is today, the complexity
> to
> > > > > >>handle
> > > > > >> >> >>>> transient
> > > > > >> >> >>>>>>>>> state in checkpointed callback falls on every
> > operator.
> > > > For
> > > > > >> eg,
> > > > > >> >> >>>> lets
> > > > > >> >> >>>>>>> say
> > > > > >> >> >>>>>>>>> earlier I had a transient map which I cleared every
> > > time
> > > > > >>the
> > > > > >> >> >>>>>>> checkpointed
> > > > > >> >> >>>>>>>>> was called, with async checkpointing this simple
> task
> > > > will
> > > > > >>be
> > > > > >> a
> > > > > >> >> lot
> > > > > >> >> >>>>>>> more
> > > > > >> >> >>>>>>>>> complicated.
> > > > > >> >> >>>>>>>>>
> > > > > >> >> >>>>>>>>> I think Async checkpointing broke the semantics of
> > > > operator
> > > > > >> >> >>>> callbacks
> > > > > >> >> >>>>>>> and
> > > > > >> >> >>>>>>>>> should NOT be the default.
> > > > > >> >> >>>>>>>>
> > > > > >> >> >>>>>>>>
> > > > > >> >> >>>>>>>
> > > > > >> >> >>>>>
> > > > > >> >> >>>>>
> > > > > >> >> >>>>
> > > > > >> >> >>>
> > > > > >> >> >>>
> > > > > >> >> >
> > > > > >> >>
> > > > > >> >>
> > > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Re: Why is Async checkpointing made default?

Posted by Pramod Immaneni <pr...@datatorrent.com>.
Tim,

If the operator is such that I can discard the state from the previous
checkpoint to the current checkpoint on the checkpointed callback then the
state can be reconstructed by the incoming data on replay and I don't need
to wait till committed to remove the un-wanted state. Its more efficient
memory wise.

Thanks

On Mon, Nov 23, 2015 at 11:17 AM, Timothy Farkas <ti...@datatorrent.com>
wrote:

> Pramod,
>
> Very good point about checkpointed being called within a window boundary.
> But I still don't understand the purpose of calling checkpointed after the
> asynchronous copy to HDFS is completed. It makes checkpointed
> implementations more complex, and it does not provide any benefits. The
> checkpointed implementation logic could be called multiple times for the
> same window if the operator fails and is restored to an earlier checkpoint,
> this fact invalidates any atomicity gaurantees that you are trying to
> preserve. I would like a concrete example of when it is necessary to call
> checkpointed after the asynchronous copy to hdfs. The only use case I can
> think of is if the operator reads its own checkpointed state from hdfs in
> the checkpointed call back, but this use case is not a valid use case.
>
> Tim
>
> On Mon, Nov 23, 2015 at 10:56 AM, Pramod Immaneni <pr...@datatorrent.com>
> wrote:
>
> > There is a problem I see with the older way in which checkpointed was
> > called as well which is relevant to this. It is called outside of window
> > boundary between end window and begin window which I don't think is the
> > correct as platform should not run operator business logic outside of
> > window boundaries. The operator could end up sending tuples accidentally
> in
> > this call for example.
> >
> > I think we need to fix that and also not assume that checkpointed
> callback
> > will be called immediately after end window of the window being
> > checkpointed.
> >
> > Thanks
> >
> > On Mon, Nov 23, 2015 at 10:38 AM, Chandni Singh <chandni@datatorrent.com
> >
> > wrote:
> >
> > > :-) I did not write it that semantics. Looks like someone made that
> > > assumption along the way. Fix the code which assumes that.
> > >
> > > What  do you mean?
> > > We go on fixing all the code which was written because the behavior
> with
> > > Synchronous checkpointing was
> > > beginWindow(x) -> endWindow -> checkpointWindow(x)   ( when
> checkpointing
> > > is aligned with application window boundary)
> > >
> > > This was broken recently with async checkpointing but instead of fixing
> > > that, we go on fixing all the existing solutions.
> > > I guess I find this ridiculous.
> > >
> > > And why should we make it harder and harder to write operators?
> > > I think majority of people who participated in this discussion do
> believe
> > > that we need to fix and I think this is critical.
> > >
> > > Chandni
> > >
> > >
> > >
> > > On Mon, Nov 23, 2015 at 10:16 AM, Chetan Narsude (cnarsude) <
> > > cnarsude@cisco.com> wrote:
> > >
> > > >
> > > >
> > > > On 11/23/15, 8:44 AM, "Chandni Singh" <ch...@datatorrent.com>
> wrote:
> > > >
> > > > >I think in the API, there is windowId in the checkpointed callback
> for
> > > > >cases when checkpointing could happen within application windows.
> > > > >
> > > > >IMHO backward compatibility is broken. For 3 years since the
> platform
> > > was
> > > > >created the semantics of checkpointed were
> > > > >beginWindow(x) -> endwindow -> checkpointWindow(x) when
> checkpointing
> > > was
> > > > >aligned with application window boundaries.
> > > > >
> > > >
> > > > :-) I did not write it that semantics. Looks like someone made that
> > > > assumption along the way. Fix the code which assumes that.
> > > >
> > > > ‹
> > > > Chetan
> > > >
> > > > >Please not that aligning Checkpointing with Application Window
> > boundary
> > > is
> > > > >the DEFAUL behavior and I think most of us agree that aligning the
> > > > >checkpointing with application window boundaries is what we see in
> > every
> > > > >use case.
> > > > >
> > > > >Code was written with that assumption by committers of Apex (not
> even
> > by
> > > > >people who are new to Apex). It is broken by by a change introduced
> > > couple
> > > > >of months back (August 2015).
> > > > >
> > > > >Moreover, what do we achieve by NOT guaranteeing that semantics-
> > > delegate
> > > > >the complexity to operators to handle it. Even a simple scenario
> that
> > I
> > > > >mentioned in my first mail, is very complicated.
> > > > >
> > > > >I think this is a big issue and we need to fix it.
> > > > >
> > > > >Chandni
> > > > >
> > > > >On Mon, Nov 23, 2015 at 7:12 AM, Munagala Ramanath <
> > ram@datatorrent.com
> > > >
> > > > >wrote:
> > > > >
> > > > >> I too find Gaurav's argument cogent: endWindow() does not take a
> > > > >> windowId parameter
> > > > >> leading to a natural guarantee that it matches the immediately
> > > > >> preceding beginWindow().
> > > > >>
> > > > >> Both committed() and checkpointed() take that parameter which
> > suggests
> > > > >>it
> > > > >> may
> > > > >> lag behind the current window. The comments on those methods say
> > > nothing
> > > > >> that can be be interpreted as a guarantee that the windowId will
> > match
> > > > >>the
> > > > >> window just processed. So, from the viewpoint of "original intent"
> > --
> > > > >> a phrase that
> > > > >> has a long and storied history in jurisprudence
> > > > >> (https://en.wikipedia.org/wiki/Original_intent) --
> > > > >> it seems that any code that assumed a guarantee when none existed
> > must
> > > > >> be regarded
> > > > >> as erroneous.
> > > > >>
> > > > >> Having said that, we are all aware that in software it is not at
> all
> > > > >> unusual to preserve behavior
> > > > >> in the interests of backward compatibility, regardless of many
> other
> > > > >> reasons for that behavior
> > > > >> to be considered objectionable. So, if the cost of fixing all the
> > code
> > > > >> that makes that
> > > > >> assumption is too high, we should seriously consider reverting it.
> > In
> > > > >> this context, the
> > > > >> guarantee that checkpointed() simply means that the operator state
> > has
> > > > >> been serialized
> > > > >> seems adequate.
> > > > >>
> > > > >> We have a roughly analogous situation with respect to OS write()
> > > calls:
> > > > >> When the call returns, we _do not_ have an assurance that the data
> > has
> > > > >> gone to disk.
> > > > >> All we know is that the OS has copied the data to its buffers for
> > > > >> flushing to disk. If we
> > > > >> want to know when the actual write to disk is done, we need to
> use a
> > > > >> different call -- fsync().
> > > > >>
> > > > >> Ram
> > > > >>
> > > > >> On Mon, Nov 23, 2015 at 6:19 AM, Pramod Immaneni
> > > > >><pr...@datatorrent.com>
> > > > >> wrote:
> > > > >> > I think the original intent for checkpointed callback was that
> it
> > > can
> > > > >>be
> > > > >> > called anytime after checkpoint and not necessarily immediately
> > > after
> > > > >>the
> > > > >> > window prior of checkpoint. As Gaurav mentioned the API bears it
> > > out.
> > > > >> > Furthermore the callback has to be called within the lifecycle
> > > > >>methods of
> > > > >> > the operator so it will be called inside a window so you anyway
> > have
> > > > >>to
> > > > >> > deal with new data anyway before the callback is called. Even
> > though
> > > > >> > checkpointed is not committed shouldn't it at least guarantee
> that
> > > the
> > > > >> > operator is recoverable to that state in which case it should be
> > > > >>called
> > > > >> > after the save to HDFS actually finishes.
> > > > >> >
> > > > >> > Thanks
> > > > >> >
> > > > >> > On Mon, Nov 23, 2015 at 5:38 AM, Gaurav Gupta <
> > > gaurav@datatorrent.com
> > > > >
> > > > >> > wrote:
> > > > >> >
> > > > >> >> IMO, I don¹t think there is any backward incompatibility wrt
> > > > >> Checkpointing
> > > > >> >> call back semantics because
> > > > >> >>
> > > > >> >> 1. The checkpointed call is only made once the operator state
> is
> > > > >> preserved.
> > > > >> >> 2. The window ids being passed to checkpointed are in
> increasing
> > > > >>order.
> > > > >> >> 3. The window ids being passed are still the same ids as were
> > > passed
> > > > >> >> earlier.
> > > > >> >> 4. The sequence is still
> > > begingWindow()->endWindow()->checkpointed().
> > > > >> >>
> > > > >> >> Thanks
> > > > >> >> - Gaurav
> > > > >> >>
> > > > >> >> > On Nov 23, 2015, at 1:03 AM, Gaurav Gupta <
> > > gaurav@datatorrent.com>
> > > > >> >> wrote:
> > > > >> >> >
> > > > >> >> > If the requirement is that the order is always
> > > > >> >> begingWindow()->endWindow()->checkpointed(), why to pass
> windowId
> > > in
> > > > >>the
> > > > >> >> checkpointed() call back?
> > > > >> >> >
> > > > >> >> > Thanks
> > > > >> >> > - Gaurav
> > > > >> >> >
> > > > >> >> >> On Nov 22, 2015, at 11:22 PM, Chandni Singh
> > > > >><chandni@datatorrent.com
> > > > >> >> <ma...@datatorrent.com>> wrote:
> > > > >> >> >>
> > > > >> >> >> FYI,
> > > > >> >> >>
> > > > >> >> >> HDHTWriter implementation is dependent on the older
> semantics
> > > and
> > > > >> seems
> > > > >> >> to
> > > > >> >> >> be broken now.
> > > > >> >> >> startWindow(x) -> endWindow(x) -> checkpointed(x)
> > > > >> >> >> In the checkpointed implementation, it copies certain state
> > > > >> (transient)
> > > > >> >> and
> > > > >> >> >> transfers it to a checkpointedWriteCache with respect to
> > window
> > > > >>'x'.
> > > > >> >> >>
> > > > >> >> >> With Async checkpointing it, the state that is transferred
> is
> > > much
> > > > >> more
> > > > >> >> >> recent than window 'x'.
> > > > >> >> >>
> > > > >> >> >> Chandni
> > > > >> >> >>
> > > > >> >> >>
> > > > >> >> >> On Sun, Nov 22, 2015 at 11:04 PM, Chandni Singh <
> > > > >> >> chandni@datatorrent.com <ma...@datatorrent.com>>
> > > > >> >> >> wrote:
> > > > >> >> >>
> > > > >> >> >>> Agreed. Thomas's solution fixes the backward
> > incompatibility. I
> > > > >> think
> > > > >> >> we
> > > > >> >> >>> really need to fix this.
> > > > >> >> >>>
> > > > >> >> >>> On Sun, Nov 22, 2015 at 10:23 PM, Timothy Farkas <
> > > > >> tim@datatorrent.com
> > > > >> >> <ma...@datatorrent.com>>
> > > > >> >> >>> wrote:
> > > > >> >> >>>
> > > > >> >> >>>> Gaurav,
> > > > >> >> >>>>
> > > > >> >> >>>> I think if the state copy fails then STRAM should roll
> back
> > > the
> > > > >> >> operator
> > > > >> >> >>>> to
> > > > >> >> >>>> a checkpoint that is further back than the last
> checkpoint.
> > If
> > > > >>you
> > > > >> are
> > > > >> >> >>>> saying that you want to preserve the semantic that
> > > checkpointed
> > > > >>is
> > > > >> >> only
> > > > >> >> >>>> called after a checkpoint is completed, I would argue that
> > > that
> > > > >> >> guarantee
> > > > >> >> >>>> is already pointless in the current implementation since
> it
> > is
> > > > >> always
> > > > >> >> >>>> possible for an operator to be rolled back to a checkpoint
> > > > >>before
> > > > >> it's
> > > > >> >> >>>> last
> > > > >> >> >>>> completed checkpoint. So, it is already currently possible
> > for
> > > > >>some
> > > > >> >> >>>> database or file operation performed after a completed
> > > > >>checkpoint
> > > > >> to
> > > > >> >> be
> > > > >> >> >>>> redone after a failure. Because of this I think Thomas's
> > > > >>solution
> > > > >> >> makes
> > > > >> >> >>>> the
> > > > >> >> >>>> most sense. Thomas's solution would also address Chandni's
> > > > >>original
> > > > >> >> point
> > > > >> >> >>>> that the semantics for the checkpointed call back have
> been
> > > > >> violated.
> > > > >> >> >>>> There
> > > > >> >> >>>> are operators in our libraries that have depended on the
> > > > >> >> beginWindow(x),
> > > > >> >> >>>> endWindow(x), and checkpointed(x) call sequence, which is
> > now
> > > > >> broken.
> > > > >> >> We
> > > > >> >> >>>> should probably fix that.
> > > > >> >> >>>>
> > > > >> >> >>>> Tim
> > > > >> >> >>>>
> > > > >> >> >>>> On Sun, Nov 22, 2015 at 10:02 PM, Gaurav Gupta <
> > > > >> >> gaurav@datatorrent.com <ma...@datatorrent.com>>
> > > > >> >> >>>> wrote:
> > > > >> >> >>>>
> > > > >> >> >>>>> Thomas,
> > > > >> >> >>>>>
> > > > >> >> >>>>> This was done to preserve checkpointing semantics that is
> > to
> > > > >>tell
> > > > >> the
> > > > >> >> >>>>> operator that its state is preserved. Say if database is
> > > > >>updated
> > > > >> or
> > > > >> >> >>>> files
> > > > >> >> >>>>> are moved in checkpointed call but the state copy fails,
> > how
> > > to
> > > > >> >> address
> > > > >> >> >>>>> such scenarios?
> > > > >> >> >>>>>
> > > > >> >> >>>>> Thanks
> > > > >> >> >>>>> - Gaurav
> > > > >> >> >>>>>
> > > > >> >> >>>>>> On Nov 22, 2015, at 9:44 PM, Thomas Weise <
> > > > >> thomas@datatorrent.com
> > > > >> >> <ma...@datatorrent.com>>
> > > > >> >> >>>>> wrote:
> > > > >> >> >>>>>>
> > > > >> >> >>>>>> Alternatively I would ask why the checkpointed callback
> > > needs
> > > > >>to
> > > > >> >> wait
> > > > >> >> >>>>> until
> > > > >> >> >>>>>> the data was copied to HDFS instead upon completion of
> the
> > > > >>state
> > > > >> >> >>>>>> serialization.
> > > > >> >> >>>>>>
> > > > >> >> >>>>>> Thomas
> > > > >> >> >>>>>>
> > > > >> >> >>>>>>
> > > > >> >> >>>>>> On Sun, Nov 22, 2015 at 9:41 PM, Chandni Singh <
> > > > >> >> >>>> chandni@datatorrent.com <ma...@datatorrent.com>>
> > > > >> >> >>>>>> wrote:
> > > > >> >> >>>>>>
> > > > >> >> >>>>>>> Gaurav,
> > > > >> >> >>>>>>>
> > > > >> >> >>>>>>> My question is about why Async was made the default
> when
> > it
> > > > >> changed
> > > > >> >> >>>> the
> > > > >> >> >>>>>>> semantics of operator callbacks. Your response doesn't
> > > answer
> > > > >> that.
> > > > >> >> >>>>>>>
> > > > >> >> >>>>>>> In a way we broke backward compatibility.
> > > > >> >> >>>>>>>
> > > > >> >> >>>>>>> Chandni
> > > > >> >> >>>>>>>
> > > > >> >> >>>>>>> On Sun, Nov 22, 2015 at 9:22 PM, Gaurav Gupta <
> > > > >> >> >>>> gaurav@datatorrent.com <ma...@datatorrent.com>>
> > > > >> >> >>>>>>> wrote:
> > > > >> >> >>>>>>>
> > > > >> >> >>>>>>>> The idea behind Async checkpointing is to unblock
> > operator
> > > > >> while
> > > > >> >> the
> > > > >> >> >>>>>>> state
> > > > >> >> >>>>>>>> is getting transferred to HDFS.
> > > > >> >> >>>>>>>> Just to clarify that this beginWindow (x) ->
> > endWindow(x)
> > > ->
> > > > >> >> >>>>> checkpointed
> > > > >> >> >>>>>>>> (x-1 ) should be an ideal sequence, but if the HDFS is
> > > slow
> > > > >>or
> > > > >> for
> > > > >> >> >>>> some
> > > > >> >> >>>>>>>> other reason transferring the state to HDFS is slow
> this
> > > > >> sequence
> > > > >> >> >>>> may
> > > > >> >> >>>>> not
> > > > >> >> >>>>>>>> hold true.
> > > > >> >> >>>>>>>>
> > > > >> >> >>>>>>>> Can your use case be addressed by
> > > > >> >> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78 <
> > > > >> >> https://malhar.atlassian.net/browse/APEX-78> <
> > > > >> >> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78 <
> > > > >> >> https://malhar.atlassian.net/browse/APEX-78>>?
> > > > >> >> >>>>>>>>
> > > > >> >> >>>>>>>> Thanks
> > > > >> >> >>>>>>>> - Gaurav
> > > > >> >> >>>>>>>>
> > > > >> >> >>>>>>>>> On Nov 22, 2015, at 3:56 PM, Chandni Singh <
> > > > >> >> >>>> chandni@datatorrent.com <ma...@datatorrent.com>>
> > > > >> >> >>>>>>>> wrote:
> > > > >> >> >>>>>>>>>
> > > > >> >> >>>>>>>>> With Async checkpointing the checkpoint callback in
> > > > >> >> CheckpointPoint
> > > > >> >> >>>>>>>>> listener is called for a previous window, that is,
> > > > >> >> >>>>>>>>> beginWindow (x) -> endWindow(x) -> checkpointed (x-1
> )
> > > > >> >> >>>>>>>>>
> > > > >> >> >>>>>>>>> This feature was newly introduced. With synchronous
> > > > >> >> checkpointing,
> > > > >> >> >>>> the
> > > > >> >> >>>>>>>>> behavior was always
> > > > >> >> >>>>>>>>> beginWindow(x) -> endWindow(x) -> checkpointed (x)
> > > > >> >> >>>>>>>>>
> > > > >> >> >>>>>>>>> A lot of operators were written before asynchronous
> > > > >> checkpointing
> > > > >> >> >>>> was
> > > > >> >> >>>>>>>>> introduced and few of them can rely on the sequencing
> > > > >> guaranteed
> > > > >> >> by
> > > > >> >> >>>>>>>>> synchronous checkpointing.
> > > > >> >> >>>>>>>>>
> > > > >> >> >>>>>>>>> So why was Async Checkpointed made default?
> > > > >> >> >>>>>>>>>
> > > > >> >> >>>>>>>>> With how Async checkpoint is today, the complexity to
> > > > >>handle
> > > > >> >> >>>> transient
> > > > >> >> >>>>>>>>> state in checkpointed callback falls on every
> operator.
> > > For
> > > > >> eg,
> > > > >> >> >>>> lets
> > > > >> >> >>>>>>> say
> > > > >> >> >>>>>>>>> earlier I had a transient map which I cleared every
> > time
> > > > >>the
> > > > >> >> >>>>>>> checkpointed
> > > > >> >> >>>>>>>>> was called, with async checkpointing this simple task
> > > will
> > > > >>be
> > > > >> a
> > > > >> >> lot
> > > > >> >> >>>>>>> more
> > > > >> >> >>>>>>>>> complicated.
> > > > >> >> >>>>>>>>>
> > > > >> >> >>>>>>>>> I think Async checkpointing broke the semantics of
> > > operator
> > > > >> >> >>>> callbacks
> > > > >> >> >>>>>>> and
> > > > >> >> >>>>>>>>> should NOT be the default.
> > > > >> >> >>>>>>>>
> > > > >> >> >>>>>>>>
> > > > >> >> >>>>>>>
> > > > >> >> >>>>>
> > > > >> >> >>>>>
> > > > >> >> >>>>
> > > > >> >> >>>
> > > > >> >> >>>
> > > > >> >> >
> > > > >> >>
> > > > >> >>
> > > > >>
> > > >
> > > >
> > >
> >
>

Re: Why is Async checkpointing made default?

Posted by Timothy Farkas <ti...@datatorrent.com>.
Pramod,

Very good point about checkpointed being called within a window boundary.
But I still don't understand the purpose of calling checkpointed after the
asynchronous copy to HDFS is completed. It makes checkpointed
implementations more complex, and it does not provide any benefits. The
checkpointed implementation logic could be called multiple times for the
same window if the operator fails and is restored to an earlier checkpoint,
this fact invalidates any atomicity gaurantees that you are trying to
preserve. I would like a concrete example of when it is necessary to call
checkpointed after the asynchronous copy to hdfs. The only use case I can
think of is if the operator reads its own checkpointed state from hdfs in
the checkpointed call back, but this use case is not a valid use case.

Tim

On Mon, Nov 23, 2015 at 10:56 AM, Pramod Immaneni <pr...@datatorrent.com>
wrote:

> There is a problem I see with the older way in which checkpointed was
> called as well which is relevant to this. It is called outside of window
> boundary between end window and begin window which I don't think is the
> correct as platform should not run operator business logic outside of
> window boundaries. The operator could end up sending tuples accidentally in
> this call for example.
>
> I think we need to fix that and also not assume that checkpointed callback
> will be called immediately after end window of the window being
> checkpointed.
>
> Thanks
>
> On Mon, Nov 23, 2015 at 10:38 AM, Chandni Singh <ch...@datatorrent.com>
> wrote:
>
> > :-) I did not write it that semantics. Looks like someone made that
> > assumption along the way. Fix the code which assumes that.
> >
> > What  do you mean?
> > We go on fixing all the code which was written because the behavior with
> > Synchronous checkpointing was
> > beginWindow(x) -> endWindow -> checkpointWindow(x)   ( when checkpointing
> > is aligned with application window boundary)
> >
> > This was broken recently with async checkpointing but instead of fixing
> > that, we go on fixing all the existing solutions.
> > I guess I find this ridiculous.
> >
> > And why should we make it harder and harder to write operators?
> > I think majority of people who participated in this discussion do believe
> > that we need to fix and I think this is critical.
> >
> > Chandni
> >
> >
> >
> > On Mon, Nov 23, 2015 at 10:16 AM, Chetan Narsude (cnarsude) <
> > cnarsude@cisco.com> wrote:
> >
> > >
> > >
> > > On 11/23/15, 8:44 AM, "Chandni Singh" <ch...@datatorrent.com> wrote:
> > >
> > > >I think in the API, there is windowId in the checkpointed callback for
> > > >cases when checkpointing could happen within application windows.
> > > >
> > > >IMHO backward compatibility is broken. For 3 years since the platform
> > was
> > > >created the semantics of checkpointed were
> > > >beginWindow(x) -> endwindow -> checkpointWindow(x) when checkpointing
> > was
> > > >aligned with application window boundaries.
> > > >
> > >
> > > :-) I did not write it that semantics. Looks like someone made that
> > > assumption along the way. Fix the code which assumes that.
> > >
> > > ‹
> > > Chetan
> > >
> > > >Please not that aligning Checkpointing with Application Window
> boundary
> > is
> > > >the DEFAUL behavior and I think most of us agree that aligning the
> > > >checkpointing with application window boundaries is what we see in
> every
> > > >use case.
> > > >
> > > >Code was written with that assumption by committers of Apex (not even
> by
> > > >people who are new to Apex). It is broken by by a change introduced
> > couple
> > > >of months back (August 2015).
> > > >
> > > >Moreover, what do we achieve by NOT guaranteeing that semantics-
> > delegate
> > > >the complexity to operators to handle it. Even a simple scenario that
> I
> > > >mentioned in my first mail, is very complicated.
> > > >
> > > >I think this is a big issue and we need to fix it.
> > > >
> > > >Chandni
> > > >
> > > >On Mon, Nov 23, 2015 at 7:12 AM, Munagala Ramanath <
> ram@datatorrent.com
> > >
> > > >wrote:
> > > >
> > > >> I too find Gaurav's argument cogent: endWindow() does not take a
> > > >> windowId parameter
> > > >> leading to a natural guarantee that it matches the immediately
> > > >> preceding beginWindow().
> > > >>
> > > >> Both committed() and checkpointed() take that parameter which
> suggests
> > > >>it
> > > >> may
> > > >> lag behind the current window. The comments on those methods say
> > nothing
> > > >> that can be be interpreted as a guarantee that the windowId will
> match
> > > >>the
> > > >> window just processed. So, from the viewpoint of "original intent"
> --
> > > >> a phrase that
> > > >> has a long and storied history in jurisprudence
> > > >> (https://en.wikipedia.org/wiki/Original_intent) --
> > > >> it seems that any code that assumed a guarantee when none existed
> must
> > > >> be regarded
> > > >> as erroneous.
> > > >>
> > > >> Having said that, we are all aware that in software it is not at all
> > > >> unusual to preserve behavior
> > > >> in the interests of backward compatibility, regardless of many other
> > > >> reasons for that behavior
> > > >> to be considered objectionable. So, if the cost of fixing all the
> code
> > > >> that makes that
> > > >> assumption is too high, we should seriously consider reverting it.
> In
> > > >> this context, the
> > > >> guarantee that checkpointed() simply means that the operator state
> has
> > > >> been serialized
> > > >> seems adequate.
> > > >>
> > > >> We have a roughly analogous situation with respect to OS write()
> > calls:
> > > >> When the call returns, we _do not_ have an assurance that the data
> has
> > > >> gone to disk.
> > > >> All we know is that the OS has copied the data to its buffers for
> > > >> flushing to disk. If we
> > > >> want to know when the actual write to disk is done, we need to use a
> > > >> different call -- fsync().
> > > >>
> > > >> Ram
> > > >>
> > > >> On Mon, Nov 23, 2015 at 6:19 AM, Pramod Immaneni
> > > >><pr...@datatorrent.com>
> > > >> wrote:
> > > >> > I think the original intent for checkpointed callback was that it
> > can
> > > >>be
> > > >> > called anytime after checkpoint and not necessarily immediately
> > after
> > > >>the
> > > >> > window prior of checkpoint. As Gaurav mentioned the API bears it
> > out.
> > > >> > Furthermore the callback has to be called within the lifecycle
> > > >>methods of
> > > >> > the operator so it will be called inside a window so you anyway
> have
> > > >>to
> > > >> > deal with new data anyway before the callback is called. Even
> though
> > > >> > checkpointed is not committed shouldn't it at least guarantee that
> > the
> > > >> > operator is recoverable to that state in which case it should be
> > > >>called
> > > >> > after the save to HDFS actually finishes.
> > > >> >
> > > >> > Thanks
> > > >> >
> > > >> > On Mon, Nov 23, 2015 at 5:38 AM, Gaurav Gupta <
> > gaurav@datatorrent.com
> > > >
> > > >> > wrote:
> > > >> >
> > > >> >> IMO, I don¹t think there is any backward incompatibility wrt
> > > >> Checkpointing
> > > >> >> call back semantics because
> > > >> >>
> > > >> >> 1. The checkpointed call is only made once the operator state is
> > > >> preserved.
> > > >> >> 2. The window ids being passed to checkpointed are in increasing
> > > >>order.
> > > >> >> 3. The window ids being passed are still the same ids as were
> > passed
> > > >> >> earlier.
> > > >> >> 4. The sequence is still
> > begingWindow()->endWindow()->checkpointed().
> > > >> >>
> > > >> >> Thanks
> > > >> >> - Gaurav
> > > >> >>
> > > >> >> > On Nov 23, 2015, at 1:03 AM, Gaurav Gupta <
> > gaurav@datatorrent.com>
> > > >> >> wrote:
> > > >> >> >
> > > >> >> > If the requirement is that the order is always
> > > >> >> begingWindow()->endWindow()->checkpointed(), why to pass windowId
> > in
> > > >>the
> > > >> >> checkpointed() call back?
> > > >> >> >
> > > >> >> > Thanks
> > > >> >> > - Gaurav
> > > >> >> >
> > > >> >> >> On Nov 22, 2015, at 11:22 PM, Chandni Singh
> > > >><chandni@datatorrent.com
> > > >> >> <ma...@datatorrent.com>> wrote:
> > > >> >> >>
> > > >> >> >> FYI,
> > > >> >> >>
> > > >> >> >> HDHTWriter implementation is dependent on the older semantics
> > and
> > > >> seems
> > > >> >> to
> > > >> >> >> be broken now.
> > > >> >> >> startWindow(x) -> endWindow(x) -> checkpointed(x)
> > > >> >> >> In the checkpointed implementation, it copies certain state
> > > >> (transient)
> > > >> >> and
> > > >> >> >> transfers it to a checkpointedWriteCache with respect to
> window
> > > >>'x'.
> > > >> >> >>
> > > >> >> >> With Async checkpointing it, the state that is transferred is
> > much
> > > >> more
> > > >> >> >> recent than window 'x'.
> > > >> >> >>
> > > >> >> >> Chandni
> > > >> >> >>
> > > >> >> >>
> > > >> >> >> On Sun, Nov 22, 2015 at 11:04 PM, Chandni Singh <
> > > >> >> chandni@datatorrent.com <ma...@datatorrent.com>>
> > > >> >> >> wrote:
> > > >> >> >>
> > > >> >> >>> Agreed. Thomas's solution fixes the backward
> incompatibility. I
> > > >> think
> > > >> >> we
> > > >> >> >>> really need to fix this.
> > > >> >> >>>
> > > >> >> >>> On Sun, Nov 22, 2015 at 10:23 PM, Timothy Farkas <
> > > >> tim@datatorrent.com
> > > >> >> <ma...@datatorrent.com>>
> > > >> >> >>> wrote:
> > > >> >> >>>
> > > >> >> >>>> Gaurav,
> > > >> >> >>>>
> > > >> >> >>>> I think if the state copy fails then STRAM should roll back
> > the
> > > >> >> operator
> > > >> >> >>>> to
> > > >> >> >>>> a checkpoint that is further back than the last checkpoint.
> If
> > > >>you
> > > >> are
> > > >> >> >>>> saying that you want to preserve the semantic that
> > checkpointed
> > > >>is
> > > >> >> only
> > > >> >> >>>> called after a checkpoint is completed, I would argue that
> > that
> > > >> >> guarantee
> > > >> >> >>>> is already pointless in the current implementation since it
> is
> > > >> always
> > > >> >> >>>> possible for an operator to be rolled back to a checkpoint
> > > >>before
> > > >> it's
> > > >> >> >>>> last
> > > >> >> >>>> completed checkpoint. So, it is already currently possible
> for
> > > >>some
> > > >> >> >>>> database or file operation performed after a completed
> > > >>checkpoint
> > > >> to
> > > >> >> be
> > > >> >> >>>> redone after a failure. Because of this I think Thomas's
> > > >>solution
> > > >> >> makes
> > > >> >> >>>> the
> > > >> >> >>>> most sense. Thomas's solution would also address Chandni's
> > > >>original
> > > >> >> point
> > > >> >> >>>> that the semantics for the checkpointed call back have been
> > > >> violated.
> > > >> >> >>>> There
> > > >> >> >>>> are operators in our libraries that have depended on the
> > > >> >> beginWindow(x),
> > > >> >> >>>> endWindow(x), and checkpointed(x) call sequence, which is
> now
> > > >> broken.
> > > >> >> We
> > > >> >> >>>> should probably fix that.
> > > >> >> >>>>
> > > >> >> >>>> Tim
> > > >> >> >>>>
> > > >> >> >>>> On Sun, Nov 22, 2015 at 10:02 PM, Gaurav Gupta <
> > > >> >> gaurav@datatorrent.com <ma...@datatorrent.com>>
> > > >> >> >>>> wrote:
> > > >> >> >>>>
> > > >> >> >>>>> Thomas,
> > > >> >> >>>>>
> > > >> >> >>>>> This was done to preserve checkpointing semantics that is
> to
> > > >>tell
> > > >> the
> > > >> >> >>>>> operator that its state is preserved. Say if database is
> > > >>updated
> > > >> or
> > > >> >> >>>> files
> > > >> >> >>>>> are moved in checkpointed call but the state copy fails,
> how
> > to
> > > >> >> address
> > > >> >> >>>>> such scenarios?
> > > >> >> >>>>>
> > > >> >> >>>>> Thanks
> > > >> >> >>>>> - Gaurav
> > > >> >> >>>>>
> > > >> >> >>>>>> On Nov 22, 2015, at 9:44 PM, Thomas Weise <
> > > >> thomas@datatorrent.com
> > > >> >> <ma...@datatorrent.com>>
> > > >> >> >>>>> wrote:
> > > >> >> >>>>>>
> > > >> >> >>>>>> Alternatively I would ask why the checkpointed callback
> > needs
> > > >>to
> > > >> >> wait
> > > >> >> >>>>> until
> > > >> >> >>>>>> the data was copied to HDFS instead upon completion of the
> > > >>state
> > > >> >> >>>>>> serialization.
> > > >> >> >>>>>>
> > > >> >> >>>>>> Thomas
> > > >> >> >>>>>>
> > > >> >> >>>>>>
> > > >> >> >>>>>> On Sun, Nov 22, 2015 at 9:41 PM, Chandni Singh <
> > > >> >> >>>> chandni@datatorrent.com <ma...@datatorrent.com>>
> > > >> >> >>>>>> wrote:
> > > >> >> >>>>>>
> > > >> >> >>>>>>> Gaurav,
> > > >> >> >>>>>>>
> > > >> >> >>>>>>> My question is about why Async was made the default when
> it
> > > >> changed
> > > >> >> >>>> the
> > > >> >> >>>>>>> semantics of operator callbacks. Your response doesn't
> > answer
> > > >> that.
> > > >> >> >>>>>>>
> > > >> >> >>>>>>> In a way we broke backward compatibility.
> > > >> >> >>>>>>>
> > > >> >> >>>>>>> Chandni
> > > >> >> >>>>>>>
> > > >> >> >>>>>>> On Sun, Nov 22, 2015 at 9:22 PM, Gaurav Gupta <
> > > >> >> >>>> gaurav@datatorrent.com <ma...@datatorrent.com>>
> > > >> >> >>>>>>> wrote:
> > > >> >> >>>>>>>
> > > >> >> >>>>>>>> The idea behind Async checkpointing is to unblock
> operator
> > > >> while
> > > >> >> the
> > > >> >> >>>>>>> state
> > > >> >> >>>>>>>> is getting transferred to HDFS.
> > > >> >> >>>>>>>> Just to clarify that this beginWindow (x) ->
> endWindow(x)
> > ->
> > > >> >> >>>>> checkpointed
> > > >> >> >>>>>>>> (x-1 ) should be an ideal sequence, but if the HDFS is
> > slow
> > > >>or
> > > >> for
> > > >> >> >>>> some
> > > >> >> >>>>>>>> other reason transferring the state to HDFS is slow this
> > > >> sequence
> > > >> >> >>>> may
> > > >> >> >>>>> not
> > > >> >> >>>>>>>> hold true.
> > > >> >> >>>>>>>>
> > > >> >> >>>>>>>> Can your use case be addressed by
> > > >> >> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78 <
> > > >> >> https://malhar.atlassian.net/browse/APEX-78> <
> > > >> >> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78 <
> > > >> >> https://malhar.atlassian.net/browse/APEX-78>>?
> > > >> >> >>>>>>>>
> > > >> >> >>>>>>>> Thanks
> > > >> >> >>>>>>>> - Gaurav
> > > >> >> >>>>>>>>
> > > >> >> >>>>>>>>> On Nov 22, 2015, at 3:56 PM, Chandni Singh <
> > > >> >> >>>> chandni@datatorrent.com <ma...@datatorrent.com>>
> > > >> >> >>>>>>>> wrote:
> > > >> >> >>>>>>>>>
> > > >> >> >>>>>>>>> With Async checkpointing the checkpoint callback in
> > > >> >> CheckpointPoint
> > > >> >> >>>>>>>>> listener is called for a previous window, that is,
> > > >> >> >>>>>>>>> beginWindow (x) -> endWindow(x) -> checkpointed (x-1 )
> > > >> >> >>>>>>>>>
> > > >> >> >>>>>>>>> This feature was newly introduced. With synchronous
> > > >> >> checkpointing,
> > > >> >> >>>> the
> > > >> >> >>>>>>>>> behavior was always
> > > >> >> >>>>>>>>> beginWindow(x) -> endWindow(x) -> checkpointed (x)
> > > >> >> >>>>>>>>>
> > > >> >> >>>>>>>>> A lot of operators were written before asynchronous
> > > >> checkpointing
> > > >> >> >>>> was
> > > >> >> >>>>>>>>> introduced and few of them can rely on the sequencing
> > > >> guaranteed
> > > >> >> by
> > > >> >> >>>>>>>>> synchronous checkpointing.
> > > >> >> >>>>>>>>>
> > > >> >> >>>>>>>>> So why was Async Checkpointed made default?
> > > >> >> >>>>>>>>>
> > > >> >> >>>>>>>>> With how Async checkpoint is today, the complexity to
> > > >>handle
> > > >> >> >>>> transient
> > > >> >> >>>>>>>>> state in checkpointed callback falls on every operator.
> > For
> > > >> eg,
> > > >> >> >>>> lets
> > > >> >> >>>>>>> say
> > > >> >> >>>>>>>>> earlier I had a transient map which I cleared every
> time
> > > >>the
> > > >> >> >>>>>>> checkpointed
> > > >> >> >>>>>>>>> was called, with async checkpointing this simple task
> > will
> > > >>be
> > > >> a
> > > >> >> lot
> > > >> >> >>>>>>> more
> > > >> >> >>>>>>>>> complicated.
> > > >> >> >>>>>>>>>
> > > >> >> >>>>>>>>> I think Async checkpointing broke the semantics of
> > operator
> > > >> >> >>>> callbacks
> > > >> >> >>>>>>> and
> > > >> >> >>>>>>>>> should NOT be the default.
> > > >> >> >>>>>>>>
> > > >> >> >>>>>>>>
> > > >> >> >>>>>>>
> > > >> >> >>>>>
> > > >> >> >>>>>
> > > >> >> >>>>
> > > >> >> >>>
> > > >> >> >>>
> > > >> >> >
> > > >> >>
> > > >> >>
> > > >>
> > >
> > >
> >
>

Re: Why is Async checkpointing made default?

Posted by Pramod Immaneni <pr...@datatorrent.com>.
There is a problem I see with the older way in which checkpointed was
called as well which is relevant to this. It is called outside of window
boundary between end window and begin window which I don't think is the
correct as platform should not run operator business logic outside of
window boundaries. The operator could end up sending tuples accidentally in
this call for example.

I think we need to fix that and also not assume that checkpointed callback
will be called immediately after end window of the window being
checkpointed.

Thanks

On Mon, Nov 23, 2015 at 10:38 AM, Chandni Singh <ch...@datatorrent.com>
wrote:

> :-) I did not write it that semantics. Looks like someone made that
> assumption along the way. Fix the code which assumes that.
>
> What  do you mean?
> We go on fixing all the code which was written because the behavior with
> Synchronous checkpointing was
> beginWindow(x) -> endWindow -> checkpointWindow(x)   ( when checkpointing
> is aligned with application window boundary)
>
> This was broken recently with async checkpointing but instead of fixing
> that, we go on fixing all the existing solutions.
> I guess I find this ridiculous.
>
> And why should we make it harder and harder to write operators?
> I think majority of people who participated in this discussion do believe
> that we need to fix and I think this is critical.
>
> Chandni
>
>
>
> On Mon, Nov 23, 2015 at 10:16 AM, Chetan Narsude (cnarsude) <
> cnarsude@cisco.com> wrote:
>
> >
> >
> > On 11/23/15, 8:44 AM, "Chandni Singh" <ch...@datatorrent.com> wrote:
> >
> > >I think in the API, there is windowId in the checkpointed callback for
> > >cases when checkpointing could happen within application windows.
> > >
> > >IMHO backward compatibility is broken. For 3 years since the platform
> was
> > >created the semantics of checkpointed were
> > >beginWindow(x) -> endwindow -> checkpointWindow(x) when checkpointing
> was
> > >aligned with application window boundaries.
> > >
> >
> > :-) I did not write it that semantics. Looks like someone made that
> > assumption along the way. Fix the code which assumes that.
> >
> > ‹
> > Chetan
> >
> > >Please not that aligning Checkpointing with Application Window boundary
> is
> > >the DEFAUL behavior and I think most of us agree that aligning the
> > >checkpointing with application window boundaries is what we see in every
> > >use case.
> > >
> > >Code was written with that assumption by committers of Apex (not even by
> > >people who are new to Apex). It is broken by by a change introduced
> couple
> > >of months back (August 2015).
> > >
> > >Moreover, what do we achieve by NOT guaranteeing that semantics-
> delegate
> > >the complexity to operators to handle it. Even a simple scenario that I
> > >mentioned in my first mail, is very complicated.
> > >
> > >I think this is a big issue and we need to fix it.
> > >
> > >Chandni
> > >
> > >On Mon, Nov 23, 2015 at 7:12 AM, Munagala Ramanath <ram@datatorrent.com
> >
> > >wrote:
> > >
> > >> I too find Gaurav's argument cogent: endWindow() does not take a
> > >> windowId parameter
> > >> leading to a natural guarantee that it matches the immediately
> > >> preceding beginWindow().
> > >>
> > >> Both committed() and checkpointed() take that parameter which suggests
> > >>it
> > >> may
> > >> lag behind the current window. The comments on those methods say
> nothing
> > >> that can be be interpreted as a guarantee that the windowId will match
> > >>the
> > >> window just processed. So, from the viewpoint of "original intent" --
> > >> a phrase that
> > >> has a long and storied history in jurisprudence
> > >> (https://en.wikipedia.org/wiki/Original_intent) --
> > >> it seems that any code that assumed a guarantee when none existed must
> > >> be regarded
> > >> as erroneous.
> > >>
> > >> Having said that, we are all aware that in software it is not at all
> > >> unusual to preserve behavior
> > >> in the interests of backward compatibility, regardless of many other
> > >> reasons for that behavior
> > >> to be considered objectionable. So, if the cost of fixing all the code
> > >> that makes that
> > >> assumption is too high, we should seriously consider reverting it. In
> > >> this context, the
> > >> guarantee that checkpointed() simply means that the operator state has
> > >> been serialized
> > >> seems adequate.
> > >>
> > >> We have a roughly analogous situation with respect to OS write()
> calls:
> > >> When the call returns, we _do not_ have an assurance that the data has
> > >> gone to disk.
> > >> All we know is that the OS has copied the data to its buffers for
> > >> flushing to disk. If we
> > >> want to know when the actual write to disk is done, we need to use a
> > >> different call -- fsync().
> > >>
> > >> Ram
> > >>
> > >> On Mon, Nov 23, 2015 at 6:19 AM, Pramod Immaneni
> > >><pr...@datatorrent.com>
> > >> wrote:
> > >> > I think the original intent for checkpointed callback was that it
> can
> > >>be
> > >> > called anytime after checkpoint and not necessarily immediately
> after
> > >>the
> > >> > window prior of checkpoint. As Gaurav mentioned the API bears it
> out.
> > >> > Furthermore the callback has to be called within the lifecycle
> > >>methods of
> > >> > the operator so it will be called inside a window so you anyway have
> > >>to
> > >> > deal with new data anyway before the callback is called. Even though
> > >> > checkpointed is not committed shouldn't it at least guarantee that
> the
> > >> > operator is recoverable to that state in which case it should be
> > >>called
> > >> > after the save to HDFS actually finishes.
> > >> >
> > >> > Thanks
> > >> >
> > >> > On Mon, Nov 23, 2015 at 5:38 AM, Gaurav Gupta <
> gaurav@datatorrent.com
> > >
> > >> > wrote:
> > >> >
> > >> >> IMO, I don¹t think there is any backward incompatibility wrt
> > >> Checkpointing
> > >> >> call back semantics because
> > >> >>
> > >> >> 1. The checkpointed call is only made once the operator state is
> > >> preserved.
> > >> >> 2. The window ids being passed to checkpointed are in increasing
> > >>order.
> > >> >> 3. The window ids being passed are still the same ids as were
> passed
> > >> >> earlier.
> > >> >> 4. The sequence is still
> begingWindow()->endWindow()->checkpointed().
> > >> >>
> > >> >> Thanks
> > >> >> - Gaurav
> > >> >>
> > >> >> > On Nov 23, 2015, at 1:03 AM, Gaurav Gupta <
> gaurav@datatorrent.com>
> > >> >> wrote:
> > >> >> >
> > >> >> > If the requirement is that the order is always
> > >> >> begingWindow()->endWindow()->checkpointed(), why to pass windowId
> in
> > >>the
> > >> >> checkpointed() call back?
> > >> >> >
> > >> >> > Thanks
> > >> >> > - Gaurav
> > >> >> >
> > >> >> >> On Nov 22, 2015, at 11:22 PM, Chandni Singh
> > >><chandni@datatorrent.com
> > >> >> <ma...@datatorrent.com>> wrote:
> > >> >> >>
> > >> >> >> FYI,
> > >> >> >>
> > >> >> >> HDHTWriter implementation is dependent on the older semantics
> and
> > >> seems
> > >> >> to
> > >> >> >> be broken now.
> > >> >> >> startWindow(x) -> endWindow(x) -> checkpointed(x)
> > >> >> >> In the checkpointed implementation, it copies certain state
> > >> (transient)
> > >> >> and
> > >> >> >> transfers it to a checkpointedWriteCache with respect to window
> > >>'x'.
> > >> >> >>
> > >> >> >> With Async checkpointing it, the state that is transferred is
> much
> > >> more
> > >> >> >> recent than window 'x'.
> > >> >> >>
> > >> >> >> Chandni
> > >> >> >>
> > >> >> >>
> > >> >> >> On Sun, Nov 22, 2015 at 11:04 PM, Chandni Singh <
> > >> >> chandni@datatorrent.com <ma...@datatorrent.com>>
> > >> >> >> wrote:
> > >> >> >>
> > >> >> >>> Agreed. Thomas's solution fixes the backward incompatibility. I
> > >> think
> > >> >> we
> > >> >> >>> really need to fix this.
> > >> >> >>>
> > >> >> >>> On Sun, Nov 22, 2015 at 10:23 PM, Timothy Farkas <
> > >> tim@datatorrent.com
> > >> >> <ma...@datatorrent.com>>
> > >> >> >>> wrote:
> > >> >> >>>
> > >> >> >>>> Gaurav,
> > >> >> >>>>
> > >> >> >>>> I think if the state copy fails then STRAM should roll back
> the
> > >> >> operator
> > >> >> >>>> to
> > >> >> >>>> a checkpoint that is further back than the last checkpoint. If
> > >>you
> > >> are
> > >> >> >>>> saying that you want to preserve the semantic that
> checkpointed
> > >>is
> > >> >> only
> > >> >> >>>> called after a checkpoint is completed, I would argue that
> that
> > >> >> guarantee
> > >> >> >>>> is already pointless in the current implementation since it is
> > >> always
> > >> >> >>>> possible for an operator to be rolled back to a checkpoint
> > >>before
> > >> it's
> > >> >> >>>> last
> > >> >> >>>> completed checkpoint. So, it is already currently possible for
> > >>some
> > >> >> >>>> database or file operation performed after a completed
> > >>checkpoint
> > >> to
> > >> >> be
> > >> >> >>>> redone after a failure. Because of this I think Thomas's
> > >>solution
> > >> >> makes
> > >> >> >>>> the
> > >> >> >>>> most sense. Thomas's solution would also address Chandni's
> > >>original
> > >> >> point
> > >> >> >>>> that the semantics for the checkpointed call back have been
> > >> violated.
> > >> >> >>>> There
> > >> >> >>>> are operators in our libraries that have depended on the
> > >> >> beginWindow(x),
> > >> >> >>>> endWindow(x), and checkpointed(x) call sequence, which is now
> > >> broken.
> > >> >> We
> > >> >> >>>> should probably fix that.
> > >> >> >>>>
> > >> >> >>>> Tim
> > >> >> >>>>
> > >> >> >>>> On Sun, Nov 22, 2015 at 10:02 PM, Gaurav Gupta <
> > >> >> gaurav@datatorrent.com <ma...@datatorrent.com>>
> > >> >> >>>> wrote:
> > >> >> >>>>
> > >> >> >>>>> Thomas,
> > >> >> >>>>>
> > >> >> >>>>> This was done to preserve checkpointing semantics that is to
> > >>tell
> > >> the
> > >> >> >>>>> operator that its state is preserved. Say if database is
> > >>updated
> > >> or
> > >> >> >>>> files
> > >> >> >>>>> are moved in checkpointed call but the state copy fails, how
> to
> > >> >> address
> > >> >> >>>>> such scenarios?
> > >> >> >>>>>
> > >> >> >>>>> Thanks
> > >> >> >>>>> - Gaurav
> > >> >> >>>>>
> > >> >> >>>>>> On Nov 22, 2015, at 9:44 PM, Thomas Weise <
> > >> thomas@datatorrent.com
> > >> >> <ma...@datatorrent.com>>
> > >> >> >>>>> wrote:
> > >> >> >>>>>>
> > >> >> >>>>>> Alternatively I would ask why the checkpointed callback
> needs
> > >>to
> > >> >> wait
> > >> >> >>>>> until
> > >> >> >>>>>> the data was copied to HDFS instead upon completion of the
> > >>state
> > >> >> >>>>>> serialization.
> > >> >> >>>>>>
> > >> >> >>>>>> Thomas
> > >> >> >>>>>>
> > >> >> >>>>>>
> > >> >> >>>>>> On Sun, Nov 22, 2015 at 9:41 PM, Chandni Singh <
> > >> >> >>>> chandni@datatorrent.com <ma...@datatorrent.com>>
> > >> >> >>>>>> wrote:
> > >> >> >>>>>>
> > >> >> >>>>>>> Gaurav,
> > >> >> >>>>>>>
> > >> >> >>>>>>> My question is about why Async was made the default when it
> > >> changed
> > >> >> >>>> the
> > >> >> >>>>>>> semantics of operator callbacks. Your response doesn't
> answer
> > >> that.
> > >> >> >>>>>>>
> > >> >> >>>>>>> In a way we broke backward compatibility.
> > >> >> >>>>>>>
> > >> >> >>>>>>> Chandni
> > >> >> >>>>>>>
> > >> >> >>>>>>> On Sun, Nov 22, 2015 at 9:22 PM, Gaurav Gupta <
> > >> >> >>>> gaurav@datatorrent.com <ma...@datatorrent.com>>
> > >> >> >>>>>>> wrote:
> > >> >> >>>>>>>
> > >> >> >>>>>>>> The idea behind Async checkpointing is to unblock operator
> > >> while
> > >> >> the
> > >> >> >>>>>>> state
> > >> >> >>>>>>>> is getting transferred to HDFS.
> > >> >> >>>>>>>> Just to clarify that this beginWindow (x) -> endWindow(x)
> ->
> > >> >> >>>>> checkpointed
> > >> >> >>>>>>>> (x-1 ) should be an ideal sequence, but if the HDFS is
> slow
> > >>or
> > >> for
> > >> >> >>>> some
> > >> >> >>>>>>>> other reason transferring the state to HDFS is slow this
> > >> sequence
> > >> >> >>>> may
> > >> >> >>>>> not
> > >> >> >>>>>>>> hold true.
> > >> >> >>>>>>>>
> > >> >> >>>>>>>> Can your use case be addressed by
> > >> >> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78 <
> > >> >> https://malhar.atlassian.net/browse/APEX-78> <
> > >> >> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78 <
> > >> >> https://malhar.atlassian.net/browse/APEX-78>>?
> > >> >> >>>>>>>>
> > >> >> >>>>>>>> Thanks
> > >> >> >>>>>>>> - Gaurav
> > >> >> >>>>>>>>
> > >> >> >>>>>>>>> On Nov 22, 2015, at 3:56 PM, Chandni Singh <
> > >> >> >>>> chandni@datatorrent.com <ma...@datatorrent.com>>
> > >> >> >>>>>>>> wrote:
> > >> >> >>>>>>>>>
> > >> >> >>>>>>>>> With Async checkpointing the checkpoint callback in
> > >> >> CheckpointPoint
> > >> >> >>>>>>>>> listener is called for a previous window, that is,
> > >> >> >>>>>>>>> beginWindow (x) -> endWindow(x) -> checkpointed (x-1 )
> > >> >> >>>>>>>>>
> > >> >> >>>>>>>>> This feature was newly introduced. With synchronous
> > >> >> checkpointing,
> > >> >> >>>> the
> > >> >> >>>>>>>>> behavior was always
> > >> >> >>>>>>>>> beginWindow(x) -> endWindow(x) -> checkpointed (x)
> > >> >> >>>>>>>>>
> > >> >> >>>>>>>>> A lot of operators were written before asynchronous
> > >> checkpointing
> > >> >> >>>> was
> > >> >> >>>>>>>>> introduced and few of them can rely on the sequencing
> > >> guaranteed
> > >> >> by
> > >> >> >>>>>>>>> synchronous checkpointing.
> > >> >> >>>>>>>>>
> > >> >> >>>>>>>>> So why was Async Checkpointed made default?
> > >> >> >>>>>>>>>
> > >> >> >>>>>>>>> With how Async checkpoint is today, the complexity to
> > >>handle
> > >> >> >>>> transient
> > >> >> >>>>>>>>> state in checkpointed callback falls on every operator.
> For
> > >> eg,
> > >> >> >>>> lets
> > >> >> >>>>>>> say
> > >> >> >>>>>>>>> earlier I had a transient map which I cleared every time
> > >>the
> > >> >> >>>>>>> checkpointed
> > >> >> >>>>>>>>> was called, with async checkpointing this simple task
> will
> > >>be
> > >> a
> > >> >> lot
> > >> >> >>>>>>> more
> > >> >> >>>>>>>>> complicated.
> > >> >> >>>>>>>>>
> > >> >> >>>>>>>>> I think Async checkpointing broke the semantics of
> operator
> > >> >> >>>> callbacks
> > >> >> >>>>>>> and
> > >> >> >>>>>>>>> should NOT be the default.
> > >> >> >>>>>>>>
> > >> >> >>>>>>>>
> > >> >> >>>>>>>
> > >> >> >>>>>
> > >> >> >>>>>
> > >> >> >>>>
> > >> >> >>>
> > >> >> >>>
> > >> >> >
> > >> >>
> > >> >>
> > >>
> >
> >
>

Re: Why is Async checkpointing made default?

Posted by Chandni Singh <ch...@datatorrent.com>.
:-) I did not write it that semantics. Looks like someone made that
assumption along the way. Fix the code which assumes that.

What  do you mean?
We go on fixing all the code which was written because the behavior with
Synchronous checkpointing was
beginWindow(x) -> endWindow -> checkpointWindow(x)   ( when checkpointing
is aligned with application window boundary)

This was broken recently with async checkpointing but instead of fixing
that, we go on fixing all the existing solutions.
I guess I find this ridiculous.

And why should we make it harder and harder to write operators?
I think majority of people who participated in this discussion do believe
that we need to fix and I think this is critical.

Chandni



On Mon, Nov 23, 2015 at 10:16 AM, Chetan Narsude (cnarsude) <
cnarsude@cisco.com> wrote:

>
>
> On 11/23/15, 8:44 AM, "Chandni Singh" <ch...@datatorrent.com> wrote:
>
> >I think in the API, there is windowId in the checkpointed callback for
> >cases when checkpointing could happen within application windows.
> >
> >IMHO backward compatibility is broken. For 3 years since the platform was
> >created the semantics of checkpointed were
> >beginWindow(x) -> endwindow -> checkpointWindow(x) when checkpointing was
> >aligned with application window boundaries.
> >
>
> :-) I did not write it that semantics. Looks like someone made that
> assumption along the way. Fix the code which assumes that.
>
> ‹
> Chetan
>
> >Please not that aligning Checkpointing with Application Window boundary is
> >the DEFAUL behavior and I think most of us agree that aligning the
> >checkpointing with application window boundaries is what we see in every
> >use case.
> >
> >Code was written with that assumption by committers of Apex (not even by
> >people who are new to Apex). It is broken by by a change introduced couple
> >of months back (August 2015).
> >
> >Moreover, what do we achieve by NOT guaranteeing that semantics- delegate
> >the complexity to operators to handle it. Even a simple scenario that I
> >mentioned in my first mail, is very complicated.
> >
> >I think this is a big issue and we need to fix it.
> >
> >Chandni
> >
> >On Mon, Nov 23, 2015 at 7:12 AM, Munagala Ramanath <ra...@datatorrent.com>
> >wrote:
> >
> >> I too find Gaurav's argument cogent: endWindow() does not take a
> >> windowId parameter
> >> leading to a natural guarantee that it matches the immediately
> >> preceding beginWindow().
> >>
> >> Both committed() and checkpointed() take that parameter which suggests
> >>it
> >> may
> >> lag behind the current window. The comments on those methods say nothing
> >> that can be be interpreted as a guarantee that the windowId will match
> >>the
> >> window just processed. So, from the viewpoint of "original intent" --
> >> a phrase that
> >> has a long and storied history in jurisprudence
> >> (https://en.wikipedia.org/wiki/Original_intent) --
> >> it seems that any code that assumed a guarantee when none existed must
> >> be regarded
> >> as erroneous.
> >>
> >> Having said that, we are all aware that in software it is not at all
> >> unusual to preserve behavior
> >> in the interests of backward compatibility, regardless of many other
> >> reasons for that behavior
> >> to be considered objectionable. So, if the cost of fixing all the code
> >> that makes that
> >> assumption is too high, we should seriously consider reverting it. In
> >> this context, the
> >> guarantee that checkpointed() simply means that the operator state has
> >> been serialized
> >> seems adequate.
> >>
> >> We have a roughly analogous situation with respect to OS write() calls:
> >> When the call returns, we _do not_ have an assurance that the data has
> >> gone to disk.
> >> All we know is that the OS has copied the data to its buffers for
> >> flushing to disk. If we
> >> want to know when the actual write to disk is done, we need to use a
> >> different call -- fsync().
> >>
> >> Ram
> >>
> >> On Mon, Nov 23, 2015 at 6:19 AM, Pramod Immaneni
> >><pr...@datatorrent.com>
> >> wrote:
> >> > I think the original intent for checkpointed callback was that it can
> >>be
> >> > called anytime after checkpoint and not necessarily immediately after
> >>the
> >> > window prior of checkpoint. As Gaurav mentioned the API bears it out.
> >> > Furthermore the callback has to be called within the lifecycle
> >>methods of
> >> > the operator so it will be called inside a window so you anyway have
> >>to
> >> > deal with new data anyway before the callback is called. Even though
> >> > checkpointed is not committed shouldn't it at least guarantee that the
> >> > operator is recoverable to that state in which case it should be
> >>called
> >> > after the save to HDFS actually finishes.
> >> >
> >> > Thanks
> >> >
> >> > On Mon, Nov 23, 2015 at 5:38 AM, Gaurav Gupta <gaurav@datatorrent.com
> >
> >> > wrote:
> >> >
> >> >> IMO, I don¹t think there is any backward incompatibility wrt
> >> Checkpointing
> >> >> call back semantics because
> >> >>
> >> >> 1. The checkpointed call is only made once the operator state is
> >> preserved.
> >> >> 2. The window ids being passed to checkpointed are in increasing
> >>order.
> >> >> 3. The window ids being passed are still the same ids as were passed
> >> >> earlier.
> >> >> 4. The sequence is still begingWindow()->endWindow()->checkpointed().
> >> >>
> >> >> Thanks
> >> >> - Gaurav
> >> >>
> >> >> > On Nov 23, 2015, at 1:03 AM, Gaurav Gupta <ga...@datatorrent.com>
> >> >> wrote:
> >> >> >
> >> >> > If the requirement is that the order is always
> >> >> begingWindow()->endWindow()->checkpointed(), why to pass windowId in
> >>the
> >> >> checkpointed() call back?
> >> >> >
> >> >> > Thanks
> >> >> > - Gaurav
> >> >> >
> >> >> >> On Nov 22, 2015, at 11:22 PM, Chandni Singh
> >><chandni@datatorrent.com
> >> >> <ma...@datatorrent.com>> wrote:
> >> >> >>
> >> >> >> FYI,
> >> >> >>
> >> >> >> HDHTWriter implementation is dependent on the older semantics and
> >> seems
> >> >> to
> >> >> >> be broken now.
> >> >> >> startWindow(x) -> endWindow(x) -> checkpointed(x)
> >> >> >> In the checkpointed implementation, it copies certain state
> >> (transient)
> >> >> and
> >> >> >> transfers it to a checkpointedWriteCache with respect to window
> >>'x'.
> >> >> >>
> >> >> >> With Async checkpointing it, the state that is transferred is much
> >> more
> >> >> >> recent than window 'x'.
> >> >> >>
> >> >> >> Chandni
> >> >> >>
> >> >> >>
> >> >> >> On Sun, Nov 22, 2015 at 11:04 PM, Chandni Singh <
> >> >> chandni@datatorrent.com <ma...@datatorrent.com>>
> >> >> >> wrote:
> >> >> >>
> >> >> >>> Agreed. Thomas's solution fixes the backward incompatibility. I
> >> think
> >> >> we
> >> >> >>> really need to fix this.
> >> >> >>>
> >> >> >>> On Sun, Nov 22, 2015 at 10:23 PM, Timothy Farkas <
> >> tim@datatorrent.com
> >> >> <ma...@datatorrent.com>>
> >> >> >>> wrote:
> >> >> >>>
> >> >> >>>> Gaurav,
> >> >> >>>>
> >> >> >>>> I think if the state copy fails then STRAM should roll back the
> >> >> operator
> >> >> >>>> to
> >> >> >>>> a checkpoint that is further back than the last checkpoint. If
> >>you
> >> are
> >> >> >>>> saying that you want to preserve the semantic that checkpointed
> >>is
> >> >> only
> >> >> >>>> called after a checkpoint is completed, I would argue that that
> >> >> guarantee
> >> >> >>>> is already pointless in the current implementation since it is
> >> always
> >> >> >>>> possible for an operator to be rolled back to a checkpoint
> >>before
> >> it's
> >> >> >>>> last
> >> >> >>>> completed checkpoint. So, it is already currently possible for
> >>some
> >> >> >>>> database or file operation performed after a completed
> >>checkpoint
> >> to
> >> >> be
> >> >> >>>> redone after a failure. Because of this I think Thomas's
> >>solution
> >> >> makes
> >> >> >>>> the
> >> >> >>>> most sense. Thomas's solution would also address Chandni's
> >>original
> >> >> point
> >> >> >>>> that the semantics for the checkpointed call back have been
> >> violated.
> >> >> >>>> There
> >> >> >>>> are operators in our libraries that have depended on the
> >> >> beginWindow(x),
> >> >> >>>> endWindow(x), and checkpointed(x) call sequence, which is now
> >> broken.
> >> >> We
> >> >> >>>> should probably fix that.
> >> >> >>>>
> >> >> >>>> Tim
> >> >> >>>>
> >> >> >>>> On Sun, Nov 22, 2015 at 10:02 PM, Gaurav Gupta <
> >> >> gaurav@datatorrent.com <ma...@datatorrent.com>>
> >> >> >>>> wrote:
> >> >> >>>>
> >> >> >>>>> Thomas,
> >> >> >>>>>
> >> >> >>>>> This was done to preserve checkpointing semantics that is to
> >>tell
> >> the
> >> >> >>>>> operator that its state is preserved. Say if database is
> >>updated
> >> or
> >> >> >>>> files
> >> >> >>>>> are moved in checkpointed call but the state copy fails, how to
> >> >> address
> >> >> >>>>> such scenarios?
> >> >> >>>>>
> >> >> >>>>> Thanks
> >> >> >>>>> - Gaurav
> >> >> >>>>>
> >> >> >>>>>> On Nov 22, 2015, at 9:44 PM, Thomas Weise <
> >> thomas@datatorrent.com
> >> >> <ma...@datatorrent.com>>
> >> >> >>>>> wrote:
> >> >> >>>>>>
> >> >> >>>>>> Alternatively I would ask why the checkpointed callback needs
> >>to
> >> >> wait
> >> >> >>>>> until
> >> >> >>>>>> the data was copied to HDFS instead upon completion of the
> >>state
> >> >> >>>>>> serialization.
> >> >> >>>>>>
> >> >> >>>>>> Thomas
> >> >> >>>>>>
> >> >> >>>>>>
> >> >> >>>>>> On Sun, Nov 22, 2015 at 9:41 PM, Chandni Singh <
> >> >> >>>> chandni@datatorrent.com <ma...@datatorrent.com>>
> >> >> >>>>>> wrote:
> >> >> >>>>>>
> >> >> >>>>>>> Gaurav,
> >> >> >>>>>>>
> >> >> >>>>>>> My question is about why Async was made the default when it
> >> changed
> >> >> >>>> the
> >> >> >>>>>>> semantics of operator callbacks. Your response doesn't answer
> >> that.
> >> >> >>>>>>>
> >> >> >>>>>>> In a way we broke backward compatibility.
> >> >> >>>>>>>
> >> >> >>>>>>> Chandni
> >> >> >>>>>>>
> >> >> >>>>>>> On Sun, Nov 22, 2015 at 9:22 PM, Gaurav Gupta <
> >> >> >>>> gaurav@datatorrent.com <ma...@datatorrent.com>>
> >> >> >>>>>>> wrote:
> >> >> >>>>>>>
> >> >> >>>>>>>> The idea behind Async checkpointing is to unblock operator
> >> while
> >> >> the
> >> >> >>>>>>> state
> >> >> >>>>>>>> is getting transferred to HDFS.
> >> >> >>>>>>>> Just to clarify that this beginWindow (x) -> endWindow(x) ->
> >> >> >>>>> checkpointed
> >> >> >>>>>>>> (x-1 ) should be an ideal sequence, but if the HDFS is slow
> >>or
> >> for
> >> >> >>>> some
> >> >> >>>>>>>> other reason transferring the state to HDFS is slow this
> >> sequence
> >> >> >>>> may
> >> >> >>>>> not
> >> >> >>>>>>>> hold true.
> >> >> >>>>>>>>
> >> >> >>>>>>>> Can your use case be addressed by
> >> >> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78 <
> >> >> https://malhar.atlassian.net/browse/APEX-78> <
> >> >> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78 <
> >> >> https://malhar.atlassian.net/browse/APEX-78>>?
> >> >> >>>>>>>>
> >> >> >>>>>>>> Thanks
> >> >> >>>>>>>> - Gaurav
> >> >> >>>>>>>>
> >> >> >>>>>>>>> On Nov 22, 2015, at 3:56 PM, Chandni Singh <
> >> >> >>>> chandni@datatorrent.com <ma...@datatorrent.com>>
> >> >> >>>>>>>> wrote:
> >> >> >>>>>>>>>
> >> >> >>>>>>>>> With Async checkpointing the checkpoint callback in
> >> >> CheckpointPoint
> >> >> >>>>>>>>> listener is called for a previous window, that is,
> >> >> >>>>>>>>> beginWindow (x) -> endWindow(x) -> checkpointed (x-1 )
> >> >> >>>>>>>>>
> >> >> >>>>>>>>> This feature was newly introduced. With synchronous
> >> >> checkpointing,
> >> >> >>>> the
> >> >> >>>>>>>>> behavior was always
> >> >> >>>>>>>>> beginWindow(x) -> endWindow(x) -> checkpointed (x)
> >> >> >>>>>>>>>
> >> >> >>>>>>>>> A lot of operators were written before asynchronous
> >> checkpointing
> >> >> >>>> was
> >> >> >>>>>>>>> introduced and few of them can rely on the sequencing
> >> guaranteed
> >> >> by
> >> >> >>>>>>>>> synchronous checkpointing.
> >> >> >>>>>>>>>
> >> >> >>>>>>>>> So why was Async Checkpointed made default?
> >> >> >>>>>>>>>
> >> >> >>>>>>>>> With how Async checkpoint is today, the complexity to
> >>handle
> >> >> >>>> transient
> >> >> >>>>>>>>> state in checkpointed callback falls on every operator. For
> >> eg,
> >> >> >>>> lets
> >> >> >>>>>>> say
> >> >> >>>>>>>>> earlier I had a transient map which I cleared every time
> >>the
> >> >> >>>>>>> checkpointed
> >> >> >>>>>>>>> was called, with async checkpointing this simple task will
> >>be
> >> a
> >> >> lot
> >> >> >>>>>>> more
> >> >> >>>>>>>>> complicated.
> >> >> >>>>>>>>>
> >> >> >>>>>>>>> I think Async checkpointing broke the semantics of operator
> >> >> >>>> callbacks
> >> >> >>>>>>> and
> >> >> >>>>>>>>> should NOT be the default.
> >> >> >>>>>>>>
> >> >> >>>>>>>>
> >> >> >>>>>>>
> >> >> >>>>>
> >> >> >>>>>
> >> >> >>>>
> >> >> >>>
> >> >> >>>
> >> >> >
> >> >>
> >> >>
> >>
>
>

Re: Why is Async checkpointing made default?

Posted by "Chetan Narsude (cnarsude)" <cn...@cisco.com>.

On 11/23/15, 8:44 AM, "Chandni Singh" <ch...@datatorrent.com> wrote:

>I think in the API, there is windowId in the checkpointed callback for
>cases when checkpointing could happen within application windows.
>
>IMHO backward compatibility is broken. For 3 years since the platform was
>created the semantics of checkpointed were
>beginWindow(x) -> endwindow -> checkpointWindow(x) when checkpointing was
>aligned with application window boundaries.
>

:-) I did not write it that semantics. Looks like someone made that
assumption along the way. Fix the code which assumes that.

‹
Chetan

>Please not that aligning Checkpointing with Application Window boundary is
>the DEFAUL behavior and I think most of us agree that aligning the
>checkpointing with application window boundaries is what we see in every
>use case.
>
>Code was written with that assumption by committers of Apex (not even by
>people who are new to Apex). It is broken by by a change introduced couple
>of months back (August 2015).
>
>Moreover, what do we achieve by NOT guaranteeing that semantics- delegate
>the complexity to operators to handle it. Even a simple scenario that I
>mentioned in my first mail, is very complicated.
>
>I think this is a big issue and we need to fix it.
>
>Chandni
>
>On Mon, Nov 23, 2015 at 7:12 AM, Munagala Ramanath <ra...@datatorrent.com>
>wrote:
>
>> I too find Gaurav's argument cogent: endWindow() does not take a
>> windowId parameter
>> leading to a natural guarantee that it matches the immediately
>> preceding beginWindow().
>>
>> Both committed() and checkpointed() take that parameter which suggests
>>it
>> may
>> lag behind the current window. The comments on those methods say nothing
>> that can be be interpreted as a guarantee that the windowId will match
>>the
>> window just processed. So, from the viewpoint of "original intent" --
>> a phrase that
>> has a long and storied history in jurisprudence
>> (https://en.wikipedia.org/wiki/Original_intent) --
>> it seems that any code that assumed a guarantee when none existed must
>> be regarded
>> as erroneous.
>>
>> Having said that, we are all aware that in software it is not at all
>> unusual to preserve behavior
>> in the interests of backward compatibility, regardless of many other
>> reasons for that behavior
>> to be considered objectionable. So, if the cost of fixing all the code
>> that makes that
>> assumption is too high, we should seriously consider reverting it. In
>> this context, the
>> guarantee that checkpointed() simply means that the operator state has
>> been serialized
>> seems adequate.
>>
>> We have a roughly analogous situation with respect to OS write() calls:
>> When the call returns, we _do not_ have an assurance that the data has
>> gone to disk.
>> All we know is that the OS has copied the data to its buffers for
>> flushing to disk. If we
>> want to know when the actual write to disk is done, we need to use a
>> different call -- fsync().
>>
>> Ram
>>
>> On Mon, Nov 23, 2015 at 6:19 AM, Pramod Immaneni
>><pr...@datatorrent.com>
>> wrote:
>> > I think the original intent for checkpointed callback was that it can
>>be
>> > called anytime after checkpoint and not necessarily immediately after
>>the
>> > window prior of checkpoint. As Gaurav mentioned the API bears it out.
>> > Furthermore the callback has to be called within the lifecycle
>>methods of
>> > the operator so it will be called inside a window so you anyway have
>>to
>> > deal with new data anyway before the callback is called. Even though
>> > checkpointed is not committed shouldn't it at least guarantee that the
>> > operator is recoverable to that state in which case it should be
>>called
>> > after the save to HDFS actually finishes.
>> >
>> > Thanks
>> >
>> > On Mon, Nov 23, 2015 at 5:38 AM, Gaurav Gupta <ga...@datatorrent.com>
>> > wrote:
>> >
>> >> IMO, I don¹t think there is any backward incompatibility wrt
>> Checkpointing
>> >> call back semantics because
>> >>
>> >> 1. The checkpointed call is only made once the operator state is
>> preserved.
>> >> 2. The window ids being passed to checkpointed are in increasing
>>order.
>> >> 3. The window ids being passed are still the same ids as were passed
>> >> earlier.
>> >> 4. The sequence is still begingWindow()->endWindow()->checkpointed().
>> >>
>> >> Thanks
>> >> - Gaurav
>> >>
>> >> > On Nov 23, 2015, at 1:03 AM, Gaurav Gupta <ga...@datatorrent.com>
>> >> wrote:
>> >> >
>> >> > If the requirement is that the order is always
>> >> begingWindow()->endWindow()->checkpointed(), why to pass windowId in
>>the
>> >> checkpointed() call back?
>> >> >
>> >> > Thanks
>> >> > - Gaurav
>> >> >
>> >> >> On Nov 22, 2015, at 11:22 PM, Chandni Singh
>><chandni@datatorrent.com
>> >> <ma...@datatorrent.com>> wrote:
>> >> >>
>> >> >> FYI,
>> >> >>
>> >> >> HDHTWriter implementation is dependent on the older semantics and
>> seems
>> >> to
>> >> >> be broken now.
>> >> >> startWindow(x) -> endWindow(x) -> checkpointed(x)
>> >> >> In the checkpointed implementation, it copies certain state
>> (transient)
>> >> and
>> >> >> transfers it to a checkpointedWriteCache with respect to window
>>'x'.
>> >> >>
>> >> >> With Async checkpointing it, the state that is transferred is much
>> more
>> >> >> recent than window 'x'.
>> >> >>
>> >> >> Chandni
>> >> >>
>> >> >>
>> >> >> On Sun, Nov 22, 2015 at 11:04 PM, Chandni Singh <
>> >> chandni@datatorrent.com <ma...@datatorrent.com>>
>> >> >> wrote:
>> >> >>
>> >> >>> Agreed. Thomas's solution fixes the backward incompatibility. I
>> think
>> >> we
>> >> >>> really need to fix this.
>> >> >>>
>> >> >>> On Sun, Nov 22, 2015 at 10:23 PM, Timothy Farkas <
>> tim@datatorrent.com
>> >> <ma...@datatorrent.com>>
>> >> >>> wrote:
>> >> >>>
>> >> >>>> Gaurav,
>> >> >>>>
>> >> >>>> I think if the state copy fails then STRAM should roll back the
>> >> operator
>> >> >>>> to
>> >> >>>> a checkpoint that is further back than the last checkpoint. If
>>you
>> are
>> >> >>>> saying that you want to preserve the semantic that checkpointed
>>is
>> >> only
>> >> >>>> called after a checkpoint is completed, I would argue that that
>> >> guarantee
>> >> >>>> is already pointless in the current implementation since it is
>> always
>> >> >>>> possible for an operator to be rolled back to a checkpoint
>>before
>> it's
>> >> >>>> last
>> >> >>>> completed checkpoint. So, it is already currently possible for
>>some
>> >> >>>> database or file operation performed after a completed
>>checkpoint
>> to
>> >> be
>> >> >>>> redone after a failure. Because of this I think Thomas's
>>solution
>> >> makes
>> >> >>>> the
>> >> >>>> most sense. Thomas's solution would also address Chandni's
>>original
>> >> point
>> >> >>>> that the semantics for the checkpointed call back have been
>> violated.
>> >> >>>> There
>> >> >>>> are operators in our libraries that have depended on the
>> >> beginWindow(x),
>> >> >>>> endWindow(x), and checkpointed(x) call sequence, which is now
>> broken.
>> >> We
>> >> >>>> should probably fix that.
>> >> >>>>
>> >> >>>> Tim
>> >> >>>>
>> >> >>>> On Sun, Nov 22, 2015 at 10:02 PM, Gaurav Gupta <
>> >> gaurav@datatorrent.com <ma...@datatorrent.com>>
>> >> >>>> wrote:
>> >> >>>>
>> >> >>>>> Thomas,
>> >> >>>>>
>> >> >>>>> This was done to preserve checkpointing semantics that is to
>>tell
>> the
>> >> >>>>> operator that its state is preserved. Say if database is
>>updated
>> or
>> >> >>>> files
>> >> >>>>> are moved in checkpointed call but the state copy fails, how to
>> >> address
>> >> >>>>> such scenarios?
>> >> >>>>>
>> >> >>>>> Thanks
>> >> >>>>> - Gaurav
>> >> >>>>>
>> >> >>>>>> On Nov 22, 2015, at 9:44 PM, Thomas Weise <
>> thomas@datatorrent.com
>> >> <ma...@datatorrent.com>>
>> >> >>>>> wrote:
>> >> >>>>>>
>> >> >>>>>> Alternatively I would ask why the checkpointed callback needs
>>to
>> >> wait
>> >> >>>>> until
>> >> >>>>>> the data was copied to HDFS instead upon completion of the
>>state
>> >> >>>>>> serialization.
>> >> >>>>>>
>> >> >>>>>> Thomas
>> >> >>>>>>
>> >> >>>>>>
>> >> >>>>>> On Sun, Nov 22, 2015 at 9:41 PM, Chandni Singh <
>> >> >>>> chandni@datatorrent.com <ma...@datatorrent.com>>
>> >> >>>>>> wrote:
>> >> >>>>>>
>> >> >>>>>>> Gaurav,
>> >> >>>>>>>
>> >> >>>>>>> My question is about why Async was made the default when it
>> changed
>> >> >>>> the
>> >> >>>>>>> semantics of operator callbacks. Your response doesn't answer
>> that.
>> >> >>>>>>>
>> >> >>>>>>> In a way we broke backward compatibility.
>> >> >>>>>>>
>> >> >>>>>>> Chandni
>> >> >>>>>>>
>> >> >>>>>>> On Sun, Nov 22, 2015 at 9:22 PM, Gaurav Gupta <
>> >> >>>> gaurav@datatorrent.com <ma...@datatorrent.com>>
>> >> >>>>>>> wrote:
>> >> >>>>>>>
>> >> >>>>>>>> The idea behind Async checkpointing is to unblock operator
>> while
>> >> the
>> >> >>>>>>> state
>> >> >>>>>>>> is getting transferred to HDFS.
>> >> >>>>>>>> Just to clarify that this beginWindow (x) -> endWindow(x) ->
>> >> >>>>> checkpointed
>> >> >>>>>>>> (x-1 ) should be an ideal sequence, but if the HDFS is slow
>>or
>> for
>> >> >>>> some
>> >> >>>>>>>> other reason transferring the state to HDFS is slow this
>> sequence
>> >> >>>> may
>> >> >>>>> not
>> >> >>>>>>>> hold true.
>> >> >>>>>>>>
>> >> >>>>>>>> Can your use case be addressed by
>> >> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78 <
>> >> https://malhar.atlassian.net/browse/APEX-78> <
>> >> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78 <
>> >> https://malhar.atlassian.net/browse/APEX-78>>?
>> >> >>>>>>>>
>> >> >>>>>>>> Thanks
>> >> >>>>>>>> - Gaurav
>> >> >>>>>>>>
>> >> >>>>>>>>> On Nov 22, 2015, at 3:56 PM, Chandni Singh <
>> >> >>>> chandni@datatorrent.com <ma...@datatorrent.com>>
>> >> >>>>>>>> wrote:
>> >> >>>>>>>>>
>> >> >>>>>>>>> With Async checkpointing the checkpoint callback in
>> >> CheckpointPoint
>> >> >>>>>>>>> listener is called for a previous window, that is,
>> >> >>>>>>>>> beginWindow (x) -> endWindow(x) -> checkpointed (x-1 )
>> >> >>>>>>>>>
>> >> >>>>>>>>> This feature was newly introduced. With synchronous
>> >> checkpointing,
>> >> >>>> the
>> >> >>>>>>>>> behavior was always
>> >> >>>>>>>>> beginWindow(x) -> endWindow(x) -> checkpointed (x)
>> >> >>>>>>>>>
>> >> >>>>>>>>> A lot of operators were written before asynchronous
>> checkpointing
>> >> >>>> was
>> >> >>>>>>>>> introduced and few of them can rely on the sequencing
>> guaranteed
>> >> by
>> >> >>>>>>>>> synchronous checkpointing.
>> >> >>>>>>>>>
>> >> >>>>>>>>> So why was Async Checkpointed made default?
>> >> >>>>>>>>>
>> >> >>>>>>>>> With how Async checkpoint is today, the complexity to
>>handle
>> >> >>>> transient
>> >> >>>>>>>>> state in checkpointed callback falls on every operator. For
>> eg,
>> >> >>>> lets
>> >> >>>>>>> say
>> >> >>>>>>>>> earlier I had a transient map which I cleared every time
>>the
>> >> >>>>>>> checkpointed
>> >> >>>>>>>>> was called, with async checkpointing this simple task will
>>be
>> a
>> >> lot
>> >> >>>>>>> more
>> >> >>>>>>>>> complicated.
>> >> >>>>>>>>>
>> >> >>>>>>>>> I think Async checkpointing broke the semantics of operator
>> >> >>>> callbacks
>> >> >>>>>>> and
>> >> >>>>>>>>> should NOT be the default.
>> >> >>>>>>>>
>> >> >>>>>>>>
>> >> >>>>>>>
>> >> >>>>>
>> >> >>>>>
>> >> >>>>
>> >> >>>
>> >> >>>
>> >> >
>> >>
>> >>
>>


Re: Why is Async checkpointing made default?

Posted by Siyuan Hua <si...@datatorrent.com>.
I do agree with Chandni, Thomas and Tim,

Simply because Async may break some existing code, but Sync way won't break
anything whatever your assumption/interpretation of API is.

I think we need to document this, if your checkpoint logic is independent
of the window id or you have special logic to handle async checkpoint, you
can set checkpoint to Async
and as a unified API, the window id could be used for Async checkpoint

Siyuan

On Mon, Nov 23, 2015 at 8:44 AM, Chandni Singh <ch...@datatorrent.com>
wrote:

> I think in the API, there is windowId in the checkpointed callback for
> cases when checkpointing could happen within application windows.
>
> IMHO backward compatibility is broken. For 3 years since the platform was
> created the semantics of checkpointed were
> beginWindow(x) -> endwindow -> checkpointWindow(x) when checkpointing was
> aligned with application window boundaries.
>
> Please not that aligning Checkpointing with Application Window boundary is
> the DEFAUL behavior and I think most of us agree that aligning the
> checkpointing with application window boundaries is what we see in every
> use case.
>
> Code was written with that assumption by committers of Apex (not even by
> people who are new to Apex). It is broken by by a change introduced couple
> of months back (August 2015).
>
> Moreover, what do we achieve by NOT guaranteeing that semantics- delegate
> the complexity to operators to handle it. Even a simple scenario that I
> mentioned in my first mail, is very complicated.
>
> I think this is a big issue and we need to fix it.
>
> Chandni
>
> On Mon, Nov 23, 2015 at 7:12 AM, Munagala Ramanath <ra...@datatorrent.com>
> wrote:
>
> > I too find Gaurav's argument cogent: endWindow() does not take a
> > windowId parameter
> > leading to a natural guarantee that it matches the immediately
> > preceding beginWindow().
> >
> > Both committed() and checkpointed() take that parameter which suggests it
> > may
> > lag behind the current window. The comments on those methods say nothing
> > that can be be interpreted as a guarantee that the windowId will match
> the
> > window just processed. So, from the viewpoint of "original intent" --
> > a phrase that
> > has a long and storied history in jurisprudence
> > (https://en.wikipedia.org/wiki/Original_intent) --
> > it seems that any code that assumed a guarantee when none existed must
> > be regarded
> > as erroneous.
> >
> > Having said that, we are all aware that in software it is not at all
> > unusual to preserve behavior
> > in the interests of backward compatibility, regardless of many other
> > reasons for that behavior
> > to be considered objectionable. So, if the cost of fixing all the code
> > that makes that
> > assumption is too high, we should seriously consider reverting it. In
> > this context, the
> > guarantee that checkpointed() simply means that the operator state has
> > been serialized
> > seems adequate.
> >
> > We have a roughly analogous situation with respect to OS write() calls:
> > When the call returns, we _do not_ have an assurance that the data has
> > gone to disk.
> > All we know is that the OS has copied the data to its buffers for
> > flushing to disk. If we
> > want to know when the actual write to disk is done, we need to use a
> > different call -- fsync().
> >
> > Ram
> >
> > On Mon, Nov 23, 2015 at 6:19 AM, Pramod Immaneni <pramod@datatorrent.com
> >
> > wrote:
> > > I think the original intent for checkpointed callback was that it can
> be
> > > called anytime after checkpoint and not necessarily immediately after
> the
> > > window prior of checkpoint. As Gaurav mentioned the API bears it out.
> > > Furthermore the callback has to be called within the lifecycle methods
> of
> > > the operator so it will be called inside a window so you anyway have to
> > > deal with new data anyway before the callback is called. Even though
> > > checkpointed is not committed shouldn't it at least guarantee that the
> > > operator is recoverable to that state in which case it should be called
> > > after the save to HDFS actually finishes.
> > >
> > > Thanks
> > >
> > > On Mon, Nov 23, 2015 at 5:38 AM, Gaurav Gupta <ga...@datatorrent.com>
> > > wrote:
> > >
> > >> IMO, I don’t think there is any backward incompatibility wrt
> > Checkpointing
> > >> call back semantics because
> > >>
> > >> 1. The checkpointed call is only made once the operator state is
> > preserved.
> > >> 2. The window ids being passed to checkpointed are in increasing
> order.
> > >> 3. The window ids being passed are still the same ids as were passed
> > >> earlier.
> > >> 4. The sequence is still begingWindow()->endWindow()->checkpointed().
> > >>
> > >> Thanks
> > >> - Gaurav
> > >>
> > >> > On Nov 23, 2015, at 1:03 AM, Gaurav Gupta <ga...@datatorrent.com>
> > >> wrote:
> > >> >
> > >> > If the requirement is that the order is always
> > >> begingWindow()->endWindow()->checkpointed(), why to pass windowId in
> the
> > >> checkpointed() call back?
> > >> >
> > >> > Thanks
> > >> > - Gaurav
> > >> >
> > >> >> On Nov 22, 2015, at 11:22 PM, Chandni Singh <
> chandni@datatorrent.com
> > >> <ma...@datatorrent.com>> wrote:
> > >> >>
> > >> >> FYI,
> > >> >>
> > >> >> HDHTWriter implementation is dependent on the older semantics and
> > seems
> > >> to
> > >> >> be broken now.
> > >> >> startWindow(x) -> endWindow(x) -> checkpointed(x)
> > >> >> In the checkpointed implementation, it copies certain state
> > (transient)
> > >> and
> > >> >> transfers it to a checkpointedWriteCache with respect to window
> 'x'.
> > >> >>
> > >> >> With Async checkpointing it, the state that is transferred is much
> > more
> > >> >> recent than window 'x'.
> > >> >>
> > >> >> Chandni
> > >> >>
> > >> >>
> > >> >> On Sun, Nov 22, 2015 at 11:04 PM, Chandni Singh <
> > >> chandni@datatorrent.com <ma...@datatorrent.com>>
> > >> >> wrote:
> > >> >>
> > >> >>> Agreed. Thomas's solution fixes the backward incompatibility. I
> > think
> > >> we
> > >> >>> really need to fix this.
> > >> >>>
> > >> >>> On Sun, Nov 22, 2015 at 10:23 PM, Timothy Farkas <
> > tim@datatorrent.com
> > >> <ma...@datatorrent.com>>
> > >> >>> wrote:
> > >> >>>
> > >> >>>> Gaurav,
> > >> >>>>
> > >> >>>> I think if the state copy fails then STRAM should roll back the
> > >> operator
> > >> >>>> to
> > >> >>>> a checkpoint that is further back than the last checkpoint. If
> you
> > are
> > >> >>>> saying that you want to preserve the semantic that checkpointed
> is
> > >> only
> > >> >>>> called after a checkpoint is completed, I would argue that that
> > >> guarantee
> > >> >>>> is already pointless in the current implementation since it is
> > always
> > >> >>>> possible for an operator to be rolled back to a checkpoint before
> > it's
> > >> >>>> last
> > >> >>>> completed checkpoint. So, it is already currently possible for
> some
> > >> >>>> database or file operation performed after a completed checkpoint
> > to
> > >> be
> > >> >>>> redone after a failure. Because of this I think Thomas's solution
> > >> makes
> > >> >>>> the
> > >> >>>> most sense. Thomas's solution would also address Chandni's
> original
> > >> point
> > >> >>>> that the semantics for the checkpointed call back have been
> > violated.
> > >> >>>> There
> > >> >>>> are operators in our libraries that have depended on the
> > >> beginWindow(x),
> > >> >>>> endWindow(x), and checkpointed(x) call sequence, which is now
> > broken.
> > >> We
> > >> >>>> should probably fix that.
> > >> >>>>
> > >> >>>> Tim
> > >> >>>>
> > >> >>>> On Sun, Nov 22, 2015 at 10:02 PM, Gaurav Gupta <
> > >> gaurav@datatorrent.com <ma...@datatorrent.com>>
> > >> >>>> wrote:
> > >> >>>>
> > >> >>>>> Thomas,
> > >> >>>>>
> > >> >>>>> This was done to preserve checkpointing semantics that is to
> tell
> > the
> > >> >>>>> operator that its state is preserved. Say if database is updated
> > or
> > >> >>>> files
> > >> >>>>> are moved in checkpointed call but the state copy fails, how to
> > >> address
> > >> >>>>> such scenarios?
> > >> >>>>>
> > >> >>>>> Thanks
> > >> >>>>> - Gaurav
> > >> >>>>>
> > >> >>>>>> On Nov 22, 2015, at 9:44 PM, Thomas Weise <
> > thomas@datatorrent.com
> > >> <ma...@datatorrent.com>>
> > >> >>>>> wrote:
> > >> >>>>>>
> > >> >>>>>> Alternatively I would ask why the checkpointed callback needs
> to
> > >> wait
> > >> >>>>> until
> > >> >>>>>> the data was copied to HDFS instead upon completion of the
> state
> > >> >>>>>> serialization.
> > >> >>>>>>
> > >> >>>>>> Thomas
> > >> >>>>>>
> > >> >>>>>>
> > >> >>>>>> On Sun, Nov 22, 2015 at 9:41 PM, Chandni Singh <
> > >> >>>> chandni@datatorrent.com <ma...@datatorrent.com>>
> > >> >>>>>> wrote:
> > >> >>>>>>
> > >> >>>>>>> Gaurav,
> > >> >>>>>>>
> > >> >>>>>>> My question is about why Async was made the default when it
> > changed
> > >> >>>> the
> > >> >>>>>>> semantics of operator callbacks. Your response doesn't answer
> > that.
> > >> >>>>>>>
> > >> >>>>>>> In a way we broke backward compatibility.
> > >> >>>>>>>
> > >> >>>>>>> Chandni
> > >> >>>>>>>
> > >> >>>>>>> On Sun, Nov 22, 2015 at 9:22 PM, Gaurav Gupta <
> > >> >>>> gaurav@datatorrent.com <ma...@datatorrent.com>>
> > >> >>>>>>> wrote:
> > >> >>>>>>>
> > >> >>>>>>>> The idea behind Async checkpointing is to unblock operator
> > while
> > >> the
> > >> >>>>>>> state
> > >> >>>>>>>> is getting transferred to HDFS.
> > >> >>>>>>>> Just to clarify that this beginWindow (x) -> endWindow(x) ->
> > >> >>>>> checkpointed
> > >> >>>>>>>> (x-1 ) should be an ideal sequence, but if the HDFS is slow
> or
> > for
> > >> >>>> some
> > >> >>>>>>>> other reason transferring the state to HDFS is slow this
> > sequence
> > >> >>>> may
> > >> >>>>> not
> > >> >>>>>>>> hold true.
> > >> >>>>>>>>
> > >> >>>>>>>> Can your use case be addressed by
> > >> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78 <
> > >> https://malhar.atlassian.net/browse/APEX-78> <
> > >> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78 <
> > >> https://malhar.atlassian.net/browse/APEX-78>>?
> > >> >>>>>>>>
> > >> >>>>>>>> Thanks
> > >> >>>>>>>> - Gaurav
> > >> >>>>>>>>
> > >> >>>>>>>>> On Nov 22, 2015, at 3:56 PM, Chandni Singh <
> > >> >>>> chandni@datatorrent.com <ma...@datatorrent.com>>
> > >> >>>>>>>> wrote:
> > >> >>>>>>>>>
> > >> >>>>>>>>> With Async checkpointing the checkpoint callback in
> > >> CheckpointPoint
> > >> >>>>>>>>> listener is called for a previous window, that is,
> > >> >>>>>>>>> beginWindow (x) -> endWindow(x) -> checkpointed (x-1 )
> > >> >>>>>>>>>
> > >> >>>>>>>>> This feature was newly introduced. With synchronous
> > >> checkpointing,
> > >> >>>> the
> > >> >>>>>>>>> behavior was always
> > >> >>>>>>>>> beginWindow(x) -> endWindow(x) -> checkpointed (x)
> > >> >>>>>>>>>
> > >> >>>>>>>>> A lot of operators were written before asynchronous
> > checkpointing
> > >> >>>> was
> > >> >>>>>>>>> introduced and few of them can rely on the sequencing
> > guaranteed
> > >> by
> > >> >>>>>>>>> synchronous checkpointing.
> > >> >>>>>>>>>
> > >> >>>>>>>>> So why was Async Checkpointed made default?
> > >> >>>>>>>>>
> > >> >>>>>>>>> With how Async checkpoint is today, the complexity to handle
> > >> >>>> transient
> > >> >>>>>>>>> state in checkpointed callback falls on every operator. For
> > eg,
> > >> >>>> lets
> > >> >>>>>>> say
> > >> >>>>>>>>> earlier I had a transient map which I cleared every time the
> > >> >>>>>>> checkpointed
> > >> >>>>>>>>> was called, with async checkpointing this simple task will
> be
> > a
> > >> lot
> > >> >>>>>>> more
> > >> >>>>>>>>> complicated.
> > >> >>>>>>>>>
> > >> >>>>>>>>> I think Async checkpointing broke the semantics of operator
> > >> >>>> callbacks
> > >> >>>>>>> and
> > >> >>>>>>>>> should NOT be the default.
> > >> >>>>>>>>
> > >> >>>>>>>>
> > >> >>>>>>>
> > >> >>>>>
> > >> >>>>>
> > >> >>>>
> > >> >>>
> > >> >>>
> > >> >
> > >>
> > >>
> >
>

Re: Why is Async checkpointing made default?

Posted by Chandni Singh <ch...@datatorrent.com>.
I think in the API, there is windowId in the checkpointed callback for
cases when checkpointing could happen within application windows.

IMHO backward compatibility is broken. For 3 years since the platform was
created the semantics of checkpointed were
beginWindow(x) -> endwindow -> checkpointWindow(x) when checkpointing was
aligned with application window boundaries.

Please not that aligning Checkpointing with Application Window boundary is
the DEFAUL behavior and I think most of us agree that aligning the
checkpointing with application window boundaries is what we see in every
use case.

Code was written with that assumption by committers of Apex (not even by
people who are new to Apex). It is broken by by a change introduced couple
of months back (August 2015).

Moreover, what do we achieve by NOT guaranteeing that semantics- delegate
the complexity to operators to handle it. Even a simple scenario that I
mentioned in my first mail, is very complicated.

I think this is a big issue and we need to fix it.

Chandni

On Mon, Nov 23, 2015 at 7:12 AM, Munagala Ramanath <ra...@datatorrent.com>
wrote:

> I too find Gaurav's argument cogent: endWindow() does not take a
> windowId parameter
> leading to a natural guarantee that it matches the immediately
> preceding beginWindow().
>
> Both committed() and checkpointed() take that parameter which suggests it
> may
> lag behind the current window. The comments on those methods say nothing
> that can be be interpreted as a guarantee that the windowId will match the
> window just processed. So, from the viewpoint of "original intent" --
> a phrase that
> has a long and storied history in jurisprudence
> (https://en.wikipedia.org/wiki/Original_intent) --
> it seems that any code that assumed a guarantee when none existed must
> be regarded
> as erroneous.
>
> Having said that, we are all aware that in software it is not at all
> unusual to preserve behavior
> in the interests of backward compatibility, regardless of many other
> reasons for that behavior
> to be considered objectionable. So, if the cost of fixing all the code
> that makes that
> assumption is too high, we should seriously consider reverting it. In
> this context, the
> guarantee that checkpointed() simply means that the operator state has
> been serialized
> seems adequate.
>
> We have a roughly analogous situation with respect to OS write() calls:
> When the call returns, we _do not_ have an assurance that the data has
> gone to disk.
> All we know is that the OS has copied the data to its buffers for
> flushing to disk. If we
> want to know when the actual write to disk is done, we need to use a
> different call -- fsync().
>
> Ram
>
> On Mon, Nov 23, 2015 at 6:19 AM, Pramod Immaneni <pr...@datatorrent.com>
> wrote:
> > I think the original intent for checkpointed callback was that it can be
> > called anytime after checkpoint and not necessarily immediately after the
> > window prior of checkpoint. As Gaurav mentioned the API bears it out.
> > Furthermore the callback has to be called within the lifecycle methods of
> > the operator so it will be called inside a window so you anyway have to
> > deal with new data anyway before the callback is called. Even though
> > checkpointed is not committed shouldn't it at least guarantee that the
> > operator is recoverable to that state in which case it should be called
> > after the save to HDFS actually finishes.
> >
> > Thanks
> >
> > On Mon, Nov 23, 2015 at 5:38 AM, Gaurav Gupta <ga...@datatorrent.com>
> > wrote:
> >
> >> IMO, I don’t think there is any backward incompatibility wrt
> Checkpointing
> >> call back semantics because
> >>
> >> 1. The checkpointed call is only made once the operator state is
> preserved.
> >> 2. The window ids being passed to checkpointed are in increasing order.
> >> 3. The window ids being passed are still the same ids as were passed
> >> earlier.
> >> 4. The sequence is still begingWindow()->endWindow()->checkpointed().
> >>
> >> Thanks
> >> - Gaurav
> >>
> >> > On Nov 23, 2015, at 1:03 AM, Gaurav Gupta <ga...@datatorrent.com>
> >> wrote:
> >> >
> >> > If the requirement is that the order is always
> >> begingWindow()->endWindow()->checkpointed(), why to pass windowId in the
> >> checkpointed() call back?
> >> >
> >> > Thanks
> >> > - Gaurav
> >> >
> >> >> On Nov 22, 2015, at 11:22 PM, Chandni Singh <chandni@datatorrent.com
> >> <ma...@datatorrent.com>> wrote:
> >> >>
> >> >> FYI,
> >> >>
> >> >> HDHTWriter implementation is dependent on the older semantics and
> seems
> >> to
> >> >> be broken now.
> >> >> startWindow(x) -> endWindow(x) -> checkpointed(x)
> >> >> In the checkpointed implementation, it copies certain state
> (transient)
> >> and
> >> >> transfers it to a checkpointedWriteCache with respect to window 'x'.
> >> >>
> >> >> With Async checkpointing it, the state that is transferred is much
> more
> >> >> recent than window 'x'.
> >> >>
> >> >> Chandni
> >> >>
> >> >>
> >> >> On Sun, Nov 22, 2015 at 11:04 PM, Chandni Singh <
> >> chandni@datatorrent.com <ma...@datatorrent.com>>
> >> >> wrote:
> >> >>
> >> >>> Agreed. Thomas's solution fixes the backward incompatibility. I
> think
> >> we
> >> >>> really need to fix this.
> >> >>>
> >> >>> On Sun, Nov 22, 2015 at 10:23 PM, Timothy Farkas <
> tim@datatorrent.com
> >> <ma...@datatorrent.com>>
> >> >>> wrote:
> >> >>>
> >> >>>> Gaurav,
> >> >>>>
> >> >>>> I think if the state copy fails then STRAM should roll back the
> >> operator
> >> >>>> to
> >> >>>> a checkpoint that is further back than the last checkpoint. If you
> are
> >> >>>> saying that you want to preserve the semantic that checkpointed is
> >> only
> >> >>>> called after a checkpoint is completed, I would argue that that
> >> guarantee
> >> >>>> is already pointless in the current implementation since it is
> always
> >> >>>> possible for an operator to be rolled back to a checkpoint before
> it's
> >> >>>> last
> >> >>>> completed checkpoint. So, it is already currently possible for some
> >> >>>> database or file operation performed after a completed checkpoint
> to
> >> be
> >> >>>> redone after a failure. Because of this I think Thomas's solution
> >> makes
> >> >>>> the
> >> >>>> most sense. Thomas's solution would also address Chandni's original
> >> point
> >> >>>> that the semantics for the checkpointed call back have been
> violated.
> >> >>>> There
> >> >>>> are operators in our libraries that have depended on the
> >> beginWindow(x),
> >> >>>> endWindow(x), and checkpointed(x) call sequence, which is now
> broken.
> >> We
> >> >>>> should probably fix that.
> >> >>>>
> >> >>>> Tim
> >> >>>>
> >> >>>> On Sun, Nov 22, 2015 at 10:02 PM, Gaurav Gupta <
> >> gaurav@datatorrent.com <ma...@datatorrent.com>>
> >> >>>> wrote:
> >> >>>>
> >> >>>>> Thomas,
> >> >>>>>
> >> >>>>> This was done to preserve checkpointing semantics that is to tell
> the
> >> >>>>> operator that its state is preserved. Say if database is updated
> or
> >> >>>> files
> >> >>>>> are moved in checkpointed call but the state copy fails, how to
> >> address
> >> >>>>> such scenarios?
> >> >>>>>
> >> >>>>> Thanks
> >> >>>>> - Gaurav
> >> >>>>>
> >> >>>>>> On Nov 22, 2015, at 9:44 PM, Thomas Weise <
> thomas@datatorrent.com
> >> <ma...@datatorrent.com>>
> >> >>>>> wrote:
> >> >>>>>>
> >> >>>>>> Alternatively I would ask why the checkpointed callback needs to
> >> wait
> >> >>>>> until
> >> >>>>>> the data was copied to HDFS instead upon completion of the state
> >> >>>>>> serialization.
> >> >>>>>>
> >> >>>>>> Thomas
> >> >>>>>>
> >> >>>>>>
> >> >>>>>> On Sun, Nov 22, 2015 at 9:41 PM, Chandni Singh <
> >> >>>> chandni@datatorrent.com <ma...@datatorrent.com>>
> >> >>>>>> wrote:
> >> >>>>>>
> >> >>>>>>> Gaurav,
> >> >>>>>>>
> >> >>>>>>> My question is about why Async was made the default when it
> changed
> >> >>>> the
> >> >>>>>>> semantics of operator callbacks. Your response doesn't answer
> that.
> >> >>>>>>>
> >> >>>>>>> In a way we broke backward compatibility.
> >> >>>>>>>
> >> >>>>>>> Chandni
> >> >>>>>>>
> >> >>>>>>> On Sun, Nov 22, 2015 at 9:22 PM, Gaurav Gupta <
> >> >>>> gaurav@datatorrent.com <ma...@datatorrent.com>>
> >> >>>>>>> wrote:
> >> >>>>>>>
> >> >>>>>>>> The idea behind Async checkpointing is to unblock operator
> while
> >> the
> >> >>>>>>> state
> >> >>>>>>>> is getting transferred to HDFS.
> >> >>>>>>>> Just to clarify that this beginWindow (x) -> endWindow(x) ->
> >> >>>>> checkpointed
> >> >>>>>>>> (x-1 ) should be an ideal sequence, but if the HDFS is slow or
> for
> >> >>>> some
> >> >>>>>>>> other reason transferring the state to HDFS is slow this
> sequence
> >> >>>> may
> >> >>>>> not
> >> >>>>>>>> hold true.
> >> >>>>>>>>
> >> >>>>>>>> Can your use case be addressed by
> >> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78 <
> >> https://malhar.atlassian.net/browse/APEX-78> <
> >> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78 <
> >> https://malhar.atlassian.net/browse/APEX-78>>?
> >> >>>>>>>>
> >> >>>>>>>> Thanks
> >> >>>>>>>> - Gaurav
> >> >>>>>>>>
> >> >>>>>>>>> On Nov 22, 2015, at 3:56 PM, Chandni Singh <
> >> >>>> chandni@datatorrent.com <ma...@datatorrent.com>>
> >> >>>>>>>> wrote:
> >> >>>>>>>>>
> >> >>>>>>>>> With Async checkpointing the checkpoint callback in
> >> CheckpointPoint
> >> >>>>>>>>> listener is called for a previous window, that is,
> >> >>>>>>>>> beginWindow (x) -> endWindow(x) -> checkpointed (x-1 )
> >> >>>>>>>>>
> >> >>>>>>>>> This feature was newly introduced. With synchronous
> >> checkpointing,
> >> >>>> the
> >> >>>>>>>>> behavior was always
> >> >>>>>>>>> beginWindow(x) -> endWindow(x) -> checkpointed (x)
> >> >>>>>>>>>
> >> >>>>>>>>> A lot of operators were written before asynchronous
> checkpointing
> >> >>>> was
> >> >>>>>>>>> introduced and few of them can rely on the sequencing
> guaranteed
> >> by
> >> >>>>>>>>> synchronous checkpointing.
> >> >>>>>>>>>
> >> >>>>>>>>> So why was Async Checkpointed made default?
> >> >>>>>>>>>
> >> >>>>>>>>> With how Async checkpoint is today, the complexity to handle
> >> >>>> transient
> >> >>>>>>>>> state in checkpointed callback falls on every operator. For
> eg,
> >> >>>> lets
> >> >>>>>>> say
> >> >>>>>>>>> earlier I had a transient map which I cleared every time the
> >> >>>>>>> checkpointed
> >> >>>>>>>>> was called, with async checkpointing this simple task will be
> a
> >> lot
> >> >>>>>>> more
> >> >>>>>>>>> complicated.
> >> >>>>>>>>>
> >> >>>>>>>>> I think Async checkpointing broke the semantics of operator
> >> >>>> callbacks
> >> >>>>>>> and
> >> >>>>>>>>> should NOT be the default.
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>
> >> >>>>>
> >> >>>>>
> >> >>>>
> >> >>>
> >> >>>
> >> >
> >>
> >>
>

Re: Why is Async checkpointing made default?

Posted by Munagala Ramanath <ra...@datatorrent.com>.
I too find Gaurav's argument cogent: endWindow() does not take a
windowId parameter
leading to a natural guarantee that it matches the immediately
preceding beginWindow().

Both committed() and checkpointed() take that parameter which suggests it may
lag behind the current window. The comments on those methods say nothing
that can be be interpreted as a guarantee that the windowId will match the
window just processed. So, from the viewpoint of "original intent" --
a phrase that
has a long and storied history in jurisprudence
(https://en.wikipedia.org/wiki/Original_intent) --
it seems that any code that assumed a guarantee when none existed must
be regarded
as erroneous.

Having said that, we are all aware that in software it is not at all
unusual to preserve behavior
in the interests of backward compatibility, regardless of many other
reasons for that behavior
to be considered objectionable. So, if the cost of fixing all the code
that makes that
assumption is too high, we should seriously consider reverting it. In
this context, the
guarantee that checkpointed() simply means that the operator state has
been serialized
seems adequate.

We have a roughly analogous situation with respect to OS write() calls:
When the call returns, we _do not_ have an assurance that the data has
gone to disk.
All we know is that the OS has copied the data to its buffers for
flushing to disk. If we
want to know when the actual write to disk is done, we need to use a
different call -- fsync().

Ram

On Mon, Nov 23, 2015 at 6:19 AM, Pramod Immaneni <pr...@datatorrent.com> wrote:
> I think the original intent for checkpointed callback was that it can be
> called anytime after checkpoint and not necessarily immediately after the
> window prior of checkpoint. As Gaurav mentioned the API bears it out.
> Furthermore the callback has to be called within the lifecycle methods of
> the operator so it will be called inside a window so you anyway have to
> deal with new data anyway before the callback is called. Even though
> checkpointed is not committed shouldn't it at least guarantee that the
> operator is recoverable to that state in which case it should be called
> after the save to HDFS actually finishes.
>
> Thanks
>
> On Mon, Nov 23, 2015 at 5:38 AM, Gaurav Gupta <ga...@datatorrent.com>
> wrote:
>
>> IMO, I don’t think there is any backward incompatibility wrt Checkpointing
>> call back semantics because
>>
>> 1. The checkpointed call is only made once the operator state is preserved.
>> 2. The window ids being passed to checkpointed are in increasing order.
>> 3. The window ids being passed are still the same ids as were passed
>> earlier.
>> 4. The sequence is still begingWindow()->endWindow()->checkpointed().
>>
>> Thanks
>> - Gaurav
>>
>> > On Nov 23, 2015, at 1:03 AM, Gaurav Gupta <ga...@datatorrent.com>
>> wrote:
>> >
>> > If the requirement is that the order is always
>> begingWindow()->endWindow()->checkpointed(), why to pass windowId in the
>> checkpointed() call back?
>> >
>> > Thanks
>> > - Gaurav
>> >
>> >> On Nov 22, 2015, at 11:22 PM, Chandni Singh <chandni@datatorrent.com
>> <ma...@datatorrent.com>> wrote:
>> >>
>> >> FYI,
>> >>
>> >> HDHTWriter implementation is dependent on the older semantics and seems
>> to
>> >> be broken now.
>> >> startWindow(x) -> endWindow(x) -> checkpointed(x)
>> >> In the checkpointed implementation, it copies certain state (transient)
>> and
>> >> transfers it to a checkpointedWriteCache with respect to window 'x'.
>> >>
>> >> With Async checkpointing it, the state that is transferred is much more
>> >> recent than window 'x'.
>> >>
>> >> Chandni
>> >>
>> >>
>> >> On Sun, Nov 22, 2015 at 11:04 PM, Chandni Singh <
>> chandni@datatorrent.com <ma...@datatorrent.com>>
>> >> wrote:
>> >>
>> >>> Agreed. Thomas's solution fixes the backward incompatibility. I think
>> we
>> >>> really need to fix this.
>> >>>
>> >>> On Sun, Nov 22, 2015 at 10:23 PM, Timothy Farkas <tim@datatorrent.com
>> <ma...@datatorrent.com>>
>> >>> wrote:
>> >>>
>> >>>> Gaurav,
>> >>>>
>> >>>> I think if the state copy fails then STRAM should roll back the
>> operator
>> >>>> to
>> >>>> a checkpoint that is further back than the last checkpoint. If you are
>> >>>> saying that you want to preserve the semantic that checkpointed is
>> only
>> >>>> called after a checkpoint is completed, I would argue that that
>> guarantee
>> >>>> is already pointless in the current implementation since it is always
>> >>>> possible for an operator to be rolled back to a checkpoint before it's
>> >>>> last
>> >>>> completed checkpoint. So, it is already currently possible for some
>> >>>> database or file operation performed after a completed checkpoint to
>> be
>> >>>> redone after a failure. Because of this I think Thomas's solution
>> makes
>> >>>> the
>> >>>> most sense. Thomas's solution would also address Chandni's original
>> point
>> >>>> that the semantics for the checkpointed call back have been violated.
>> >>>> There
>> >>>> are operators in our libraries that have depended on the
>> beginWindow(x),
>> >>>> endWindow(x), and checkpointed(x) call sequence, which is now broken.
>> We
>> >>>> should probably fix that.
>> >>>>
>> >>>> Tim
>> >>>>
>> >>>> On Sun, Nov 22, 2015 at 10:02 PM, Gaurav Gupta <
>> gaurav@datatorrent.com <ma...@datatorrent.com>>
>> >>>> wrote:
>> >>>>
>> >>>>> Thomas,
>> >>>>>
>> >>>>> This was done to preserve checkpointing semantics that is to tell the
>> >>>>> operator that its state is preserved. Say if database is updated or
>> >>>> files
>> >>>>> are moved in checkpointed call but the state copy fails, how to
>> address
>> >>>>> such scenarios?
>> >>>>>
>> >>>>> Thanks
>> >>>>> - Gaurav
>> >>>>>
>> >>>>>> On Nov 22, 2015, at 9:44 PM, Thomas Weise <thomas@datatorrent.com
>> <ma...@datatorrent.com>>
>> >>>>> wrote:
>> >>>>>>
>> >>>>>> Alternatively I would ask why the checkpointed callback needs to
>> wait
>> >>>>> until
>> >>>>>> the data was copied to HDFS instead upon completion of the state
>> >>>>>> serialization.
>> >>>>>>
>> >>>>>> Thomas
>> >>>>>>
>> >>>>>>
>> >>>>>> On Sun, Nov 22, 2015 at 9:41 PM, Chandni Singh <
>> >>>> chandni@datatorrent.com <ma...@datatorrent.com>>
>> >>>>>> wrote:
>> >>>>>>
>> >>>>>>> Gaurav,
>> >>>>>>>
>> >>>>>>> My question is about why Async was made the default when it changed
>> >>>> the
>> >>>>>>> semantics of operator callbacks. Your response doesn't answer that.
>> >>>>>>>
>> >>>>>>> In a way we broke backward compatibility.
>> >>>>>>>
>> >>>>>>> Chandni
>> >>>>>>>
>> >>>>>>> On Sun, Nov 22, 2015 at 9:22 PM, Gaurav Gupta <
>> >>>> gaurav@datatorrent.com <ma...@datatorrent.com>>
>> >>>>>>> wrote:
>> >>>>>>>
>> >>>>>>>> The idea behind Async checkpointing is to unblock operator while
>> the
>> >>>>>>> state
>> >>>>>>>> is getting transferred to HDFS.
>> >>>>>>>> Just to clarify that this beginWindow (x) -> endWindow(x) ->
>> >>>>> checkpointed
>> >>>>>>>> (x-1 ) should be an ideal sequence, but if the HDFS is slow or for
>> >>>> some
>> >>>>>>>> other reason transferring the state to HDFS is slow this sequence
>> >>>> may
>> >>>>> not
>> >>>>>>>> hold true.
>> >>>>>>>>
>> >>>>>>>> Can your use case be addressed by
>> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78 <
>> https://malhar.atlassian.net/browse/APEX-78> <
>> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78 <
>> https://malhar.atlassian.net/browse/APEX-78>>?
>> >>>>>>>>
>> >>>>>>>> Thanks
>> >>>>>>>> - Gaurav
>> >>>>>>>>
>> >>>>>>>>> On Nov 22, 2015, at 3:56 PM, Chandni Singh <
>> >>>> chandni@datatorrent.com <ma...@datatorrent.com>>
>> >>>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>> With Async checkpointing the checkpoint callback in
>> CheckpointPoint
>> >>>>>>>>> listener is called for a previous window, that is,
>> >>>>>>>>> beginWindow (x) -> endWindow(x) -> checkpointed (x-1 )
>> >>>>>>>>>
>> >>>>>>>>> This feature was newly introduced. With synchronous
>> checkpointing,
>> >>>> the
>> >>>>>>>>> behavior was always
>> >>>>>>>>> beginWindow(x) -> endWindow(x) -> checkpointed (x)
>> >>>>>>>>>
>> >>>>>>>>> A lot of operators were written before asynchronous checkpointing
>> >>>> was
>> >>>>>>>>> introduced and few of them can rely on the sequencing guaranteed
>> by
>> >>>>>>>>> synchronous checkpointing.
>> >>>>>>>>>
>> >>>>>>>>> So why was Async Checkpointed made default?
>> >>>>>>>>>
>> >>>>>>>>> With how Async checkpoint is today, the complexity to handle
>> >>>> transient
>> >>>>>>>>> state in checkpointed callback falls on every operator. For eg,
>> >>>> lets
>> >>>>>>> say
>> >>>>>>>>> earlier I had a transient map which I cleared every time the
>> >>>>>>> checkpointed
>> >>>>>>>>> was called, with async checkpointing this simple task will be a
>> lot
>> >>>>>>> more
>> >>>>>>>>> complicated.
>> >>>>>>>>>
>> >>>>>>>>> I think Async checkpointing broke the semantics of operator
>> >>>> callbacks
>> >>>>>>> and
>> >>>>>>>>> should NOT be the default.
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>>
>> >
>>
>>

Re: Why is Async checkpointing made default?

Posted by Pramod Immaneni <pr...@datatorrent.com>.
I think the original intent for checkpointed callback was that it can be
called anytime after checkpoint and not necessarily immediately after the
window prior of checkpoint. As Gaurav mentioned the API bears it out.
Furthermore the callback has to be called within the lifecycle methods of
the operator so it will be called inside a window so you anyway have to
deal with new data anyway before the callback is called. Even though
checkpointed is not committed shouldn't it at least guarantee that the
operator is recoverable to that state in which case it should be called
after the save to HDFS actually finishes.

Thanks

On Mon, Nov 23, 2015 at 5:38 AM, Gaurav Gupta <ga...@datatorrent.com>
wrote:

> IMO, I don’t think there is any backward incompatibility wrt Checkpointing
> call back semantics because
>
> 1. The checkpointed call is only made once the operator state is preserved.
> 2. The window ids being passed to checkpointed are in increasing order.
> 3. The window ids being passed are still the same ids as were passed
> earlier.
> 4. The sequence is still begingWindow()->endWindow()->checkpointed().
>
> Thanks
> - Gaurav
>
> > On Nov 23, 2015, at 1:03 AM, Gaurav Gupta <ga...@datatorrent.com>
> wrote:
> >
> > If the requirement is that the order is always
> begingWindow()->endWindow()->checkpointed(), why to pass windowId in the
> checkpointed() call back?
> >
> > Thanks
> > - Gaurav
> >
> >> On Nov 22, 2015, at 11:22 PM, Chandni Singh <chandni@datatorrent.com
> <ma...@datatorrent.com>> wrote:
> >>
> >> FYI,
> >>
> >> HDHTWriter implementation is dependent on the older semantics and seems
> to
> >> be broken now.
> >> startWindow(x) -> endWindow(x) -> checkpointed(x)
> >> In the checkpointed implementation, it copies certain state (transient)
> and
> >> transfers it to a checkpointedWriteCache with respect to window 'x'.
> >>
> >> With Async checkpointing it, the state that is transferred is much more
> >> recent than window 'x'.
> >>
> >> Chandni
> >>
> >>
> >> On Sun, Nov 22, 2015 at 11:04 PM, Chandni Singh <
> chandni@datatorrent.com <ma...@datatorrent.com>>
> >> wrote:
> >>
> >>> Agreed. Thomas's solution fixes the backward incompatibility. I think
> we
> >>> really need to fix this.
> >>>
> >>> On Sun, Nov 22, 2015 at 10:23 PM, Timothy Farkas <tim@datatorrent.com
> <ma...@datatorrent.com>>
> >>> wrote:
> >>>
> >>>> Gaurav,
> >>>>
> >>>> I think if the state copy fails then STRAM should roll back the
> operator
> >>>> to
> >>>> a checkpoint that is further back than the last checkpoint. If you are
> >>>> saying that you want to preserve the semantic that checkpointed is
> only
> >>>> called after a checkpoint is completed, I would argue that that
> guarantee
> >>>> is already pointless in the current implementation since it is always
> >>>> possible for an operator to be rolled back to a checkpoint before it's
> >>>> last
> >>>> completed checkpoint. So, it is already currently possible for some
> >>>> database or file operation performed after a completed checkpoint to
> be
> >>>> redone after a failure. Because of this I think Thomas's solution
> makes
> >>>> the
> >>>> most sense. Thomas's solution would also address Chandni's original
> point
> >>>> that the semantics for the checkpointed call back have been violated.
> >>>> There
> >>>> are operators in our libraries that have depended on the
> beginWindow(x),
> >>>> endWindow(x), and checkpointed(x) call sequence, which is now broken.
> We
> >>>> should probably fix that.
> >>>>
> >>>> Tim
> >>>>
> >>>> On Sun, Nov 22, 2015 at 10:02 PM, Gaurav Gupta <
> gaurav@datatorrent.com <ma...@datatorrent.com>>
> >>>> wrote:
> >>>>
> >>>>> Thomas,
> >>>>>
> >>>>> This was done to preserve checkpointing semantics that is to tell the
> >>>>> operator that its state is preserved. Say if database is updated or
> >>>> files
> >>>>> are moved in checkpointed call but the state copy fails, how to
> address
> >>>>> such scenarios?
> >>>>>
> >>>>> Thanks
> >>>>> - Gaurav
> >>>>>
> >>>>>> On Nov 22, 2015, at 9:44 PM, Thomas Weise <thomas@datatorrent.com
> <ma...@datatorrent.com>>
> >>>>> wrote:
> >>>>>>
> >>>>>> Alternatively I would ask why the checkpointed callback needs to
> wait
> >>>>> until
> >>>>>> the data was copied to HDFS instead upon completion of the state
> >>>>>> serialization.
> >>>>>>
> >>>>>> Thomas
> >>>>>>
> >>>>>>
> >>>>>> On Sun, Nov 22, 2015 at 9:41 PM, Chandni Singh <
> >>>> chandni@datatorrent.com <ma...@datatorrent.com>>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Gaurav,
> >>>>>>>
> >>>>>>> My question is about why Async was made the default when it changed
> >>>> the
> >>>>>>> semantics of operator callbacks. Your response doesn't answer that.
> >>>>>>>
> >>>>>>> In a way we broke backward compatibility.
> >>>>>>>
> >>>>>>> Chandni
> >>>>>>>
> >>>>>>> On Sun, Nov 22, 2015 at 9:22 PM, Gaurav Gupta <
> >>>> gaurav@datatorrent.com <ma...@datatorrent.com>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> The idea behind Async checkpointing is to unblock operator while
> the
> >>>>>>> state
> >>>>>>>> is getting transferred to HDFS.
> >>>>>>>> Just to clarify that this beginWindow (x) -> endWindow(x) ->
> >>>>> checkpointed
> >>>>>>>> (x-1 ) should be an ideal sequence, but if the HDFS is slow or for
> >>>> some
> >>>>>>>> other reason transferring the state to HDFS is slow this sequence
> >>>> may
> >>>>> not
> >>>>>>>> hold true.
> >>>>>>>>
> >>>>>>>> Can your use case be addressed by
> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78 <
> https://malhar.atlassian.net/browse/APEX-78> <
> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78 <
> https://malhar.atlassian.net/browse/APEX-78>>?
> >>>>>>>>
> >>>>>>>> Thanks
> >>>>>>>> - Gaurav
> >>>>>>>>
> >>>>>>>>> On Nov 22, 2015, at 3:56 PM, Chandni Singh <
> >>>> chandni@datatorrent.com <ma...@datatorrent.com>>
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>> With Async checkpointing the checkpoint callback in
> CheckpointPoint
> >>>>>>>>> listener is called for a previous window, that is,
> >>>>>>>>> beginWindow (x) -> endWindow(x) -> checkpointed (x-1 )
> >>>>>>>>>
> >>>>>>>>> This feature was newly introduced. With synchronous
> checkpointing,
> >>>> the
> >>>>>>>>> behavior was always
> >>>>>>>>> beginWindow(x) -> endWindow(x) -> checkpointed (x)
> >>>>>>>>>
> >>>>>>>>> A lot of operators were written before asynchronous checkpointing
> >>>> was
> >>>>>>>>> introduced and few of them can rely on the sequencing guaranteed
> by
> >>>>>>>>> synchronous checkpointing.
> >>>>>>>>>
> >>>>>>>>> So why was Async Checkpointed made default?
> >>>>>>>>>
> >>>>>>>>> With how Async checkpoint is today, the complexity to handle
> >>>> transient
> >>>>>>>>> state in checkpointed callback falls on every operator. For eg,
> >>>> lets
> >>>>>>> say
> >>>>>>>>> earlier I had a transient map which I cleared every time the
> >>>>>>> checkpointed
> >>>>>>>>> was called, with async checkpointing this simple task will be a
> lot
> >>>>>>> more
> >>>>>>>>> complicated.
> >>>>>>>>>
> >>>>>>>>> I think Async checkpointing broke the semantics of operator
> >>>> callbacks
> >>>>>>> and
> >>>>>>>>> should NOT be the default.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >
>
>

Re: Why is Async checkpointing made default?

Posted by Gaurav Gupta <ga...@datatorrent.com>.
IMO, I don’t think there is any backward incompatibility wrt Checkpointing call back semantics because

1. The checkpointed call is only made once the operator state is preserved.
2. The window ids being passed to checkpointed are in increasing order.
3. The window ids being passed are still the same ids as were passed earlier.
4. The sequence is still begingWindow()->endWindow()->checkpointed(). 

Thanks
- Gaurav

> On Nov 23, 2015, at 1:03 AM, Gaurav Gupta <ga...@datatorrent.com> wrote:
> 
> If the requirement is that the order is always begingWindow()->endWindow()->checkpointed(), why to pass windowId in the checkpointed() call back?
> 
> Thanks
> - Gaurav
> 
>> On Nov 22, 2015, at 11:22 PM, Chandni Singh <chandni@datatorrent.com <ma...@datatorrent.com>> wrote:
>> 
>> FYI,
>> 
>> HDHTWriter implementation is dependent on the older semantics and seems to
>> be broken now.
>> startWindow(x) -> endWindow(x) -> checkpointed(x)
>> In the checkpointed implementation, it copies certain state (transient) and
>> transfers it to a checkpointedWriteCache with respect to window 'x'.
>> 
>> With Async checkpointing it, the state that is transferred is much more
>> recent than window 'x'.
>> 
>> Chandni
>> 
>> 
>> On Sun, Nov 22, 2015 at 11:04 PM, Chandni Singh <chandni@datatorrent.com <ma...@datatorrent.com>>
>> wrote:
>> 
>>> Agreed. Thomas's solution fixes the backward incompatibility. I think we
>>> really need to fix this.
>>> 
>>> On Sun, Nov 22, 2015 at 10:23 PM, Timothy Farkas <tim@datatorrent.com <ma...@datatorrent.com>>
>>> wrote:
>>> 
>>>> Gaurav,
>>>> 
>>>> I think if the state copy fails then STRAM should roll back the operator
>>>> to
>>>> a checkpoint that is further back than the last checkpoint. If you are
>>>> saying that you want to preserve the semantic that checkpointed is only
>>>> called after a checkpoint is completed, I would argue that that guarantee
>>>> is already pointless in the current implementation since it is always
>>>> possible for an operator to be rolled back to a checkpoint before it's
>>>> last
>>>> completed checkpoint. So, it is already currently possible for some
>>>> database or file operation performed after a completed checkpoint to be
>>>> redone after a failure. Because of this I think Thomas's solution makes
>>>> the
>>>> most sense. Thomas's solution would also address Chandni's original point
>>>> that the semantics for the checkpointed call back have been violated.
>>>> There
>>>> are operators in our libraries that have depended on the beginWindow(x),
>>>> endWindow(x), and checkpointed(x) call sequence, which is now broken. We
>>>> should probably fix that.
>>>> 
>>>> Tim
>>>> 
>>>> On Sun, Nov 22, 2015 at 10:02 PM, Gaurav Gupta <gaurav@datatorrent.com <ma...@datatorrent.com>>
>>>> wrote:
>>>> 
>>>>> Thomas,
>>>>> 
>>>>> This was done to preserve checkpointing semantics that is to tell the
>>>>> operator that its state is preserved. Say if database is updated or
>>>> files
>>>>> are moved in checkpointed call but the state copy fails, how to address
>>>>> such scenarios?
>>>>> 
>>>>> Thanks
>>>>> - Gaurav
>>>>> 
>>>>>> On Nov 22, 2015, at 9:44 PM, Thomas Weise <thomas@datatorrent.com <ma...@datatorrent.com>>
>>>>> wrote:
>>>>>> 
>>>>>> Alternatively I would ask why the checkpointed callback needs to wait
>>>>> until
>>>>>> the data was copied to HDFS instead upon completion of the state
>>>>>> serialization.
>>>>>> 
>>>>>> Thomas
>>>>>> 
>>>>>> 
>>>>>> On Sun, Nov 22, 2015 at 9:41 PM, Chandni Singh <
>>>> chandni@datatorrent.com <ma...@datatorrent.com>>
>>>>>> wrote:
>>>>>> 
>>>>>>> Gaurav,
>>>>>>> 
>>>>>>> My question is about why Async was made the default when it changed
>>>> the
>>>>>>> semantics of operator callbacks. Your response doesn't answer that.
>>>>>>> 
>>>>>>> In a way we broke backward compatibility.
>>>>>>> 
>>>>>>> Chandni
>>>>>>> 
>>>>>>> On Sun, Nov 22, 2015 at 9:22 PM, Gaurav Gupta <
>>>> gaurav@datatorrent.com <ma...@datatorrent.com>>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> The idea behind Async checkpointing is to unblock operator while the
>>>>>>> state
>>>>>>>> is getting transferred to HDFS.
>>>>>>>> Just to clarify that this beginWindow (x) -> endWindow(x) ->
>>>>> checkpointed
>>>>>>>> (x-1 ) should be an ideal sequence, but if the HDFS is slow or for
>>>> some
>>>>>>>> other reason transferring the state to HDFS is slow this sequence
>>>> may
>>>>> not
>>>>>>>> hold true.
>>>>>>>> 
>>>>>>>> Can your use case be addressed by
>>>>>>>> https://malhar.atlassian.net/browse/APEX-78 <https://malhar.atlassian.net/browse/APEX-78> <
>>>>>>>> https://malhar.atlassian.net/browse/APEX-78 <https://malhar.atlassian.net/browse/APEX-78>>?
>>>>>>>> 
>>>>>>>> Thanks
>>>>>>>> - Gaurav
>>>>>>>> 
>>>>>>>>> On Nov 22, 2015, at 3:56 PM, Chandni Singh <
>>>> chandni@datatorrent.com <ma...@datatorrent.com>>
>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>> With Async checkpointing the checkpoint callback in CheckpointPoint
>>>>>>>>> listener is called for a previous window, that is,
>>>>>>>>> beginWindow (x) -> endWindow(x) -> checkpointed (x-1 )
>>>>>>>>> 
>>>>>>>>> This feature was newly introduced. With synchronous checkpointing,
>>>> the
>>>>>>>>> behavior was always
>>>>>>>>> beginWindow(x) -> endWindow(x) -> checkpointed (x)
>>>>>>>>> 
>>>>>>>>> A lot of operators were written before asynchronous checkpointing
>>>> was
>>>>>>>>> introduced and few of them can rely on the sequencing guaranteed by
>>>>>>>>> synchronous checkpointing.
>>>>>>>>> 
>>>>>>>>> So why was Async Checkpointed made default?
>>>>>>>>> 
>>>>>>>>> With how Async checkpoint is today, the complexity to handle
>>>> transient
>>>>>>>>> state in checkpointed callback falls on every operator. For eg,
>>>> lets
>>>>>>> say
>>>>>>>>> earlier I had a transient map which I cleared every time the
>>>>>>> checkpointed
>>>>>>>>> was called, with async checkpointing this simple task will be a lot
>>>>>>> more
>>>>>>>>> complicated.
>>>>>>>>> 
>>>>>>>>> I think Async checkpointing broke the semantics of operator
>>>> callbacks
>>>>>>> and
>>>>>>>>> should NOT be the default.
>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>>> 
>>> 
>>> 
> 


Re: Why is Async checkpointing made default?

Posted by Gaurav Gupta <ga...@datatorrent.com>.
If the requirement is that the order is always begingWindow()->endWindow()->checkpointed(), why to pass windowId in the checkpointed() call back?

Thanks
- Gaurav

> On Nov 22, 2015, at 11:22 PM, Chandni Singh <ch...@datatorrent.com> wrote:
> 
> FYI,
> 
> HDHTWriter implementation is dependent on the older semantics and seems to
> be broken now.
> startWindow(x) -> endWindow(x) -> checkpointed(x)
> In the checkpointed implementation, it copies certain state (transient) and
> transfers it to a checkpointedWriteCache with respect to window 'x'.
> 
> With Async checkpointing it, the state that is transferred is much more
> recent than window 'x'.
> 
> Chandni
> 
> 
> On Sun, Nov 22, 2015 at 11:04 PM, Chandni Singh <ch...@datatorrent.com>
> wrote:
> 
>> Agreed. Thomas's solution fixes the backward incompatibility. I think we
>> really need to fix this.
>> 
>> On Sun, Nov 22, 2015 at 10:23 PM, Timothy Farkas <ti...@datatorrent.com>
>> wrote:
>> 
>>> Gaurav,
>>> 
>>> I think if the state copy fails then STRAM should roll back the operator
>>> to
>>> a checkpoint that is further back than the last checkpoint. If you are
>>> saying that you want to preserve the semantic that checkpointed is only
>>> called after a checkpoint is completed, I would argue that that guarantee
>>> is already pointless in the current implementation since it is always
>>> possible for an operator to be rolled back to a checkpoint before it's
>>> last
>>> completed checkpoint. So, it is already currently possible for some
>>> database or file operation performed after a completed checkpoint to be
>>> redone after a failure. Because of this I think Thomas's solution makes
>>> the
>>> most sense. Thomas's solution would also address Chandni's original point
>>> that the semantics for the checkpointed call back have been violated.
>>> There
>>> are operators in our libraries that have depended on the beginWindow(x),
>>> endWindow(x), and checkpointed(x) call sequence, which is now broken. We
>>> should probably fix that.
>>> 
>>> Tim
>>> 
>>> On Sun, Nov 22, 2015 at 10:02 PM, Gaurav Gupta <ga...@datatorrent.com>
>>> wrote:
>>> 
>>>> Thomas,
>>>> 
>>>> This was done to preserve checkpointing semantics that is to tell the
>>>> operator that its state is preserved. Say if database is updated or
>>> files
>>>> are moved in checkpointed call but the state copy fails, how to address
>>>> such scenarios?
>>>> 
>>>> Thanks
>>>> - Gaurav
>>>> 
>>>>> On Nov 22, 2015, at 9:44 PM, Thomas Weise <th...@datatorrent.com>
>>>> wrote:
>>>>> 
>>>>> Alternatively I would ask why the checkpointed callback needs to wait
>>>> until
>>>>> the data was copied to HDFS instead upon completion of the state
>>>>> serialization.
>>>>> 
>>>>> Thomas
>>>>> 
>>>>> 
>>>>> On Sun, Nov 22, 2015 at 9:41 PM, Chandni Singh <
>>> chandni@datatorrent.com>
>>>>> wrote:
>>>>> 
>>>>>> Gaurav,
>>>>>> 
>>>>>> My question is about why Async was made the default when it changed
>>> the
>>>>>> semantics of operator callbacks. Your response doesn't answer that.
>>>>>> 
>>>>>> In a way we broke backward compatibility.
>>>>>> 
>>>>>> Chandni
>>>>>> 
>>>>>> On Sun, Nov 22, 2015 at 9:22 PM, Gaurav Gupta <
>>> gaurav@datatorrent.com>
>>>>>> wrote:
>>>>>> 
>>>>>>> The idea behind Async checkpointing is to unblock operator while the
>>>>>> state
>>>>>>> is getting transferred to HDFS.
>>>>>>> Just to clarify that this beginWindow (x) -> endWindow(x) ->
>>>> checkpointed
>>>>>>> (x-1 ) should be an ideal sequence, but if the HDFS is slow or for
>>> some
>>>>>>> other reason transferring the state to HDFS is slow this sequence
>>> may
>>>> not
>>>>>>> hold true.
>>>>>>> 
>>>>>>> Can your use case be addressed by
>>>>>>> https://malhar.atlassian.net/browse/APEX-78 <
>>>>>>> https://malhar.atlassian.net/browse/APEX-78>?
>>>>>>> 
>>>>>>> Thanks
>>>>>>> - Gaurav
>>>>>>> 
>>>>>>>> On Nov 22, 2015, at 3:56 PM, Chandni Singh <
>>> chandni@datatorrent.com>
>>>>>>> wrote:
>>>>>>>> 
>>>>>>>> With Async checkpointing the checkpoint callback in CheckpointPoint
>>>>>>>> listener is called for a previous window, that is,
>>>>>>>> beginWindow (x) -> endWindow(x) -> checkpointed (x-1 )
>>>>>>>> 
>>>>>>>> This feature was newly introduced. With synchronous checkpointing,
>>> the
>>>>>>>> behavior was always
>>>>>>>> beginWindow(x) -> endWindow(x) -> checkpointed (x)
>>>>>>>> 
>>>>>>>> A lot of operators were written before asynchronous checkpointing
>>> was
>>>>>>>> introduced and few of them can rely on the sequencing guaranteed by
>>>>>>>> synchronous checkpointing.
>>>>>>>> 
>>>>>>>> So why was Async Checkpointed made default?
>>>>>>>> 
>>>>>>>> With how Async checkpoint is today, the complexity to handle
>>> transient
>>>>>>>> state in checkpointed callback falls on every operator. For eg,
>>> lets
>>>>>> say
>>>>>>>> earlier I had a transient map which I cleared every time the
>>>>>> checkpointed
>>>>>>>> was called, with async checkpointing this simple task will be a lot
>>>>>> more
>>>>>>>> complicated.
>>>>>>>> 
>>>>>>>> I think Async checkpointing broke the semantics of operator
>>> callbacks
>>>>>> and
>>>>>>>> should NOT be the default.
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>>>> 
>>> 
>> 
>> 


Re: Why is Async checkpointing made default?

Posted by Chandni Singh <ch...@datatorrent.com>.
FYI,

HDHTWriter implementation is dependent on the older semantics and seems to
be broken now.
startWindow(x) -> endWindow(x) -> checkpointed(x)
In the checkpointed implementation, it copies certain state (transient) and
transfers it to a checkpointedWriteCache with respect to window 'x'.

With Async checkpointing it, the state that is transferred is much more
recent than window 'x'.

Chandni


On Sun, Nov 22, 2015 at 11:04 PM, Chandni Singh <ch...@datatorrent.com>
wrote:

> Agreed. Thomas's solution fixes the backward incompatibility. I think we
> really need to fix this.
>
> On Sun, Nov 22, 2015 at 10:23 PM, Timothy Farkas <ti...@datatorrent.com>
> wrote:
>
>> Gaurav,
>>
>> I think if the state copy fails then STRAM should roll back the operator
>> to
>> a checkpoint that is further back than the last checkpoint. If you are
>> saying that you want to preserve the semantic that checkpointed is only
>> called after a checkpoint is completed, I would argue that that guarantee
>> is already pointless in the current implementation since it is always
>> possible for an operator to be rolled back to a checkpoint before it's
>> last
>> completed checkpoint. So, it is already currently possible for some
>> database or file operation performed after a completed checkpoint to be
>> redone after a failure. Because of this I think Thomas's solution makes
>> the
>> most sense. Thomas's solution would also address Chandni's original point
>> that the semantics for the checkpointed call back have been violated.
>> There
>> are operators in our libraries that have depended on the beginWindow(x),
>> endWindow(x), and checkpointed(x) call sequence, which is now broken. We
>> should probably fix that.
>>
>> Tim
>>
>> On Sun, Nov 22, 2015 at 10:02 PM, Gaurav Gupta <ga...@datatorrent.com>
>> wrote:
>>
>> > Thomas,
>> >
>> > This was done to preserve checkpointing semantics that is to tell the
>> > operator that its state is preserved. Say if database is updated or
>> files
>> > are moved in checkpointed call but the state copy fails, how to address
>> > such scenarios?
>> >
>> > Thanks
>> > - Gaurav
>> >
>> > > On Nov 22, 2015, at 9:44 PM, Thomas Weise <th...@datatorrent.com>
>> > wrote:
>> > >
>> > > Alternatively I would ask why the checkpointed callback needs to wait
>> > until
>> > > the data was copied to HDFS instead upon completion of the state
>> > > serialization.
>> > >
>> > > Thomas
>> > >
>> > >
>> > > On Sun, Nov 22, 2015 at 9:41 PM, Chandni Singh <
>> chandni@datatorrent.com>
>> > > wrote:
>> > >
>> > >> Gaurav,
>> > >>
>> > >> My question is about why Async was made the default when it changed
>> the
>> > >> semantics of operator callbacks. Your response doesn't answer that.
>> > >>
>> > >> In a way we broke backward compatibility.
>> > >>
>> > >> Chandni
>> > >>
>> > >> On Sun, Nov 22, 2015 at 9:22 PM, Gaurav Gupta <
>> gaurav@datatorrent.com>
>> > >> wrote:
>> > >>
>> > >>> The idea behind Async checkpointing is to unblock operator while the
>> > >> state
>> > >>> is getting transferred to HDFS.
>> > >>> Just to clarify that this beginWindow (x) -> endWindow(x) ->
>> > checkpointed
>> > >>> (x-1 ) should be an ideal sequence, but if the HDFS is slow or for
>> some
>> > >>> other reason transferring the state to HDFS is slow this sequence
>> may
>> > not
>> > >>> hold true.
>> > >>>
>> > >>> Can your use case be addressed by
>> > >>> https://malhar.atlassian.net/browse/APEX-78 <
>> > >>> https://malhar.atlassian.net/browse/APEX-78>?
>> > >>>
>> > >>> Thanks
>> > >>> - Gaurav
>> > >>>
>> > >>>> On Nov 22, 2015, at 3:56 PM, Chandni Singh <
>> chandni@datatorrent.com>
>> > >>> wrote:
>> > >>>>
>> > >>>> With Async checkpointing the checkpoint callback in CheckpointPoint
>> > >>>> listener is called for a previous window, that is,
>> > >>>> beginWindow (x) -> endWindow(x) -> checkpointed (x-1 )
>> > >>>>
>> > >>>> This feature was newly introduced. With synchronous checkpointing,
>> the
>> > >>>> behavior was always
>> > >>>> beginWindow(x) -> endWindow(x) -> checkpointed (x)
>> > >>>>
>> > >>>> A lot of operators were written before asynchronous checkpointing
>> was
>> > >>>> introduced and few of them can rely on the sequencing guaranteed by
>> > >>>> synchronous checkpointing.
>> > >>>>
>> > >>>> So why was Async Checkpointed made default?
>> > >>>>
>> > >>>> With how Async checkpoint is today, the complexity to handle
>> transient
>> > >>>> state in checkpointed callback falls on every operator. For eg,
>> lets
>> > >> say
>> > >>>> earlier I had a transient map which I cleared every time the
>> > >> checkpointed
>> > >>>> was called, with async checkpointing this simple task will be a lot
>> > >> more
>> > >>>> complicated.
>> > >>>>
>> > >>>> I think Async checkpointing broke the semantics of operator
>> callbacks
>> > >> and
>> > >>>> should NOT be the default.
>> > >>>
>> > >>>
>> > >>
>> >
>> >
>>
>
>

Re: Why is Async checkpointing made default?

Posted by Chandni Singh <ch...@datatorrent.com>.
Agreed. Thomas's solution fixes the backward incompatibility. I think we
really need to fix this.

On Sun, Nov 22, 2015 at 10:23 PM, Timothy Farkas <ti...@datatorrent.com>
wrote:

> Gaurav,
>
> I think if the state copy fails then STRAM should roll back the operator to
> a checkpoint that is further back than the last checkpoint. If you are
> saying that you want to preserve the semantic that checkpointed is only
> called after a checkpoint is completed, I would argue that that guarantee
> is already pointless in the current implementation since it is always
> possible for an operator to be rolled back to a checkpoint before it's last
> completed checkpoint. So, it is already currently possible for some
> database or file operation performed after a completed checkpoint to be
> redone after a failure. Because of this I think Thomas's solution makes the
> most sense. Thomas's solution would also address Chandni's original point
> that the semantics for the checkpointed call back have been violated. There
> are operators in our libraries that have depended on the beginWindow(x),
> endWindow(x), and checkpointed(x) call sequence, which is now broken. We
> should probably fix that.
>
> Tim
>
> On Sun, Nov 22, 2015 at 10:02 PM, Gaurav Gupta <ga...@datatorrent.com>
> wrote:
>
> > Thomas,
> >
> > This was done to preserve checkpointing semantics that is to tell the
> > operator that its state is preserved. Say if database is updated or files
> > are moved in checkpointed call but the state copy fails, how to address
> > such scenarios?
> >
> > Thanks
> > - Gaurav
> >
> > > On Nov 22, 2015, at 9:44 PM, Thomas Weise <th...@datatorrent.com>
> > wrote:
> > >
> > > Alternatively I would ask why the checkpointed callback needs to wait
> > until
> > > the data was copied to HDFS instead upon completion of the state
> > > serialization.
> > >
> > > Thomas
> > >
> > >
> > > On Sun, Nov 22, 2015 at 9:41 PM, Chandni Singh <
> chandni@datatorrent.com>
> > > wrote:
> > >
> > >> Gaurav,
> > >>
> > >> My question is about why Async was made the default when it changed
> the
> > >> semantics of operator callbacks. Your response doesn't answer that.
> > >>
> > >> In a way we broke backward compatibility.
> > >>
> > >> Chandni
> > >>
> > >> On Sun, Nov 22, 2015 at 9:22 PM, Gaurav Gupta <gaurav@datatorrent.com
> >
> > >> wrote:
> > >>
> > >>> The idea behind Async checkpointing is to unblock operator while the
> > >> state
> > >>> is getting transferred to HDFS.
> > >>> Just to clarify that this beginWindow (x) -> endWindow(x) ->
> > checkpointed
> > >>> (x-1 ) should be an ideal sequence, but if the HDFS is slow or for
> some
> > >>> other reason transferring the state to HDFS is slow this sequence may
> > not
> > >>> hold true.
> > >>>
> > >>> Can your use case be addressed by
> > >>> https://malhar.atlassian.net/browse/APEX-78 <
> > >>> https://malhar.atlassian.net/browse/APEX-78>?
> > >>>
> > >>> Thanks
> > >>> - Gaurav
> > >>>
> > >>>> On Nov 22, 2015, at 3:56 PM, Chandni Singh <chandni@datatorrent.com
> >
> > >>> wrote:
> > >>>>
> > >>>> With Async checkpointing the checkpoint callback in CheckpointPoint
> > >>>> listener is called for a previous window, that is,
> > >>>> beginWindow (x) -> endWindow(x) -> checkpointed (x-1 )
> > >>>>
> > >>>> This feature was newly introduced. With synchronous checkpointing,
> the
> > >>>> behavior was always
> > >>>> beginWindow(x) -> endWindow(x) -> checkpointed (x)
> > >>>>
> > >>>> A lot of operators were written before asynchronous checkpointing
> was
> > >>>> introduced and few of them can rely on the sequencing guaranteed by
> > >>>> synchronous checkpointing.
> > >>>>
> > >>>> So why was Async Checkpointed made default?
> > >>>>
> > >>>> With how Async checkpoint is today, the complexity to handle
> transient
> > >>>> state in checkpointed callback falls on every operator. For eg, lets
> > >> say
> > >>>> earlier I had a transient map which I cleared every time the
> > >> checkpointed
> > >>>> was called, with async checkpointing this simple task will be a lot
> > >> more
> > >>>> complicated.
> > >>>>
> > >>>> I think Async checkpointing broke the semantics of operator
> callbacks
> > >> and
> > >>>> should NOT be the default.
> > >>>
> > >>>
> > >>
> >
> >
>

Re: Why is Async checkpointing made default?

Posted by Timothy Farkas <ti...@datatorrent.com>.
Gaurav,

I think if the state copy fails then STRAM should roll back the operator to
a checkpoint that is further back than the last checkpoint. If you are
saying that you want to preserve the semantic that checkpointed is only
called after a checkpoint is completed, I would argue that that guarantee
is already pointless in the current implementation since it is always
possible for an operator to be rolled back to a checkpoint before it's last
completed checkpoint. So, it is already currently possible for some
database or file operation performed after a completed checkpoint to be
redone after a failure. Because of this I think Thomas's solution makes the
most sense. Thomas's solution would also address Chandni's original point
that the semantics for the checkpointed call back have been violated. There
are operators in our libraries that have depended on the beginWindow(x),
endWindow(x), and checkpointed(x) call sequence, which is now broken. We
should probably fix that.

Tim

On Sun, Nov 22, 2015 at 10:02 PM, Gaurav Gupta <ga...@datatorrent.com>
wrote:

> Thomas,
>
> This was done to preserve checkpointing semantics that is to tell the
> operator that its state is preserved. Say if database is updated or files
> are moved in checkpointed call but the state copy fails, how to address
> such scenarios?
>
> Thanks
> - Gaurav
>
> > On Nov 22, 2015, at 9:44 PM, Thomas Weise <th...@datatorrent.com>
> wrote:
> >
> > Alternatively I would ask why the checkpointed callback needs to wait
> until
> > the data was copied to HDFS instead upon completion of the state
> > serialization.
> >
> > Thomas
> >
> >
> > On Sun, Nov 22, 2015 at 9:41 PM, Chandni Singh <ch...@datatorrent.com>
> > wrote:
> >
> >> Gaurav,
> >>
> >> My question is about why Async was made the default when it changed the
> >> semantics of operator callbacks. Your response doesn't answer that.
> >>
> >> In a way we broke backward compatibility.
> >>
> >> Chandni
> >>
> >> On Sun, Nov 22, 2015 at 9:22 PM, Gaurav Gupta <ga...@datatorrent.com>
> >> wrote:
> >>
> >>> The idea behind Async checkpointing is to unblock operator while the
> >> state
> >>> is getting transferred to HDFS.
> >>> Just to clarify that this beginWindow (x) -> endWindow(x) ->
> checkpointed
> >>> (x-1 ) should be an ideal sequence, but if the HDFS is slow or for some
> >>> other reason transferring the state to HDFS is slow this sequence may
> not
> >>> hold true.
> >>>
> >>> Can your use case be addressed by
> >>> https://malhar.atlassian.net/browse/APEX-78 <
> >>> https://malhar.atlassian.net/browse/APEX-78>?
> >>>
> >>> Thanks
> >>> - Gaurav
> >>>
> >>>> On Nov 22, 2015, at 3:56 PM, Chandni Singh <ch...@datatorrent.com>
> >>> wrote:
> >>>>
> >>>> With Async checkpointing the checkpoint callback in CheckpointPoint
> >>>> listener is called for a previous window, that is,
> >>>> beginWindow (x) -> endWindow(x) -> checkpointed (x-1 )
> >>>>
> >>>> This feature was newly introduced. With synchronous checkpointing, the
> >>>> behavior was always
> >>>> beginWindow(x) -> endWindow(x) -> checkpointed (x)
> >>>>
> >>>> A lot of operators were written before asynchronous checkpointing was
> >>>> introduced and few of them can rely on the sequencing guaranteed by
> >>>> synchronous checkpointing.
> >>>>
> >>>> So why was Async Checkpointed made default?
> >>>>
> >>>> With how Async checkpoint is today, the complexity to handle transient
> >>>> state in checkpointed callback falls on every operator. For eg, lets
> >> say
> >>>> earlier I had a transient map which I cleared every time the
> >> checkpointed
> >>>> was called, with async checkpointing this simple task will be a lot
> >> more
> >>>> complicated.
> >>>>
> >>>> I think Async checkpointing broke the semantics of operator callbacks
> >> and
> >>>> should NOT be the default.
> >>>
> >>>
> >>
>
>

Re: Why is Async checkpointing made default?

Posted by Gaurav Gupta <ga...@datatorrent.com>.
Thomas,

This was done to preserve checkpointing semantics that is to tell the operator that its state is preserved. Say if database is updated or files are moved in checkpointed call but the state copy fails, how to address such scenarios?

Thanks
- Gaurav

> On Nov 22, 2015, at 9:44 PM, Thomas Weise <th...@datatorrent.com> wrote:
> 
> Alternatively I would ask why the checkpointed callback needs to wait until
> the data was copied to HDFS instead upon completion of the state
> serialization.
> 
> Thomas
> 
> 
> On Sun, Nov 22, 2015 at 9:41 PM, Chandni Singh <ch...@datatorrent.com>
> wrote:
> 
>> Gaurav,
>> 
>> My question is about why Async was made the default when it changed the
>> semantics of operator callbacks. Your response doesn't answer that.
>> 
>> In a way we broke backward compatibility.
>> 
>> Chandni
>> 
>> On Sun, Nov 22, 2015 at 9:22 PM, Gaurav Gupta <ga...@datatorrent.com>
>> wrote:
>> 
>>> The idea behind Async checkpointing is to unblock operator while the
>> state
>>> is getting transferred to HDFS.
>>> Just to clarify that this beginWindow (x) -> endWindow(x) -> checkpointed
>>> (x-1 ) should be an ideal sequence, but if the HDFS is slow or for some
>>> other reason transferring the state to HDFS is slow this sequence may not
>>> hold true.
>>> 
>>> Can your use case be addressed by
>>> https://malhar.atlassian.net/browse/APEX-78 <
>>> https://malhar.atlassian.net/browse/APEX-78>?
>>> 
>>> Thanks
>>> - Gaurav
>>> 
>>>> On Nov 22, 2015, at 3:56 PM, Chandni Singh <ch...@datatorrent.com>
>>> wrote:
>>>> 
>>>> With Async checkpointing the checkpoint callback in CheckpointPoint
>>>> listener is called for a previous window, that is,
>>>> beginWindow (x) -> endWindow(x) -> checkpointed (x-1 )
>>>> 
>>>> This feature was newly introduced. With synchronous checkpointing, the
>>>> behavior was always
>>>> beginWindow(x) -> endWindow(x) -> checkpointed (x)
>>>> 
>>>> A lot of operators were written before asynchronous checkpointing was
>>>> introduced and few of them can rely on the sequencing guaranteed by
>>>> synchronous checkpointing.
>>>> 
>>>> So why was Async Checkpointed made default?
>>>> 
>>>> With how Async checkpoint is today, the complexity to handle transient
>>>> state in checkpointed callback falls on every operator. For eg, lets
>> say
>>>> earlier I had a transient map which I cleared every time the
>> checkpointed
>>>> was called, with async checkpointing this simple task will be a lot
>> more
>>>> complicated.
>>>> 
>>>> I think Async checkpointing broke the semantics of operator callbacks
>> and
>>>> should NOT be the default.
>>> 
>>> 
>> 


Re: Why is Async checkpointing made default?

Posted by Thomas Weise <th...@datatorrent.com>.
Alternatively I would ask why the checkpointed callback needs to wait until
the data was copied to HDFS instead upon completion of the state
serialization.

Thomas


On Sun, Nov 22, 2015 at 9:41 PM, Chandni Singh <ch...@datatorrent.com>
wrote:

> Gaurav,
>
> My question is about why Async was made the default when it changed the
> semantics of operator callbacks. Your response doesn't answer that.
>
> In a way we broke backward compatibility.
>
> Chandni
>
> On Sun, Nov 22, 2015 at 9:22 PM, Gaurav Gupta <ga...@datatorrent.com>
> wrote:
>
> > The idea behind Async checkpointing is to unblock operator while the
> state
> > is getting transferred to HDFS.
> > Just to clarify that this beginWindow (x) -> endWindow(x) -> checkpointed
> > (x-1 ) should be an ideal sequence, but if the HDFS is slow or for some
> > other reason transferring the state to HDFS is slow this sequence may not
> > hold true.
> >
> > Can your use case be addressed by
> > https://malhar.atlassian.net/browse/APEX-78 <
> > https://malhar.atlassian.net/browse/APEX-78>?
> >
> > Thanks
> > - Gaurav
> >
> > > On Nov 22, 2015, at 3:56 PM, Chandni Singh <ch...@datatorrent.com>
> > wrote:
> > >
> > > With Async checkpointing the checkpoint callback in CheckpointPoint
> > > listener is called for a previous window, that is,
> > > beginWindow (x) -> endWindow(x) -> checkpointed (x-1 )
> > >
> > > This feature was newly introduced. With synchronous checkpointing, the
> > > behavior was always
> > > beginWindow(x) -> endWindow(x) -> checkpointed (x)
> > >
> > > A lot of operators were written before asynchronous checkpointing was
> > > introduced and few of them can rely on the sequencing guaranteed by
> > > synchronous checkpointing.
> > >
> > > So why was Async Checkpointed made default?
> > >
> > > With how Async checkpoint is today, the complexity to handle transient
> > > state in checkpointed callback falls on every operator. For eg, lets
> say
> > > earlier I had a transient map which I cleared every time the
> checkpointed
> > > was called, with async checkpointing this simple task will be a lot
> more
> > > complicated.
> > >
> > > I think Async checkpointing broke the semantics of operator callbacks
> and
> > > should NOT be the default.
> >
> >
>

Re: Why is Async checkpointing made default?

Posted by Chandni Singh <ch...@datatorrent.com>.
Gaurav,

My question is about why Async was made the default when it changed the
semantics of operator callbacks. Your response doesn't answer that.

In a way we broke backward compatibility.

Chandni

On Sun, Nov 22, 2015 at 9:22 PM, Gaurav Gupta <ga...@datatorrent.com>
wrote:

> The idea behind Async checkpointing is to unblock operator while the state
> is getting transferred to HDFS.
> Just to clarify that this beginWindow (x) -> endWindow(x) -> checkpointed
> (x-1 ) should be an ideal sequence, but if the HDFS is slow or for some
> other reason transferring the state to HDFS is slow this sequence may not
> hold true.
>
> Can your use case be addressed by
> https://malhar.atlassian.net/browse/APEX-78 <
> https://malhar.atlassian.net/browse/APEX-78>?
>
> Thanks
> - Gaurav
>
> > On Nov 22, 2015, at 3:56 PM, Chandni Singh <ch...@datatorrent.com>
> wrote:
> >
> > With Async checkpointing the checkpoint callback in CheckpointPoint
> > listener is called for a previous window, that is,
> > beginWindow (x) -> endWindow(x) -> checkpointed (x-1 )
> >
> > This feature was newly introduced. With synchronous checkpointing, the
> > behavior was always
> > beginWindow(x) -> endWindow(x) -> checkpointed (x)
> >
> > A lot of operators were written before asynchronous checkpointing was
> > introduced and few of them can rely on the sequencing guaranteed by
> > synchronous checkpointing.
> >
> > So why was Async Checkpointed made default?
> >
> > With how Async checkpoint is today, the complexity to handle transient
> > state in checkpointed callback falls on every operator. For eg, lets say
> > earlier I had a transient map which I cleared every time the checkpointed
> > was called, with async checkpointing this simple task will be a lot more
> > complicated.
> >
> > I think Async checkpointing broke the semantics of operator callbacks and
> > should NOT be the default.
>
>

Re: Why is Async checkpointing made default?

Posted by Gaurav Gupta <ga...@datatorrent.com>.
The idea behind Async checkpointing is to unblock operator while the state is getting transferred to HDFS. 
Just to clarify that this beginWindow (x) -> endWindow(x) -> checkpointed (x-1 ) should be an ideal sequence, but if the HDFS is slow or for some other reason transferring the state to HDFS is slow this sequence may not hold true. 

Can your use case be addressed by https://malhar.atlassian.net/browse/APEX-78 <https://malhar.atlassian.net/browse/APEX-78>?

Thanks
- Gaurav

> On Nov 22, 2015, at 3:56 PM, Chandni Singh <ch...@datatorrent.com> wrote:
> 
> With Async checkpointing the checkpoint callback in CheckpointPoint
> listener is called for a previous window, that is,
> beginWindow (x) -> endWindow(x) -> checkpointed (x-1 )
> 
> This feature was newly introduced. With synchronous checkpointing, the
> behavior was always
> beginWindow(x) -> endWindow(x) -> checkpointed (x)
> 
> A lot of operators were written before asynchronous checkpointing was
> introduced and few of them can rely on the sequencing guaranteed by
> synchronous checkpointing.
> 
> So why was Async Checkpointed made default?
> 
> With how Async checkpoint is today, the complexity to handle transient
> state in checkpointed callback falls on every operator. For eg, lets say
> earlier I had a transient map which I cleared every time the checkpointed
> was called, with async checkpointing this simple task will be a lot more
> complicated.
> 
> I think Async checkpointing broke the semantics of operator callbacks and
> should NOT be the default.