You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by Tushar Gosavi <tu...@apache.org> on 2017/03/01 13:34:03 UTC

APEXCORE-619 Recovery windowId in future during application relaunch.

Help Needed for APEXCORE-619

Issue : When application is relaunched after long time with stateless
opeartors at the end of the DAG, the stateless operators starts with a very
high windowId. In this case the stateless operator ignors all the data
received till upstream operator catches up with it. This breaks the
*at-least-once* gaurantee while relaunch of the opeartor or when master is
killed and application is restarted.

Solutions:
- Fix windowId for stateless leaf operators from upstream opeartor. But it
has some issues when we have a join with two upstrams operators at
different windowId. If we set the windowID to min(upstream windowId), then
we need to again recalulate the new recovery window ids for upstream paths
from this operators.

- Other solution is to create a empty file in checkpoint directory for
stateless operators. This will help us to identify the checkpoints of
stateless operators during relaunch instead of computing from latest
timestamp.

- Bring the entire DAG to committedWindowId. This could be achived using
writing committedWindowId in a journal. we need to make sure that we are
not puring the checkpointed state until the committedWundowId is saved in
journal.

Let me know your thoughs on this and preferred solution.

Regards,
-Tushar.

Re: APEXCORE-619 Recovery windowId in future during application relaunch.

Posted by Sandesh Hegde <sa...@datatorrent.com>.
1. Create an empty checkpoint file for the stateless operators.
2. Remove the logic to treat stateless operators as a special case.

Rest of the design remains as is.

On Wed, Mar 1, 2017 at 11:18 AM Amol Kekre <am...@datatorrent.com> wrote:

> The third option should be it.
> 1. On relaunch the DAG should start at commitWindowId
> 2. Pruning of checkpoints should only happen after committedWindowId is
> written by Stram state
>
> Thks
> Amol
>
>
>
> E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> | Twitter:
> @*amolhkekre*
>
> www.datatorrent.com  |  apex.apache.org
>
> *Join us at Apex Big Data World-San Jose
> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> [image: http://www.apexbigdata.com/san-jose-register.html]
> <http://www.apexbigdata.com/san-jose-register.html>
>
> On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi <tu...@apache.org> wrote:
>
> > Help Needed for APEXCORE-619
> >
> > Issue : When application is relaunched after long time with stateless
> > opeartors at the end of the DAG, the stateless operators starts with a
> very
> > high windowId. In this case the stateless operator ignors all the data
> > received till upstream operator catches up with it. This breaks the
> > *at-least-once* gaurantee while relaunch of the opeartor or when master
> is
> > killed and application is restarted.
> >
> > Solutions:
> > - Fix windowId for stateless leaf operators from upstream opeartor. But
> it
> > has some issues when we have a join with two upstrams operators at
> > different windowId. If we set the windowID to min(upstream windowId),
> then
> > we need to again recalulate the new recovery window ids for upstream
> paths
> > from this operators.
> >
> > - Other solution is to create a empty file in checkpoint directory for
> > stateless operators. This will help us to identify the checkpoints of
> > stateless operators during relaunch instead of computing from latest
> > timestamp.
> >
> > - Bring the entire DAG to committedWindowId. This could be achived using
> > writing committedWindowId in a journal. we need to make sure that we are
> > not puring the checkpointed state until the committedWundowId is saved in
> > journal.
> >
> > Let me know your thoughs on this and preferred solution.
> >
> > Regards,
> > -Tushar.
> >
>
-- 
*Join us at Apex Big Data World-San Jose
<http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
[image: http://www.apexbigdata.com/san-jose-register.html]

Re: APEXCORE-619 Recovery windowId in future during application relaunch.

Posted by Amol Kekre <am...@datatorrent.com>.
The third option should be it.
1. On relaunch the DAG should start at commitWindowId
2. Pruning of checkpoints should only happen after committedWindowId is
written by Stram state

Thks
Amol



E:amol@datatorrent.com | M: 510-449-2606 | Twitter: @*amolhkekre*

www.datatorrent.com  |  apex.apache.org

*Join us at Apex Big Data World-San Jose
<http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
[image: http://www.apexbigdata.com/san-jose-register.html]
<http://www.apexbigdata.com/san-jose-register.html>

On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi <tu...@apache.org> wrote:

> Help Needed for APEXCORE-619
>
> Issue : When application is relaunched after long time with stateless
> opeartors at the end of the DAG, the stateless operators starts with a very
> high windowId. In this case the stateless operator ignors all the data
> received till upstream operator catches up with it. This breaks the
> *at-least-once* gaurantee while relaunch of the opeartor or when master is
> killed and application is restarted.
>
> Solutions:
> - Fix windowId for stateless leaf operators from upstream opeartor. But it
> has some issues when we have a join with two upstrams operators at
> different windowId. If we set the windowID to min(upstream windowId), then
> we need to again recalulate the new recovery window ids for upstream paths
> from this operators.
>
> - Other solution is to create a empty file in checkpoint directory for
> stateless operators. This will help us to identify the checkpoints of
> stateless operators during relaunch instead of computing from latest
> timestamp.
>
> - Bring the entire DAG to committedWindowId. This could be achived using
> writing committedWindowId in a journal. we need to make sure that we are
> not puring the checkpointed state until the committedWundowId is saved in
> journal.
>
> Let me know your thoughs on this and preferred solution.
>
> Regards,
> -Tushar.
>

Re: APEXCORE-619 Recovery windowId in future during application relaunch.

Posted by "Herger, Brendan" <Br...@capitalone.com>.
unsubscribe

Thanks,
Brendan Herger
Machine Learning Engineer
Center for Machine Learning @ Capital One
415.582.7457 (cell)
 

On 3/20/17, 1:38 PM, "Tushar Gosavi" <tu...@datatorrent.com> wrote:

    I have opened an pull request #490 to handle this scenario by implementing
    option 3.
    
    -Tushar.
    
    
    On Tue, Mar 14, 2017 at 7:46 PM, Thomas Weise <th...@apache.org> wrote:
    
    > The JDBC operator was an example of a valid use case where no state needs
    > to be saved in a checkpoint (it is saved to the database instead). We
    > cannot break such use cases and neither should the application developer be
    > exposed to it, especially not in backward incompatible fashion.
    >
    > I think that the platform/operators need to handle this. Perhaps we can
    > look at all the "stateless" operators and see which ones really depend on
    > at-least-once processing (to produce exactly-once results) and then change
    > the remaining to have default processing mode at-most-once? That would
    > require separate annotation support to indicate a default processing mode
    > for an operator.
    >
    > Thanks,
    > Thomas
    >
    >
    >
    >
    >
    >
    >
    >
    > On Tue, Mar 14, 2017 at 4:26 AM, Tushar Gosavi <tu...@datatorrent.com>
    > wrote:
    >
    > > Hi Vlad,
    > >
    > > I am more worried about backward compatibility. Most of the new users
    > will
    > > be not able to launch simple applications which are starting point while
    > > learning Apex such as PiDemo and WordCount. We could provide a way for
    > user
    > > to not use empty file storage agent for leaf stateless operators if he is
    > > more worried about name node operations.
    > >
    > > - Tushar.
    > >
    > >
    > > On Sat, Mar 11, 2017 at 6:10 AM, Vlad Rozov <v....@datatorrent.com>
    > > wrote:
    > >
    > >> I would prefer to go with option 2 (and maybe reuse -force flag to allow
    > >> launching application that do not validate due to newly introduced
    > rule). I
    > >> am not sure that it is OK to outsmart application designer and force
    > >> stateless operator to become statefull.
    > >>
    > >> Thank you,
    > >>
    > >> Vlad
    > >>
    > >> *Join us at Apex Big Data World-San Jose
    > >> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017*
    > >> [image: http://www.apexbigdata.com/san-jose-register.html]
    > >> <http://www.apexbigdata.com/san-jose-register.html>
    > >> On 3/10/17 07:38, Thomas Weise wrote:
    > >>
    > >> +1
    > >>
    > >> But keep in mind it will cause unnecessary name node operations and
    > >> therefore it would be good to only use it when it is really needed (i.e.
    > >> the operator in reality isn't stateless, it stores its state somewhere
    > >> else).
    > >>
    > >> Can we look at optimizing the behavior for "stateless" operators that
    > are
    > >> really stateless. For example the console operator should by default be
    > >> AT_MOST_ONCE?
    > >>
    > >>
    > >> On Fri, Mar 10, 2017 at 1:45 AM, Bhupesh Chawda <
    > bhupesh@datatorrent.com> <bh...@datatorrent.com>
    > >> wrote:
    > >>
    > >>
    > >> My preference is also for option 3. It looks clean and simple to
    > implement.
    > >>
    > >> ~ Bhupesh
    > >>
    > >>
    > >> _______________________________________________________
    > >>
    > >> Bhupesh Chawda
    > >>
    > >> E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
    > >> www.datatorrent.com  |  apex.apache.org
    > >>
    > >>
    > >>
    > >> On Fri, Mar 10, 2017 at 3:06 PM, Tushar Gosavi <tu...@datatorrent.com>
    > <tu...@datatorrent.com>
    > >> wrote:
    > >>
    > >>
    > >> Can you please let me know your preference? My preference is for
    > solution
    > >> 3, by adding a StorageAgent which creates an empty file, and using this
    > >> storage agent for leaf stateless operators.
    > >>
    > >> - Tushar.
    > >>
    > >>
    > >> On Tue, Mar 7, 2017 at 1:52 PM, Tushar Gosavi <tu...@datatorrent.com>
    > <tu...@datatorrent.com>
    > >> wrote:
    > >>
    > >>
    > >> Thank you all for the feedback.
    > >>
    > >> Some of the useful output operator can be stateless, they push data
    > >> received in a window to output store. for example KafkaOutputOperator/
    > >>
    > >> JDBCOutputOperator,
    > >>
    > >> or the output stores where
    > >> writes are idempotent, which covers most of the key-value stores.
    > >>
    > >> I was going to use the existing logic to compute the committedWindowId
    > >> with addition of few steps explained below.
    > >> solution-1
    > >> - Calculate committedWindow with leaf operator checkpoints set to
    > >>
    > >> current
    > >>
    > >> timestamp (current behaviour)
    > >> - Update leaf operators recoveryWindowId to committedWindowId
    > >> - Calculate committedWindow again, this steps is required because as
    > >> downstream operator recoveryWindowId is reduced and hence we may have
    > >>
    > >> to
    > >>
    > >> adjust the recoveryWindowId of upstream operators.
    > >>
    > >> This will prevent leaf stateless opeartors to start from current
    > >> timestamp, hence reducing amount of data loss. But As per the concern
    > >> raised by Bhupesh about last stateless operator being slow, the
    > >>
    > >> solution
    > >>
    > >> suggested by Vlad is sufficient
    > >>
    > >> solution-1
    > >> - as explained above. If little loss is expected we could go with this
    > >> appraoch.
    > >> solution-2
    > >> - Fail validation if last operator is stateless in AT_LEAST_ONCE
    > >>
    > >> scenario
    > >>
    > >> as suggested by Vlad.
    > >>   This could break backward compatibility as old applications will fail
    > >>
    > >> to
    > >>
    > >> launch.
    > >> solution-3
    > >> - Mark last operator stateful in AT_LEAST_ONCE scenario.
    > >>
    > >> Let me know about your preference.
    > >>
    > >> Regards,
    > >> - Tushar.
    > >>
    > >>
    > >> On Mon, Mar 6, 2017 at 8:31 PM, Vlad Rozov <v....@datatorrent.com> <
    > v.rozov@datatorrent.com>
    > >> wrote:
    > >>
    > >>
    > >> For a long chain of stateless operators at the end of a DAG, it is
    > >> possible that time to propagate the end window to a leaf operator is
    > >> greater than the time for a checkpoint to be persisted in HDFS.
    > >>
    > >> If at least once processing guarantee is necessary, the leaf operators
    > >> should not be STATELESS. Will invalidating DAG that has one or more
    > >>
    > >> leaf
    > >>
    > >> operator marked as STATELESS with AT_LEAST_ONCE processing solve
    > >> APEXCORE-619? It is not the best solution, but I think it is
    > >>
    > >> sufficient
    > >>
    > >> for
    > >>
    > >> the described scenario.
    > >>
    > >> Thank you,
    > >>
    > >> Vlad
    > >>
    > >>
    > >> On 3/2/17 08:43, Thomas Weise wrote:
    > >>
    > >>
    > >> Good point, that's correct for a stateless leaf operator (operator
    > >>
    > >> that
    > >>
    > >> does not have downstream operators). The minimum of upstream
    > >>
    > >> checkpoints
    > >>
    > >> can be higher than the last windowId seen by the leaf operator.
    > >>
    > >> Although
    > >>
    > >> that is a low probability, because it would mean the time it took for
    > >>
    > >> the
    > >>
    > >> checkpoint to become visible in HDFS is less than propagation of
    > >> endWindow
    > >> downstream.
    > >>
    > >> It's also not a problem for an intermediate stateless operator,
    > >>
    > >> because
    > >>
    > >> the
    > >> downstream checkpoint will inform the recovery windowId. Most of the
    > >>
    > >> time
    > >>
    > >> stateless operators are intermediate.
    > >>
    > >> Leaf operators are the output operators. I suspect in the original
    > >> scenario
    > >> is was a console output operator? Useful output operators usually
    > >>
    > >> won't
    > >>
    > >> be
    > >> stateless, they have to track state to interact with the external
    > >>
    > >> system
    > >>
    > >> correctly. I'm bringing this up for adequate cost/benefit analysis.
    > >>
    > >> In absence of stateful downstream operator, you only have the
    > >>
    > >> committed
    > >>
    > >> windowId, which is essentially a checkpointing watermark. On
    > >>
    > >> application
    > >>
    > >> restart it has to be recomputed from the checkpoints available, and
    > >>
    > >> does
    > >>
    > >> not cover the scenario Tushar reported originally.
    > >>
    > >> Saving committed windowId comes at a cost, it would have to be
    > >>
    > >> written
    > >>
    > >> to
    > >>
    > >> the journal before operators are notified. Care has been taken to no
    > >> write
    > >> unnecessarily to the journal, as it is blocking I/O and in this case
    > >>
    > >> the
    > >>
    > >> frequency depends on the order of arrival of checkpoint notifications
    > >> from
    > >> operators. We also don't want to delay commitedWindow notification,
    > >>
    > >> as
    > >>
    > >> that
    > >> would introduce latency.
    > >>
    > >> Thomas
    > >>
    > >>
    > >> On Thu, Mar 2, 2017 at 2:10 AM, Bhupesh Chawda <
    > >>
    > >> bhupesh@datatorrent.com>
    > >>
    > >> wrote:
    > >>
    > >> What if all operators complete first checkpoints but the stateless
    > >>
    > >> operator
    > >> could not cross the first checkpoint window, and the DAG crashed.
    > >> If we try to figure out the recovery checkpoint now, we might
    > >>
    > >> conclude
    > >>
    > >> that
    > >> checkpoint 1 is the point to start and we may miss some data getting
    > >> processed by the stateless operator. Probably in this case at-least
    > >> once is
    > >> also not guaranteed?
    > >>
    > >> ~ Bhupesh
    > >>
    > >>
    > >> _______________________________________________________
    > >>
    > >> Bhupesh Chawda
    > >>
    > >> E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
    > >> www.datatorrent.com  |  apex.apache.org
    > >>
    > >>
    > >>
    > >> On Thu, Mar 2, 2017 at 8:06 AM, Thomas Weise <th...@apache.org> <
    > thw@apache.org>
    > >>
    > >> wrote:
    > >>
    > >> Dummy checkpoints, continuously writing committed window id and the
    > >>
    > >> like
    > >>
    > >> all introduce overhead that is probably not needed.
    > >>
    > >> All the information to derive what we need is likely available and
    > >>
    > >> IMO
    > >>
    > >> the
    > >>
    > >>
    > >> discussion should be on what is the correct way of using it. I will
    > >> have
    > >>
    > >>
    > >> a
    > >>
    > >>
    > >> look when I get to it as well.
    > >>
    > >> Thanks,
    > >> Thomas
    > >>
    > >>
    > >> On Wed, Mar 1, 2017 at 6:29 PM, Sandesh Hegde <
    > >>
    > >> sandesh@datatorrent.com
    > >>
    > >> wrote:
    > >>
    > >> Instead of treating the stateless operator in a special way and
    > >>
    > >> missing
    > >>
    > >> corner cases, just have a dummy checkpoint, then there is no need
    > >>
    > >> to
    > >>
    > >> handle
    > >>
    > >>
    > >> corner cases.
    > >>
    > >> There is a name for this solution,https://en.wikipedia.
    > org/wiki/Null_Object_pattern
    > >>
    > >>
    > >>
    > >> On Wed, Mar 1, 2017 at 2:52 PM Pramod Immaneni <pramod@datatorrent.com
    > >> wrote:
    > >>
    > >> There is code in various places that deals with stateless
    > >>
    > >> operators
    > >>
    > >> in
    > >>
    > >> a
    > >>
    > >>
    > >> special way even though a physical checkpoint does not exist on
    > >>
    > >> the
    > >>
    > >> disk.
    > >>
    > >> It is probably a matter of applying similar thought process/logic
    > >>
    > >> correctly
    > >>
    > >>
    > >> here.
    > >>
    > >> On Wed, Mar 1, 2017 at 2:27 PM, Amol Kekre <amol@datatorrent.com
    > >>
    > >> wrote:
    > >>
    > >> hmm! the fact that commitWindowId has moved up (right now in
    > >>
    > >> memory
    > >>
    > >> of
    > >>
    > >> Stram) should mean that a complete set of checkpoints are
    > >>
    > >> available,
    > >>
    > >> i.e
    > >>
    > >> commitWindowId can be derived. Lets say that next checkpoint
    > >>
    > >> window
    > >>
    > >> also
    > >>
    > >> gets checkpointed across the app, commitwindowID is in memory but
    > >>
    > >> not
    > >>
    > >> written to stram-state yet, then upon relaunch the latest
    > >>
    > >> commitwindowID
    > >>
    > >> should get computed correctly.
    > >>
    > >> This may be just about setting stateless operators to
    > >>
    > >>
    > >> commitWindowid
    > >>
    > >> on
    > >>
    > >>
    > >> re-launch? aka bug/feature?
    > >>
    > >> Thks
    > >> Amol
    > >>
    > >>
    > >> E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606>
    > <(510)%20449-2606> |
    > >>
    > >>
    > >> Twitter:
    > >>
    > >> @*amolhkekre*
    > >>
    > >> www.datatorrent.com  |  apex.apache.org
    > >>
    > >> *Join us at Apex Big Data World-San Jose<http://www.apexbigdata.
    > com/san-jose.html> <http://www.apexbigdata.com/san-jose.html>, April 4,
    > 2017!*
    > >> [image: http://www.apexbigdata.com/san-jose-register.html]<http:/
    > /www.apexbigdata.com/san-jose-register.html> <http://www.apexbigdata.com/
    > san-jose-register.html>
    > >>
    > >> On Wed, Mar 1, 2017 at 1:41 PM, Pramod Immaneni <
    > >>
    > >>
    > >> pramod@datatorrent.com>
    > >>
    > >> wrote:
    > >>
    > >> Do we need to save committedWindowId? Can't it be computed from
    > >>
    > >> existing
    > >>
    > >> checkpoints by walking through the DAG. We probably do this
    > >>
    > >> anyway
    > >>
    > >> and
    > >>
    > >> I
    > >>
    > >>
    > >> suspect there is a minor bug somewhere in there. If an operator
    > >>
    > >> is
    > >>
    > >> stateless you could assume checkpoint as long max for sake of
    > >>
    > >> computation
    > >>
    > >> and compute the committed window to be the lowest common
    > >>
    > >> checkpoint.
    > >>
    > >> If
    > >>
    > >>
    > >> they are all stateless and you end up with long max you can start
    > >>
    > >> with
    > >>
    > >> window id that reflects the current timestamp.
    > >>
    > >> Thanks
    > >>
    > >> On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre <
    > >>
    > >> amol@datatorrent.com
    > >>
    > >> wrote:
    > >>
    > >> CommitWindowId could be computed from the existing checkpoints.
    > >>
    > >> That
    > >>
    > >> solution still needs purge to be done after commitWindowId is
    > >>
    > >> confirmed
    > >>
    > >> to
    > >>
    > >> be saved in Stram state. Without ths the commitWindowId
    > >>
    > >>
    > >> computed
    > >>
    > >> from
    > >>
    > >> the
    > >>
    > >> checkpoints may have some checkpoints missing.
    > >>
    > >> Thks
    > >> Amol
    > >>
    > >>
    > >> E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606>
    > <(510)%20449-2606> |
    > >>
    > >>
    > >> Twitter: @*amolhkekre*
    > >>
    > >> www.datatorrent.com  |  apex.apache.org
    > >>
    > >> *Join us at Apex Big Data World-San Jose<http://www.apexbigdata.
    > com/san-jose.html> <http://www.apexbigdata.com/san-jose.html>, April 4,
    > 2017!*
    > >> [image: http://www.apexbigdata.com/san-jose-register.html]<http:/
    > /www.apexbigdata.com/san-jose-register.html> <http://www.apexbigdata.com/
    > san-jose-register.html>
    > >>
    > >> On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni <
    > >>
    > >>
    > >> pramod@datatorrent.com
    > >>
    > >> wrote:
    > >>
    > >> Can't the commitedWindowId be calculated by looking at the
    > >>
    > >> physical
    > >>
    > >> plan
    > >>
    > >> and the existing checkpoints?
    > >>
    > >> On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi <
    > >>
    > >>
    > >> tushar@apache.org
    > >>
    > >> wrote:
    > >>
    > >> Help Needed for APEXCORE-619
    > >>
    > >> Issue : When application is relaunched after long time with
    > >>
    > >>
    > >> stateless
    > >>
    > >> opeartors at the end of the DAG, the stateless operators
    > >>
    > >> starts
    > >>
    > >> with
    > >>
    > >> a
    > >>
    > >>
    > >> very
    > >>
    > >> high windowId. In this case the stateless operator ignors
    > >>
    > >>
    > >> all
    > >>
    > >> the
    > >>
    > >> data
    > >>
    > >> received till upstream operator catches up with it. This
    > >>
    > >> breaks
    > >>
    > >> the
    > >>
    > >> *at-least-once* gaurantee while relaunch of the opeartor or
    > >>
    > >> when
    > >>
    > >> master
    > >>
    > >> is
    > >>
    > >> killed and application is restarted.
    > >>
    > >> Solutions:
    > >> - Fix windowId for stateless leaf operators from upstream
    > >>
    > >>
    > >> opeartor.
    > >>
    > >> But
    > >>
    > >> it
    > >>
    > >> has some issues when we have a join with two upstrams
    > >>
    > >>
    > >> operators
    > >>
    > >> at
    > >>
    > >> different windowId. If we set the windowID to min(upstream
    > >>
    > >> windowId),
    > >>
    > >> then
    > >>
    > >> we need to again recalulate the new recovery window ids for
    > >>
    > >>
    > >> upstream
    > >>
    > >> paths
    > >>
    > >> from this operators.
    > >>
    > >> - Other solution is to create a empty file in checkpoint
    > >>
    > >>
    > >> directory
    > >>
    > >> for
    > >>
    > >> stateless operators. This will help us to identify the
    > >>
    > >> checkpoints
    > >>
    > >> of
    > >>
    > >>
    > >> stateless operators during relaunch instead of computing
    > >>
    > >> from
    > >>
    > >> latest
    > >>
    > >> timestamp.
    > >>
    > >> - Bring the entire DAG to committedWindowId. This could be
    > >>
    > >>
    > >> achived
    > >>
    > >> using
    > >>
    > >> writing committedWindowId in a journal. we need to make
    > >>
    > >> sure
    > >>
    > >> that
    > >>
    > >> we
    > >>
    > >> are
    > >>
    > >> not puring the checkpointed state until the
    > >>
    > >> committedWundowId
    > >>
    > >> is
    > >>
    > >> saved
    > >>
    > >> in
    > >>
    > >>
    > >> journal.
    > >>
    > >> Let me know your thoughs on this and preferred solution.
    > >>
    > >> Regards,
    > >> -Tushar.
    > >>
    > >> --
    > >>
    > >> *Join us at Apex Big Data World-San Jose<http://www.apexbigdata.
    > com/san-jose.html> <http://www.apexbigdata.com/san-jose.html>, April 4,
    > 2017!*
    > >> [image: http://www.apexbigdata.com/san-jose-register.html]
    > >>
    > >>
    > >>
    > >>
    > >>
    > >
    >
    

________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

Re: APEXCORE-619 Recovery windowId in future during application relaunch.

Posted by Tushar Gosavi <tu...@datatorrent.com>.
I have opened an pull request #490 to handle this scenario by implementing
option 3.

-Tushar.


On Tue, Mar 14, 2017 at 7:46 PM, Thomas Weise <th...@apache.org> wrote:

> The JDBC operator was an example of a valid use case where no state needs
> to be saved in a checkpoint (it is saved to the database instead). We
> cannot break such use cases and neither should the application developer be
> exposed to it, especially not in backward incompatible fashion.
>
> I think that the platform/operators need to handle this. Perhaps we can
> look at all the "stateless" operators and see which ones really depend on
> at-least-once processing (to produce exactly-once results) and then change
> the remaining to have default processing mode at-most-once? That would
> require separate annotation support to indicate a default processing mode
> for an operator.
>
> Thanks,
> Thomas
>
>
>
>
>
>
>
>
> On Tue, Mar 14, 2017 at 4:26 AM, Tushar Gosavi <tu...@datatorrent.com>
> wrote:
>
> > Hi Vlad,
> >
> > I am more worried about backward compatibility. Most of the new users
> will
> > be not able to launch simple applications which are starting point while
> > learning Apex such as PiDemo and WordCount. We could provide a way for
> user
> > to not use empty file storage agent for leaf stateless operators if he is
> > more worried about name node operations.
> >
> > - Tushar.
> >
> >
> > On Sat, Mar 11, 2017 at 6:10 AM, Vlad Rozov <v....@datatorrent.com>
> > wrote:
> >
> >> I would prefer to go with option 2 (and maybe reuse -force flag to allow
> >> launching application that do not validate due to newly introduced
> rule). I
> >> am not sure that it is OK to outsmart application designer and force
> >> stateless operator to become statefull.
> >>
> >> Thank you,
> >>
> >> Vlad
> >>
> >> *Join us at Apex Big Data World-San Jose
> >> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017*
> >> [image: http://www.apexbigdata.com/san-jose-register.html]
> >> <http://www.apexbigdata.com/san-jose-register.html>
> >> On 3/10/17 07:38, Thomas Weise wrote:
> >>
> >> +1
> >>
> >> But keep in mind it will cause unnecessary name node operations and
> >> therefore it would be good to only use it when it is really needed (i.e.
> >> the operator in reality isn't stateless, it stores its state somewhere
> >> else).
> >>
> >> Can we look at optimizing the behavior for "stateless" operators that
> are
> >> really stateless. For example the console operator should by default be
> >> AT_MOST_ONCE?
> >>
> >>
> >> On Fri, Mar 10, 2017 at 1:45 AM, Bhupesh Chawda <
> bhupesh@datatorrent.com> <bh...@datatorrent.com>
> >> wrote:
> >>
> >>
> >> My preference is also for option 3. It looks clean and simple to
> implement.
> >>
> >> ~ Bhupesh
> >>
> >>
> >> _______________________________________________________
> >>
> >> Bhupesh Chawda
> >>
> >> E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
> >> www.datatorrent.com  |  apex.apache.org
> >>
> >>
> >>
> >> On Fri, Mar 10, 2017 at 3:06 PM, Tushar Gosavi <tu...@datatorrent.com>
> <tu...@datatorrent.com>
> >> wrote:
> >>
> >>
> >> Can you please let me know your preference? My preference is for
> solution
> >> 3, by adding a StorageAgent which creates an empty file, and using this
> >> storage agent for leaf stateless operators.
> >>
> >> - Tushar.
> >>
> >>
> >> On Tue, Mar 7, 2017 at 1:52 PM, Tushar Gosavi <tu...@datatorrent.com>
> <tu...@datatorrent.com>
> >> wrote:
> >>
> >>
> >> Thank you all for the feedback.
> >>
> >> Some of the useful output operator can be stateless, they push data
> >> received in a window to output store. for example KafkaOutputOperator/
> >>
> >> JDBCOutputOperator,
> >>
> >> or the output stores where
> >> writes are idempotent, which covers most of the key-value stores.
> >>
> >> I was going to use the existing logic to compute the committedWindowId
> >> with addition of few steps explained below.
> >> solution-1
> >> - Calculate committedWindow with leaf operator checkpoints set to
> >>
> >> current
> >>
> >> timestamp (current behaviour)
> >> - Update leaf operators recoveryWindowId to committedWindowId
> >> - Calculate committedWindow again, this steps is required because as
> >> downstream operator recoveryWindowId is reduced and hence we may have
> >>
> >> to
> >>
> >> adjust the recoveryWindowId of upstream operators.
> >>
> >> This will prevent leaf stateless opeartors to start from current
> >> timestamp, hence reducing amount of data loss. But As per the concern
> >> raised by Bhupesh about last stateless operator being slow, the
> >>
> >> solution
> >>
> >> suggested by Vlad is sufficient
> >>
> >> solution-1
> >> - as explained above. If little loss is expected we could go with this
> >> appraoch.
> >> solution-2
> >> - Fail validation if last operator is stateless in AT_LEAST_ONCE
> >>
> >> scenario
> >>
> >> as suggested by Vlad.
> >>   This could break backward compatibility as old applications will fail
> >>
> >> to
> >>
> >> launch.
> >> solution-3
> >> - Mark last operator stateful in AT_LEAST_ONCE scenario.
> >>
> >> Let me know about your preference.
> >>
> >> Regards,
> >> - Tushar.
> >>
> >>
> >> On Mon, Mar 6, 2017 at 8:31 PM, Vlad Rozov <v....@datatorrent.com> <
> v.rozov@datatorrent.com>
> >> wrote:
> >>
> >>
> >> For a long chain of stateless operators at the end of a DAG, it is
> >> possible that time to propagate the end window to a leaf operator is
> >> greater than the time for a checkpoint to be persisted in HDFS.
> >>
> >> If at least once processing guarantee is necessary, the leaf operators
> >> should not be STATELESS. Will invalidating DAG that has one or more
> >>
> >> leaf
> >>
> >> operator marked as STATELESS with AT_LEAST_ONCE processing solve
> >> APEXCORE-619? It is not the best solution, but I think it is
> >>
> >> sufficient
> >>
> >> for
> >>
> >> the described scenario.
> >>
> >> Thank you,
> >>
> >> Vlad
> >>
> >>
> >> On 3/2/17 08:43, Thomas Weise wrote:
> >>
> >>
> >> Good point, that's correct for a stateless leaf operator (operator
> >>
> >> that
> >>
> >> does not have downstream operators). The minimum of upstream
> >>
> >> checkpoints
> >>
> >> can be higher than the last windowId seen by the leaf operator.
> >>
> >> Although
> >>
> >> that is a low probability, because it would mean the time it took for
> >>
> >> the
> >>
> >> checkpoint to become visible in HDFS is less than propagation of
> >> endWindow
> >> downstream.
> >>
> >> It's also not a problem for an intermediate stateless operator,
> >>
> >> because
> >>
> >> the
> >> downstream checkpoint will inform the recovery windowId. Most of the
> >>
> >> time
> >>
> >> stateless operators are intermediate.
> >>
> >> Leaf operators are the output operators. I suspect in the original
> >> scenario
> >> is was a console output operator? Useful output operators usually
> >>
> >> won't
> >>
> >> be
> >> stateless, they have to track state to interact with the external
> >>
> >> system
> >>
> >> correctly. I'm bringing this up for adequate cost/benefit analysis.
> >>
> >> In absence of stateful downstream operator, you only have the
> >>
> >> committed
> >>
> >> windowId, which is essentially a checkpointing watermark. On
> >>
> >> application
> >>
> >> restart it has to be recomputed from the checkpoints available, and
> >>
> >> does
> >>
> >> not cover the scenario Tushar reported originally.
> >>
> >> Saving committed windowId comes at a cost, it would have to be
> >>
> >> written
> >>
> >> to
> >>
> >> the journal before operators are notified. Care has been taken to no
> >> write
> >> unnecessarily to the journal, as it is blocking I/O and in this case
> >>
> >> the
> >>
> >> frequency depends on the order of arrival of checkpoint notifications
> >> from
> >> operators. We also don't want to delay commitedWindow notification,
> >>
> >> as
> >>
> >> that
> >> would introduce latency.
> >>
> >> Thomas
> >>
> >>
> >> On Thu, Mar 2, 2017 at 2:10 AM, Bhupesh Chawda <
> >>
> >> bhupesh@datatorrent.com>
> >>
> >> wrote:
> >>
> >> What if all operators complete first checkpoints but the stateless
> >>
> >> operator
> >> could not cross the first checkpoint window, and the DAG crashed.
> >> If we try to figure out the recovery checkpoint now, we might
> >>
> >> conclude
> >>
> >> that
> >> checkpoint 1 is the point to start and we may miss some data getting
> >> processed by the stateless operator. Probably in this case at-least
> >> once is
> >> also not guaranteed?
> >>
> >> ~ Bhupesh
> >>
> >>
> >> _______________________________________________________
> >>
> >> Bhupesh Chawda
> >>
> >> E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
> >> www.datatorrent.com  |  apex.apache.org
> >>
> >>
> >>
> >> On Thu, Mar 2, 2017 at 8:06 AM, Thomas Weise <th...@apache.org> <
> thw@apache.org>
> >>
> >> wrote:
> >>
> >> Dummy checkpoints, continuously writing committed window id and the
> >>
> >> like
> >>
> >> all introduce overhead that is probably not needed.
> >>
> >> All the information to derive what we need is likely available and
> >>
> >> IMO
> >>
> >> the
> >>
> >>
> >> discussion should be on what is the correct way of using it. I will
> >> have
> >>
> >>
> >> a
> >>
> >>
> >> look when I get to it as well.
> >>
> >> Thanks,
> >> Thomas
> >>
> >>
> >> On Wed, Mar 1, 2017 at 6:29 PM, Sandesh Hegde <
> >>
> >> sandesh@datatorrent.com
> >>
> >> wrote:
> >>
> >> Instead of treating the stateless operator in a special way and
> >>
> >> missing
> >>
> >> corner cases, just have a dummy checkpoint, then there is no need
> >>
> >> to
> >>
> >> handle
> >>
> >>
> >> corner cases.
> >>
> >> There is a name for this solution,https://en.wikipedia.
> org/wiki/Null_Object_pattern
> >>
> >>
> >>
> >> On Wed, Mar 1, 2017 at 2:52 PM Pramod Immaneni <pramod@datatorrent.com
> >> wrote:
> >>
> >> There is code in various places that deals with stateless
> >>
> >> operators
> >>
> >> in
> >>
> >> a
> >>
> >>
> >> special way even though a physical checkpoint does not exist on
> >>
> >> the
> >>
> >> disk.
> >>
> >> It is probably a matter of applying similar thought process/logic
> >>
> >> correctly
> >>
> >>
> >> here.
> >>
> >> On Wed, Mar 1, 2017 at 2:27 PM, Amol Kekre <amol@datatorrent.com
> >>
> >> wrote:
> >>
> >> hmm! the fact that commitWindowId has moved up (right now in
> >>
> >> memory
> >>
> >> of
> >>
> >> Stram) should mean that a complete set of checkpoints are
> >>
> >> available,
> >>
> >> i.e
> >>
> >> commitWindowId can be derived. Lets say that next checkpoint
> >>
> >> window
> >>
> >> also
> >>
> >> gets checkpointed across the app, commitwindowID is in memory but
> >>
> >> not
> >>
> >> written to stram-state yet, then upon relaunch the latest
> >>
> >> commitwindowID
> >>
> >> should get computed correctly.
> >>
> >> This may be just about setting stateless operators to
> >>
> >>
> >> commitWindowid
> >>
> >> on
> >>
> >>
> >> re-launch? aka bug/feature?
> >>
> >> Thks
> >> Amol
> >>
> >>
> >> E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606>
> <(510)%20449-2606> |
> >>
> >>
> >> Twitter:
> >>
> >> @*amolhkekre*
> >>
> >> www.datatorrent.com  |  apex.apache.org
> >>
> >> *Join us at Apex Big Data World-San Jose<http://www.apexbigdata.
> com/san-jose.html> <http://www.apexbigdata.com/san-jose.html>, April 4,
> 2017!*
> >> [image: http://www.apexbigdata.com/san-jose-register.html]<http:/
> /www.apexbigdata.com/san-jose-register.html> <http://www.apexbigdata.com/
> san-jose-register.html>
> >>
> >> On Wed, Mar 1, 2017 at 1:41 PM, Pramod Immaneni <
> >>
> >>
> >> pramod@datatorrent.com>
> >>
> >> wrote:
> >>
> >> Do we need to save committedWindowId? Can't it be computed from
> >>
> >> existing
> >>
> >> checkpoints by walking through the DAG. We probably do this
> >>
> >> anyway
> >>
> >> and
> >>
> >> I
> >>
> >>
> >> suspect there is a minor bug somewhere in there. If an operator
> >>
> >> is
> >>
> >> stateless you could assume checkpoint as long max for sake of
> >>
> >> computation
> >>
> >> and compute the committed window to be the lowest common
> >>
> >> checkpoint.
> >>
> >> If
> >>
> >>
> >> they are all stateless and you end up with long max you can start
> >>
> >> with
> >>
> >> window id that reflects the current timestamp.
> >>
> >> Thanks
> >>
> >> On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre <
> >>
> >> amol@datatorrent.com
> >>
> >> wrote:
> >>
> >> CommitWindowId could be computed from the existing checkpoints.
> >>
> >> That
> >>
> >> solution still needs purge to be done after commitWindowId is
> >>
> >> confirmed
> >>
> >> to
> >>
> >> be saved in Stram state. Without ths the commitWindowId
> >>
> >>
> >> computed
> >>
> >> from
> >>
> >> the
> >>
> >> checkpoints may have some checkpoints missing.
> >>
> >> Thks
> >> Amol
> >>
> >>
> >> E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606>
> <(510)%20449-2606> |
> >>
> >>
> >> Twitter: @*amolhkekre*
> >>
> >> www.datatorrent.com  |  apex.apache.org
> >>
> >> *Join us at Apex Big Data World-San Jose<http://www.apexbigdata.
> com/san-jose.html> <http://www.apexbigdata.com/san-jose.html>, April 4,
> 2017!*
> >> [image: http://www.apexbigdata.com/san-jose-register.html]<http:/
> /www.apexbigdata.com/san-jose-register.html> <http://www.apexbigdata.com/
> san-jose-register.html>
> >>
> >> On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni <
> >>
> >>
> >> pramod@datatorrent.com
> >>
> >> wrote:
> >>
> >> Can't the commitedWindowId be calculated by looking at the
> >>
> >> physical
> >>
> >> plan
> >>
> >> and the existing checkpoints?
> >>
> >> On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi <
> >>
> >>
> >> tushar@apache.org
> >>
> >> wrote:
> >>
> >> Help Needed for APEXCORE-619
> >>
> >> Issue : When application is relaunched after long time with
> >>
> >>
> >> stateless
> >>
> >> opeartors at the end of the DAG, the stateless operators
> >>
> >> starts
> >>
> >> with
> >>
> >> a
> >>
> >>
> >> very
> >>
> >> high windowId. In this case the stateless operator ignors
> >>
> >>
> >> all
> >>
> >> the
> >>
> >> data
> >>
> >> received till upstream operator catches up with it. This
> >>
> >> breaks
> >>
> >> the
> >>
> >> *at-least-once* gaurantee while relaunch of the opeartor or
> >>
> >> when
> >>
> >> master
> >>
> >> is
> >>
> >> killed and application is restarted.
> >>
> >> Solutions:
> >> - Fix windowId for stateless leaf operators from upstream
> >>
> >>
> >> opeartor.
> >>
> >> But
> >>
> >> it
> >>
> >> has some issues when we have a join with two upstrams
> >>
> >>
> >> operators
> >>
> >> at
> >>
> >> different windowId. If we set the windowID to min(upstream
> >>
> >> windowId),
> >>
> >> then
> >>
> >> we need to again recalulate the new recovery window ids for
> >>
> >>
> >> upstream
> >>
> >> paths
> >>
> >> from this operators.
> >>
> >> - Other solution is to create a empty file in checkpoint
> >>
> >>
> >> directory
> >>
> >> for
> >>
> >> stateless operators. This will help us to identify the
> >>
> >> checkpoints
> >>
> >> of
> >>
> >>
> >> stateless operators during relaunch instead of computing
> >>
> >> from
> >>
> >> latest
> >>
> >> timestamp.
> >>
> >> - Bring the entire DAG to committedWindowId. This could be
> >>
> >>
> >> achived
> >>
> >> using
> >>
> >> writing committedWindowId in a journal. we need to make
> >>
> >> sure
> >>
> >> that
> >>
> >> we
> >>
> >> are
> >>
> >> not puring the checkpointed state until the
> >>
> >> committedWundowId
> >>
> >> is
> >>
> >> saved
> >>
> >> in
> >>
> >>
> >> journal.
> >>
> >> Let me know your thoughs on this and preferred solution.
> >>
> >> Regards,
> >> -Tushar.
> >>
> >> --
> >>
> >> *Join us at Apex Big Data World-San Jose<http://www.apexbigdata.
> com/san-jose.html> <http://www.apexbigdata.com/san-jose.html>, April 4,
> 2017!*
> >> [image: http://www.apexbigdata.com/san-jose-register.html]
> >>
> >>
> >>
> >>
> >>
> >
>

Re: APEXCORE-619 Recovery windowId in future during application relaunch.

Posted by Thomas Weise <th...@apache.org>.
The JDBC operator was an example of a valid use case where no state needs
to be saved in a checkpoint (it is saved to the database instead). We
cannot break such use cases and neither should the application developer be
exposed to it, especially not in backward incompatible fashion.

I think that the platform/operators need to handle this. Perhaps we can
look at all the "stateless" operators and see which ones really depend on
at-least-once processing (to produce exactly-once results) and then change
the remaining to have default processing mode at-most-once? That would
require separate annotation support to indicate a default processing mode
for an operator.

Thanks,
Thomas








On Tue, Mar 14, 2017 at 4:26 AM, Tushar Gosavi <tu...@datatorrent.com>
wrote:

> Hi Vlad,
>
> I am more worried about backward compatibility. Most of the new users will
> be not able to launch simple applications which are starting point while
> learning Apex such as PiDemo and WordCount. We could provide a way for user
> to not use empty file storage agent for leaf stateless operators if he is
> more worried about name node operations.
>
> - Tushar.
>
>
> On Sat, Mar 11, 2017 at 6:10 AM, Vlad Rozov <v....@datatorrent.com>
> wrote:
>
>> I would prefer to go with option 2 (and maybe reuse -force flag to allow
>> launching application that do not validate due to newly introduced rule). I
>> am not sure that it is OK to outsmart application designer and force
>> stateless operator to become statefull.
>>
>> Thank you,
>>
>> Vlad
>>
>> *Join us at Apex Big Data World-San Jose
>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017*
>> [image: http://www.apexbigdata.com/san-jose-register.html]
>> <http://www.apexbigdata.com/san-jose-register.html>
>> On 3/10/17 07:38, Thomas Weise wrote:
>>
>> +1
>>
>> But keep in mind it will cause unnecessary name node operations and
>> therefore it would be good to only use it when it is really needed (i.e.
>> the operator in reality isn't stateless, it stores its state somewhere
>> else).
>>
>> Can we look at optimizing the behavior for "stateless" operators that are
>> really stateless. For example the console operator should by default be
>> AT_MOST_ONCE?
>>
>>
>> On Fri, Mar 10, 2017 at 1:45 AM, Bhupesh Chawda <bh...@datatorrent.com> <bh...@datatorrent.com>
>> wrote:
>>
>>
>> My preference is also for option 3. It looks clean and simple to implement.
>>
>> ~ Bhupesh
>>
>>
>> _______________________________________________________
>>
>> Bhupesh Chawda
>>
>> E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
>> www.datatorrent.com  |  apex.apache.org
>>
>>
>>
>> On Fri, Mar 10, 2017 at 3:06 PM, Tushar Gosavi <tu...@datatorrent.com> <tu...@datatorrent.com>
>> wrote:
>>
>>
>> Can you please let me know your preference? My preference is for solution
>> 3, by adding a StorageAgent which creates an empty file, and using this
>> storage agent for leaf stateless operators.
>>
>> - Tushar.
>>
>>
>> On Tue, Mar 7, 2017 at 1:52 PM, Tushar Gosavi <tu...@datatorrent.com> <tu...@datatorrent.com>
>> wrote:
>>
>>
>> Thank you all for the feedback.
>>
>> Some of the useful output operator can be stateless, they push data
>> received in a window to output store. for example KafkaOutputOperator/
>>
>> JDBCOutputOperator,
>>
>> or the output stores where
>> writes are idempotent, which covers most of the key-value stores.
>>
>> I was going to use the existing logic to compute the committedWindowId
>> with addition of few steps explained below.
>> solution-1
>> - Calculate committedWindow with leaf operator checkpoints set to
>>
>> current
>>
>> timestamp (current behaviour)
>> - Update leaf operators recoveryWindowId to committedWindowId
>> - Calculate committedWindow again, this steps is required because as
>> downstream operator recoveryWindowId is reduced and hence we may have
>>
>> to
>>
>> adjust the recoveryWindowId of upstream operators.
>>
>> This will prevent leaf stateless opeartors to start from current
>> timestamp, hence reducing amount of data loss. But As per the concern
>> raised by Bhupesh about last stateless operator being slow, the
>>
>> solution
>>
>> suggested by Vlad is sufficient
>>
>> solution-1
>> - as explained above. If little loss is expected we could go with this
>> appraoch.
>> solution-2
>> - Fail validation if last operator is stateless in AT_LEAST_ONCE
>>
>> scenario
>>
>> as suggested by Vlad.
>>   This could break backward compatibility as old applications will fail
>>
>> to
>>
>> launch.
>> solution-3
>> - Mark last operator stateful in AT_LEAST_ONCE scenario.
>>
>> Let me know about your preference.
>>
>> Regards,
>> - Tushar.
>>
>>
>> On Mon, Mar 6, 2017 at 8:31 PM, Vlad Rozov <v....@datatorrent.com> <v....@datatorrent.com>
>> wrote:
>>
>>
>> For a long chain of stateless operators at the end of a DAG, it is
>> possible that time to propagate the end window to a leaf operator is
>> greater than the time for a checkpoint to be persisted in HDFS.
>>
>> If at least once processing guarantee is necessary, the leaf operators
>> should not be STATELESS. Will invalidating DAG that has one or more
>>
>> leaf
>>
>> operator marked as STATELESS with AT_LEAST_ONCE processing solve
>> APEXCORE-619? It is not the best solution, but I think it is
>>
>> sufficient
>>
>> for
>>
>> the described scenario.
>>
>> Thank you,
>>
>> Vlad
>>
>>
>> On 3/2/17 08:43, Thomas Weise wrote:
>>
>>
>> Good point, that's correct for a stateless leaf operator (operator
>>
>> that
>>
>> does not have downstream operators). The minimum of upstream
>>
>> checkpoints
>>
>> can be higher than the last windowId seen by the leaf operator.
>>
>> Although
>>
>> that is a low probability, because it would mean the time it took for
>>
>> the
>>
>> checkpoint to become visible in HDFS is less than propagation of
>> endWindow
>> downstream.
>>
>> It's also not a problem for an intermediate stateless operator,
>>
>> because
>>
>> the
>> downstream checkpoint will inform the recovery windowId. Most of the
>>
>> time
>>
>> stateless operators are intermediate.
>>
>> Leaf operators are the output operators. I suspect in the original
>> scenario
>> is was a console output operator? Useful output operators usually
>>
>> won't
>>
>> be
>> stateless, they have to track state to interact with the external
>>
>> system
>>
>> correctly. I'm bringing this up for adequate cost/benefit analysis.
>>
>> In absence of stateful downstream operator, you only have the
>>
>> committed
>>
>> windowId, which is essentially a checkpointing watermark. On
>>
>> application
>>
>> restart it has to be recomputed from the checkpoints available, and
>>
>> does
>>
>> not cover the scenario Tushar reported originally.
>>
>> Saving committed windowId comes at a cost, it would have to be
>>
>> written
>>
>> to
>>
>> the journal before operators are notified. Care has been taken to no
>> write
>> unnecessarily to the journal, as it is blocking I/O and in this case
>>
>> the
>>
>> frequency depends on the order of arrival of checkpoint notifications
>> from
>> operators. We also don't want to delay commitedWindow notification,
>>
>> as
>>
>> that
>> would introduce latency.
>>
>> Thomas
>>
>>
>> On Thu, Mar 2, 2017 at 2:10 AM, Bhupesh Chawda <
>>
>> bhupesh@datatorrent.com>
>>
>> wrote:
>>
>> What if all operators complete first checkpoints but the stateless
>>
>> operator
>> could not cross the first checkpoint window, and the DAG crashed.
>> If we try to figure out the recovery checkpoint now, we might
>>
>> conclude
>>
>> that
>> checkpoint 1 is the point to start and we may miss some data getting
>> processed by the stateless operator. Probably in this case at-least
>> once is
>> also not guaranteed?
>>
>> ~ Bhupesh
>>
>>
>> _______________________________________________________
>>
>> Bhupesh Chawda
>>
>> E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
>> www.datatorrent.com  |  apex.apache.org
>>
>>
>>
>> On Thu, Mar 2, 2017 at 8:06 AM, Thomas Weise <th...@apache.org> <th...@apache.org>
>>
>> wrote:
>>
>> Dummy checkpoints, continuously writing committed window id and the
>>
>> like
>>
>> all introduce overhead that is probably not needed.
>>
>> All the information to derive what we need is likely available and
>>
>> IMO
>>
>> the
>>
>>
>> discussion should be on what is the correct way of using it. I will
>> have
>>
>>
>> a
>>
>>
>> look when I get to it as well.
>>
>> Thanks,
>> Thomas
>>
>>
>> On Wed, Mar 1, 2017 at 6:29 PM, Sandesh Hegde <
>>
>> sandesh@datatorrent.com
>>
>> wrote:
>>
>> Instead of treating the stateless operator in a special way and
>>
>> missing
>>
>> corner cases, just have a dummy checkpoint, then there is no need
>>
>> to
>>
>> handle
>>
>>
>> corner cases.
>>
>> There is a name for this solution,https://en.wikipedia.org/wiki/Null_Object_pattern
>>
>>
>>
>> On Wed, Mar 1, 2017 at 2:52 PM Pramod Immaneni <pramod@datatorrent.com
>> wrote:
>>
>> There is code in various places that deals with stateless
>>
>> operators
>>
>> in
>>
>> a
>>
>>
>> special way even though a physical checkpoint does not exist on
>>
>> the
>>
>> disk.
>>
>> It is probably a matter of applying similar thought process/logic
>>
>> correctly
>>
>>
>> here.
>>
>> On Wed, Mar 1, 2017 at 2:27 PM, Amol Kekre <amol@datatorrent.com
>>
>> wrote:
>>
>> hmm! the fact that commitWindowId has moved up (right now in
>>
>> memory
>>
>> of
>>
>> Stram) should mean that a complete set of checkpoints are
>>
>> available,
>>
>> i.e
>>
>> commitWindowId can be derived. Lets say that next checkpoint
>>
>> window
>>
>> also
>>
>> gets checkpointed across the app, commitwindowID is in memory but
>>
>> not
>>
>> written to stram-state yet, then upon relaunch the latest
>>
>> commitwindowID
>>
>> should get computed correctly.
>>
>> This may be just about setting stateless operators to
>>
>>
>> commitWindowid
>>
>> on
>>
>>
>> re-launch? aka bug/feature?
>>
>> Thks
>> Amol
>>
>>
>> E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> <(510)%20449-2606> |
>>
>>
>> Twitter:
>>
>> @*amolhkekre*
>>
>> www.datatorrent.com  |  apex.apache.org
>>
>> *Join us at Apex Big Data World-San Jose<http://www.apexbigdata.com/san-jose.html> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
>> [image: http://www.apexbigdata.com/san-jose-register.html]<http://www.apexbigdata.com/san-jose-register.html> <http://www.apexbigdata.com/san-jose-register.html>
>>
>> On Wed, Mar 1, 2017 at 1:41 PM, Pramod Immaneni <
>>
>>
>> pramod@datatorrent.com>
>>
>> wrote:
>>
>> Do we need to save committedWindowId? Can't it be computed from
>>
>> existing
>>
>> checkpoints by walking through the DAG. We probably do this
>>
>> anyway
>>
>> and
>>
>> I
>>
>>
>> suspect there is a minor bug somewhere in there. If an operator
>>
>> is
>>
>> stateless you could assume checkpoint as long max for sake of
>>
>> computation
>>
>> and compute the committed window to be the lowest common
>>
>> checkpoint.
>>
>> If
>>
>>
>> they are all stateless and you end up with long max you can start
>>
>> with
>>
>> window id that reflects the current timestamp.
>>
>> Thanks
>>
>> On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre <
>>
>> amol@datatorrent.com
>>
>> wrote:
>>
>> CommitWindowId could be computed from the existing checkpoints.
>>
>> That
>>
>> solution still needs purge to be done after commitWindowId is
>>
>> confirmed
>>
>> to
>>
>> be saved in Stram state. Without ths the commitWindowId
>>
>>
>> computed
>>
>> from
>>
>> the
>>
>> checkpoints may have some checkpoints missing.
>>
>> Thks
>> Amol
>>
>>
>> E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> <(510)%20449-2606> |
>>
>>
>> Twitter: @*amolhkekre*
>>
>> www.datatorrent.com  |  apex.apache.org
>>
>> *Join us at Apex Big Data World-San Jose<http://www.apexbigdata.com/san-jose.html> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
>> [image: http://www.apexbigdata.com/san-jose-register.html]<http://www.apexbigdata.com/san-jose-register.html> <http://www.apexbigdata.com/san-jose-register.html>
>>
>> On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni <
>>
>>
>> pramod@datatorrent.com
>>
>> wrote:
>>
>> Can't the commitedWindowId be calculated by looking at the
>>
>> physical
>>
>> plan
>>
>> and the existing checkpoints?
>>
>> On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi <
>>
>>
>> tushar@apache.org
>>
>> wrote:
>>
>> Help Needed for APEXCORE-619
>>
>> Issue : When application is relaunched after long time with
>>
>>
>> stateless
>>
>> opeartors at the end of the DAG, the stateless operators
>>
>> starts
>>
>> with
>>
>> a
>>
>>
>> very
>>
>> high windowId. In this case the stateless operator ignors
>>
>>
>> all
>>
>> the
>>
>> data
>>
>> received till upstream operator catches up with it. This
>>
>> breaks
>>
>> the
>>
>> *at-least-once* gaurantee while relaunch of the opeartor or
>>
>> when
>>
>> master
>>
>> is
>>
>> killed and application is restarted.
>>
>> Solutions:
>> - Fix windowId for stateless leaf operators from upstream
>>
>>
>> opeartor.
>>
>> But
>>
>> it
>>
>> has some issues when we have a join with two upstrams
>>
>>
>> operators
>>
>> at
>>
>> different windowId. If we set the windowID to min(upstream
>>
>> windowId),
>>
>> then
>>
>> we need to again recalulate the new recovery window ids for
>>
>>
>> upstream
>>
>> paths
>>
>> from this operators.
>>
>> - Other solution is to create a empty file in checkpoint
>>
>>
>> directory
>>
>> for
>>
>> stateless operators. This will help us to identify the
>>
>> checkpoints
>>
>> of
>>
>>
>> stateless operators during relaunch instead of computing
>>
>> from
>>
>> latest
>>
>> timestamp.
>>
>> - Bring the entire DAG to committedWindowId. This could be
>>
>>
>> achived
>>
>> using
>>
>> writing committedWindowId in a journal. we need to make
>>
>> sure
>>
>> that
>>
>> we
>>
>> are
>>
>> not puring the checkpointed state until the
>>
>> committedWundowId
>>
>> is
>>
>> saved
>>
>> in
>>
>>
>> journal.
>>
>> Let me know your thoughs on this and preferred solution.
>>
>> Regards,
>> -Tushar.
>>
>> --
>>
>> *Join us at Apex Big Data World-San Jose<http://www.apexbigdata.com/san-jose.html> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
>> [image: http://www.apexbigdata.com/san-jose-register.html]
>>
>>
>>
>>
>>
>

Re: APEXCORE-619 Recovery windowId in future during application relaunch.

Posted by Tushar Gosavi <tu...@datatorrent.com>.
Hi Vlad,

I am more worried about backward compatibility. Most of the new users will
be not able to launch simple applications which are starting point while
learning Apex such as PiDemo and WordCount. We could provide a way for user
to not use empty file storage agent for leaf stateless operators if he is
more worried about name node operations.

- Tushar.


On Sat, Mar 11, 2017 at 6:10 AM, Vlad Rozov <v....@datatorrent.com> wrote:

> I would prefer to go with option 2 (and maybe reuse -force flag to allow
> launching application that do not validate due to newly introduced rule). I
> am not sure that it is OK to outsmart application designer and force
> stateless operator to become statefull.
>
> Thank you,
>
> Vlad
>
> *Join us at Apex Big Data World-San Jose
> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017*
> [image: http://www.apexbigdata.com/san-jose-register.html]
> <http://www.apexbigdata.com/san-jose-register.html>
> On 3/10/17 07:38, Thomas Weise wrote:
>
> +1
>
> But keep in mind it will cause unnecessary name node operations and
> therefore it would be good to only use it when it is really needed (i.e.
> the operator in reality isn't stateless, it stores its state somewhere
> else).
>
> Can we look at optimizing the behavior for "stateless" operators that are
> really stateless. For example the console operator should by default be
> AT_MOST_ONCE?
>
>
> On Fri, Mar 10, 2017 at 1:45 AM, Bhupesh Chawda <bh...@datatorrent.com> <bh...@datatorrent.com>
> wrote:
>
>
> My preference is also for option 3. It looks clean and simple to implement.
>
> ~ Bhupesh
>
>
> _______________________________________________________
>
> Bhupesh Chawda
>
> E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
> www.datatorrent.com  |  apex.apache.org
>
>
>
> On Fri, Mar 10, 2017 at 3:06 PM, Tushar Gosavi <tu...@datatorrent.com> <tu...@datatorrent.com>
> wrote:
>
>
> Can you please let me know your preference? My preference is for solution
> 3, by adding a StorageAgent which creates an empty file, and using this
> storage agent for leaf stateless operators.
>
> - Tushar.
>
>
> On Tue, Mar 7, 2017 at 1:52 PM, Tushar Gosavi <tu...@datatorrent.com> <tu...@datatorrent.com>
> wrote:
>
>
> Thank you all for the feedback.
>
> Some of the useful output operator can be stateless, they push data
> received in a window to output store. for example KafkaOutputOperator/
>
> JDBCOutputOperator,
>
> or the output stores where
> writes are idempotent, which covers most of the key-value stores.
>
> I was going to use the existing logic to compute the committedWindowId
> with addition of few steps explained below.
> solution-1
> - Calculate committedWindow with leaf operator checkpoints set to
>
> current
>
> timestamp (current behaviour)
> - Update leaf operators recoveryWindowId to committedWindowId
> - Calculate committedWindow again, this steps is required because as
> downstream operator recoveryWindowId is reduced and hence we may have
>
> to
>
> adjust the recoveryWindowId of upstream operators.
>
> This will prevent leaf stateless opeartors to start from current
> timestamp, hence reducing amount of data loss. But As per the concern
> raised by Bhupesh about last stateless operator being slow, the
>
> solution
>
> suggested by Vlad is sufficient
>
> solution-1
> - as explained above. If little loss is expected we could go with this
> appraoch.
> solution-2
> - Fail validation if last operator is stateless in AT_LEAST_ONCE
>
> scenario
>
> as suggested by Vlad.
>   This could break backward compatibility as old applications will fail
>
> to
>
> launch.
> solution-3
> - Mark last operator stateful in AT_LEAST_ONCE scenario.
>
> Let me know about your preference.
>
> Regards,
> - Tushar.
>
>
> On Mon, Mar 6, 2017 at 8:31 PM, Vlad Rozov <v....@datatorrent.com> <v....@datatorrent.com>
> wrote:
>
>
> For a long chain of stateless operators at the end of a DAG, it is
> possible that time to propagate the end window to a leaf operator is
> greater than the time for a checkpoint to be persisted in HDFS.
>
> If at least once processing guarantee is necessary, the leaf operators
> should not be STATELESS. Will invalidating DAG that has one or more
>
> leaf
>
> operator marked as STATELESS with AT_LEAST_ONCE processing solve
> APEXCORE-619? It is not the best solution, but I think it is
>
> sufficient
>
> for
>
> the described scenario.
>
> Thank you,
>
> Vlad
>
>
> On 3/2/17 08:43, Thomas Weise wrote:
>
>
> Good point, that's correct for a stateless leaf operator (operator
>
> that
>
> does not have downstream operators). The minimum of upstream
>
> checkpoints
>
> can be higher than the last windowId seen by the leaf operator.
>
> Although
>
> that is a low probability, because it would mean the time it took for
>
> the
>
> checkpoint to become visible in HDFS is less than propagation of
> endWindow
> downstream.
>
> It's also not a problem for an intermediate stateless operator,
>
> because
>
> the
> downstream checkpoint will inform the recovery windowId. Most of the
>
> time
>
> stateless operators are intermediate.
>
> Leaf operators are the output operators. I suspect in the original
> scenario
> is was a console output operator? Useful output operators usually
>
> won't
>
> be
> stateless, they have to track state to interact with the external
>
> system
>
> correctly. I'm bringing this up for adequate cost/benefit analysis.
>
> In absence of stateful downstream operator, you only have the
>
> committed
>
> windowId, which is essentially a checkpointing watermark. On
>
> application
>
> restart it has to be recomputed from the checkpoints available, and
>
> does
>
> not cover the scenario Tushar reported originally.
>
> Saving committed windowId comes at a cost, it would have to be
>
> written
>
> to
>
> the journal before operators are notified. Care has been taken to no
> write
> unnecessarily to the journal, as it is blocking I/O and in this case
>
> the
>
> frequency depends on the order of arrival of checkpoint notifications
> from
> operators. We also don't want to delay commitedWindow notification,
>
> as
>
> that
> would introduce latency.
>
> Thomas
>
>
> On Thu, Mar 2, 2017 at 2:10 AM, Bhupesh Chawda <
>
> bhupesh@datatorrent.com>
>
> wrote:
>
> What if all operators complete first checkpoints but the stateless
>
> operator
> could not cross the first checkpoint window, and the DAG crashed.
> If we try to figure out the recovery checkpoint now, we might
>
> conclude
>
> that
> checkpoint 1 is the point to start and we may miss some data getting
> processed by the stateless operator. Probably in this case at-least
> once is
> also not guaranteed?
>
> ~ Bhupesh
>
>
> _______________________________________________________
>
> Bhupesh Chawda
>
> E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
> www.datatorrent.com  |  apex.apache.org
>
>
>
> On Thu, Mar 2, 2017 at 8:06 AM, Thomas Weise <th...@apache.org> <th...@apache.org>
>
> wrote:
>
> Dummy checkpoints, continuously writing committed window id and the
>
> like
>
> all introduce overhead that is probably not needed.
>
> All the information to derive what we need is likely available and
>
> IMO
>
> the
>
>
> discussion should be on what is the correct way of using it. I will
> have
>
>
> a
>
>
> look when I get to it as well.
>
> Thanks,
> Thomas
>
>
> On Wed, Mar 1, 2017 at 6:29 PM, Sandesh Hegde <
>
> sandesh@datatorrent.com
>
> wrote:
>
> Instead of treating the stateless operator in a special way and
>
> missing
>
> corner cases, just have a dummy checkpoint, then there is no need
>
> to
>
> handle
>
>
> corner cases.
>
> There is a name for this solution,https://en.wikipedia.org/wiki/Null_Object_pattern
>
>
>
> On Wed, Mar 1, 2017 at 2:52 PM Pramod Immaneni <pramod@datatorrent.com
> wrote:
>
> There is code in various places that deals with stateless
>
> operators
>
> in
>
> a
>
>
> special way even though a physical checkpoint does not exist on
>
> the
>
> disk.
>
> It is probably a matter of applying similar thought process/logic
>
> correctly
>
>
> here.
>
> On Wed, Mar 1, 2017 at 2:27 PM, Amol Kekre <amol@datatorrent.com
>
> wrote:
>
> hmm! the fact that commitWindowId has moved up (right now in
>
> memory
>
> of
>
> Stram) should mean that a complete set of checkpoints are
>
> available,
>
> i.e
>
> commitWindowId can be derived. Lets say that next checkpoint
>
> window
>
> also
>
> gets checkpointed across the app, commitwindowID is in memory but
>
> not
>
> written to stram-state yet, then upon relaunch the latest
>
> commitwindowID
>
> should get computed correctly.
>
> This may be just about setting stateless operators to
>
>
> commitWindowid
>
> on
>
>
> re-launch? aka bug/feature?
>
> Thks
> Amol
>
>
> E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> |
>
>
> Twitter:
>
> @*amolhkekre*
>
> www.datatorrent.com  |  apex.apache.org
>
> *Join us at Apex Big Data World-San Jose<http://www.apexbigdata.com/san-jose.html> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> [image: http://www.apexbigdata.com/san-jose-register.html]<http://www.apexbigdata.com/san-jose-register.html> <http://www.apexbigdata.com/san-jose-register.html>
>
> On Wed, Mar 1, 2017 at 1:41 PM, Pramod Immaneni <
>
>
> pramod@datatorrent.com>
>
> wrote:
>
> Do we need to save committedWindowId? Can't it be computed from
>
> existing
>
> checkpoints by walking through the DAG. We probably do this
>
> anyway
>
> and
>
> I
>
>
> suspect there is a minor bug somewhere in there. If an operator
>
> is
>
> stateless you could assume checkpoint as long max for sake of
>
> computation
>
> and compute the committed window to be the lowest common
>
> checkpoint.
>
> If
>
>
> they are all stateless and you end up with long max you can start
>
> with
>
> window id that reflects the current timestamp.
>
> Thanks
>
> On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre <
>
> amol@datatorrent.com
>
> wrote:
>
> CommitWindowId could be computed from the existing checkpoints.
>
> That
>
> solution still needs purge to be done after commitWindowId is
>
> confirmed
>
> to
>
> be saved in Stram state. Without ths the commitWindowId
>
>
> computed
>
> from
>
> the
>
> checkpoints may have some checkpoints missing.
>
> Thks
> Amol
>
>
> E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> |
>
>
> Twitter: @*amolhkekre*
>
> www.datatorrent.com  |  apex.apache.org
>
> *Join us at Apex Big Data World-San Jose<http://www.apexbigdata.com/san-jose.html> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> [image: http://www.apexbigdata.com/san-jose-register.html]<http://www.apexbigdata.com/san-jose-register.html> <http://www.apexbigdata.com/san-jose-register.html>
>
> On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni <
>
>
> pramod@datatorrent.com
>
> wrote:
>
> Can't the commitedWindowId be calculated by looking at the
>
> physical
>
> plan
>
> and the existing checkpoints?
>
> On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi <
>
>
> tushar@apache.org
>
> wrote:
>
> Help Needed for APEXCORE-619
>
> Issue : When application is relaunched after long time with
>
>
> stateless
>
> opeartors at the end of the DAG, the stateless operators
>
> starts
>
> with
>
> a
>
>
> very
>
> high windowId. In this case the stateless operator ignors
>
>
> all
>
> the
>
> data
>
> received till upstream operator catches up with it. This
>
> breaks
>
> the
>
> *at-least-once* gaurantee while relaunch of the opeartor or
>
> when
>
> master
>
> is
>
> killed and application is restarted.
>
> Solutions:
> - Fix windowId for stateless leaf operators from upstream
>
>
> opeartor.
>
> But
>
> it
>
> has some issues when we have a join with two upstrams
>
>
> operators
>
> at
>
> different windowId. If we set the windowID to min(upstream
>
> windowId),
>
> then
>
> we need to again recalulate the new recovery window ids for
>
>
> upstream
>
> paths
>
> from this operators.
>
> - Other solution is to create a empty file in checkpoint
>
>
> directory
>
> for
>
> stateless operators. This will help us to identify the
>
> checkpoints
>
> of
>
>
> stateless operators during relaunch instead of computing
>
> from
>
> latest
>
> timestamp.
>
> - Bring the entire DAG to committedWindowId. This could be
>
>
> achived
>
> using
>
> writing committedWindowId in a journal. we need to make
>
> sure
>
> that
>
> we
>
> are
>
> not puring the checkpointed state until the
>
> committedWundowId
>
> is
>
> saved
>
> in
>
>
> journal.
>
> Let me know your thoughs on this and preferred solution.
>
> Regards,
> -Tushar.
>
> --
>
> *Join us at Apex Big Data World-San Jose<http://www.apexbigdata.com/san-jose.html> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> [image: http://www.apexbigdata.com/san-jose-register.html]
>
>
>
>
>

Re: APEXCORE-619 Recovery windowId in future during application relaunch.

Posted by Vlad Rozov <v....@datatorrent.com>.
I would prefer to go with option 2 (and maybe reuse -force flag to allow 
launching application that do not validate due to newly introduced 
rule). I am not sure that it is OK to outsmart application designer and 
force stateless operator to become statefull.

Thank you,

Vlad

/Join us at Apex Big Data World-San Jose 
<http://www.apexbigdata.com/san-jose.html>, April 4, 2017/
http://www.apexbigdata.com/san-jose-register.html 
<http://www.apexbigdata.com/san-jose-register.html>
On 3/10/17 07:38, Thomas Weise wrote:
> +1
>
> But keep in mind it will cause unnecessary name node operations and
> therefore it would be good to only use it when it is really needed (i.e.
> the operator in reality isn't stateless, it stores its state somewhere
> else).
>
> Can we look at optimizing the behavior for "stateless" operators that are
> really stateless. For example the console operator should by default be
> AT_MOST_ONCE?
>
>
> On Fri, Mar 10, 2017 at 1:45 AM, Bhupesh Chawda <bh...@datatorrent.com>
> wrote:
>
>> My preference is also for option 3. It looks clean and simple to implement.
>>
>> ~ Bhupesh
>>
>>
>> _______________________________________________________
>>
>> Bhupesh Chawda
>>
>> E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
>>
>> www.datatorrent.com  |  apex.apache.org
>>
>>
>>
>> On Fri, Mar 10, 2017 at 3:06 PM, Tushar Gosavi <tu...@datatorrent.com>
>> wrote:
>>
>>> Can you please let me know your preference? My preference is for solution
>>> 3, by adding a StorageAgent which creates an empty file, and using this
>>> storage agent for leaf stateless operators.
>>>
>>> - Tushar.
>>>
>>>
>>> On Tue, Mar 7, 2017 at 1:52 PM, Tushar Gosavi <tu...@datatorrent.com>
>>> wrote:
>>>
>>>> Thank you all for the feedback.
>>>>
>>>> Some of the useful output operator can be stateless, they push data
>>>> received in a window to output store. for example KafkaOutputOperator/
>>> JDBCOutputOperator,
>>>> or the output stores where
>>>> writes are idempotent, which covers most of the key-value stores.
>>>>
>>>> I was going to use the existing logic to compute the committedWindowId
>>>> with addition of few steps explained below.
>>>> solution-1
>>>> - Calculate committedWindow with leaf operator checkpoints set to
>> current
>>>> timestamp (current behaviour)
>>>> - Update leaf operators recoveryWindowId to committedWindowId
>>>> - Calculate committedWindow again, this steps is required because as
>>>> downstream operator recoveryWindowId is reduced and hence we may have
>> to
>>>> adjust the recoveryWindowId of upstream operators.
>>>>
>>>> This will prevent leaf stateless opeartors to start from current
>>>> timestamp, hence reducing amount of data loss. But As per the concern
>>>> raised by Bhupesh about last stateless operator being slow, the
>> solution
>>>> suggested by Vlad is sufficient
>>>>
>>>> solution-1
>>>> - as explained above. If little loss is expected we could go with this
>>>> appraoch.
>>>> solution-2
>>>> - Fail validation if last operator is stateless in AT_LEAST_ONCE
>> scenario
>>>> as suggested by Vlad.
>>>>    This could break backward compatibility as old applications will fail
>>> to
>>>> launch.
>>>> solution-3
>>>> - Mark last operator stateful in AT_LEAST_ONCE scenario.
>>>>
>>>> Let me know about your preference.
>>>>
>>>> Regards,
>>>> - Tushar.
>>>>
>>>>
>>>> On Mon, Mar 6, 2017 at 8:31 PM, Vlad Rozov <v....@datatorrent.com>
>>>> wrote:
>>>>
>>>>> For a long chain of stateless operators at the end of a DAG, it is
>>>>> possible that time to propagate the end window to a leaf operator is
>>>>> greater than the time for a checkpoint to be persisted in HDFS.
>>>>>
>>>>> If at least once processing guarantee is necessary, the leaf operators
>>>>> should not be STATELESS. Will invalidating DAG that has one or more
>> leaf
>>>>> operator marked as STATELESS with AT_LEAST_ONCE processing solve
>>>>> APEXCORE-619? It is not the best solution, but I think it is
>> sufficient
>>> for
>>>>> the described scenario.
>>>>>
>>>>> Thank you,
>>>>>
>>>>> Vlad
>>>>>
>>>>>
>>>>> On 3/2/17 08:43, Thomas Weise wrote:
>>>>>
>>>>>> Good point, that's correct for a stateless leaf operator (operator
>> that
>>>>>> does not have downstream operators). The minimum of upstream
>>> checkpoints
>>>>>> can be higher than the last windowId seen by the leaf operator.
>>> Although
>>>>>> that is a low probability, because it would mean the time it took for
>>> the
>>>>>> checkpoint to become visible in HDFS is less than propagation of
>>>>>> endWindow
>>>>>> downstream.
>>>>>>
>>>>>> It's also not a problem for an intermediate stateless operator,
>> because
>>>>>> the
>>>>>> downstream checkpoint will inform the recovery windowId. Most of the
>>> time
>>>>>> stateless operators are intermediate.
>>>>>>
>>>>>> Leaf operators are the output operators. I suspect in the original
>>>>>> scenario
>>>>>> is was a console output operator? Useful output operators usually
>> won't
>>>>>> be
>>>>>> stateless, they have to track state to interact with the external
>>> system
>>>>>> correctly. I'm bringing this up for adequate cost/benefit analysis.
>>>>>>
>>>>>> In absence of stateful downstream operator, you only have the
>> committed
>>>>>> windowId, which is essentially a checkpointing watermark. On
>>> application
>>>>>> restart it has to be recomputed from the checkpoints available, and
>>> does
>>>>>> not cover the scenario Tushar reported originally.
>>>>>>
>>>>>> Saving committed windowId comes at a cost, it would have to be
>> written
>>> to
>>>>>> the journal before operators are notified. Care has been taken to no
>>>>>> write
>>>>>> unnecessarily to the journal, as it is blocking I/O and in this case
>>> the
>>>>>> frequency depends on the order of arrival of checkpoint notifications
>>>>>> from
>>>>>> operators. We also don't want to delay commitedWindow notification,
>> as
>>>>>> that
>>>>>> would introduce latency.
>>>>>>
>>>>>> Thomas
>>>>>>
>>>>>>
>>>>>> On Thu, Mar 2, 2017 at 2:10 AM, Bhupesh Chawda <
>>> bhupesh@datatorrent.com>
>>>>>> wrote:
>>>>>>
>>>>>> What if all operators complete first checkpoints but the stateless
>>>>>>> operator
>>>>>>> could not cross the first checkpoint window, and the DAG crashed.
>>>>>>> If we try to figure out the recovery checkpoint now, we might
>> conclude
>>>>>>> that
>>>>>>> checkpoint 1 is the point to start and we may miss some data getting
>>>>>>> processed by the stateless operator. Probably in this case at-least
>>>>>>> once is
>>>>>>> also not guaranteed?
>>>>>>>
>>>>>>> ~ Bhupesh
>>>>>>>
>>>>>>>
>>>>>>> _______________________________________________________
>>>>>>>
>>>>>>> Bhupesh Chawda
>>>>>>>
>>>>>>> E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
>>>>>>>
>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Mar 2, 2017 at 8:06 AM, Thomas Weise <th...@apache.org>
>> wrote:
>>>>>>> Dummy checkpoints, continuously writing committed window id and the
>>> like
>>>>>>>> all introduce overhead that is probably not needed.
>>>>>>>>
>>>>>>>> All the information to derive what we need is likely available and
>>> IMO
>>>>>>> the
>>>>>>>
>>>>>>>> discussion should be on what is the correct way of using it. I will
>>>>>>>> have
>>>>>>>>
>>>>>>> a
>>>>>>>
>>>>>>>> look when I get to it as well.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Thomas
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Mar 1, 2017 at 6:29 PM, Sandesh Hegde <
>>> sandesh@datatorrent.com
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Instead of treating the stateless operator in a special way and
>>> missing
>>>>>>>>> corner cases, just have a dummy checkpoint, then there is no need
>> to
>>>>>>>> handle
>>>>>>>>
>>>>>>>>> corner cases.
>>>>>>>>>
>>>>>>>>> There is a name for this solution,
>>>>>>>>> https://en.wikipedia.org/wiki/Null_Object_pattern
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Mar 1, 2017 at 2:52 PM Pramod Immaneni <
>>>>>>>>> pramod@datatorrent.com
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> There is code in various places that deals with stateless
>> operators
>>>>>>>>> in
>>>>>>>> a
>>>>>>>>
>>>>>>>>> special way even though a physical checkpoint does not exist on
>> the
>>>>>>>>> disk.
>>>>>>>>> It is probably a matter of applying similar thought process/logic
>>>>>>>>> correctly
>>>>>>>>>
>>>>>>>>>> here.
>>>>>>>>>>
>>>>>>>>>> On Wed, Mar 1, 2017 at 2:27 PM, Amol Kekre <amol@datatorrent.com
>>>>>>>>> wrote:
>>>>>>>>> hmm! the fact that commitWindowId has moved up (right now in
>> memory
>>>>>>>>>> of
>>>>>>>>> Stram) should mean that a complete set of checkpoints are
>>>>>>>>>> available,
>>>>>>>> i.e
>>>>>>>>>> commitWindowId can be derived. Lets say that next checkpoint
>> window
>>>>>>>>>> also
>>>>>>>>>> gets checkpointed across the app, commitwindowID is in memory but
>>>>>>>>>> not
>>>>>>>> written to stram-state yet, then upon relaunch the latest
>>>>>>>>>> commitwindowID
>>>>>>>>>> should get computed correctly.
>>>>>>>>>>> This may be just about setting stateless operators to
>>>>>>>>>>>
>>>>>>>>>> commitWindowid
>>>>>>>> on
>>>>>>>>
>>>>>>>>> re-launch? aka bug/feature?
>>>>>>>>>>> Thks
>>>>>>>>>>> Amol
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> |
>>>>>>>>>>>
>>>>>>>>>> Twitter:
>>>>>>>>> @*amolhkekre*
>>>>>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>>>>>
>>>>>>>>>>> *Join us at Apex Big Data World-San Jose
>>>>>>>>>>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
>>>>>>>>>>> [image: http://www.apexbigdata.com/san-jose-register.html]
>>>>>>>>>>> <http://www.apexbigdata.com/san-jose-register.html>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Mar 1, 2017 at 1:41 PM, Pramod Immaneni <
>>>>>>>>>>>
>>>>>>>>>> pramod@datatorrent.com>
>>>>>>>>>> wrote:
>>>>>>>>>>> Do we need to save committedWindowId? Can't it be computed from
>>>>>>>>>>> existing
>>>>>>>>>>> checkpoints by walking through the DAG. We probably do this
>>>>>>>>>>> anyway
>>>>>>>> and
>>>>>>>>>> I
>>>>>>>>>>
>>>>>>>>>>> suspect there is a minor bug somewhere in there. If an operator
>>>>>>>>>>> is
>>>>>>>> stateless you could assume checkpoint as long max for sake of
>>>>>>>>>>> computation
>>>>>>>>>>> and compute the committed window to be the lowest common
>>>>>>>>>>> checkpoint.
>>>>>>>>> If
>>>>>>>>>
>>>>>>>>>> they are all stateless and you end up with long max you can start
>>>>>>>>>>> with
>>>>>>>>>> window id that reflects the current timestamp.
>>>>>>>>>>>> Thanks
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre <
>> amol@datatorrent.com
>>>>>>>>>>> wrote:
>>>>>>>>>>> CommitWindowId could be computed from the existing checkpoints.
>>>>>>>>>>>> That
>>>>>>>>>> solution still needs purge to be done after commitWindowId is
>>>>>>>>>>>> confirmed
>>>>>>>>>>> to
>>>>>>>>>>>>> be saved in Stram state. Without ths the commitWindowId
>>>>>>>>>>>>>
>>>>>>>>>>>> computed
>>>>>>>> from
>>>>>>>>>> the
>>>>>>>>>>>> checkpoints may have some checkpoints missing.
>>>>>>>>>>>>> Thks
>>>>>>>>>>>>> Amol
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> |
>>>>>>>>>>>>>
>>>>>>>>>>>> Twitter: @*amolhkekre*
>>>>>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>>>>>>> *Join us at Apex Big Data World-San Jose
>>>>>>>>>>>>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
>>>>>>>>>>>>> [image: http://www.apexbigdata.com/san-jose-register.html]
>>>>>>>>>>>>> <http://www.apexbigdata.com/san-jose-register.html>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni <
>>>>>>>>>>>>>
>>>>>>>>>>>> pramod@datatorrent.com
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> Can't the commitedWindowId be calculated by looking at the
>>>>>>>>>>>>> physical
>>>>>>>>>> plan
>>>>>>>>>>>>> and the existing checkpoints?
>>>>>>>>>>>>>> On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi <
>>>>>>>>>>>>>>
>>>>>>>>>>>>> tushar@apache.org
>>>>>>>>> wrote:
>>>>>>>>>>>>> Help Needed for APEXCORE-619
>>>>>>>>>>>>>>> Issue : When application is relaunched after long time with
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> stateless
>>>>>>>>>>>> opeartors at the end of the DAG, the stateless operators
>>>>>>>>>>>>>> starts
>>>>>>>>> with
>>>>>>>>>>>> a
>>>>>>>>>>>>
>>>>>>>>>>>>> very
>>>>>>>>>>>>>>> high windowId. In this case the stateless operator ignors
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> all
>>>>>>>> the
>>>>>>>>>> data
>>>>>>>>>>>>> received till upstream operator catches up with it. This
>>>>>>>>>>>>>> breaks
>>>>>>>>> the
>>>>>>>>>>> *at-least-once* gaurantee while relaunch of the opeartor or
>>>>>>>>>>>>>> when
>>>>>>>>>> master
>>>>>>>>>>>>> is
>>>>>>>>>>>>>>> killed and application is restarted.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Solutions:
>>>>>>>>>>>>>>> - Fix windowId for stateless leaf operators from upstream
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> opeartor.
>>>>>>>>>>> But
>>>>>>>>>>>>> it
>>>>>>>>>>>>>>> has some issues when we have a join with two upstrams
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> operators
>>>>>>>>> at
>>>>>>>>>>> different windowId. If we set the windowID to min(upstream
>>>>>>>>>>>>>> windowId),
>>>>>>>>>>>> then
>>>>>>>>>>>>>>> we need to again recalulate the new recovery window ids for
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> upstream
>>>>>>>>>>>> paths
>>>>>>>>>>>>>>> from this operators.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> - Other solution is to create a empty file in checkpoint
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> directory
>>>>>>>>>>> for
>>>>>>>>>>>>> stateless operators. This will help us to identify the
>>>>>>>>>>>>>> checkpoints
>>>>>>>>>>> of
>>>>>>>>>>>
>>>>>>>>>>>> stateless operators during relaunch instead of computing
>>>>>>>>>>>>>> from
>>>>>>>> latest
>>>>>>>>>>>> timestamp.
>>>>>>>>>>>>>>> - Bring the entire DAG to committedWindowId. This could be
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> achived
>>>>>>>>>>> using
>>>>>>>>>>>>>> writing committedWindowId in a journal. we need to make
>>>>>>>>>>>>>> sure
>>>>>>>> that
>>>>>>>>>> we
>>>>>>>>>>>> are
>>>>>>>>>>>>>> not puring the checkpointed state until the
>>>>>>>>>>>>>> committedWundowId
>>>>>>>> is
>>>>>>>>>> saved
>>>>>>>>>>>>> in
>>>>>>>>>>>>>
>>>>>>>>>>>>>> journal.
>>>>>>>>>>>>>>> Let me know your thoughs on this and preferred solution.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>> -Tushar.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>> *Join us at Apex Big Data World-San Jose
>>>>>>>>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
>>>>>>>>> [image: http://www.apexbigdata.com/san-jose-register.html]
>>>>>>>>>
>>>>>>>>>


Re: APEXCORE-619 Recovery windowId in future during application relaunch.

Posted by Thomas Weise <th...@apache.org>.
+1

But keep in mind it will cause unnecessary name node operations and
therefore it would be good to only use it when it is really needed (i.e.
the operator in reality isn't stateless, it stores its state somewhere
else).

Can we look at optimizing the behavior for "stateless" operators that are
really stateless. For example the console operator should by default be
AT_MOST_ONCE?


On Fri, Mar 10, 2017 at 1:45 AM, Bhupesh Chawda <bh...@datatorrent.com>
wrote:

> My preference is also for option 3. It looks clean and simple to implement.
>
> ~ Bhupesh
>
>
> _______________________________________________________
>
> Bhupesh Chawda
>
> E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
>
> www.datatorrent.com  |  apex.apache.org
>
>
>
> On Fri, Mar 10, 2017 at 3:06 PM, Tushar Gosavi <tu...@datatorrent.com>
> wrote:
>
> > Can you please let me know your preference? My preference is for solution
> > 3, by adding a StorageAgent which creates an empty file, and using this
> > storage agent for leaf stateless operators.
> >
> > - Tushar.
> >
> >
> > On Tue, Mar 7, 2017 at 1:52 PM, Tushar Gosavi <tu...@datatorrent.com>
> > wrote:
> >
> > > Thank you all for the feedback.
> > >
> > > Some of the useful output operator can be stateless, they push data
> > > received in a window to output store. for example KafkaOutputOperator/
> > JDBCOutputOperator,
> > > or the output stores where
> > > writes are idempotent, which covers most of the key-value stores.
> > >
> > > I was going to use the existing logic to compute the committedWindowId
> > > with addition of few steps explained below.
> > > solution-1
> > > - Calculate committedWindow with leaf operator checkpoints set to
> current
> > > timestamp (current behaviour)
> > > - Update leaf operators recoveryWindowId to committedWindowId
> > > - Calculate committedWindow again, this steps is required because as
> > > downstream operator recoveryWindowId is reduced and hence we may have
> to
> > > adjust the recoveryWindowId of upstream operators.
> > >
> > > This will prevent leaf stateless opeartors to start from current
> > > timestamp, hence reducing amount of data loss. But As per the concern
> > > raised by Bhupesh about last stateless operator being slow, the
> solution
> > > suggested by Vlad is sufficient
> > >
> > > solution-1
> > > - as explained above. If little loss is expected we could go with this
> > > appraoch.
> > > solution-2
> > > - Fail validation if last operator is stateless in AT_LEAST_ONCE
> scenario
> > > as suggested by Vlad.
> > >   This could break backward compatibility as old applications will fail
> > to
> > > launch.
> > > solution-3
> > > - Mark last operator stateful in AT_LEAST_ONCE scenario.
> > >
> > > Let me know about your preference.
> > >
> > > Regards,
> > > - Tushar.
> > >
> > >
> > > On Mon, Mar 6, 2017 at 8:31 PM, Vlad Rozov <v....@datatorrent.com>
> > > wrote:
> > >
> > >> For a long chain of stateless operators at the end of a DAG, it is
> > >> possible that time to propagate the end window to a leaf operator is
> > >> greater than the time for a checkpoint to be persisted in HDFS.
> > >>
> > >> If at least once processing guarantee is necessary, the leaf operators
> > >> should not be STATELESS. Will invalidating DAG that has one or more
> leaf
> > >> operator marked as STATELESS with AT_LEAST_ONCE processing solve
> > >> APEXCORE-619? It is not the best solution, but I think it is
> sufficient
> > for
> > >> the described scenario.
> > >>
> > >> Thank you,
> > >>
> > >> Vlad
> > >>
> > >>
> > >> On 3/2/17 08:43, Thomas Weise wrote:
> > >>
> > >>> Good point, that's correct for a stateless leaf operator (operator
> that
> > >>> does not have downstream operators). The minimum of upstream
> > checkpoints
> > >>> can be higher than the last windowId seen by the leaf operator.
> > Although
> > >>> that is a low probability, because it would mean the time it took for
> > the
> > >>> checkpoint to become visible in HDFS is less than propagation of
> > >>> endWindow
> > >>> downstream.
> > >>>
> > >>> It's also not a problem for an intermediate stateless operator,
> because
> > >>> the
> > >>> downstream checkpoint will inform the recovery windowId. Most of the
> > time
> > >>> stateless operators are intermediate.
> > >>>
> > >>> Leaf operators are the output operators. I suspect in the original
> > >>> scenario
> > >>> is was a console output operator? Useful output operators usually
> won't
> > >>> be
> > >>> stateless, they have to track state to interact with the external
> > system
> > >>> correctly. I'm bringing this up for adequate cost/benefit analysis.
> > >>>
> > >>> In absence of stateful downstream operator, you only have the
> committed
> > >>> windowId, which is essentially a checkpointing watermark. On
> > application
> > >>> restart it has to be recomputed from the checkpoints available, and
> > does
> > >>> not cover the scenario Tushar reported originally.
> > >>>
> > >>> Saving committed windowId comes at a cost, it would have to be
> written
> > to
> > >>> the journal before operators are notified. Care has been taken to no
> > >>> write
> > >>> unnecessarily to the journal, as it is blocking I/O and in this case
> > the
> > >>> frequency depends on the order of arrival of checkpoint notifications
> > >>> from
> > >>> operators. We also don't want to delay commitedWindow notification,
> as
> > >>> that
> > >>> would introduce latency.
> > >>>
> > >>> Thomas
> > >>>
> > >>>
> > >>> On Thu, Mar 2, 2017 at 2:10 AM, Bhupesh Chawda <
> > bhupesh@datatorrent.com>
> > >>> wrote:
> > >>>
> > >>> What if all operators complete first checkpoints but the stateless
> > >>>> operator
> > >>>> could not cross the first checkpoint window, and the DAG crashed.
> > >>>> If we try to figure out the recovery checkpoint now, we might
> conclude
> > >>>> that
> > >>>> checkpoint 1 is the point to start and we may miss some data getting
> > >>>> processed by the stateless operator. Probably in this case at-least
> > >>>> once is
> > >>>> also not guaranteed?
> > >>>>
> > >>>> ~ Bhupesh
> > >>>>
> > >>>>
> > >>>> _______________________________________________________
> > >>>>
> > >>>> Bhupesh Chawda
> > >>>>
> > >>>> E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
> > >>>>
> > >>>> www.datatorrent.com  |  apex.apache.org
> > >>>>
> > >>>>
> > >>>>
> > >>>> On Thu, Mar 2, 2017 at 8:06 AM, Thomas Weise <th...@apache.org>
> wrote:
> > >>>>
> > >>>> Dummy checkpoints, continuously writing committed window id and the
> > like
> > >>>>> all introduce overhead that is probably not needed.
> > >>>>>
> > >>>>> All the information to derive what we need is likely available and
> > IMO
> > >>>>>
> > >>>> the
> > >>>>
> > >>>>> discussion should be on what is the correct way of using it. I will
> > >>>>> have
> > >>>>>
> > >>>> a
> > >>>>
> > >>>>> look when I get to it as well.
> > >>>>>
> > >>>>> Thanks,
> > >>>>> Thomas
> > >>>>>
> > >>>>>
> > >>>>> On Wed, Mar 1, 2017 at 6:29 PM, Sandesh Hegde <
> > sandesh@datatorrent.com
> > >>>>> >
> > >>>>> wrote:
> > >>>>>
> > >>>>> Instead of treating the stateless operator in a special way and
> > missing
> > >>>>>> corner cases, just have a dummy checkpoint, then there is no need
> to
> > >>>>>>
> > >>>>> handle
> > >>>>>
> > >>>>>> corner cases.
> > >>>>>>
> > >>>>>> There is a name for this solution,
> > >>>>>> https://en.wikipedia.org/wiki/Null_Object_pattern
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> On Wed, Mar 1, 2017 at 2:52 PM Pramod Immaneni <
> > >>>>>> pramod@datatorrent.com
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>> There is code in various places that deals with stateless
> operators
> > >>>>>>>
> > >>>>>> in
> > >>>>
> > >>>>> a
> > >>>>>
> > >>>>>> special way even though a physical checkpoint does not exist on
> the
> > >>>>>>>
> > >>>>>> disk.
> > >>>>>
> > >>>>>> It is probably a matter of applying similar thought process/logic
> > >>>>>>>
> > >>>>>> correctly
> > >>>>>>
> > >>>>>>> here.
> > >>>>>>>
> > >>>>>>> On Wed, Mar 1, 2017 at 2:27 PM, Amol Kekre <amol@datatorrent.com
> >
> > >>>>>>>
> > >>>>>> wrote:
> > >>>>>
> > >>>>>> hmm! the fact that commitWindowId has moved up (right now in
> memory
> > >>>>>>>>
> > >>>>>>> of
> > >>>>>
> > >>>>>> Stram) should mean that a complete set of checkpoints are
> > >>>>>>>>
> > >>>>>>> available,
> > >>>>
> > >>>>> i.e
> > >>>>>>
> > >>>>>>> commitWindowId can be derived. Lets say that next checkpoint
> window
> > >>>>>>>>
> > >>>>>>> also
> > >>>>>>
> > >>>>>>> gets checkpointed across the app, commitwindowID is in memory but
> > >>>>>>>>
> > >>>>>>> not
> > >>>>
> > >>>>> written to stram-state yet, then upon relaunch the latest
> > >>>>>>>>
> > >>>>>>> commitwindowID
> > >>>>>>
> > >>>>>>> should get computed correctly.
> > >>>>>>>>
> > >>>>>>>> This may be just about setting stateless operators to
> > >>>>>>>>
> > >>>>>>> commitWindowid
> > >>>>
> > >>>>> on
> > >>>>>
> > >>>>>> re-launch? aka bug/feature?
> > >>>>>>>>
> > >>>>>>>> Thks
> > >>>>>>>> Amol
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> |
> > >>>>>>>>
> > >>>>>>> Twitter:
> > >>>>>
> > >>>>>> @*amolhkekre*
> > >>>>>>>
> > >>>>>>>> www.datatorrent.com  |  apex.apache.org
> > >>>>>>>>
> > >>>>>>>> *Join us at Apex Big Data World-San Jose
> > >>>>>>>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> > >>>>>>>> [image: http://www.apexbigdata.com/san-jose-register.html]
> > >>>>>>>> <http://www.apexbigdata.com/san-jose-register.html>
> > >>>>>>>>
> > >>>>>>>> On Wed, Mar 1, 2017 at 1:41 PM, Pramod Immaneni <
> > >>>>>>>>
> > >>>>>>> pramod@datatorrent.com>
> > >>>>>>
> > >>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>> Do we need to save committedWindowId? Can't it be computed from
> > >>>>>>>>>
> > >>>>>>>> existing
> > >>>>>>>
> > >>>>>>>> checkpoints by walking through the DAG. We probably do this
> > >>>>>>>>>
> > >>>>>>>> anyway
> > >>>>
> > >>>>> and
> > >>>>>>
> > >>>>>>> I
> > >>>>>>>
> > >>>>>>>> suspect there is a minor bug somewhere in there. If an operator
> > >>>>>>>>>
> > >>>>>>>> is
> > >>>>
> > >>>>> stateless you could assume checkpoint as long max for sake of
> > >>>>>>>>>
> > >>>>>>>> computation
> > >>>>>>>
> > >>>>>>>> and compute the committed window to be the lowest common
> > >>>>>>>>>
> > >>>>>>>> checkpoint.
> > >>>>>
> > >>>>>> If
> > >>>>>>
> > >>>>>>> they are all stateless and you end up with long max you can start
> > >>>>>>>>>
> > >>>>>>>> with
> > >>>>>>
> > >>>>>>> window id that reflects the current timestamp.
> > >>>>>>>>>
> > >>>>>>>>> Thanks
> > >>>>>>>>>
> > >>>>>>>>> On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre <
> amol@datatorrent.com
> > >>>>>>>>>
> > >>>>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> CommitWindowId could be computed from the existing checkpoints.
> > >>>>>>>>>>
> > >>>>>>>>> That
> > >>>>>>
> > >>>>>>> solution still needs purge to be done after commitWindowId is
> > >>>>>>>>>>
> > >>>>>>>>> confirmed
> > >>>>>>>
> > >>>>>>>> to
> > >>>>>>>>>
> > >>>>>>>>>> be saved in Stram state. Without ths the commitWindowId
> > >>>>>>>>>>
> > >>>>>>>>> computed
> > >>>>
> > >>>>> from
> > >>>>>>
> > >>>>>>> the
> > >>>>>>>>
> > >>>>>>>>> checkpoints may have some checkpoints missing.
> > >>>>>>>>>>
> > >>>>>>>>>> Thks
> > >>>>>>>>>> Amol
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> |
> > >>>>>>>>>>
> > >>>>>>>>> Twitter: @*amolhkekre*
> > >>>>>>>
> > >>>>>>>> www.datatorrent.com  |  apex.apache.org
> > >>>>>>>>>>
> > >>>>>>>>>> *Join us at Apex Big Data World-San Jose
> > >>>>>>>>>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> > >>>>>>>>>> [image: http://www.apexbigdata.com/san-jose-register.html]
> > >>>>>>>>>> <http://www.apexbigdata.com/san-jose-register.html>
> > >>>>>>>>>>
> > >>>>>>>>>> On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni <
> > >>>>>>>>>>
> > >>>>>>>>> pramod@datatorrent.com
> > >>>>>>>>
> > >>>>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>> Can't the commitedWindowId be calculated by looking at the
> > >>>>>>>>>>>
> > >>>>>>>>>> physical
> > >>>>>>
> > >>>>>>> plan
> > >>>>>>>>>
> > >>>>>>>>>> and the existing checkpoints?
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi <
> > >>>>>>>>>>>
> > >>>>>>>>>> tushar@apache.org
> > >>>>>
> > >>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Help Needed for APEXCORE-619
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Issue : When application is relaunched after long time with
> > >>>>>>>>>>>>
> > >>>>>>>>>>> stateless
> > >>>>>>>>
> > >>>>>>>>> opeartors at the end of the DAG, the stateless operators
> > >>>>>>>>>>>>
> > >>>>>>>>>>> starts
> > >>>>>
> > >>>>>> with
> > >>>>>>>>
> > >>>>>>>>> a
> > >>>>>>>>>
> > >>>>>>>>>> very
> > >>>>>>>>>>>
> > >>>>>>>>>>>> high windowId. In this case the stateless operator ignors
> > >>>>>>>>>>>>
> > >>>>>>>>>>> all
> > >>>>
> > >>>>> the
> > >>>>>>
> > >>>>>>> data
> > >>>>>>>>>
> > >>>>>>>>>> received till upstream operator catches up with it. This
> > >>>>>>>>>>>>
> > >>>>>>>>>>> breaks
> > >>>>>
> > >>>>>> the
> > >>>>>>>
> > >>>>>>>> *at-least-once* gaurantee while relaunch of the opeartor or
> > >>>>>>>>>>>>
> > >>>>>>>>>>> when
> > >>>>>>
> > >>>>>>> master
> > >>>>>>>>>
> > >>>>>>>>>> is
> > >>>>>>>>>>>
> > >>>>>>>>>>>> killed and application is restarted.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Solutions:
> > >>>>>>>>>>>> - Fix windowId for stateless leaf operators from upstream
> > >>>>>>>>>>>>
> > >>>>>>>>>>> opeartor.
> > >>>>>>>
> > >>>>>>>> But
> > >>>>>>>>>
> > >>>>>>>>>> it
> > >>>>>>>>>>>
> > >>>>>>>>>>>> has some issues when we have a join with two upstrams
> > >>>>>>>>>>>>
> > >>>>>>>>>>> operators
> > >>>>>
> > >>>>>> at
> > >>>>>>>
> > >>>>>>>> different windowId. If we set the windowID to min(upstream
> > >>>>>>>>>>>>
> > >>>>>>>>>>> windowId),
> > >>>>>>>>
> > >>>>>>>>> then
> > >>>>>>>>>>>
> > >>>>>>>>>>>> we need to again recalulate the new recovery window ids for
> > >>>>>>>>>>>>
> > >>>>>>>>>>> upstream
> > >>>>>>>>
> > >>>>>>>>> paths
> > >>>>>>>>>>>
> > >>>>>>>>>>>> from this operators.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> - Other solution is to create a empty file in checkpoint
> > >>>>>>>>>>>>
> > >>>>>>>>>>> directory
> > >>>>>>>
> > >>>>>>>> for
> > >>>>>>>>>
> > >>>>>>>>>> stateless operators. This will help us to identify the
> > >>>>>>>>>>>>
> > >>>>>>>>>>> checkpoints
> > >>>>>>>
> > >>>>>>>> of
> > >>>>>>>>
> > >>>>>>>>> stateless operators during relaunch instead of computing
> > >>>>>>>>>>>>
> > >>>>>>>>>>> from
> > >>>>
> > >>>>> latest
> > >>>>>>>>
> > >>>>>>>>> timestamp.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> - Bring the entire DAG to committedWindowId. This could be
> > >>>>>>>>>>>>
> > >>>>>>>>>>> achived
> > >>>>>>>
> > >>>>>>>> using
> > >>>>>>>>>>
> > >>>>>>>>>>> writing committedWindowId in a journal. we need to make
> > >>>>>>>>>>>>
> > >>>>>>>>>>> sure
> > >>>>
> > >>>>> that
> > >>>>>>
> > >>>>>>> we
> > >>>>>>>>
> > >>>>>>>>> are
> > >>>>>>>>>>
> > >>>>>>>>>>> not puring the checkpointed state until the
> > >>>>>>>>>>>>
> > >>>>>>>>>>> committedWundowId
> > >>>>
> > >>>>> is
> > >>>>>>
> > >>>>>>> saved
> > >>>>>>>>>
> > >>>>>>>>>> in
> > >>>>>>>>>>
> > >>>>>>>>>>> journal.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Let me know your thoughs on this and preferred solution.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Regards,
> > >>>>>>>>>>>> -Tushar.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> --
> > >>>>>> *Join us at Apex Big Data World-San Jose
> > >>>>>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> > >>>>>> [image: http://www.apexbigdata.com/san-jose-register.html]
> > >>>>>>
> > >>>>>>
> > >>
> > >
> >
>

Re: APEXCORE-619 Recovery windowId in future during application relaunch.

Posted by Bhupesh Chawda <bh...@datatorrent.com>.
My preference is also for option 3. It looks clean and simple to implement.

~ Bhupesh


_______________________________________________________

Bhupesh Chawda

E: bhupesh@datatorrent.com | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Fri, Mar 10, 2017 at 3:06 PM, Tushar Gosavi <tu...@datatorrent.com>
wrote:

> Can you please let me know your preference? My preference is for solution
> 3, by adding a StorageAgent which creates an empty file, and using this
> storage agent for leaf stateless operators.
>
> - Tushar.
>
>
> On Tue, Mar 7, 2017 at 1:52 PM, Tushar Gosavi <tu...@datatorrent.com>
> wrote:
>
> > Thank you all for the feedback.
> >
> > Some of the useful output operator can be stateless, they push data
> > received in a window to output store. for example KafkaOutputOperator/
> JDBCOutputOperator,
> > or the output stores where
> > writes are idempotent, which covers most of the key-value stores.
> >
> > I was going to use the existing logic to compute the committedWindowId
> > with addition of few steps explained below.
> > solution-1
> > - Calculate committedWindow with leaf operator checkpoints set to current
> > timestamp (current behaviour)
> > - Update leaf operators recoveryWindowId to committedWindowId
> > - Calculate committedWindow again, this steps is required because as
> > downstream operator recoveryWindowId is reduced and hence we may have to
> > adjust the recoveryWindowId of upstream operators.
> >
> > This will prevent leaf stateless opeartors to start from current
> > timestamp, hence reducing amount of data loss. But As per the concern
> > raised by Bhupesh about last stateless operator being slow, the solution
> > suggested by Vlad is sufficient
> >
> > solution-1
> > - as explained above. If little loss is expected we could go with this
> > appraoch.
> > solution-2
> > - Fail validation if last operator is stateless in AT_LEAST_ONCE scenario
> > as suggested by Vlad.
> >   This could break backward compatibility as old applications will fail
> to
> > launch.
> > solution-3
> > - Mark last operator stateful in AT_LEAST_ONCE scenario.
> >
> > Let me know about your preference.
> >
> > Regards,
> > - Tushar.
> >
> >
> > On Mon, Mar 6, 2017 at 8:31 PM, Vlad Rozov <v....@datatorrent.com>
> > wrote:
> >
> >> For a long chain of stateless operators at the end of a DAG, it is
> >> possible that time to propagate the end window to a leaf operator is
> >> greater than the time for a checkpoint to be persisted in HDFS.
> >>
> >> If at least once processing guarantee is necessary, the leaf operators
> >> should not be STATELESS. Will invalidating DAG that has one or more leaf
> >> operator marked as STATELESS with AT_LEAST_ONCE processing solve
> >> APEXCORE-619? It is not the best solution, but I think it is sufficient
> for
> >> the described scenario.
> >>
> >> Thank you,
> >>
> >> Vlad
> >>
> >>
> >> On 3/2/17 08:43, Thomas Weise wrote:
> >>
> >>> Good point, that's correct for a stateless leaf operator (operator that
> >>> does not have downstream operators). The minimum of upstream
> checkpoints
> >>> can be higher than the last windowId seen by the leaf operator.
> Although
> >>> that is a low probability, because it would mean the time it took for
> the
> >>> checkpoint to become visible in HDFS is less than propagation of
> >>> endWindow
> >>> downstream.
> >>>
> >>> It's also not a problem for an intermediate stateless operator, because
> >>> the
> >>> downstream checkpoint will inform the recovery windowId. Most of the
> time
> >>> stateless operators are intermediate.
> >>>
> >>> Leaf operators are the output operators. I suspect in the original
> >>> scenario
> >>> is was a console output operator? Useful output operators usually won't
> >>> be
> >>> stateless, they have to track state to interact with the external
> system
> >>> correctly. I'm bringing this up for adequate cost/benefit analysis.
> >>>
> >>> In absence of stateful downstream operator, you only have the committed
> >>> windowId, which is essentially a checkpointing watermark. On
> application
> >>> restart it has to be recomputed from the checkpoints available, and
> does
> >>> not cover the scenario Tushar reported originally.
> >>>
> >>> Saving committed windowId comes at a cost, it would have to be written
> to
> >>> the journal before operators are notified. Care has been taken to no
> >>> write
> >>> unnecessarily to the journal, as it is blocking I/O and in this case
> the
> >>> frequency depends on the order of arrival of checkpoint notifications
> >>> from
> >>> operators. We also don't want to delay commitedWindow notification, as
> >>> that
> >>> would introduce latency.
> >>>
> >>> Thomas
> >>>
> >>>
> >>> On Thu, Mar 2, 2017 at 2:10 AM, Bhupesh Chawda <
> bhupesh@datatorrent.com>
> >>> wrote:
> >>>
> >>> What if all operators complete first checkpoints but the stateless
> >>>> operator
> >>>> could not cross the first checkpoint window, and the DAG crashed.
> >>>> If we try to figure out the recovery checkpoint now, we might conclude
> >>>> that
> >>>> checkpoint 1 is the point to start and we may miss some data getting
> >>>> processed by the stateless operator. Probably in this case at-least
> >>>> once is
> >>>> also not guaranteed?
> >>>>
> >>>> ~ Bhupesh
> >>>>
> >>>>
> >>>> _______________________________________________________
> >>>>
> >>>> Bhupesh Chawda
> >>>>
> >>>> E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
> >>>>
> >>>> www.datatorrent.com  |  apex.apache.org
> >>>>
> >>>>
> >>>>
> >>>> On Thu, Mar 2, 2017 at 8:06 AM, Thomas Weise <th...@apache.org> wrote:
> >>>>
> >>>> Dummy checkpoints, continuously writing committed window id and the
> like
> >>>>> all introduce overhead that is probably not needed.
> >>>>>
> >>>>> All the information to derive what we need is likely available and
> IMO
> >>>>>
> >>>> the
> >>>>
> >>>>> discussion should be on what is the correct way of using it. I will
> >>>>> have
> >>>>>
> >>>> a
> >>>>
> >>>>> look when I get to it as well.
> >>>>>
> >>>>> Thanks,
> >>>>> Thomas
> >>>>>
> >>>>>
> >>>>> On Wed, Mar 1, 2017 at 6:29 PM, Sandesh Hegde <
> sandesh@datatorrent.com
> >>>>> >
> >>>>> wrote:
> >>>>>
> >>>>> Instead of treating the stateless operator in a special way and
> missing
> >>>>>> corner cases, just have a dummy checkpoint, then there is no need to
> >>>>>>
> >>>>> handle
> >>>>>
> >>>>>> corner cases.
> >>>>>>
> >>>>>> There is a name for this solution,
> >>>>>> https://en.wikipedia.org/wiki/Null_Object_pattern
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Wed, Mar 1, 2017 at 2:52 PM Pramod Immaneni <
> >>>>>> pramod@datatorrent.com
> >>>>>> wrote:
> >>>>>>
> >>>>>> There is code in various places that deals with stateless operators
> >>>>>>>
> >>>>>> in
> >>>>
> >>>>> a
> >>>>>
> >>>>>> special way even though a physical checkpoint does not exist on the
> >>>>>>>
> >>>>>> disk.
> >>>>>
> >>>>>> It is probably a matter of applying similar thought process/logic
> >>>>>>>
> >>>>>> correctly
> >>>>>>
> >>>>>>> here.
> >>>>>>>
> >>>>>>> On Wed, Mar 1, 2017 at 2:27 PM, Amol Kekre <am...@datatorrent.com>
> >>>>>>>
> >>>>>> wrote:
> >>>>>
> >>>>>> hmm! the fact that commitWindowId has moved up (right now in memory
> >>>>>>>>
> >>>>>>> of
> >>>>>
> >>>>>> Stram) should mean that a complete set of checkpoints are
> >>>>>>>>
> >>>>>>> available,
> >>>>
> >>>>> i.e
> >>>>>>
> >>>>>>> commitWindowId can be derived. Lets say that next checkpoint window
> >>>>>>>>
> >>>>>>> also
> >>>>>>
> >>>>>>> gets checkpointed across the app, commitwindowID is in memory but
> >>>>>>>>
> >>>>>>> not
> >>>>
> >>>>> written to stram-state yet, then upon relaunch the latest
> >>>>>>>>
> >>>>>>> commitwindowID
> >>>>>>
> >>>>>>> should get computed correctly.
> >>>>>>>>
> >>>>>>>> This may be just about setting stateless operators to
> >>>>>>>>
> >>>>>>> commitWindowid
> >>>>
> >>>>> on
> >>>>>
> >>>>>> re-launch? aka bug/feature?
> >>>>>>>>
> >>>>>>>> Thks
> >>>>>>>> Amol
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> |
> >>>>>>>>
> >>>>>>> Twitter:
> >>>>>
> >>>>>> @*amolhkekre*
> >>>>>>>
> >>>>>>>> www.datatorrent.com  |  apex.apache.org
> >>>>>>>>
> >>>>>>>> *Join us at Apex Big Data World-San Jose
> >>>>>>>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> >>>>>>>> [image: http://www.apexbigdata.com/san-jose-register.html]
> >>>>>>>> <http://www.apexbigdata.com/san-jose-register.html>
> >>>>>>>>
> >>>>>>>> On Wed, Mar 1, 2017 at 1:41 PM, Pramod Immaneni <
> >>>>>>>>
> >>>>>>> pramod@datatorrent.com>
> >>>>>>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>> Do we need to save committedWindowId? Can't it be computed from
> >>>>>>>>>
> >>>>>>>> existing
> >>>>>>>
> >>>>>>>> checkpoints by walking through the DAG. We probably do this
> >>>>>>>>>
> >>>>>>>> anyway
> >>>>
> >>>>> and
> >>>>>>
> >>>>>>> I
> >>>>>>>
> >>>>>>>> suspect there is a minor bug somewhere in there. If an operator
> >>>>>>>>>
> >>>>>>>> is
> >>>>
> >>>>> stateless you could assume checkpoint as long max for sake of
> >>>>>>>>>
> >>>>>>>> computation
> >>>>>>>
> >>>>>>>> and compute the committed window to be the lowest common
> >>>>>>>>>
> >>>>>>>> checkpoint.
> >>>>>
> >>>>>> If
> >>>>>>
> >>>>>>> they are all stateless and you end up with long max you can start
> >>>>>>>>>
> >>>>>>>> with
> >>>>>>
> >>>>>>> window id that reflects the current timestamp.
> >>>>>>>>>
> >>>>>>>>> Thanks
> >>>>>>>>>
> >>>>>>>>> On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre <amol@datatorrent.com
> >>>>>>>>>
> >>>>>>>> wrote:
> >>>>>>>
> >>>>>>>> CommitWindowId could be computed from the existing checkpoints.
> >>>>>>>>>>
> >>>>>>>>> That
> >>>>>>
> >>>>>>> solution still needs purge to be done after commitWindowId is
> >>>>>>>>>>
> >>>>>>>>> confirmed
> >>>>>>>
> >>>>>>>> to
> >>>>>>>>>
> >>>>>>>>>> be saved in Stram state. Without ths the commitWindowId
> >>>>>>>>>>
> >>>>>>>>> computed
> >>>>
> >>>>> from
> >>>>>>
> >>>>>>> the
> >>>>>>>>
> >>>>>>>>> checkpoints may have some checkpoints missing.
> >>>>>>>>>>
> >>>>>>>>>> Thks
> >>>>>>>>>> Amol
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> |
> >>>>>>>>>>
> >>>>>>>>> Twitter: @*amolhkekre*
> >>>>>>>
> >>>>>>>> www.datatorrent.com  |  apex.apache.org
> >>>>>>>>>>
> >>>>>>>>>> *Join us at Apex Big Data World-San Jose
> >>>>>>>>>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> >>>>>>>>>> [image: http://www.apexbigdata.com/san-jose-register.html]
> >>>>>>>>>> <http://www.apexbigdata.com/san-jose-register.html>
> >>>>>>>>>>
> >>>>>>>>>> On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni <
> >>>>>>>>>>
> >>>>>>>>> pramod@datatorrent.com
> >>>>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Can't the commitedWindowId be calculated by looking at the
> >>>>>>>>>>>
> >>>>>>>>>> physical
> >>>>>>
> >>>>>>> plan
> >>>>>>>>>
> >>>>>>>>>> and the existing checkpoints?
> >>>>>>>>>>>
> >>>>>>>>>>> On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi <
> >>>>>>>>>>>
> >>>>>>>>>> tushar@apache.org
> >>>>>
> >>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Help Needed for APEXCORE-619
> >>>>>>>>>>>>
> >>>>>>>>>>>> Issue : When application is relaunched after long time with
> >>>>>>>>>>>>
> >>>>>>>>>>> stateless
> >>>>>>>>
> >>>>>>>>> opeartors at the end of the DAG, the stateless operators
> >>>>>>>>>>>>
> >>>>>>>>>>> starts
> >>>>>
> >>>>>> with
> >>>>>>>>
> >>>>>>>>> a
> >>>>>>>>>
> >>>>>>>>>> very
> >>>>>>>>>>>
> >>>>>>>>>>>> high windowId. In this case the stateless operator ignors
> >>>>>>>>>>>>
> >>>>>>>>>>> all
> >>>>
> >>>>> the
> >>>>>>
> >>>>>>> data
> >>>>>>>>>
> >>>>>>>>>> received till upstream operator catches up with it. This
> >>>>>>>>>>>>
> >>>>>>>>>>> breaks
> >>>>>
> >>>>>> the
> >>>>>>>
> >>>>>>>> *at-least-once* gaurantee while relaunch of the opeartor or
> >>>>>>>>>>>>
> >>>>>>>>>>> when
> >>>>>>
> >>>>>>> master
> >>>>>>>>>
> >>>>>>>>>> is
> >>>>>>>>>>>
> >>>>>>>>>>>> killed and application is restarted.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Solutions:
> >>>>>>>>>>>> - Fix windowId for stateless leaf operators from upstream
> >>>>>>>>>>>>
> >>>>>>>>>>> opeartor.
> >>>>>>>
> >>>>>>>> But
> >>>>>>>>>
> >>>>>>>>>> it
> >>>>>>>>>>>
> >>>>>>>>>>>> has some issues when we have a join with two upstrams
> >>>>>>>>>>>>
> >>>>>>>>>>> operators
> >>>>>
> >>>>>> at
> >>>>>>>
> >>>>>>>> different windowId. If we set the windowID to min(upstream
> >>>>>>>>>>>>
> >>>>>>>>>>> windowId),
> >>>>>>>>
> >>>>>>>>> then
> >>>>>>>>>>>
> >>>>>>>>>>>> we need to again recalulate the new recovery window ids for
> >>>>>>>>>>>>
> >>>>>>>>>>> upstream
> >>>>>>>>
> >>>>>>>>> paths
> >>>>>>>>>>>
> >>>>>>>>>>>> from this operators.
> >>>>>>>>>>>>
> >>>>>>>>>>>> - Other solution is to create a empty file in checkpoint
> >>>>>>>>>>>>
> >>>>>>>>>>> directory
> >>>>>>>
> >>>>>>>> for
> >>>>>>>>>
> >>>>>>>>>> stateless operators. This will help us to identify the
> >>>>>>>>>>>>
> >>>>>>>>>>> checkpoints
> >>>>>>>
> >>>>>>>> of
> >>>>>>>>
> >>>>>>>>> stateless operators during relaunch instead of computing
> >>>>>>>>>>>>
> >>>>>>>>>>> from
> >>>>
> >>>>> latest
> >>>>>>>>
> >>>>>>>>> timestamp.
> >>>>>>>>>>>>
> >>>>>>>>>>>> - Bring the entire DAG to committedWindowId. This could be
> >>>>>>>>>>>>
> >>>>>>>>>>> achived
> >>>>>>>
> >>>>>>>> using
> >>>>>>>>>>
> >>>>>>>>>>> writing committedWindowId in a journal. we need to make
> >>>>>>>>>>>>
> >>>>>>>>>>> sure
> >>>>
> >>>>> that
> >>>>>>
> >>>>>>> we
> >>>>>>>>
> >>>>>>>>> are
> >>>>>>>>>>
> >>>>>>>>>>> not puring the checkpointed state until the
> >>>>>>>>>>>>
> >>>>>>>>>>> committedWundowId
> >>>>
> >>>>> is
> >>>>>>
> >>>>>>> saved
> >>>>>>>>>
> >>>>>>>>>> in
> >>>>>>>>>>
> >>>>>>>>>>> journal.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Let me know your thoughs on this and preferred solution.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Regards,
> >>>>>>>>>>>> -Tushar.
> >>>>>>>>>>>>
> >>>>>>>>>>>> --
> >>>>>> *Join us at Apex Big Data World-San Jose
> >>>>>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> >>>>>> [image: http://www.apexbigdata.com/san-jose-register.html]
> >>>>>>
> >>>>>>
> >>
> >
>

Re: APEXCORE-619 Recovery windowId in future during application relaunch.

Posted by Tushar Gosavi <tu...@datatorrent.com>.
Can you please let me know your preference? My preference is for solution
3, by adding a StorageAgent which creates an empty file, and using this
storage agent for leaf stateless operators.

- Tushar.


On Tue, Mar 7, 2017 at 1:52 PM, Tushar Gosavi <tu...@datatorrent.com>
wrote:

> Thank you all for the feedback.
>
> Some of the useful output operator can be stateless, they push data
> received in a window to output store. for example KafkaOutputOperator/JDBCOutputOperator,
> or the output stores where
> writes are idempotent, which covers most of the key-value stores.
>
> I was going to use the existing logic to compute the committedWindowId
> with addition of few steps explained below.
> solution-1
> - Calculate committedWindow with leaf operator checkpoints set to current
> timestamp (current behaviour)
> - Update leaf operators recoveryWindowId to committedWindowId
> - Calculate committedWindow again, this steps is required because as
> downstream operator recoveryWindowId is reduced and hence we may have to
> adjust the recoveryWindowId of upstream operators.
>
> This will prevent leaf stateless opeartors to start from current
> timestamp, hence reducing amount of data loss. But As per the concern
> raised by Bhupesh about last stateless operator being slow, the solution
> suggested by Vlad is sufficient
>
> solution-1
> - as explained above. If little loss is expected we could go with this
> appraoch.
> solution-2
> - Fail validation if last operator is stateless in AT_LEAST_ONCE scenario
> as suggested by Vlad.
>   This could break backward compatibility as old applications will fail to
> launch.
> solution-3
> - Mark last operator stateful in AT_LEAST_ONCE scenario.
>
> Let me know about your preference.
>
> Regards,
> - Tushar.
>
>
> On Mon, Mar 6, 2017 at 8:31 PM, Vlad Rozov <v....@datatorrent.com>
> wrote:
>
>> For a long chain of stateless operators at the end of a DAG, it is
>> possible that time to propagate the end window to a leaf operator is
>> greater than the time for a checkpoint to be persisted in HDFS.
>>
>> If at least once processing guarantee is necessary, the leaf operators
>> should not be STATELESS. Will invalidating DAG that has one or more leaf
>> operator marked as STATELESS with AT_LEAST_ONCE processing solve
>> APEXCORE-619? It is not the best solution, but I think it is sufficient for
>> the described scenario.
>>
>> Thank you,
>>
>> Vlad
>>
>>
>> On 3/2/17 08:43, Thomas Weise wrote:
>>
>>> Good point, that's correct for a stateless leaf operator (operator that
>>> does not have downstream operators). The minimum of upstream checkpoints
>>> can be higher than the last windowId seen by the leaf operator. Although
>>> that is a low probability, because it would mean the time it took for the
>>> checkpoint to become visible in HDFS is less than propagation of
>>> endWindow
>>> downstream.
>>>
>>> It's also not a problem for an intermediate stateless operator, because
>>> the
>>> downstream checkpoint will inform the recovery windowId. Most of the time
>>> stateless operators are intermediate.
>>>
>>> Leaf operators are the output operators. I suspect in the original
>>> scenario
>>> is was a console output operator? Useful output operators usually won't
>>> be
>>> stateless, they have to track state to interact with the external system
>>> correctly. I'm bringing this up for adequate cost/benefit analysis.
>>>
>>> In absence of stateful downstream operator, you only have the committed
>>> windowId, which is essentially a checkpointing watermark. On application
>>> restart it has to be recomputed from the checkpoints available, and does
>>> not cover the scenario Tushar reported originally.
>>>
>>> Saving committed windowId comes at a cost, it would have to be written to
>>> the journal before operators are notified. Care has been taken to no
>>> write
>>> unnecessarily to the journal, as it is blocking I/O and in this case the
>>> frequency depends on the order of arrival of checkpoint notifications
>>> from
>>> operators. We also don't want to delay commitedWindow notification, as
>>> that
>>> would introduce latency.
>>>
>>> Thomas
>>>
>>>
>>> On Thu, Mar 2, 2017 at 2:10 AM, Bhupesh Chawda <bh...@datatorrent.com>
>>> wrote:
>>>
>>> What if all operators complete first checkpoints but the stateless
>>>> operator
>>>> could not cross the first checkpoint window, and the DAG crashed.
>>>> If we try to figure out the recovery checkpoint now, we might conclude
>>>> that
>>>> checkpoint 1 is the point to start and we may miss some data getting
>>>> processed by the stateless operator. Probably in this case at-least
>>>> once is
>>>> also not guaranteed?
>>>>
>>>> ~ Bhupesh
>>>>
>>>>
>>>> _______________________________________________________
>>>>
>>>> Bhupesh Chawda
>>>>
>>>> E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
>>>>
>>>> www.datatorrent.com  |  apex.apache.org
>>>>
>>>>
>>>>
>>>> On Thu, Mar 2, 2017 at 8:06 AM, Thomas Weise <th...@apache.org> wrote:
>>>>
>>>> Dummy checkpoints, continuously writing committed window id and the like
>>>>> all introduce overhead that is probably not needed.
>>>>>
>>>>> All the information to derive what we need is likely available and IMO
>>>>>
>>>> the
>>>>
>>>>> discussion should be on what is the correct way of using it. I will
>>>>> have
>>>>>
>>>> a
>>>>
>>>>> look when I get to it as well.
>>>>>
>>>>> Thanks,
>>>>> Thomas
>>>>>
>>>>>
>>>>> On Wed, Mar 1, 2017 at 6:29 PM, Sandesh Hegde <sandesh@datatorrent.com
>>>>> >
>>>>> wrote:
>>>>>
>>>>> Instead of treating the stateless operator in a special way and missing
>>>>>> corner cases, just have a dummy checkpoint, then there is no need to
>>>>>>
>>>>> handle
>>>>>
>>>>>> corner cases.
>>>>>>
>>>>>> There is a name for this solution,
>>>>>> https://en.wikipedia.org/wiki/Null_Object_pattern
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Mar 1, 2017 at 2:52 PM Pramod Immaneni <
>>>>>> pramod@datatorrent.com
>>>>>> wrote:
>>>>>>
>>>>>> There is code in various places that deals with stateless operators
>>>>>>>
>>>>>> in
>>>>
>>>>> a
>>>>>
>>>>>> special way even though a physical checkpoint does not exist on the
>>>>>>>
>>>>>> disk.
>>>>>
>>>>>> It is probably a matter of applying similar thought process/logic
>>>>>>>
>>>>>> correctly
>>>>>>
>>>>>>> here.
>>>>>>>
>>>>>>> On Wed, Mar 1, 2017 at 2:27 PM, Amol Kekre <am...@datatorrent.com>
>>>>>>>
>>>>>> wrote:
>>>>>
>>>>>> hmm! the fact that commitWindowId has moved up (right now in memory
>>>>>>>>
>>>>>>> of
>>>>>
>>>>>> Stram) should mean that a complete set of checkpoints are
>>>>>>>>
>>>>>>> available,
>>>>
>>>>> i.e
>>>>>>
>>>>>>> commitWindowId can be derived. Lets say that next checkpoint window
>>>>>>>>
>>>>>>> also
>>>>>>
>>>>>>> gets checkpointed across the app, commitwindowID is in memory but
>>>>>>>>
>>>>>>> not
>>>>
>>>>> written to stram-state yet, then upon relaunch the latest
>>>>>>>>
>>>>>>> commitwindowID
>>>>>>
>>>>>>> should get computed correctly.
>>>>>>>>
>>>>>>>> This may be just about setting stateless operators to
>>>>>>>>
>>>>>>> commitWindowid
>>>>
>>>>> on
>>>>>
>>>>>> re-launch? aka bug/feature?
>>>>>>>>
>>>>>>>> Thks
>>>>>>>> Amol
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> |
>>>>>>>>
>>>>>>> Twitter:
>>>>>
>>>>>> @*amolhkekre*
>>>>>>>
>>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>>
>>>>>>>> *Join us at Apex Big Data World-San Jose
>>>>>>>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
>>>>>>>> [image: http://www.apexbigdata.com/san-jose-register.html]
>>>>>>>> <http://www.apexbigdata.com/san-jose-register.html>
>>>>>>>>
>>>>>>>> On Wed, Mar 1, 2017 at 1:41 PM, Pramod Immaneni <
>>>>>>>>
>>>>>>> pramod@datatorrent.com>
>>>>>>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Do we need to save committedWindowId? Can't it be computed from
>>>>>>>>>
>>>>>>>> existing
>>>>>>>
>>>>>>>> checkpoints by walking through the DAG. We probably do this
>>>>>>>>>
>>>>>>>> anyway
>>>>
>>>>> and
>>>>>>
>>>>>>> I
>>>>>>>
>>>>>>>> suspect there is a minor bug somewhere in there. If an operator
>>>>>>>>>
>>>>>>>> is
>>>>
>>>>> stateless you could assume checkpoint as long max for sake of
>>>>>>>>>
>>>>>>>> computation
>>>>>>>
>>>>>>>> and compute the committed window to be the lowest common
>>>>>>>>>
>>>>>>>> checkpoint.
>>>>>
>>>>>> If
>>>>>>
>>>>>>> they are all stateless and you end up with long max you can start
>>>>>>>>>
>>>>>>>> with
>>>>>>
>>>>>>> window id that reflects the current timestamp.
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>> On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre <amol@datatorrent.com
>>>>>>>>>
>>>>>>>> wrote:
>>>>>>>
>>>>>>>> CommitWindowId could be computed from the existing checkpoints.
>>>>>>>>>>
>>>>>>>>> That
>>>>>>
>>>>>>> solution still needs purge to be done after commitWindowId is
>>>>>>>>>>
>>>>>>>>> confirmed
>>>>>>>
>>>>>>>> to
>>>>>>>>>
>>>>>>>>>> be saved in Stram state. Without ths the commitWindowId
>>>>>>>>>>
>>>>>>>>> computed
>>>>
>>>>> from
>>>>>>
>>>>>>> the
>>>>>>>>
>>>>>>>>> checkpoints may have some checkpoints missing.
>>>>>>>>>>
>>>>>>>>>> Thks
>>>>>>>>>> Amol
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> |
>>>>>>>>>>
>>>>>>>>> Twitter: @*amolhkekre*
>>>>>>>
>>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>>>>
>>>>>>>>>> *Join us at Apex Big Data World-San Jose
>>>>>>>>>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
>>>>>>>>>> [image: http://www.apexbigdata.com/san-jose-register.html]
>>>>>>>>>> <http://www.apexbigdata.com/san-jose-register.html>
>>>>>>>>>>
>>>>>>>>>> On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni <
>>>>>>>>>>
>>>>>>>>> pramod@datatorrent.com
>>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Can't the commitedWindowId be calculated by looking at the
>>>>>>>>>>>
>>>>>>>>>> physical
>>>>>>
>>>>>>> plan
>>>>>>>>>
>>>>>>>>>> and the existing checkpoints?
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi <
>>>>>>>>>>>
>>>>>>>>>> tushar@apache.org
>>>>>
>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Help Needed for APEXCORE-619
>>>>>>>>>>>>
>>>>>>>>>>>> Issue : When application is relaunched after long time with
>>>>>>>>>>>>
>>>>>>>>>>> stateless
>>>>>>>>
>>>>>>>>> opeartors at the end of the DAG, the stateless operators
>>>>>>>>>>>>
>>>>>>>>>>> starts
>>>>>
>>>>>> with
>>>>>>>>
>>>>>>>>> a
>>>>>>>>>
>>>>>>>>>> very
>>>>>>>>>>>
>>>>>>>>>>>> high windowId. In this case the stateless operator ignors
>>>>>>>>>>>>
>>>>>>>>>>> all
>>>>
>>>>> the
>>>>>>
>>>>>>> data
>>>>>>>>>
>>>>>>>>>> received till upstream operator catches up with it. This
>>>>>>>>>>>>
>>>>>>>>>>> breaks
>>>>>
>>>>>> the
>>>>>>>
>>>>>>>> *at-least-once* gaurantee while relaunch of the opeartor or
>>>>>>>>>>>>
>>>>>>>>>>> when
>>>>>>
>>>>>>> master
>>>>>>>>>
>>>>>>>>>> is
>>>>>>>>>>>
>>>>>>>>>>>> killed and application is restarted.
>>>>>>>>>>>>
>>>>>>>>>>>> Solutions:
>>>>>>>>>>>> - Fix windowId for stateless leaf operators from upstream
>>>>>>>>>>>>
>>>>>>>>>>> opeartor.
>>>>>>>
>>>>>>>> But
>>>>>>>>>
>>>>>>>>>> it
>>>>>>>>>>>
>>>>>>>>>>>> has some issues when we have a join with two upstrams
>>>>>>>>>>>>
>>>>>>>>>>> operators
>>>>>
>>>>>> at
>>>>>>>
>>>>>>>> different windowId. If we set the windowID to min(upstream
>>>>>>>>>>>>
>>>>>>>>>>> windowId),
>>>>>>>>
>>>>>>>>> then
>>>>>>>>>>>
>>>>>>>>>>>> we need to again recalulate the new recovery window ids for
>>>>>>>>>>>>
>>>>>>>>>>> upstream
>>>>>>>>
>>>>>>>>> paths
>>>>>>>>>>>
>>>>>>>>>>>> from this operators.
>>>>>>>>>>>>
>>>>>>>>>>>> - Other solution is to create a empty file in checkpoint
>>>>>>>>>>>>
>>>>>>>>>>> directory
>>>>>>>
>>>>>>>> for
>>>>>>>>>
>>>>>>>>>> stateless operators. This will help us to identify the
>>>>>>>>>>>>
>>>>>>>>>>> checkpoints
>>>>>>>
>>>>>>>> of
>>>>>>>>
>>>>>>>>> stateless operators during relaunch instead of computing
>>>>>>>>>>>>
>>>>>>>>>>> from
>>>>
>>>>> latest
>>>>>>>>
>>>>>>>>> timestamp.
>>>>>>>>>>>>
>>>>>>>>>>>> - Bring the entire DAG to committedWindowId. This could be
>>>>>>>>>>>>
>>>>>>>>>>> achived
>>>>>>>
>>>>>>>> using
>>>>>>>>>>
>>>>>>>>>>> writing committedWindowId in a journal. we need to make
>>>>>>>>>>>>
>>>>>>>>>>> sure
>>>>
>>>>> that
>>>>>>
>>>>>>> we
>>>>>>>>
>>>>>>>>> are
>>>>>>>>>>
>>>>>>>>>>> not puring the checkpointed state until the
>>>>>>>>>>>>
>>>>>>>>>>> committedWundowId
>>>>
>>>>> is
>>>>>>
>>>>>>> saved
>>>>>>>>>
>>>>>>>>>> in
>>>>>>>>>>
>>>>>>>>>>> journal.
>>>>>>>>>>>>
>>>>>>>>>>>> Let me know your thoughs on this and preferred solution.
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> -Tushar.
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>> *Join us at Apex Big Data World-San Jose
>>>>>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
>>>>>> [image: http://www.apexbigdata.com/san-jose-register.html]
>>>>>>
>>>>>>
>>
>

Re: APEXCORE-619 Recovery windowId in future during application relaunch.

Posted by Tushar Gosavi <tu...@datatorrent.com>.
Thank you all for the feedback.

Some of the useful output operator can be stateless, they push data
received in a window to output store. for example
KafkaOutputOperator/JDBCOutputOperator, or the output stores where
writes are idempotent, which covers most of the key-value stores.

I was going to use the existing logic to compute the committedWindowId with
addition of few steps explained below.
solution-1
- Calculate committedWindow with leaf operator checkpoints set to current
timestamp (current behaviour)
- Update leaf operators recoveryWindowId to committedWindowId
- Calculate committedWindow again, this steps is required because as
downstream operator recoveryWindowId is reduced and hence we may have to
adjust the recoveryWindowId of upstream operators.

This will prevent leaf stateless opeartors to start from current timestamp,
hence reducing amount of data loss. But As per the concern raised by
Bhupesh about last stateless operator being slow, the solution suggested by
Vlad is sufficient

solution-1
- as explained above. If little loss is expected we could go with this
appraoch.
solution-2
- Fail validation if last operator is stateless in AT_LEAST_ONCE scenario
as suggested by Vlad.
  This could break backward compatibility as old applications will fail to
launch.
solution-3
- Mark last operator stateful in AT_LEAST_ONCE scenario.

Let me know about your preference.

Regards,
- Tushar.


On Mon, Mar 6, 2017 at 8:31 PM, Vlad Rozov <v....@datatorrent.com> wrote:

> For a long chain of stateless operators at the end of a DAG, it is
> possible that time to propagate the end window to a leaf operator is
> greater than the time for a checkpoint to be persisted in HDFS.
>
> If at least once processing guarantee is necessary, the leaf operators
> should not be STATELESS. Will invalidating DAG that has one or more leaf
> operator marked as STATELESS with AT_LEAST_ONCE processing solve
> APEXCORE-619? It is not the best solution, but I think it is sufficient for
> the described scenario.
>
> Thank you,
>
> Vlad
>
>
> On 3/2/17 08:43, Thomas Weise wrote:
>
>> Good point, that's correct for a stateless leaf operator (operator that
>> does not have downstream operators). The minimum of upstream checkpoints
>> can be higher than the last windowId seen by the leaf operator. Although
>> that is a low probability, because it would mean the time it took for the
>> checkpoint to become visible in HDFS is less than propagation of endWindow
>> downstream.
>>
>> It's also not a problem for an intermediate stateless operator, because
>> the
>> downstream checkpoint will inform the recovery windowId. Most of the time
>> stateless operators are intermediate.
>>
>> Leaf operators are the output operators. I suspect in the original
>> scenario
>> is was a console output operator? Useful output operators usually won't be
>> stateless, they have to track state to interact with the external system
>> correctly. I'm bringing this up for adequate cost/benefit analysis.
>>
>> In absence of stateful downstream operator, you only have the committed
>> windowId, which is essentially a checkpointing watermark. On application
>> restart it has to be recomputed from the checkpoints available, and does
>> not cover the scenario Tushar reported originally.
>>
>> Saving committed windowId comes at a cost, it would have to be written to
>> the journal before operators are notified. Care has been taken to no write
>> unnecessarily to the journal, as it is blocking I/O and in this case the
>> frequency depends on the order of arrival of checkpoint notifications from
>> operators. We also don't want to delay commitedWindow notification, as
>> that
>> would introduce latency.
>>
>> Thomas
>>
>>
>> On Thu, Mar 2, 2017 at 2:10 AM, Bhupesh Chawda <bh...@datatorrent.com>
>> wrote:
>>
>> What if all operators complete first checkpoints but the stateless
>>> operator
>>> could not cross the first checkpoint window, and the DAG crashed.
>>> If we try to figure out the recovery checkpoint now, we might conclude
>>> that
>>> checkpoint 1 is the point to start and we may miss some data getting
>>> processed by the stateless operator. Probably in this case at-least once
>>> is
>>> also not guaranteed?
>>>
>>> ~ Bhupesh
>>>
>>>
>>> _______________________________________________________
>>>
>>> Bhupesh Chawda
>>>
>>> E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
>>>
>>> www.datatorrent.com  |  apex.apache.org
>>>
>>>
>>>
>>> On Thu, Mar 2, 2017 at 8:06 AM, Thomas Weise <th...@apache.org> wrote:
>>>
>>> Dummy checkpoints, continuously writing committed window id and the like
>>>> all introduce overhead that is probably not needed.
>>>>
>>>> All the information to derive what we need is likely available and IMO
>>>>
>>> the
>>>
>>>> discussion should be on what is the correct way of using it. I will have
>>>>
>>> a
>>>
>>>> look when I get to it as well.
>>>>
>>>> Thanks,
>>>> Thomas
>>>>
>>>>
>>>> On Wed, Mar 1, 2017 at 6:29 PM, Sandesh Hegde <sa...@datatorrent.com>
>>>> wrote:
>>>>
>>>> Instead of treating the stateless operator in a special way and missing
>>>>> corner cases, just have a dummy checkpoint, then there is no need to
>>>>>
>>>> handle
>>>>
>>>>> corner cases.
>>>>>
>>>>> There is a name for this solution,
>>>>> https://en.wikipedia.org/wiki/Null_Object_pattern
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Mar 1, 2017 at 2:52 PM Pramod Immaneni <pramod@datatorrent.com
>>>>> wrote:
>>>>>
>>>>> There is code in various places that deals with stateless operators
>>>>>>
>>>>> in
>>>
>>>> a
>>>>
>>>>> special way even though a physical checkpoint does not exist on the
>>>>>>
>>>>> disk.
>>>>
>>>>> It is probably a matter of applying similar thought process/logic
>>>>>>
>>>>> correctly
>>>>>
>>>>>> here.
>>>>>>
>>>>>> On Wed, Mar 1, 2017 at 2:27 PM, Amol Kekre <am...@datatorrent.com>
>>>>>>
>>>>> wrote:
>>>>
>>>>> hmm! the fact that commitWindowId has moved up (right now in memory
>>>>>>>
>>>>>> of
>>>>
>>>>> Stram) should mean that a complete set of checkpoints are
>>>>>>>
>>>>>> available,
>>>
>>>> i.e
>>>>>
>>>>>> commitWindowId can be derived. Lets say that next checkpoint window
>>>>>>>
>>>>>> also
>>>>>
>>>>>> gets checkpointed across the app, commitwindowID is in memory but
>>>>>>>
>>>>>> not
>>>
>>>> written to stram-state yet, then upon relaunch the latest
>>>>>>>
>>>>>> commitwindowID
>>>>>
>>>>>> should get computed correctly.
>>>>>>>
>>>>>>> This may be just about setting stateless operators to
>>>>>>>
>>>>>> commitWindowid
>>>
>>>> on
>>>>
>>>>> re-launch? aka bug/feature?
>>>>>>>
>>>>>>> Thks
>>>>>>> Amol
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> |
>>>>>>>
>>>>>> Twitter:
>>>>
>>>>> @*amolhkekre*
>>>>>>
>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>
>>>>>>> *Join us at Apex Big Data World-San Jose
>>>>>>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
>>>>>>> [image: http://www.apexbigdata.com/san-jose-register.html]
>>>>>>> <http://www.apexbigdata.com/san-jose-register.html>
>>>>>>>
>>>>>>> On Wed, Mar 1, 2017 at 1:41 PM, Pramod Immaneni <
>>>>>>>
>>>>>> pramod@datatorrent.com>
>>>>>
>>>>>> wrote:
>>>>>>>
>>>>>>> Do we need to save committedWindowId? Can't it be computed from
>>>>>>>>
>>>>>>> existing
>>>>>>
>>>>>>> checkpoints by walking through the DAG. We probably do this
>>>>>>>>
>>>>>>> anyway
>>>
>>>> and
>>>>>
>>>>>> I
>>>>>>
>>>>>>> suspect there is a minor bug somewhere in there. If an operator
>>>>>>>>
>>>>>>> is
>>>
>>>> stateless you could assume checkpoint as long max for sake of
>>>>>>>>
>>>>>>> computation
>>>>>>
>>>>>>> and compute the committed window to be the lowest common
>>>>>>>>
>>>>>>> checkpoint.
>>>>
>>>>> If
>>>>>
>>>>>> they are all stateless and you end up with long max you can start
>>>>>>>>
>>>>>>> with
>>>>>
>>>>>> window id that reflects the current timestamp.
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>> On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre <amol@datatorrent.com
>>>>>>>>
>>>>>>> wrote:
>>>>>>
>>>>>>> CommitWindowId could be computed from the existing checkpoints.
>>>>>>>>>
>>>>>>>> That
>>>>>
>>>>>> solution still needs purge to be done after commitWindowId is
>>>>>>>>>
>>>>>>>> confirmed
>>>>>>
>>>>>>> to
>>>>>>>>
>>>>>>>>> be saved in Stram state. Without ths the commitWindowId
>>>>>>>>>
>>>>>>>> computed
>>>
>>>> from
>>>>>
>>>>>> the
>>>>>>>
>>>>>>>> checkpoints may have some checkpoints missing.
>>>>>>>>>
>>>>>>>>> Thks
>>>>>>>>> Amol
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> |
>>>>>>>>>
>>>>>>>> Twitter: @*amolhkekre*
>>>>>>
>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>>>
>>>>>>>>> *Join us at Apex Big Data World-San Jose
>>>>>>>>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
>>>>>>>>> [image: http://www.apexbigdata.com/san-jose-register.html]
>>>>>>>>> <http://www.apexbigdata.com/san-jose-register.html>
>>>>>>>>>
>>>>>>>>> On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni <
>>>>>>>>>
>>>>>>>> pramod@datatorrent.com
>>>>>>>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Can't the commitedWindowId be calculated by looking at the
>>>>>>>>>>
>>>>>>>>> physical
>>>>>
>>>>>> plan
>>>>>>>>
>>>>>>>>> and the existing checkpoints?
>>>>>>>>>>
>>>>>>>>>> On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi <
>>>>>>>>>>
>>>>>>>>> tushar@apache.org
>>>>
>>>>> wrote:
>>>>>>>>
>>>>>>>>> Help Needed for APEXCORE-619
>>>>>>>>>>>
>>>>>>>>>>> Issue : When application is relaunched after long time with
>>>>>>>>>>>
>>>>>>>>>> stateless
>>>>>>>
>>>>>>>> opeartors at the end of the DAG, the stateless operators
>>>>>>>>>>>
>>>>>>>>>> starts
>>>>
>>>>> with
>>>>>>>
>>>>>>>> a
>>>>>>>>
>>>>>>>>> very
>>>>>>>>>>
>>>>>>>>>>> high windowId. In this case the stateless operator ignors
>>>>>>>>>>>
>>>>>>>>>> all
>>>
>>>> the
>>>>>
>>>>>> data
>>>>>>>>
>>>>>>>>> received till upstream operator catches up with it. This
>>>>>>>>>>>
>>>>>>>>>> breaks
>>>>
>>>>> the
>>>>>>
>>>>>>> *at-least-once* gaurantee while relaunch of the opeartor or
>>>>>>>>>>>
>>>>>>>>>> when
>>>>>
>>>>>> master
>>>>>>>>
>>>>>>>>> is
>>>>>>>>>>
>>>>>>>>>>> killed and application is restarted.
>>>>>>>>>>>
>>>>>>>>>>> Solutions:
>>>>>>>>>>> - Fix windowId for stateless leaf operators from upstream
>>>>>>>>>>>
>>>>>>>>>> opeartor.
>>>>>>
>>>>>>> But
>>>>>>>>
>>>>>>>>> it
>>>>>>>>>>
>>>>>>>>>>> has some issues when we have a join with two upstrams
>>>>>>>>>>>
>>>>>>>>>> operators
>>>>
>>>>> at
>>>>>>
>>>>>>> different windowId. If we set the windowID to min(upstream
>>>>>>>>>>>
>>>>>>>>>> windowId),
>>>>>>>
>>>>>>>> then
>>>>>>>>>>
>>>>>>>>>>> we need to again recalulate the new recovery window ids for
>>>>>>>>>>>
>>>>>>>>>> upstream
>>>>>>>
>>>>>>>> paths
>>>>>>>>>>
>>>>>>>>>>> from this operators.
>>>>>>>>>>>
>>>>>>>>>>> - Other solution is to create a empty file in checkpoint
>>>>>>>>>>>
>>>>>>>>>> directory
>>>>>>
>>>>>>> for
>>>>>>>>
>>>>>>>>> stateless operators. This will help us to identify the
>>>>>>>>>>>
>>>>>>>>>> checkpoints
>>>>>>
>>>>>>> of
>>>>>>>
>>>>>>>> stateless operators during relaunch instead of computing
>>>>>>>>>>>
>>>>>>>>>> from
>>>
>>>> latest
>>>>>>>
>>>>>>>> timestamp.
>>>>>>>>>>>
>>>>>>>>>>> - Bring the entire DAG to committedWindowId. This could be
>>>>>>>>>>>
>>>>>>>>>> achived
>>>>>>
>>>>>>> using
>>>>>>>>>
>>>>>>>>>> writing committedWindowId in a journal. we need to make
>>>>>>>>>>>
>>>>>>>>>> sure
>>>
>>>> that
>>>>>
>>>>>> we
>>>>>>>
>>>>>>>> are
>>>>>>>>>
>>>>>>>>>> not puring the checkpointed state until the
>>>>>>>>>>>
>>>>>>>>>> committedWundowId
>>>
>>>> is
>>>>>
>>>>>> saved
>>>>>>>>
>>>>>>>>> in
>>>>>>>>>
>>>>>>>>>> journal.
>>>>>>>>>>>
>>>>>>>>>>> Let me know your thoughs on this and preferred solution.
>>>>>>>>>>>
>>>>>>>>>>> Regards,
>>>>>>>>>>> -Tushar.
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>> *Join us at Apex Big Data World-San Jose
>>>>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
>>>>> [image: http://www.apexbigdata.com/san-jose-register.html]
>>>>>
>>>>>
>

Re: APEXCORE-619 Recovery windowId in future during application relaunch.

Posted by Vlad Rozov <v....@datatorrent.com>.
For a long chain of stateless operators at the end of a DAG, it is 
possible that time to propagate the end window to a leaf operator is 
greater than the time for a checkpoint to be persisted in HDFS.

If at least once processing guarantee is necessary, the leaf operators 
should not be STATELESS. Will invalidating DAG that has one or more leaf 
operator marked as STATELESS with AT_LEAST_ONCE processing solve 
APEXCORE-619? It is not the best solution, but I think it is sufficient 
for the described scenario.

Thank you,

Vlad

On 3/2/17 08:43, Thomas Weise wrote:
> Good point, that's correct for a stateless leaf operator (operator that
> does not have downstream operators). The minimum of upstream checkpoints
> can be higher than the last windowId seen by the leaf operator. Although
> that is a low probability, because it would mean the time it took for the
> checkpoint to become visible in HDFS is less than propagation of endWindow
> downstream.
>
> It's also not a problem for an intermediate stateless operator, because the
> downstream checkpoint will inform the recovery windowId. Most of the time
> stateless operators are intermediate.
>
> Leaf operators are the output operators. I suspect in the original scenario
> is was a console output operator? Useful output operators usually won't be
> stateless, they have to track state to interact with the external system
> correctly. I'm bringing this up for adequate cost/benefit analysis.
>
> In absence of stateful downstream operator, you only have the committed
> windowId, which is essentially a checkpointing watermark. On application
> restart it has to be recomputed from the checkpoints available, and does
> not cover the scenario Tushar reported originally.
>
> Saving committed windowId comes at a cost, it would have to be written to
> the journal before operators are notified. Care has been taken to no write
> unnecessarily to the journal, as it is blocking I/O and in this case the
> frequency depends on the order of arrival of checkpoint notifications from
> operators. We also don't want to delay commitedWindow notification, as that
> would introduce latency.
>
> Thomas
>
>
> On Thu, Mar 2, 2017 at 2:10 AM, Bhupesh Chawda <bh...@datatorrent.com>
> wrote:
>
>> What if all operators complete first checkpoints but the stateless operator
>> could not cross the first checkpoint window, and the DAG crashed.
>> If we try to figure out the recovery checkpoint now, we might conclude that
>> checkpoint 1 is the point to start and we may miss some data getting
>> processed by the stateless operator. Probably in this case at-least once is
>> also not guaranteed?
>>
>> ~ Bhupesh
>>
>>
>> _______________________________________________________
>>
>> Bhupesh Chawda
>>
>> E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
>>
>> www.datatorrent.com  |  apex.apache.org
>>
>>
>>
>> On Thu, Mar 2, 2017 at 8:06 AM, Thomas Weise <th...@apache.org> wrote:
>>
>>> Dummy checkpoints, continuously writing committed window id and the like
>>> all introduce overhead that is probably not needed.
>>>
>>> All the information to derive what we need is likely available and IMO
>> the
>>> discussion should be on what is the correct way of using it. I will have
>> a
>>> look when I get to it as well.
>>>
>>> Thanks,
>>> Thomas
>>>
>>>
>>> On Wed, Mar 1, 2017 at 6:29 PM, Sandesh Hegde <sa...@datatorrent.com>
>>> wrote:
>>>
>>>> Instead of treating the stateless operator in a special way and missing
>>>> corner cases, just have a dummy checkpoint, then there is no need to
>>> handle
>>>> corner cases.
>>>>
>>>> There is a name for this solution,
>>>> https://en.wikipedia.org/wiki/Null_Object_pattern
>>>>
>>>>
>>>>
>>>> On Wed, Mar 1, 2017 at 2:52 PM Pramod Immaneni <pramod@datatorrent.com
>>>> wrote:
>>>>
>>>>> There is code in various places that deals with stateless operators
>> in
>>> a
>>>>> special way even though a physical checkpoint does not exist on the
>>> disk.
>>>>> It is probably a matter of applying similar thought process/logic
>>>> correctly
>>>>> here.
>>>>>
>>>>> On Wed, Mar 1, 2017 at 2:27 PM, Amol Kekre <am...@datatorrent.com>
>>> wrote:
>>>>>> hmm! the fact that commitWindowId has moved up (right now in memory
>>> of
>>>>>> Stram) should mean that a complete set of checkpoints are
>> available,
>>>> i.e
>>>>>> commitWindowId can be derived. Lets say that next checkpoint window
>>>> also
>>>>>> gets checkpointed across the app, commitwindowID is in memory but
>> not
>>>>>> written to stram-state yet, then upon relaunch the latest
>>>> commitwindowID
>>>>>> should get computed correctly.
>>>>>>
>>>>>> This may be just about setting stateless operators to
>> commitWindowid
>>> on
>>>>>> re-launch? aka bug/feature?
>>>>>>
>>>>>> Thks
>>>>>> Amol
>>>>>>
>>>>>>
>>>>>>
>>>>>> E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> |
>>> Twitter:
>>>>> @*amolhkekre*
>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>
>>>>>> *Join us at Apex Big Data World-San Jose
>>>>>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
>>>>>> [image: http://www.apexbigdata.com/san-jose-register.html]
>>>>>> <http://www.apexbigdata.com/san-jose-register.html>
>>>>>>
>>>>>> On Wed, Mar 1, 2017 at 1:41 PM, Pramod Immaneni <
>>>> pramod@datatorrent.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Do we need to save committedWindowId? Can't it be computed from
>>>>> existing
>>>>>>> checkpoints by walking through the DAG. We probably do this
>> anyway
>>>> and
>>>>> I
>>>>>>> suspect there is a minor bug somewhere in there. If an operator
>> is
>>>>>>> stateless you could assume checkpoint as long max for sake of
>>>>> computation
>>>>>>> and compute the committed window to be the lowest common
>>> checkpoint.
>>>> If
>>>>>>> they are all stateless and you end up with long max you can start
>>>> with
>>>>>>> window id that reflects the current timestamp.
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>> On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre <amol@datatorrent.com
>>>>> wrote:
>>>>>>>> CommitWindowId could be computed from the existing checkpoints.
>>>> That
>>>>>>>> solution still needs purge to be done after commitWindowId is
>>>>> confirmed
>>>>>>> to
>>>>>>>> be saved in Stram state. Without ths the commitWindowId
>> computed
>>>> from
>>>>>> the
>>>>>>>> checkpoints may have some checkpoints missing.
>>>>>>>>
>>>>>>>> Thks
>>>>>>>> Amol
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> |
>>>>> Twitter: @*amolhkekre*
>>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>>
>>>>>>>> *Join us at Apex Big Data World-San Jose
>>>>>>>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
>>>>>>>> [image: http://www.apexbigdata.com/san-jose-register.html]
>>>>>>>> <http://www.apexbigdata.com/san-jose-register.html>
>>>>>>>>
>>>>>>>> On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni <
>>>>>> pramod@datatorrent.com
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Can't the commitedWindowId be calculated by looking at the
>>>> physical
>>>>>>> plan
>>>>>>>>> and the existing checkpoints?
>>>>>>>>>
>>>>>>>>> On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi <
>>> tushar@apache.org
>>>>>>> wrote:
>>>>>>>>>> Help Needed for APEXCORE-619
>>>>>>>>>>
>>>>>>>>>> Issue : When application is relaunched after long time with
>>>>>> stateless
>>>>>>>>>> opeartors at the end of the DAG, the stateless operators
>>> starts
>>>>>> with
>>>>>>> a
>>>>>>>>> very
>>>>>>>>>> high windowId. In this case the stateless operator ignors
>> all
>>>> the
>>>>>>> data
>>>>>>>>>> received till upstream operator catches up with it. This
>>> breaks
>>>>> the
>>>>>>>>>> *at-least-once* gaurantee while relaunch of the opeartor or
>>>> when
>>>>>>> master
>>>>>>>>> is
>>>>>>>>>> killed and application is restarted.
>>>>>>>>>>
>>>>>>>>>> Solutions:
>>>>>>>>>> - Fix windowId for stateless leaf operators from upstream
>>>>> opeartor.
>>>>>>> But
>>>>>>>>> it
>>>>>>>>>> has some issues when we have a join with two upstrams
>>> operators
>>>>> at
>>>>>>>>>> different windowId. If we set the windowID to min(upstream
>>>>>> windowId),
>>>>>>>>> then
>>>>>>>>>> we need to again recalulate the new recovery window ids for
>>>>>> upstream
>>>>>>>>> paths
>>>>>>>>>> from this operators.
>>>>>>>>>>
>>>>>>>>>> - Other solution is to create a empty file in checkpoint
>>>>> directory
>>>>>>> for
>>>>>>>>>> stateless operators. This will help us to identify the
>>>>> checkpoints
>>>>>> of
>>>>>>>>>> stateless operators during relaunch instead of computing
>> from
>>>>>> latest
>>>>>>>>>> timestamp.
>>>>>>>>>>
>>>>>>>>>> - Bring the entire DAG to committedWindowId. This could be
>>>>> achived
>>>>>>>> using
>>>>>>>>>> writing committedWindowId in a journal. we need to make
>> sure
>>>> that
>>>>>> we
>>>>>>>> are
>>>>>>>>>> not puring the checkpointed state until the
>> committedWundowId
>>>> is
>>>>>>> saved
>>>>>>>> in
>>>>>>>>>> journal.
>>>>>>>>>>
>>>>>>>>>> Let me know your thoughs on this and preferred solution.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> -Tushar.
>>>>>>>>>>
>>>> --
>>>> *Join us at Apex Big Data World-San Jose
>>>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
>>>> [image: http://www.apexbigdata.com/san-jose-register.html]
>>>>


Re: APEXCORE-619 Recovery windowId in future during application relaunch.

Posted by Thomas Weise <th...@apache.org>.
Good point, that's correct for a stateless leaf operator (operator that
does not have downstream operators). The minimum of upstream checkpoints
can be higher than the last windowId seen by the leaf operator. Although
that is a low probability, because it would mean the time it took for the
checkpoint to become visible in HDFS is less than propagation of endWindow
downstream.

It's also not a problem for an intermediate stateless operator, because the
downstream checkpoint will inform the recovery windowId. Most of the time
stateless operators are intermediate.

Leaf operators are the output operators. I suspect in the original scenario
is was a console output operator? Useful output operators usually won't be
stateless, they have to track state to interact with the external system
correctly. I'm bringing this up for adequate cost/benefit analysis.

In absence of stateful downstream operator, you only have the committed
windowId, which is essentially a checkpointing watermark. On application
restart it has to be recomputed from the checkpoints available, and does
not cover the scenario Tushar reported originally.

Saving committed windowId comes at a cost, it would have to be written to
the journal before operators are notified. Care has been taken to no write
unnecessarily to the journal, as it is blocking I/O and in this case the
frequency depends on the order of arrival of checkpoint notifications from
operators. We also don't want to delay commitedWindow notification, as that
would introduce latency.

Thomas


On Thu, Mar 2, 2017 at 2:10 AM, Bhupesh Chawda <bh...@datatorrent.com>
wrote:

> What if all operators complete first checkpoints but the stateless operator
> could not cross the first checkpoint window, and the DAG crashed.
> If we try to figure out the recovery checkpoint now, we might conclude that
> checkpoint 1 is the point to start and we may miss some data getting
> processed by the stateless operator. Probably in this case at-least once is
> also not guaranteed?
>
> ~ Bhupesh
>
>
> _______________________________________________________
>
> Bhupesh Chawda
>
> E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
>
> www.datatorrent.com  |  apex.apache.org
>
>
>
> On Thu, Mar 2, 2017 at 8:06 AM, Thomas Weise <th...@apache.org> wrote:
>
> > Dummy checkpoints, continuously writing committed window id and the like
> > all introduce overhead that is probably not needed.
> >
> > All the information to derive what we need is likely available and IMO
> the
> > discussion should be on what is the correct way of using it. I will have
> a
> > look when I get to it as well.
> >
> > Thanks,
> > Thomas
> >
> >
> > On Wed, Mar 1, 2017 at 6:29 PM, Sandesh Hegde <sa...@datatorrent.com>
> > wrote:
> >
> > > Instead of treating the stateless operator in a special way and missing
> > > corner cases, just have a dummy checkpoint, then there is no need to
> > handle
> > > corner cases.
> > >
> > > There is a name for this solution,
> > > https://en.wikipedia.org/wiki/Null_Object_pattern
> > >
> > >
> > >
> > > On Wed, Mar 1, 2017 at 2:52 PM Pramod Immaneni <pramod@datatorrent.com
> >
> > > wrote:
> > >
> > > > There is code in various places that deals with stateless operators
> in
> > a
> > > > special way even though a physical checkpoint does not exist on the
> > disk.
> > > > It is probably a matter of applying similar thought process/logic
> > > correctly
> > > > here.
> > > >
> > > > On Wed, Mar 1, 2017 at 2:27 PM, Amol Kekre <am...@datatorrent.com>
> > wrote:
> > > >
> > > > > hmm! the fact that commitWindowId has moved up (right now in memory
> > of
> > > > > Stram) should mean that a complete set of checkpoints are
> available,
> > > i.e
> > > > > commitWindowId can be derived. Lets say that next checkpoint window
> > > also
> > > > > gets checkpointed across the app, commitwindowID is in memory but
> not
> > > > > written to stram-state yet, then upon relaunch the latest
> > > commitwindowID
> > > > > should get computed correctly.
> > > > >
> > > > > This may be just about setting stateless operators to
> commitWindowid
> > on
> > > > > re-launch? aka bug/feature?
> > > > >
> > > > > Thks
> > > > > Amol
> > > > >
> > > > >
> > > > >
> > > > > E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> |
> > Twitter:
> > > > @*amolhkekre*
> > > > >
> > > > > www.datatorrent.com  |  apex.apache.org
> > > > >
> > > > > *Join us at Apex Big Data World-San Jose
> > > > > <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> > > > > [image: http://www.apexbigdata.com/san-jose-register.html]
> > > > > <http://www.apexbigdata.com/san-jose-register.html>
> > > > >
> > > > > On Wed, Mar 1, 2017 at 1:41 PM, Pramod Immaneni <
> > > pramod@datatorrent.com>
> > > > > wrote:
> > > > >
> > > > > > Do we need to save committedWindowId? Can't it be computed from
> > > > existing
> > > > > > checkpoints by walking through the DAG. We probably do this
> anyway
> > > and
> > > > I
> > > > > > suspect there is a minor bug somewhere in there. If an operator
> is
> > > > > > stateless you could assume checkpoint as long max for sake of
> > > > computation
> > > > > > and compute the committed window to be the lowest common
> > checkpoint.
> > > If
> > > > > > they are all stateless and you end up with long max you can start
> > > with
> > > > > > window id that reflects the current timestamp.
> > > > > >
> > > > > > Thanks
> > > > > >
> > > > > > On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre <amol@datatorrent.com
> >
> > > > wrote:
> > > > > >
> > > > > > > CommitWindowId could be computed from the existing checkpoints.
> > > That
> > > > > > > solution still needs purge to be done after commitWindowId is
> > > > confirmed
> > > > > > to
> > > > > > > be saved in Stram state. Without ths the commitWindowId
> computed
> > > from
> > > > > the
> > > > > > > checkpoints may have some checkpoints missing.
> > > > > > >
> > > > > > > Thks
> > > > > > > Amol
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> |
> > > > Twitter: @*amolhkekre*
> > > > > > >
> > > > > > > www.datatorrent.com  |  apex.apache.org
> > > > > > >
> > > > > > > *Join us at Apex Big Data World-San Jose
> > > > > > > <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> > > > > > > [image: http://www.apexbigdata.com/san-jose-register.html]
> > > > > > > <http://www.apexbigdata.com/san-jose-register.html>
> > > > > > >
> > > > > > > On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni <
> > > > > pramod@datatorrent.com
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Can't the commitedWindowId be calculated by looking at the
> > > physical
> > > > > > plan
> > > > > > > > and the existing checkpoints?
> > > > > > > >
> > > > > > > > On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi <
> > tushar@apache.org
> > > >
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Help Needed for APEXCORE-619
> > > > > > > > >
> > > > > > > > > Issue : When application is relaunched after long time with
> > > > > stateless
> > > > > > > > > opeartors at the end of the DAG, the stateless operators
> > starts
> > > > > with
> > > > > > a
> > > > > > > > very
> > > > > > > > > high windowId. In this case the stateless operator ignors
> all
> > > the
> > > > > > data
> > > > > > > > > received till upstream operator catches up with it. This
> > breaks
> > > > the
> > > > > > > > > *at-least-once* gaurantee while relaunch of the opeartor or
> > > when
> > > > > > master
> > > > > > > > is
> > > > > > > > > killed and application is restarted.
> > > > > > > > >
> > > > > > > > > Solutions:
> > > > > > > > > - Fix windowId for stateless leaf operators from upstream
> > > > opeartor.
> > > > > > But
> > > > > > > > it
> > > > > > > > > has some issues when we have a join with two upstrams
> > operators
> > > > at
> > > > > > > > > different windowId. If we set the windowID to min(upstream
> > > > > windowId),
> > > > > > > > then
> > > > > > > > > we need to again recalulate the new recovery window ids for
> > > > > upstream
> > > > > > > > paths
> > > > > > > > > from this operators.
> > > > > > > > >
> > > > > > > > > - Other solution is to create a empty file in checkpoint
> > > > directory
> > > > > > for
> > > > > > > > > stateless operators. This will help us to identify the
> > > > checkpoints
> > > > > of
> > > > > > > > > stateless operators during relaunch instead of computing
> from
> > > > > latest
> > > > > > > > > timestamp.
> > > > > > > > >
> > > > > > > > > - Bring the entire DAG to committedWindowId. This could be
> > > > achived
> > > > > > > using
> > > > > > > > > writing committedWindowId in a journal. we need to make
> sure
> > > that
> > > > > we
> > > > > > > are
> > > > > > > > > not puring the checkpointed state until the
> committedWundowId
> > > is
> > > > > > saved
> > > > > > > in
> > > > > > > > > journal.
> > > > > > > > >
> > > > > > > > > Let me know your thoughs on this and preferred solution.
> > > > > > > > >
> > > > > > > > > Regards,
> > > > > > > > > -Tushar.
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > --
> > > *Join us at Apex Big Data World-San Jose
> > > <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> > > [image: http://www.apexbigdata.com/san-jose-register.html]
> > >
> >
>

Re: APEXCORE-619 Recovery windowId in future during application relaunch.

Posted by Bhupesh Chawda <bh...@datatorrent.com>.
What if all operators complete first checkpoints but the stateless operator
could not cross the first checkpoint window, and the DAG crashed.
If we try to figure out the recovery checkpoint now, we might conclude that
checkpoint 1 is the point to start and we may miss some data getting
processed by the stateless operator. Probably in this case at-least once is
also not guaranteed?

~ Bhupesh


_______________________________________________________

Bhupesh Chawda

E: bhupesh@datatorrent.com | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Thu, Mar 2, 2017 at 8:06 AM, Thomas Weise <th...@apache.org> wrote:

> Dummy checkpoints, continuously writing committed window id and the like
> all introduce overhead that is probably not needed.
>
> All the information to derive what we need is likely available and IMO the
> discussion should be on what is the correct way of using it. I will have a
> look when I get to it as well.
>
> Thanks,
> Thomas
>
>
> On Wed, Mar 1, 2017 at 6:29 PM, Sandesh Hegde <sa...@datatorrent.com>
> wrote:
>
> > Instead of treating the stateless operator in a special way and missing
> > corner cases, just have a dummy checkpoint, then there is no need to
> handle
> > corner cases.
> >
> > There is a name for this solution,
> > https://en.wikipedia.org/wiki/Null_Object_pattern
> >
> >
> >
> > On Wed, Mar 1, 2017 at 2:52 PM Pramod Immaneni <pr...@datatorrent.com>
> > wrote:
> >
> > > There is code in various places that deals with stateless operators in
> a
> > > special way even though a physical checkpoint does not exist on the
> disk.
> > > It is probably a matter of applying similar thought process/logic
> > correctly
> > > here.
> > >
> > > On Wed, Mar 1, 2017 at 2:27 PM, Amol Kekre <am...@datatorrent.com>
> wrote:
> > >
> > > > hmm! the fact that commitWindowId has moved up (right now in memory
> of
> > > > Stram) should mean that a complete set of checkpoints are available,
> > i.e
> > > > commitWindowId can be derived. Lets say that next checkpoint window
> > also
> > > > gets checkpointed across the app, commitwindowID is in memory but not
> > > > written to stram-state yet, then upon relaunch the latest
> > commitwindowID
> > > > should get computed correctly.
> > > >
> > > > This may be just about setting stateless operators to commitWindowid
> on
> > > > re-launch? aka bug/feature?
> > > >
> > > > Thks
> > > > Amol
> > > >
> > > >
> > > >
> > > > E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> |
> Twitter:
> > > @*amolhkekre*
> > > >
> > > > www.datatorrent.com  |  apex.apache.org
> > > >
> > > > *Join us at Apex Big Data World-San Jose
> > > > <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> > > > [image: http://www.apexbigdata.com/san-jose-register.html]
> > > > <http://www.apexbigdata.com/san-jose-register.html>
> > > >
> > > > On Wed, Mar 1, 2017 at 1:41 PM, Pramod Immaneni <
> > pramod@datatorrent.com>
> > > > wrote:
> > > >
> > > > > Do we need to save committedWindowId? Can't it be computed from
> > > existing
> > > > > checkpoints by walking through the DAG. We probably do this anyway
> > and
> > > I
> > > > > suspect there is a minor bug somewhere in there. If an operator is
> > > > > stateless you could assume checkpoint as long max for sake of
> > > computation
> > > > > and compute the committed window to be the lowest common
> checkpoint.
> > If
> > > > > they are all stateless and you end up with long max you can start
> > with
> > > > > window id that reflects the current timestamp.
> > > > >
> > > > > Thanks
> > > > >
> > > > > On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre <am...@datatorrent.com>
> > > wrote:
> > > > >
> > > > > > CommitWindowId could be computed from the existing checkpoints.
> > That
> > > > > > solution still needs purge to be done after commitWindowId is
> > > confirmed
> > > > > to
> > > > > > be saved in Stram state. Without ths the commitWindowId computed
> > from
> > > > the
> > > > > > checkpoints may have some checkpoints missing.
> > > > > >
> > > > > > Thks
> > > > > > Amol
> > > > > >
> > > > > >
> > > > > >
> > > > > > E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> |
> > > Twitter: @*amolhkekre*
> > > > > >
> > > > > > www.datatorrent.com  |  apex.apache.org
> > > > > >
> > > > > > *Join us at Apex Big Data World-San Jose
> > > > > > <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> > > > > > [image: http://www.apexbigdata.com/san-jose-register.html]
> > > > > > <http://www.apexbigdata.com/san-jose-register.html>
> > > > > >
> > > > > > On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni <
> > > > pramod@datatorrent.com
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Can't the commitedWindowId be calculated by looking at the
> > physical
> > > > > plan
> > > > > > > and the existing checkpoints?
> > > > > > >
> > > > > > > On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi <
> tushar@apache.org
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Help Needed for APEXCORE-619
> > > > > > > >
> > > > > > > > Issue : When application is relaunched after long time with
> > > > stateless
> > > > > > > > opeartors at the end of the DAG, the stateless operators
> starts
> > > > with
> > > > > a
> > > > > > > very
> > > > > > > > high windowId. In this case the stateless operator ignors all
> > the
> > > > > data
> > > > > > > > received till upstream operator catches up with it. This
> breaks
> > > the
> > > > > > > > *at-least-once* gaurantee while relaunch of the opeartor or
> > when
> > > > > master
> > > > > > > is
> > > > > > > > killed and application is restarted.
> > > > > > > >
> > > > > > > > Solutions:
> > > > > > > > - Fix windowId for stateless leaf operators from upstream
> > > opeartor.
> > > > > But
> > > > > > > it
> > > > > > > > has some issues when we have a join with two upstrams
> operators
> > > at
> > > > > > > > different windowId. If we set the windowID to min(upstream
> > > > windowId),
> > > > > > > then
> > > > > > > > we need to again recalulate the new recovery window ids for
> > > > upstream
> > > > > > > paths
> > > > > > > > from this operators.
> > > > > > > >
> > > > > > > > - Other solution is to create a empty file in checkpoint
> > > directory
> > > > > for
> > > > > > > > stateless operators. This will help us to identify the
> > > checkpoints
> > > > of
> > > > > > > > stateless operators during relaunch instead of computing from
> > > > latest
> > > > > > > > timestamp.
> > > > > > > >
> > > > > > > > - Bring the entire DAG to committedWindowId. This could be
> > > achived
> > > > > > using
> > > > > > > > writing committedWindowId in a journal. we need to make sure
> > that
> > > > we
> > > > > > are
> > > > > > > > not puring the checkpointed state until the committedWundowId
> > is
> > > > > saved
> > > > > > in
> > > > > > > > journal.
> > > > > > > >
> > > > > > > > Let me know your thoughs on this and preferred solution.
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > -Tushar.
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > --
> > *Join us at Apex Big Data World-San Jose
> > <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> > [image: http://www.apexbigdata.com/san-jose-register.html]
> >
>

Re: APEXCORE-619 Recovery windowId in future during application relaunch.

Posted by Thomas Weise <th...@apache.org>.
Dummy checkpoints, continuously writing committed window id and the like
all introduce overhead that is probably not needed.

All the information to derive what we need is likely available and IMO the
discussion should be on what is the correct way of using it. I will have a
look when I get to it as well.

Thanks,
Thomas


On Wed, Mar 1, 2017 at 6:29 PM, Sandesh Hegde <sa...@datatorrent.com>
wrote:

> Instead of treating the stateless operator in a special way and missing
> corner cases, just have a dummy checkpoint, then there is no need to handle
> corner cases.
>
> There is a name for this solution,
> https://en.wikipedia.org/wiki/Null_Object_pattern
>
>
>
> On Wed, Mar 1, 2017 at 2:52 PM Pramod Immaneni <pr...@datatorrent.com>
> wrote:
>
> > There is code in various places that deals with stateless operators in a
> > special way even though a physical checkpoint does not exist on the disk.
> > It is probably a matter of applying similar thought process/logic
> correctly
> > here.
> >
> > On Wed, Mar 1, 2017 at 2:27 PM, Amol Kekre <am...@datatorrent.com> wrote:
> >
> > > hmm! the fact that commitWindowId has moved up (right now in memory of
> > > Stram) should mean that a complete set of checkpoints are available,
> i.e
> > > commitWindowId can be derived. Lets say that next checkpoint window
> also
> > > gets checkpointed across the app, commitwindowID is in memory but not
> > > written to stram-state yet, then upon relaunch the latest
> commitwindowID
> > > should get computed correctly.
> > >
> > > This may be just about setting stateless operators to commitWindowid on
> > > re-launch? aka bug/feature?
> > >
> > > Thks
> > > Amol
> > >
> > >
> > >
> > > E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> | Twitter:
> > @*amolhkekre*
> > >
> > > www.datatorrent.com  |  apex.apache.org
> > >
> > > *Join us at Apex Big Data World-San Jose
> > > <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> > > [image: http://www.apexbigdata.com/san-jose-register.html]
> > > <http://www.apexbigdata.com/san-jose-register.html>
> > >
> > > On Wed, Mar 1, 2017 at 1:41 PM, Pramod Immaneni <
> pramod@datatorrent.com>
> > > wrote:
> > >
> > > > Do we need to save committedWindowId? Can't it be computed from
> > existing
> > > > checkpoints by walking through the DAG. We probably do this anyway
> and
> > I
> > > > suspect there is a minor bug somewhere in there. If an operator is
> > > > stateless you could assume checkpoint as long max for sake of
> > computation
> > > > and compute the committed window to be the lowest common checkpoint.
> If
> > > > they are all stateless and you end up with long max you can start
> with
> > > > window id that reflects the current timestamp.
> > > >
> > > > Thanks
> > > >
> > > > On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre <am...@datatorrent.com>
> > wrote:
> > > >
> > > > > CommitWindowId could be computed from the existing checkpoints.
> That
> > > > > solution still needs purge to be done after commitWindowId is
> > confirmed
> > > > to
> > > > > be saved in Stram state. Without ths the commitWindowId computed
> from
> > > the
> > > > > checkpoints may have some checkpoints missing.
> > > > >
> > > > > Thks
> > > > > Amol
> > > > >
> > > > >
> > > > >
> > > > > E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> |
> > Twitter: @*amolhkekre*
> > > > >
> > > > > www.datatorrent.com  |  apex.apache.org
> > > > >
> > > > > *Join us at Apex Big Data World-San Jose
> > > > > <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> > > > > [image: http://www.apexbigdata.com/san-jose-register.html]
> > > > > <http://www.apexbigdata.com/san-jose-register.html>
> > > > >
> > > > > On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni <
> > > pramod@datatorrent.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Can't the commitedWindowId be calculated by looking at the
> physical
> > > > plan
> > > > > > and the existing checkpoints?
> > > > > >
> > > > > > On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi <tushar@apache.org
> >
> > > > wrote:
> > > > > >
> > > > > > > Help Needed for APEXCORE-619
> > > > > > >
> > > > > > > Issue : When application is relaunched after long time with
> > > stateless
> > > > > > > opeartors at the end of the DAG, the stateless operators starts
> > > with
> > > > a
> > > > > > very
> > > > > > > high windowId. In this case the stateless operator ignors all
> the
> > > > data
> > > > > > > received till upstream operator catches up with it. This breaks
> > the
> > > > > > > *at-least-once* gaurantee while relaunch of the opeartor or
> when
> > > > master
> > > > > > is
> > > > > > > killed and application is restarted.
> > > > > > >
> > > > > > > Solutions:
> > > > > > > - Fix windowId for stateless leaf operators from upstream
> > opeartor.
> > > > But
> > > > > > it
> > > > > > > has some issues when we have a join with two upstrams operators
> > at
> > > > > > > different windowId. If we set the windowID to min(upstream
> > > windowId),
> > > > > > then
> > > > > > > we need to again recalulate the new recovery window ids for
> > > upstream
> > > > > > paths
> > > > > > > from this operators.
> > > > > > >
> > > > > > > - Other solution is to create a empty file in checkpoint
> > directory
> > > > for
> > > > > > > stateless operators. This will help us to identify the
> > checkpoints
> > > of
> > > > > > > stateless operators during relaunch instead of computing from
> > > latest
> > > > > > > timestamp.
> > > > > > >
> > > > > > > - Bring the entire DAG to committedWindowId. This could be
> > achived
> > > > > using
> > > > > > > writing committedWindowId in a journal. we need to make sure
> that
> > > we
> > > > > are
> > > > > > > not puring the checkpointed state until the committedWundowId
> is
> > > > saved
> > > > > in
> > > > > > > journal.
> > > > > > >
> > > > > > > Let me know your thoughs on this and preferred solution.
> > > > > > >
> > > > > > > Regards,
> > > > > > > -Tushar.
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> --
> *Join us at Apex Big Data World-San Jose
> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> [image: http://www.apexbigdata.com/san-jose-register.html]
>

Re: APEXCORE-619 Recovery windowId in future during application relaunch.

Posted by Sandesh Hegde <sa...@datatorrent.com>.
Instead of treating the stateless operator in a special way and missing
corner cases, just have a dummy checkpoint, then there is no need to handle
corner cases.

There is a name for this solution,
https://en.wikipedia.org/wiki/Null_Object_pattern



On Wed, Mar 1, 2017 at 2:52 PM Pramod Immaneni <pr...@datatorrent.com>
wrote:

> There is code in various places that deals with stateless operators in a
> special way even though a physical checkpoint does not exist on the disk.
> It is probably a matter of applying similar thought process/logic correctly
> here.
>
> On Wed, Mar 1, 2017 at 2:27 PM, Amol Kekre <am...@datatorrent.com> wrote:
>
> > hmm! the fact that commitWindowId has moved up (right now in memory of
> > Stram) should mean that a complete set of checkpoints are available, i.e
> > commitWindowId can be derived. Lets say that next checkpoint window also
> > gets checkpointed across the app, commitwindowID is in memory but not
> > written to stram-state yet, then upon relaunch the latest commitwindowID
> > should get computed correctly.
> >
> > This may be just about setting stateless operators to commitWindowid on
> > re-launch? aka bug/feature?
> >
> > Thks
> > Amol
> >
> >
> >
> > E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> | Twitter:
> @*amolhkekre*
> >
> > www.datatorrent.com  |  apex.apache.org
> >
> > *Join us at Apex Big Data World-San Jose
> > <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> > [image: http://www.apexbigdata.com/san-jose-register.html]
> > <http://www.apexbigdata.com/san-jose-register.html>
> >
> > On Wed, Mar 1, 2017 at 1:41 PM, Pramod Immaneni <pr...@datatorrent.com>
> > wrote:
> >
> > > Do we need to save committedWindowId? Can't it be computed from
> existing
> > > checkpoints by walking through the DAG. We probably do this anyway and
> I
> > > suspect there is a minor bug somewhere in there. If an operator is
> > > stateless you could assume checkpoint as long max for sake of
> computation
> > > and compute the committed window to be the lowest common checkpoint. If
> > > they are all stateless and you end up with long max you can start with
> > > window id that reflects the current timestamp.
> > >
> > > Thanks
> > >
> > > On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre <am...@datatorrent.com>
> wrote:
> > >
> > > > CommitWindowId could be computed from the existing checkpoints. That
> > > > solution still needs purge to be done after commitWindowId is
> confirmed
> > > to
> > > > be saved in Stram state. Without ths the commitWindowId computed from
> > the
> > > > checkpoints may have some checkpoints missing.
> > > >
> > > > Thks
> > > > Amol
> > > >
> > > >
> > > >
> > > > E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> |
> Twitter: @*amolhkekre*
> > > >
> > > > www.datatorrent.com  |  apex.apache.org
> > > >
> > > > *Join us at Apex Big Data World-San Jose
> > > > <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> > > > [image: http://www.apexbigdata.com/san-jose-register.html]
> > > > <http://www.apexbigdata.com/san-jose-register.html>
> > > >
> > > > On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni <
> > pramod@datatorrent.com
> > > >
> > > > wrote:
> > > >
> > > > > Can't the commitedWindowId be calculated by looking at the physical
> > > plan
> > > > > and the existing checkpoints?
> > > > >
> > > > > On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi <tu...@apache.org>
> > > wrote:
> > > > >
> > > > > > Help Needed for APEXCORE-619
> > > > > >
> > > > > > Issue : When application is relaunched after long time with
> > stateless
> > > > > > opeartors at the end of the DAG, the stateless operators starts
> > with
> > > a
> > > > > very
> > > > > > high windowId. In this case the stateless operator ignors all the
> > > data
> > > > > > received till upstream operator catches up with it. This breaks
> the
> > > > > > *at-least-once* gaurantee while relaunch of the opeartor or when
> > > master
> > > > > is
> > > > > > killed and application is restarted.
> > > > > >
> > > > > > Solutions:
> > > > > > - Fix windowId for stateless leaf operators from upstream
> opeartor.
> > > But
> > > > > it
> > > > > > has some issues when we have a join with two upstrams operators
> at
> > > > > > different windowId. If we set the windowID to min(upstream
> > windowId),
> > > > > then
> > > > > > we need to again recalulate the new recovery window ids for
> > upstream
> > > > > paths
> > > > > > from this operators.
> > > > > >
> > > > > > - Other solution is to create a empty file in checkpoint
> directory
> > > for
> > > > > > stateless operators. This will help us to identify the
> checkpoints
> > of
> > > > > > stateless operators during relaunch instead of computing from
> > latest
> > > > > > timestamp.
> > > > > >
> > > > > > - Bring the entire DAG to committedWindowId. This could be
> achived
> > > > using
> > > > > > writing committedWindowId in a journal. we need to make sure that
> > we
> > > > are
> > > > > > not puring the checkpointed state until the committedWundowId is
> > > saved
> > > > in
> > > > > > journal.
> > > > > >
> > > > > > Let me know your thoughs on this and preferred solution.
> > > > > >
> > > > > > Regards,
> > > > > > -Tushar.
> > > > > >
> > > > >
> > > >
> > >
> >
>
-- 
*Join us at Apex Big Data World-San Jose
<http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
[image: http://www.apexbigdata.com/san-jose-register.html]

Re: APEXCORE-619 Recovery windowId in future during application relaunch.

Posted by Pramod Immaneni <pr...@datatorrent.com>.
There is code in various places that deals with stateless operators in a
special way even though a physical checkpoint does not exist on the disk.
It is probably a matter of applying similar thought process/logic correctly
here.

On Wed, Mar 1, 2017 at 2:27 PM, Amol Kekre <am...@datatorrent.com> wrote:

> hmm! the fact that commitWindowId has moved up (right now in memory of
> Stram) should mean that a complete set of checkpoints are available, i.e
> commitWindowId can be derived. Lets say that next checkpoint window also
> gets checkpointed across the app, commitwindowID is in memory but not
> written to stram-state yet, then upon relaunch the latest commitwindowID
> should get computed correctly.
>
> This may be just about setting stateless operators to commitWindowid on
> re-launch? aka bug/feature?
>
> Thks
> Amol
>
>
>
> E:amol@datatorrent.com | M: 510-449-2606 | Twitter: @*amolhkekre*
>
> www.datatorrent.com  |  apex.apache.org
>
> *Join us at Apex Big Data World-San Jose
> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> [image: http://www.apexbigdata.com/san-jose-register.html]
> <http://www.apexbigdata.com/san-jose-register.html>
>
> On Wed, Mar 1, 2017 at 1:41 PM, Pramod Immaneni <pr...@datatorrent.com>
> wrote:
>
> > Do we need to save committedWindowId? Can't it be computed from existing
> > checkpoints by walking through the DAG. We probably do this anyway and I
> > suspect there is a minor bug somewhere in there. If an operator is
> > stateless you could assume checkpoint as long max for sake of computation
> > and compute the committed window to be the lowest common checkpoint. If
> > they are all stateless and you end up with long max you can start with
> > window id that reflects the current timestamp.
> >
> > Thanks
> >
> > On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre <am...@datatorrent.com> wrote:
> >
> > > CommitWindowId could be computed from the existing checkpoints. That
> > > solution still needs purge to be done after commitWindowId is confirmed
> > to
> > > be saved in Stram state. Without ths the commitWindowId computed from
> the
> > > checkpoints may have some checkpoints missing.
> > >
> > > Thks
> > > Amol
> > >
> > >
> > >
> > > E:amol@datatorrent.com | M: 510-449-2606 | Twitter: @*amolhkekre*
> > >
> > > www.datatorrent.com  |  apex.apache.org
> > >
> > > *Join us at Apex Big Data World-San Jose
> > > <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> > > [image: http://www.apexbigdata.com/san-jose-register.html]
> > > <http://www.apexbigdata.com/san-jose-register.html>
> > >
> > > On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni <
> pramod@datatorrent.com
> > >
> > > wrote:
> > >
> > > > Can't the commitedWindowId be calculated by looking at the physical
> > plan
> > > > and the existing checkpoints?
> > > >
> > > > On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi <tu...@apache.org>
> > wrote:
> > > >
> > > > > Help Needed for APEXCORE-619
> > > > >
> > > > > Issue : When application is relaunched after long time with
> stateless
> > > > > opeartors at the end of the DAG, the stateless operators starts
> with
> > a
> > > > very
> > > > > high windowId. In this case the stateless operator ignors all the
> > data
> > > > > received till upstream operator catches up with it. This breaks the
> > > > > *at-least-once* gaurantee while relaunch of the opeartor or when
> > master
> > > > is
> > > > > killed and application is restarted.
> > > > >
> > > > > Solutions:
> > > > > - Fix windowId for stateless leaf operators from upstream opeartor.
> > But
> > > > it
> > > > > has some issues when we have a join with two upstrams operators at
> > > > > different windowId. If we set the windowID to min(upstream
> windowId),
> > > > then
> > > > > we need to again recalulate the new recovery window ids for
> upstream
> > > > paths
> > > > > from this operators.
> > > > >
> > > > > - Other solution is to create a empty file in checkpoint directory
> > for
> > > > > stateless operators. This will help us to identify the checkpoints
> of
> > > > > stateless operators during relaunch instead of computing from
> latest
> > > > > timestamp.
> > > > >
> > > > > - Bring the entire DAG to committedWindowId. This could be achived
> > > using
> > > > > writing committedWindowId in a journal. we need to make sure that
> we
> > > are
> > > > > not puring the checkpointed state until the committedWundowId is
> > saved
> > > in
> > > > > journal.
> > > > >
> > > > > Let me know your thoughs on this and preferred solution.
> > > > >
> > > > > Regards,
> > > > > -Tushar.
> > > > >
> > > >
> > >
> >
>

Re: APEXCORE-619 Recovery windowId in future during application relaunch.

Posted by Amol Kekre <am...@datatorrent.com>.
hmm! the fact that commitWindowId has moved up (right now in memory of
Stram) should mean that a complete set of checkpoints are available, i.e
commitWindowId can be derived. Lets say that next checkpoint window also
gets checkpointed across the app, commitwindowID is in memory but not
written to stram-state yet, then upon relaunch the latest commitwindowID
should get computed correctly.

This may be just about setting stateless operators to commitWindowid on
re-launch? aka bug/feature?

Thks
Amol



E:amol@datatorrent.com | M: 510-449-2606 | Twitter: @*amolhkekre*

www.datatorrent.com  |  apex.apache.org

*Join us at Apex Big Data World-San Jose
<http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
[image: http://www.apexbigdata.com/san-jose-register.html]
<http://www.apexbigdata.com/san-jose-register.html>

On Wed, Mar 1, 2017 at 1:41 PM, Pramod Immaneni <pr...@datatorrent.com>
wrote:

> Do we need to save committedWindowId? Can't it be computed from existing
> checkpoints by walking through the DAG. We probably do this anyway and I
> suspect there is a minor bug somewhere in there. If an operator is
> stateless you could assume checkpoint as long max for sake of computation
> and compute the committed window to be the lowest common checkpoint. If
> they are all stateless and you end up with long max you can start with
> window id that reflects the current timestamp.
>
> Thanks
>
> On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre <am...@datatorrent.com> wrote:
>
> > CommitWindowId could be computed from the existing checkpoints. That
> > solution still needs purge to be done after commitWindowId is confirmed
> to
> > be saved in Stram state. Without ths the commitWindowId computed from the
> > checkpoints may have some checkpoints missing.
> >
> > Thks
> > Amol
> >
> >
> >
> > E:amol@datatorrent.com | M: 510-449-2606 | Twitter: @*amolhkekre*
> >
> > www.datatorrent.com  |  apex.apache.org
> >
> > *Join us at Apex Big Data World-San Jose
> > <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> > [image: http://www.apexbigdata.com/san-jose-register.html]
> > <http://www.apexbigdata.com/san-jose-register.html>
> >
> > On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni <pramod@datatorrent.com
> >
> > wrote:
> >
> > > Can't the commitedWindowId be calculated by looking at the physical
> plan
> > > and the existing checkpoints?
> > >
> > > On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi <tu...@apache.org>
> wrote:
> > >
> > > > Help Needed for APEXCORE-619
> > > >
> > > > Issue : When application is relaunched after long time with stateless
> > > > opeartors at the end of the DAG, the stateless operators starts with
> a
> > > very
> > > > high windowId. In this case the stateless operator ignors all the
> data
> > > > received till upstream operator catches up with it. This breaks the
> > > > *at-least-once* gaurantee while relaunch of the opeartor or when
> master
> > > is
> > > > killed and application is restarted.
> > > >
> > > > Solutions:
> > > > - Fix windowId for stateless leaf operators from upstream opeartor.
> But
> > > it
> > > > has some issues when we have a join with two upstrams operators at
> > > > different windowId. If we set the windowID to min(upstream windowId),
> > > then
> > > > we need to again recalulate the new recovery window ids for upstream
> > > paths
> > > > from this operators.
> > > >
> > > > - Other solution is to create a empty file in checkpoint directory
> for
> > > > stateless operators. This will help us to identify the checkpoints of
> > > > stateless operators during relaunch instead of computing from latest
> > > > timestamp.
> > > >
> > > > - Bring the entire DAG to committedWindowId. This could be achived
> > using
> > > > writing committedWindowId in a journal. we need to make sure that we
> > are
> > > > not puring the checkpointed state until the committedWundowId is
> saved
> > in
> > > > journal.
> > > >
> > > > Let me know your thoughs on this and preferred solution.
> > > >
> > > > Regards,
> > > > -Tushar.
> > > >
> > >
> >
>

Re: APEXCORE-619 Recovery windowId in future during application relaunch.

Posted by Pramod Immaneni <pr...@datatorrent.com>.
Do we need to save committedWindowId? Can't it be computed from existing
checkpoints by walking through the DAG. We probably do this anyway and I
suspect there is a minor bug somewhere in there. If an operator is
stateless you could assume checkpoint as long max for sake of computation
and compute the committed window to be the lowest common checkpoint. If
they are all stateless and you end up with long max you can start with
window id that reflects the current timestamp.

Thanks

On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre <am...@datatorrent.com> wrote:

> CommitWindowId could be computed from the existing checkpoints. That
> solution still needs purge to be done after commitWindowId is confirmed to
> be saved in Stram state. Without ths the commitWindowId computed from the
> checkpoints may have some checkpoints missing.
>
> Thks
> Amol
>
>
>
> E:amol@datatorrent.com | M: 510-449-2606 | Twitter: @*amolhkekre*
>
> www.datatorrent.com  |  apex.apache.org
>
> *Join us at Apex Big Data World-San Jose
> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> [image: http://www.apexbigdata.com/san-jose-register.html]
> <http://www.apexbigdata.com/san-jose-register.html>
>
> On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni <pr...@datatorrent.com>
> wrote:
>
> > Can't the commitedWindowId be calculated by looking at the physical plan
> > and the existing checkpoints?
> >
> > On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi <tu...@apache.org> wrote:
> >
> > > Help Needed for APEXCORE-619
> > >
> > > Issue : When application is relaunched after long time with stateless
> > > opeartors at the end of the DAG, the stateless operators starts with a
> > very
> > > high windowId. In this case the stateless operator ignors all the data
> > > received till upstream operator catches up with it. This breaks the
> > > *at-least-once* gaurantee while relaunch of the opeartor or when master
> > is
> > > killed and application is restarted.
> > >
> > > Solutions:
> > > - Fix windowId for stateless leaf operators from upstream opeartor. But
> > it
> > > has some issues when we have a join with two upstrams operators at
> > > different windowId. If we set the windowID to min(upstream windowId),
> > then
> > > we need to again recalulate the new recovery window ids for upstream
> > paths
> > > from this operators.
> > >
> > > - Other solution is to create a empty file in checkpoint directory for
> > > stateless operators. This will help us to identify the checkpoints of
> > > stateless operators during relaunch instead of computing from latest
> > > timestamp.
> > >
> > > - Bring the entire DAG to committedWindowId. This could be achived
> using
> > > writing committedWindowId in a journal. we need to make sure that we
> are
> > > not puring the checkpointed state until the committedWundowId is saved
> in
> > > journal.
> > >
> > > Let me know your thoughs on this and preferred solution.
> > >
> > > Regards,
> > > -Tushar.
> > >
> >
>

Re: APEXCORE-619 Recovery windowId in future during application relaunch.

Posted by Amol Kekre <am...@datatorrent.com>.
CommitWindowId could be computed from the existing checkpoints. That
solution still needs purge to be done after commitWindowId is confirmed to
be saved in Stram state. Without ths the commitWindowId computed from the
checkpoints may have some checkpoints missing.

Thks
Amol



E:amol@datatorrent.com | M: 510-449-2606 | Twitter: @*amolhkekre*

www.datatorrent.com  |  apex.apache.org

*Join us at Apex Big Data World-San Jose
<http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
[image: http://www.apexbigdata.com/san-jose-register.html]
<http://www.apexbigdata.com/san-jose-register.html>

On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni <pr...@datatorrent.com>
wrote:

> Can't the commitedWindowId be calculated by looking at the physical plan
> and the existing checkpoints?
>
> On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi <tu...@apache.org> wrote:
>
> > Help Needed for APEXCORE-619
> >
> > Issue : When application is relaunched after long time with stateless
> > opeartors at the end of the DAG, the stateless operators starts with a
> very
> > high windowId. In this case the stateless operator ignors all the data
> > received till upstream operator catches up with it. This breaks the
> > *at-least-once* gaurantee while relaunch of the opeartor or when master
> is
> > killed and application is restarted.
> >
> > Solutions:
> > - Fix windowId for stateless leaf operators from upstream opeartor. But
> it
> > has some issues when we have a join with two upstrams operators at
> > different windowId. If we set the windowID to min(upstream windowId),
> then
> > we need to again recalulate the new recovery window ids for upstream
> paths
> > from this operators.
> >
> > - Other solution is to create a empty file in checkpoint directory for
> > stateless operators. This will help us to identify the checkpoints of
> > stateless operators during relaunch instead of computing from latest
> > timestamp.
> >
> > - Bring the entire DAG to committedWindowId. This could be achived using
> > writing committedWindowId in a journal. we need to make sure that we are
> > not puring the checkpointed state until the committedWundowId is saved in
> > journal.
> >
> > Let me know your thoughs on this and preferred solution.
> >
> > Regards,
> > -Tushar.
> >
>

Re: APEXCORE-619 Recovery windowId in future during application relaunch.

Posted by Pramod Immaneni <pr...@datatorrent.com>.
Can't the commitedWindowId be calculated by looking at the physical plan
and the existing checkpoints?

On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi <tu...@apache.org> wrote:

> Help Needed for APEXCORE-619
>
> Issue : When application is relaunched after long time with stateless
> opeartors at the end of the DAG, the stateless operators starts with a very
> high windowId. In this case the stateless operator ignors all the data
> received till upstream operator catches up with it. This breaks the
> *at-least-once* gaurantee while relaunch of the opeartor or when master is
> killed and application is restarted.
>
> Solutions:
> - Fix windowId for stateless leaf operators from upstream opeartor. But it
> has some issues when we have a join with two upstrams operators at
> different windowId. If we set the windowID to min(upstream windowId), then
> we need to again recalulate the new recovery window ids for upstream paths
> from this operators.
>
> - Other solution is to create a empty file in checkpoint directory for
> stateless operators. This will help us to identify the checkpoints of
> stateless operators during relaunch instead of computing from latest
> timestamp.
>
> - Bring the entire DAG to committedWindowId. This could be achived using
> writing committedWindowId in a journal. we need to make sure that we are
> not puring the checkpointed state until the committedWundowId is saved in
> journal.
>
> Let me know your thoughs on this and preferred solution.
>
> Regards,
> -Tushar.
>