You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by David Yan <da...@datatorrent.com> on 2015/09/16 22:44:26 UTC

Supporting iterations in Apex

Hi all,

One current disadvantage of Apex is the inability to do iterations and
machine learning algorithms because we don't allow loops in the application
DAG (hence the name DAG).  I am proposing that we allow loops in the DAG if
the loop advances the window ID by a configured amount.  A JIRA ticket has
been created:

https://malhar.atlassian.net/browse/APEX-60

I have started this work in my fork at
https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.

The current progress is that a simple test case works.  Major work still
needs to be done with respect to recovery and partitioning.

The value ITERATION_WINDOW_COUNT is an attribute to an input port of an
operator.  If the value of the attribute is greater than or equal to 1, any
tuples sent to the input port are treated to be ITERATION_WINDOW_COUNT
windows ahead of what they are.

For recovery, we will need to checkpoint all the tuples between ports with
the to replay the looped tuples.  During the recovery, if the operator has
an input port, with ITERATION_WINDOW_COUNT=2, is recovering from checkpoint
window 14, the tuples for that input port from window 13 and window 14 need
to be replayed to be treated as window 15 and window 16 respectively (13+2
and 14+2).

In other words, we need to store all the tuples from window with ID
committedWindowId minus ITERATION_WINDOW_COUNT for recovery and purge the
tuples earlier than that window.
We can optimize this by only storing the tuples for ITERATION_WINDOW_COUNT
windows prior to any checkpoint.

For that, we need a storage mechanism for the tuples.  Chandni already has
something that fits this usage case in Apex Malhar.  The class is
IdempotentStorageManager.  In order for this to be used in Apex core, we
need to deprecate the class in Apex Malhar and move it to Apex Core.

A JIRA ticket has been created for this particular work:

https://malhar.atlassian.net/browse/APEX-128

Some of the above has been discussed among Thomas, Chetan, Chandni, and
myself.

For partitioning, we have not started any discussion or brainstorming.  We
appreciate any feedback on this and any other aspect related to supporting
iterations in general.

Thanks!

David

Re: Supporting iterations in Apex

Posted by Chetan Narsude <ch...@datatorrent.com>.
Thanks Ram, couldn't say it better.


Either way is good but let's make sure that we do not mix and match.

--
Chetan



On Wed, Sep 16, 2015 at 6:19 PM, Munagala Ramanath <ra...@datatorrent.com>
wrote:

> Both are right: It's a difference between an operator-centric view and a
> window-centric
> view:
>
> A is 5 windows behind B but _window_  10 is 5 windows ahead of window 15 !
>
> Ram
>
> On Wed, Sep 16, 2015 at 5:59 PM, David Yan <da...@datatorrent.com> wrote:
>
> > Chetan,
> >
> > Not important but with respect to "ahead window" terminology, when
> operator
> > A is processing window 10 and operator B is processing 15, wouldn't you
> say
> > operator A is 5 windows *behind* B?
> >
> > David
> >
> > On Wed, Sep 16, 2015 at 2:21 PM, Chetan Narsude <ch...@datatorrent.com>
> > wrote:
> >
> > > David,
> > >
> > >  I have 3 comments:
> > >
> > > 1. The "ahead window" phrase you discussed above is really behind
> window.
> > > With Apex, the windows which are ahead are the windows with smaller
> > window
> > > Id. smaller window ids are followed by bigger window ids.
> > >
> > > 2.  ITERATION_WINDOW_COUNT sounds like a misnomer. IMO, It  should be
> > > something akin to DELAY_BY_WINDOW_COUNT as you are delaying the events
> by
> > > those many windows. You are not iterating over them as many times. It
> > also
> > > resonates with PortContext.SLIDE_BY_WINDOW_COUNT
> > >
> > > 3. Deduper has similar requirement where large amount of data
> > (potentially
> > > even larger) needs to be partitioned. You can borrow the idea/code from
> > > there. And perhaps abstract the code to be reusable.
> > >
> > > HTH.
> > >
> > > --
> > > Chetan
> > >
> > > On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com>
> > wrote:
> > >
> > > > Hi all,
> > > >
> > > > One current disadvantage of Apex is the inability to do iterations
> and
> > > > machine learning algorithms because we don't allow loops in the
> > > application
> > > > DAG (hence the name DAG).  I am proposing that we allow loops in the
> > DAG
> > > if
> > > > the loop advances the window ID by a configured amount.  A JIRA
> ticket
> > > has
> > > > been created:
> > > >
> > > > https://malhar.atlassian.net/browse/APEX-60
> > > >
> > > > I have started this work in my fork at
> > > > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
> > > >
> > > > The current progress is that a simple test case works.  Major work
> > still
> > > > needs to be done with respect to recovery and partitioning.
> > > >
> > > > The value ITERATION_WINDOW_COUNT is an attribute to an input port of
> an
> > > > operator.  If the value of the attribute is greater than or equal to
> 1,
> > > any
> > > > tuples sent to the input port are treated to be
> ITERATION_WINDOW_COUNT
> > > > windows ahead of what they are.
> > > >
> > > > For recovery, we will need to checkpoint all the tuples between ports
> > > with
> > > > the to replay the looped tuples.  During the recovery, if the
> operator
> > > has
> > > > an input port, with ITERATION_WINDOW_COUNT=2, is recovering from
> > > checkpoint
> > > > window 14, the tuples for that input port from window 13 and window
> 14
> > > need
> > > > to be replayed to be treated as window 15 and window 16 respectively
> > > (13+2
> > > > and 14+2).
> > > >
> > > > In other words, we need to store all the tuples from window with ID
> > > > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and purge
> > the
> > > > tuples earlier than that window.
> > > > We can optimize this by only storing the tuples for
> > > ITERATION_WINDOW_COUNT
> > > > windows prior to any checkpoint.
> > > >
> > > > For that, we need a storage mechanism for the tuples.  Chandni
> already
> > > has
> > > > something that fits this usage case in Apex Malhar.  The class is
> > > > IdempotentStorageManager.  In order for this to be used in Apex core,
> > we
> > > > need to deprecate the class in Apex Malhar and move it to Apex Core.
> > > >
> > > > A JIRA ticket has been created for this particular work:
> > > >
> > > > https://malhar.atlassian.net/browse/APEX-128
> > > >
> > > > Some of the above has been discussed among Thomas, Chetan, Chandni,
> and
> > > > myself.
> > > >
> > > > For partitioning, we have not started any discussion or
> brainstorming.
> > > We
> > > > appreciate any feedback on this and any other aspect related to
> > > supporting
> > > > iterations in general.
> > > >
> > > > Thanks!
> > > >
> > > > David
> > > >
> > >
> >
>

Re: Supporting iterations in Apex

Posted by Munagala Ramanath <ra...@datatorrent.com>.
Both are right: It's a difference between an operator-centric view and a
window-centric
view:

A is 5 windows behind B but _window_  10 is 5 windows ahead of window 15 !

Ram

On Wed, Sep 16, 2015 at 5:59 PM, David Yan <da...@datatorrent.com> wrote:

> Chetan,
>
> Not important but with respect to "ahead window" terminology, when operator
> A is processing window 10 and operator B is processing 15, wouldn't you say
> operator A is 5 windows *behind* B?
>
> David
>
> On Wed, Sep 16, 2015 at 2:21 PM, Chetan Narsude <ch...@datatorrent.com>
> wrote:
>
> > David,
> >
> >  I have 3 comments:
> >
> > 1. The "ahead window" phrase you discussed above is really behind window.
> > With Apex, the windows which are ahead are the windows with smaller
> window
> > Id. smaller window ids are followed by bigger window ids.
> >
> > 2.  ITERATION_WINDOW_COUNT sounds like a misnomer. IMO, It  should be
> > something akin to DELAY_BY_WINDOW_COUNT as you are delaying the events by
> > those many windows. You are not iterating over them as many times. It
> also
> > resonates with PortContext.SLIDE_BY_WINDOW_COUNT
> >
> > 3. Deduper has similar requirement where large amount of data
> (potentially
> > even larger) needs to be partitioned. You can borrow the idea/code from
> > there. And perhaps abstract the code to be reusable.
> >
> > HTH.
> >
> > --
> > Chetan
> >
> > On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com>
> wrote:
> >
> > > Hi all,
> > >
> > > One current disadvantage of Apex is the inability to do iterations and
> > > machine learning algorithms because we don't allow loops in the
> > application
> > > DAG (hence the name DAG).  I am proposing that we allow loops in the
> DAG
> > if
> > > the loop advances the window ID by a configured amount.  A JIRA ticket
> > has
> > > been created:
> > >
> > > https://malhar.atlassian.net/browse/APEX-60
> > >
> > > I have started this work in my fork at
> > > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
> > >
> > > The current progress is that a simple test case works.  Major work
> still
> > > needs to be done with respect to recovery and partitioning.
> > >
> > > The value ITERATION_WINDOW_COUNT is an attribute to an input port of an
> > > operator.  If the value of the attribute is greater than or equal to 1,
> > any
> > > tuples sent to the input port are treated to be ITERATION_WINDOW_COUNT
> > > windows ahead of what they are.
> > >
> > > For recovery, we will need to checkpoint all the tuples between ports
> > with
> > > the to replay the looped tuples.  During the recovery, if the operator
> > has
> > > an input port, with ITERATION_WINDOW_COUNT=2, is recovering from
> > checkpoint
> > > window 14, the tuples for that input port from window 13 and window 14
> > need
> > > to be replayed to be treated as window 15 and window 16 respectively
> > (13+2
> > > and 14+2).
> > >
> > > In other words, we need to store all the tuples from window with ID
> > > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and purge
> the
> > > tuples earlier than that window.
> > > We can optimize this by only storing the tuples for
> > ITERATION_WINDOW_COUNT
> > > windows prior to any checkpoint.
> > >
> > > For that, we need a storage mechanism for the tuples.  Chandni already
> > has
> > > something that fits this usage case in Apex Malhar.  The class is
> > > IdempotentStorageManager.  In order for this to be used in Apex core,
> we
> > > need to deprecate the class in Apex Malhar and move it to Apex Core.
> > >
> > > A JIRA ticket has been created for this particular work:
> > >
> > > https://malhar.atlassian.net/browse/APEX-128
> > >
> > > Some of the above has been discussed among Thomas, Chetan, Chandni, and
> > > myself.
> > >
> > > For partitioning, we have not started any discussion or brainstorming.
> > We
> > > appreciate any feedback on this and any other aspect related to
> > supporting
> > > iterations in general.
> > >
> > > Thanks!
> > >
> > > David
> > >
> >
>

Re: Supporting iterations in Apex

Posted by David Yan <da...@datatorrent.com>.
Chetan,

Not important but with respect to "ahead window" terminology, when operator
A is processing window 10 and operator B is processing 15, wouldn't you say
operator A is 5 windows *behind* B?

David

On Wed, Sep 16, 2015 at 2:21 PM, Chetan Narsude <ch...@datatorrent.com>
wrote:

> David,
>
>  I have 3 comments:
>
> 1. The "ahead window" phrase you discussed above is really behind window.
> With Apex, the windows which are ahead are the windows with smaller window
> Id. smaller window ids are followed by bigger window ids.
>
> 2.  ITERATION_WINDOW_COUNT sounds like a misnomer. IMO, It  should be
> something akin to DELAY_BY_WINDOW_COUNT as you are delaying the events by
> those many windows. You are not iterating over them as many times. It also
> resonates with PortContext.SLIDE_BY_WINDOW_COUNT
>
> 3. Deduper has similar requirement where large amount of data (potentially
> even larger) needs to be partitioned. You can borrow the idea/code from
> there. And perhaps abstract the code to be reusable.
>
> HTH.
>
> --
> Chetan
>
> On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com> wrote:
>
> > Hi all,
> >
> > One current disadvantage of Apex is the inability to do iterations and
> > machine learning algorithms because we don't allow loops in the
> application
> > DAG (hence the name DAG).  I am proposing that we allow loops in the DAG
> if
> > the loop advances the window ID by a configured amount.  A JIRA ticket
> has
> > been created:
> >
> > https://malhar.atlassian.net/browse/APEX-60
> >
> > I have started this work in my fork at
> > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
> >
> > The current progress is that a simple test case works.  Major work still
> > needs to be done with respect to recovery and partitioning.
> >
> > The value ITERATION_WINDOW_COUNT is an attribute to an input port of an
> > operator.  If the value of the attribute is greater than or equal to 1,
> any
> > tuples sent to the input port are treated to be ITERATION_WINDOW_COUNT
> > windows ahead of what they are.
> >
> > For recovery, we will need to checkpoint all the tuples between ports
> with
> > the to replay the looped tuples.  During the recovery, if the operator
> has
> > an input port, with ITERATION_WINDOW_COUNT=2, is recovering from
> checkpoint
> > window 14, the tuples for that input port from window 13 and window 14
> need
> > to be replayed to be treated as window 15 and window 16 respectively
> (13+2
> > and 14+2).
> >
> > In other words, we need to store all the tuples from window with ID
> > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and purge the
> > tuples earlier than that window.
> > We can optimize this by only storing the tuples for
> ITERATION_WINDOW_COUNT
> > windows prior to any checkpoint.
> >
> > For that, we need a storage mechanism for the tuples.  Chandni already
> has
> > something that fits this usage case in Apex Malhar.  The class is
> > IdempotentStorageManager.  In order for this to be used in Apex core, we
> > need to deprecate the class in Apex Malhar and move it to Apex Core.
> >
> > A JIRA ticket has been created for this particular work:
> >
> > https://malhar.atlassian.net/browse/APEX-128
> >
> > Some of the above has been discussed among Thomas, Chetan, Chandni, and
> > myself.
> >
> > For partitioning, we have not started any discussion or brainstorming.
> We
> > appreciate any feedback on this and any other aspect related to
> supporting
> > iterations in general.
> >
> > Thanks!
> >
> > David
> >
>

Re: Supporting iterations in Apex

Posted by David Yan <da...@datatorrent.com>.
Using the buffer server of a regular input port will not work since the
upstream may also go down and the tuples of the previous window(s) won't be
able to be replayed.  Good thing is, we only need to store the tuples of
the ITERATION_WINDOW_OFFSET windows prior to a checkpoint, not all the
windows, and in most cases the ITERATION_WINDOW_OFFSET is 1.

David

On Thu, Sep 17, 2015 at 8:25 AM, Thomas Weise <th...@datatorrent.com>
wrote:

> An additional idea for the data backup. For any other port, we have
> upstream backup through buffer server (logically the abstraction is a WAL
> to enable replay). In case of the iteration port the is no upstream buffer
> server and hence the idea is to implement the WAL backed by HDFS.
>
> But there are one or many upstream buffer servers as any operator with an
> iteration port will also have at least one regular port. So could we use
> that buffer server to keep the data for the iteration port as well?
>
> BTW with regard to loop in the DAG, this depends on how you look at it.
> Considering the data (windows), it is not a loop. When you look at the
> stream connections, then it looks like a loop. The iteration port is like a
> pump, water is moved back upstream but it only flows downstream.
>
> On Wed, Sep 16, 2015 at 5:07 PM, Chandni Singh <ch...@datatorrent.com>
> wrote:
>
> > My comments:
> >
> > 1)
> > As I understand IdempotentStorageManager satisfies this use-case. It has
> > been used with operators which are dynamically partition-able and has
> been
> > integrated with various operators in Malhar to make them idempotent.
> >
> > So IMO we should not start building another version of recovery mechanism
> > in Apex.
> >
> > Once we have hammered out the details of the WAL abstraction (
> > https://malhar.atlassian.net/browse/APEX-99), we can deprecate
> > IdempotentStorageManager and use that.
> >
> > 2)
> > If we use IdempotentStorageManager here, it will give us  a better
> > understanding of its limitations and therefore help us with the WAL
> > abstraction.
> >
> > Thanks,
> > Chandni
> >
> >
> > On Wed, Sep 16, 2015 at 4:42 PM, David Yan <da...@datatorrent.com>
> wrote:
> >
> > > Thanks Chetan.
> > >
> > > Can you point me to the location of Deduper code that may be helpful
> with
> > > the recovery implementation?
> > >
> > > Does anyone have any opinion on the renaming of ITERATION_WINDOW_COUNT?
> > > DELAY_BY_WINDOW_COUNT? DELAY_WINDOW_COUNT?
> > >
> > > David
> > >
> > > On Wed, Sep 16, 2015 at 2:21 PM, Chetan Narsude <
> chetan@datatorrent.com>
> > > wrote:
> > >
> > > > David,
> > > >
> > > >  I have 3 comments:
> > > >
> > > > 1. The "ahead window" phrase you discussed above is really behind
> > window.
> > > > With Apex, the windows which are ahead are the windows with smaller
> > > window
> > > > Id. smaller window ids are followed by bigger window ids.
> > > >
> > > > 2.  ITERATION_WINDOW_COUNT sounds like a misnomer. IMO, It  should be
> > > > something akin to DELAY_BY_WINDOW_COUNT as you are delaying the
> events
> > by
> > > > those many windows. You are not iterating over them as many times. It
> > > also
> > > > resonates with PortContext.SLIDE_BY_WINDOW_COUNT
> > > >
> > > > 3. Deduper has similar requirement where large amount of data
> > > (potentially
> > > > even larger) needs to be partitioned. You can borrow the idea/code
> from
> > > > there. And perhaps abstract the code to be reusable.
> > > >
> > > > HTH.
> > > >
> > > > --
> > > > Chetan
> > > >
> > > > On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com>
> > > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > One current disadvantage of Apex is the inability to do iterations
> > and
> > > > > machine learning algorithms because we don't allow loops in the
> > > > application
> > > > > DAG (hence the name DAG).  I am proposing that we allow loops in
> the
> > > DAG
> > > > if
> > > > > the loop advances the window ID by a configured amount.  A JIRA
> > ticket
> > > > has
> > > > > been created:
> > > > >
> > > > > https://malhar.atlassian.net/browse/APEX-60
> > > > >
> > > > > I have started this work in my fork at
> > > > > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
> > > > >
> > > > > The current progress is that a simple test case works.  Major work
> > > still
> > > > > needs to be done with respect to recovery and partitioning.
> > > > >
> > > > > The value ITERATION_WINDOW_COUNT is an attribute to an input port
> of
> > an
> > > > > operator.  If the value of the attribute is greater than or equal
> to
> > 1,
> > > > any
> > > > > tuples sent to the input port are treated to be
> > ITERATION_WINDOW_COUNT
> > > > > windows ahead of what they are.
> > > > >
> > > > > For recovery, we will need to checkpoint all the tuples between
> ports
> > > > with
> > > > > the to replay the looped tuples.  During the recovery, if the
> > operator
> > > > has
> > > > > an input port, with ITERATION_WINDOW_COUNT=2, is recovering from
> > > > checkpoint
> > > > > window 14, the tuples for that input port from window 13 and window
> > 14
> > > > need
> > > > > to be replayed to be treated as window 15 and window 16
> respectively
> > > > (13+2
> > > > > and 14+2).
> > > > >
> > > > > In other words, we need to store all the tuples from window with ID
> > > > > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and
> purge
> > > the
> > > > > tuples earlier than that window.
> > > > > We can optimize this by only storing the tuples for
> > > > ITERATION_WINDOW_COUNT
> > > > > windows prior to any checkpoint.
> > > > >
> > > > > For that, we need a storage mechanism for the tuples.  Chandni
> > already
> > > > has
> > > > > something that fits this usage case in Apex Malhar.  The class is
> > > > > IdempotentStorageManager.  In order for this to be used in Apex
> core,
> > > we
> > > > > need to deprecate the class in Apex Malhar and move it to Apex
> Core.
> > > > >
> > > > > A JIRA ticket has been created for this particular work:
> > > > >
> > > > > https://malhar.atlassian.net/browse/APEX-128
> > > > >
> > > > > Some of the above has been discussed among Thomas, Chetan, Chandni,
> > and
> > > > > myself.
> > > > >
> > > > > For partitioning, we have not started any discussion or
> > brainstorming.
> > > > We
> > > > > appreciate any feedback on this and any other aspect related to
> > > > supporting
> > > > > iterations in general.
> > > > >
> > > > > Thanks!
> > > > >
> > > > > David
> > > > >
> > > >
> > >
> >
>

Re: Supporting iterations in Apex

Posted by Amol Kekre <am...@datatorrent.com>.
yes. the iteration operator has to looked at as un-looping the DAG, i.e. we
get back to "acyclic" graph.

Amol

On Thu, Sep 17, 2015 at 8:25 AM, Thomas Weise <th...@datatorrent.com>
wrote:

> An additional idea for the data backup. For any other port, we have
> upstream backup through buffer server (logically the abstraction is a WAL
> to enable replay). In case of the iteration port the is no upstream buffer
> server and hence the idea is to implement the WAL backed by HDFS.
>
> But there are one or many upstream buffer servers as any operator with an
> iteration port will also have at least one regular port. So could we use
> that buffer server to keep the data for the iteration port as well?
>
> BTW with regard to loop in the DAG, this depends on how you look at it.
> Considering the data (windows), it is not a loop. When you look at the
> stream connections, then it looks like a loop. The iteration port is like a
> pump, water is moved back upstream but it only flows downstream.
>
> On Wed, Sep 16, 2015 at 5:07 PM, Chandni Singh <ch...@datatorrent.com>
> wrote:
>
> > My comments:
> >
> > 1)
> > As I understand IdempotentStorageManager satisfies this use-case. It has
> > been used with operators which are dynamically partition-able and has
> been
> > integrated with various operators in Malhar to make them idempotent.
> >
> > So IMO we should not start building another version of recovery mechanism
> > in Apex.
> >
> > Once we have hammered out the details of the WAL abstraction (
> > https://malhar.atlassian.net/browse/APEX-99), we can deprecate
> > IdempotentStorageManager and use that.
> >
> > 2)
> > If we use IdempotentStorageManager here, it will give us  a better
> > understanding of its limitations and therefore help us with the WAL
> > abstraction.
> >
> > Thanks,
> > Chandni
> >
> >
> > On Wed, Sep 16, 2015 at 4:42 PM, David Yan <da...@datatorrent.com>
> wrote:
> >
> > > Thanks Chetan.
> > >
> > > Can you point me to the location of Deduper code that may be helpful
> with
> > > the recovery implementation?
> > >
> > > Does anyone have any opinion on the renaming of ITERATION_WINDOW_COUNT?
> > > DELAY_BY_WINDOW_COUNT? DELAY_WINDOW_COUNT?
> > >
> > > David
> > >
> > > On Wed, Sep 16, 2015 at 2:21 PM, Chetan Narsude <
> chetan@datatorrent.com>
> > > wrote:
> > >
> > > > David,
> > > >
> > > >  I have 3 comments:
> > > >
> > > > 1. The "ahead window" phrase you discussed above is really behind
> > window.
> > > > With Apex, the windows which are ahead are the windows with smaller
> > > window
> > > > Id. smaller window ids are followed by bigger window ids.
> > > >
> > > > 2.  ITERATION_WINDOW_COUNT sounds like a misnomer. IMO, It  should be
> > > > something akin to DELAY_BY_WINDOW_COUNT as you are delaying the
> events
> > by
> > > > those many windows. You are not iterating over them as many times. It
> > > also
> > > > resonates with PortContext.SLIDE_BY_WINDOW_COUNT
> > > >
> > > > 3. Deduper has similar requirement where large amount of data
> > > (potentially
> > > > even larger) needs to be partitioned. You can borrow the idea/code
> from
> > > > there. And perhaps abstract the code to be reusable.
> > > >
> > > > HTH.
> > > >
> > > > --
> > > > Chetan
> > > >
> > > > On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com>
> > > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > One current disadvantage of Apex is the inability to do iterations
> > and
> > > > > machine learning algorithms because we don't allow loops in the
> > > > application
> > > > > DAG (hence the name DAG).  I am proposing that we allow loops in
> the
> > > DAG
> > > > if
> > > > > the loop advances the window ID by a configured amount.  A JIRA
> > ticket
> > > > has
> > > > > been created:
> > > > >
> > > > > https://malhar.atlassian.net/browse/APEX-60
> > > > >
> > > > > I have started this work in my fork at
> > > > > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
> > > > >
> > > > > The current progress is that a simple test case works.  Major work
> > > still
> > > > > needs to be done with respect to recovery and partitioning.
> > > > >
> > > > > The value ITERATION_WINDOW_COUNT is an attribute to an input port
> of
> > an
> > > > > operator.  If the value of the attribute is greater than or equal
> to
> > 1,
> > > > any
> > > > > tuples sent to the input port are treated to be
> > ITERATION_WINDOW_COUNT
> > > > > windows ahead of what they are.
> > > > >
> > > > > For recovery, we will need to checkpoint all the tuples between
> ports
> > > > with
> > > > > the to replay the looped tuples.  During the recovery, if the
> > operator
> > > > has
> > > > > an input port, with ITERATION_WINDOW_COUNT=2, is recovering from
> > > > checkpoint
> > > > > window 14, the tuples for that input port from window 13 and window
> > 14
> > > > need
> > > > > to be replayed to be treated as window 15 and window 16
> respectively
> > > > (13+2
> > > > > and 14+2).
> > > > >
> > > > > In other words, we need to store all the tuples from window with ID
> > > > > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and
> purge
> > > the
> > > > > tuples earlier than that window.
> > > > > We can optimize this by only storing the tuples for
> > > > ITERATION_WINDOW_COUNT
> > > > > windows prior to any checkpoint.
> > > > >
> > > > > For that, we need a storage mechanism for the tuples.  Chandni
> > already
> > > > has
> > > > > something that fits this usage case in Apex Malhar.  The class is
> > > > > IdempotentStorageManager.  In order for this to be used in Apex
> core,
> > > we
> > > > > need to deprecate the class in Apex Malhar and move it to Apex
> Core.
> > > > >
> > > > > A JIRA ticket has been created for this particular work:
> > > > >
> > > > > https://malhar.atlassian.net/browse/APEX-128
> > > > >
> > > > > Some of the above has been discussed among Thomas, Chetan, Chandni,
> > and
> > > > > myself.
> > > > >
> > > > > For partitioning, we have not started any discussion or
> > brainstorming.
> > > > We
> > > > > appreciate any feedback on this and any other aspect related to
> > > > supporting
> > > > > iterations in general.
> > > > >
> > > > > Thanks!
> > > > >
> > > > > David
> > > > >
> > > >
> > >
> >
>

Re: Supporting iterations in Apex

Posted by Thomas Weise <th...@datatorrent.com>.
An additional idea for the data backup. For any other port, we have
upstream backup through buffer server (logically the abstraction is a WAL
to enable replay). In case of the iteration port the is no upstream buffer
server and hence the idea is to implement the WAL backed by HDFS.

But there are one or many upstream buffer servers as any operator with an
iteration port will also have at least one regular port. So could we use
that buffer server to keep the data for the iteration port as well?

BTW with regard to loop in the DAG, this depends on how you look at it.
Considering the data (windows), it is not a loop. When you look at the
stream connections, then it looks like a loop. The iteration port is like a
pump, water is moved back upstream but it only flows downstream.

On Wed, Sep 16, 2015 at 5:07 PM, Chandni Singh <ch...@datatorrent.com>
wrote:

> My comments:
>
> 1)
> As I understand IdempotentStorageManager satisfies this use-case. It has
> been used with operators which are dynamically partition-able and has been
> integrated with various operators in Malhar to make them idempotent.
>
> So IMO we should not start building another version of recovery mechanism
> in Apex.
>
> Once we have hammered out the details of the WAL abstraction (
> https://malhar.atlassian.net/browse/APEX-99), we can deprecate
> IdempotentStorageManager and use that.
>
> 2)
> If we use IdempotentStorageManager here, it will give us  a better
> understanding of its limitations and therefore help us with the WAL
> abstraction.
>
> Thanks,
> Chandni
>
>
> On Wed, Sep 16, 2015 at 4:42 PM, David Yan <da...@datatorrent.com> wrote:
>
> > Thanks Chetan.
> >
> > Can you point me to the location of Deduper code that may be helpful with
> > the recovery implementation?
> >
> > Does anyone have any opinion on the renaming of ITERATION_WINDOW_COUNT?
> > DELAY_BY_WINDOW_COUNT? DELAY_WINDOW_COUNT?
> >
> > David
> >
> > On Wed, Sep 16, 2015 at 2:21 PM, Chetan Narsude <ch...@datatorrent.com>
> > wrote:
> >
> > > David,
> > >
> > >  I have 3 comments:
> > >
> > > 1. The "ahead window" phrase you discussed above is really behind
> window.
> > > With Apex, the windows which are ahead are the windows with smaller
> > window
> > > Id. smaller window ids are followed by bigger window ids.
> > >
> > > 2.  ITERATION_WINDOW_COUNT sounds like a misnomer. IMO, It  should be
> > > something akin to DELAY_BY_WINDOW_COUNT as you are delaying the events
> by
> > > those many windows. You are not iterating over them as many times. It
> > also
> > > resonates with PortContext.SLIDE_BY_WINDOW_COUNT
> > >
> > > 3. Deduper has similar requirement where large amount of data
> > (potentially
> > > even larger) needs to be partitioned. You can borrow the idea/code from
> > > there. And perhaps abstract the code to be reusable.
> > >
> > > HTH.
> > >
> > > --
> > > Chetan
> > >
> > > On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com>
> > wrote:
> > >
> > > > Hi all,
> > > >
> > > > One current disadvantage of Apex is the inability to do iterations
> and
> > > > machine learning algorithms because we don't allow loops in the
> > > application
> > > > DAG (hence the name DAG).  I am proposing that we allow loops in the
> > DAG
> > > if
> > > > the loop advances the window ID by a configured amount.  A JIRA
> ticket
> > > has
> > > > been created:
> > > >
> > > > https://malhar.atlassian.net/browse/APEX-60
> > > >
> > > > I have started this work in my fork at
> > > > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
> > > >
> > > > The current progress is that a simple test case works.  Major work
> > still
> > > > needs to be done with respect to recovery and partitioning.
> > > >
> > > > The value ITERATION_WINDOW_COUNT is an attribute to an input port of
> an
> > > > operator.  If the value of the attribute is greater than or equal to
> 1,
> > > any
> > > > tuples sent to the input port are treated to be
> ITERATION_WINDOW_COUNT
> > > > windows ahead of what they are.
> > > >
> > > > For recovery, we will need to checkpoint all the tuples between ports
> > > with
> > > > the to replay the looped tuples.  During the recovery, if the
> operator
> > > has
> > > > an input port, with ITERATION_WINDOW_COUNT=2, is recovering from
> > > checkpoint
> > > > window 14, the tuples for that input port from window 13 and window
> 14
> > > need
> > > > to be replayed to be treated as window 15 and window 16 respectively
> > > (13+2
> > > > and 14+2).
> > > >
> > > > In other words, we need to store all the tuples from window with ID
> > > > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and purge
> > the
> > > > tuples earlier than that window.
> > > > We can optimize this by only storing the tuples for
> > > ITERATION_WINDOW_COUNT
> > > > windows prior to any checkpoint.
> > > >
> > > > For that, we need a storage mechanism for the tuples.  Chandni
> already
> > > has
> > > > something that fits this usage case in Apex Malhar.  The class is
> > > > IdempotentStorageManager.  In order for this to be used in Apex core,
> > we
> > > > need to deprecate the class in Apex Malhar and move it to Apex Core.
> > > >
> > > > A JIRA ticket has been created for this particular work:
> > > >
> > > > https://malhar.atlassian.net/browse/APEX-128
> > > >
> > > > Some of the above has been discussed among Thomas, Chetan, Chandni,
> and
> > > > myself.
> > > >
> > > > For partitioning, we have not started any discussion or
> brainstorming.
> > > We
> > > > appreciate any feedback on this and any other aspect related to
> > > supporting
> > > > iterations in general.
> > > >
> > > > Thanks!
> > > >
> > > > David
> > > >
> > >
> >
>

Re: Supporting iterations in Apex

Posted by Chandni Singh <ch...@datatorrent.com>.
My comments:

1)
As I understand IdempotentStorageManager satisfies this use-case. It has
been used with operators which are dynamically partition-able and has been
integrated with various operators in Malhar to make them idempotent.

So IMO we should not start building another version of recovery mechanism
in Apex.

Once we have hammered out the details of the WAL abstraction (
https://malhar.atlassian.net/browse/APEX-99), we can deprecate
IdempotentStorageManager and use that.

2)
If we use IdempotentStorageManager here, it will give us  a better
understanding of its limitations and therefore help us with the WAL
abstraction.

Thanks,
Chandni


On Wed, Sep 16, 2015 at 4:42 PM, David Yan <da...@datatorrent.com> wrote:

> Thanks Chetan.
>
> Can you point me to the location of Deduper code that may be helpful with
> the recovery implementation?
>
> Does anyone have any opinion on the renaming of ITERATION_WINDOW_COUNT?
> DELAY_BY_WINDOW_COUNT? DELAY_WINDOW_COUNT?
>
> David
>
> On Wed, Sep 16, 2015 at 2:21 PM, Chetan Narsude <ch...@datatorrent.com>
> wrote:
>
> > David,
> >
> >  I have 3 comments:
> >
> > 1. The "ahead window" phrase you discussed above is really behind window.
> > With Apex, the windows which are ahead are the windows with smaller
> window
> > Id. smaller window ids are followed by bigger window ids.
> >
> > 2.  ITERATION_WINDOW_COUNT sounds like a misnomer. IMO, It  should be
> > something akin to DELAY_BY_WINDOW_COUNT as you are delaying the events by
> > those many windows. You are not iterating over them as many times. It
> also
> > resonates with PortContext.SLIDE_BY_WINDOW_COUNT
> >
> > 3. Deduper has similar requirement where large amount of data
> (potentially
> > even larger) needs to be partitioned. You can borrow the idea/code from
> > there. And perhaps abstract the code to be reusable.
> >
> > HTH.
> >
> > --
> > Chetan
> >
> > On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com>
> wrote:
> >
> > > Hi all,
> > >
> > > One current disadvantage of Apex is the inability to do iterations and
> > > machine learning algorithms because we don't allow loops in the
> > application
> > > DAG (hence the name DAG).  I am proposing that we allow loops in the
> DAG
> > if
> > > the loop advances the window ID by a configured amount.  A JIRA ticket
> > has
> > > been created:
> > >
> > > https://malhar.atlassian.net/browse/APEX-60
> > >
> > > I have started this work in my fork at
> > > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
> > >
> > > The current progress is that a simple test case works.  Major work
> still
> > > needs to be done with respect to recovery and partitioning.
> > >
> > > The value ITERATION_WINDOW_COUNT is an attribute to an input port of an
> > > operator.  If the value of the attribute is greater than or equal to 1,
> > any
> > > tuples sent to the input port are treated to be ITERATION_WINDOW_COUNT
> > > windows ahead of what they are.
> > >
> > > For recovery, we will need to checkpoint all the tuples between ports
> > with
> > > the to replay the looped tuples.  During the recovery, if the operator
> > has
> > > an input port, with ITERATION_WINDOW_COUNT=2, is recovering from
> > checkpoint
> > > window 14, the tuples for that input port from window 13 and window 14
> > need
> > > to be replayed to be treated as window 15 and window 16 respectively
> > (13+2
> > > and 14+2).
> > >
> > > In other words, we need to store all the tuples from window with ID
> > > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and purge
> the
> > > tuples earlier than that window.
> > > We can optimize this by only storing the tuples for
> > ITERATION_WINDOW_COUNT
> > > windows prior to any checkpoint.
> > >
> > > For that, we need a storage mechanism for the tuples.  Chandni already
> > has
> > > something that fits this usage case in Apex Malhar.  The class is
> > > IdempotentStorageManager.  In order for this to be used in Apex core,
> we
> > > need to deprecate the class in Apex Malhar and move it to Apex Core.
> > >
> > > A JIRA ticket has been created for this particular work:
> > >
> > > https://malhar.atlassian.net/browse/APEX-128
> > >
> > > Some of the above has been discussed among Thomas, Chetan, Chandni, and
> > > myself.
> > >
> > > For partitioning, we have not started any discussion or brainstorming.
> > We
> > > appreciate any feedback on this and any other aspect related to
> > supporting
> > > iterations in general.
> > >
> > > Thanks!
> > >
> > > David
> > >
> >
>

Re: Supporting iterations in Apex

Posted by Chandni Singh <ch...@datatorrent.com>.
+1 for ITERATION_WINDOW_OFFSET.  We are trying to enable APEX for Iterative
Machine Learning and this sounds clearer/less confusing to me.

On Thu, Sep 17, 2015 at 3:39 PM, Chetan Narsude <ch...@datatorrent.com>
wrote:

> Iteration implies that something is looping over. Whereas that's just one
> use case of this functionality. One can take the output of an upstream
> operator and give it to input of the downstream operator.
>
> AFAIK, DELAY  is very well understood concept in event processing and
> analogous to how we intend to use it.
>
>
> On Wed, Sep 16, 2015 at 5:32 PM, David Yan <da...@datatorrent.com> wrote:
>
> > I think keeping the word ITERATION is clearer to the users because that's
> > what it is for.
> > The user wouldn't think he/she is trying to "delay" something...
> > In any case, I am fine either way :)
> >
> > David
> >
> > On Wed, Sep 16, 2015 at 5:12 PM, Munagala Ramanath <ra...@datatorrent.com>
> > wrote:
> >
> > > I like ITERATION_WINDOW_OFFSET.
> > >
> > > Ram
> > >
> > > On Wed, Sep 16, 2015 at 4:42 PM, David Yan <da...@datatorrent.com>
> > wrote:
> > >
> > > > Thanks Chetan.
> > > >
> > > > Can you point me to the location of Deduper code that may be helpful
> > with
> > > > the recovery implementation?
> > > >
> > > > Does anyone have any opinion on the renaming of
> ITERATION_WINDOW_COUNT?
> > > > DELAY_BY_WINDOW_COUNT? DELAY_WINDOW_COUNT?
> > > >
> > > > David
> > > >
> > > > On Wed, Sep 16, 2015 at 2:21 PM, Chetan Narsude <
> > chetan@datatorrent.com>
> > > > wrote:
> > > >
> > > > > David,
> > > > >
> > > > >  I have 3 comments:
> > > > >
> > > > > 1. The "ahead window" phrase you discussed above is really behind
> > > window.
> > > > > With Apex, the windows which are ahead are the windows with smaller
> > > > window
> > > > > Id. smaller window ids are followed by bigger window ids.
> > > > >
> > > > > 2.  ITERATION_WINDOW_COUNT sounds like a misnomer. IMO, It  should
> be
> > > > > something akin to DELAY_BY_WINDOW_COUNT as you are delaying the
> > events
> > > by
> > > > > those many windows. You are not iterating over them as many times.
> It
> > > > also
> > > > > resonates with PortContext.SLIDE_BY_WINDOW_COUNT
> > > > >
> > > > > 3. Deduper has similar requirement where large amount of data
> > > > (potentially
> > > > > even larger) needs to be partitioned. You can borrow the idea/code
> > from
> > > > > there. And perhaps abstract the code to be reusable.
> > > > >
> > > > > HTH.
> > > > >
> > > > > --
> > > > > Chetan
> > > > >
> > > > > On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com>
> > > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > One current disadvantage of Apex is the inability to do
> iterations
> > > and
> > > > > > machine learning algorithms because we don't allow loops in the
> > > > > application
> > > > > > DAG (hence the name DAG).  I am proposing that we allow loops in
> > the
> > > > DAG
> > > > > if
> > > > > > the loop advances the window ID by a configured amount.  A JIRA
> > > ticket
> > > > > has
> > > > > > been created:
> > > > > >
> > > > > > https://malhar.atlassian.net/browse/APEX-60
> > > > > >
> > > > > > I have started this work in my fork at
> > > > > > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
> > > > > >
> > > > > > The current progress is that a simple test case works.  Major
> work
> > > > still
> > > > > > needs to be done with respect to recovery and partitioning.
> > > > > >
> > > > > > The value ITERATION_WINDOW_COUNT is an attribute to an input port
> > of
> > > an
> > > > > > operator.  If the value of the attribute is greater than or equal
> > to
> > > 1,
> > > > > any
> > > > > > tuples sent to the input port are treated to be
> > > ITERATION_WINDOW_COUNT
> > > > > > windows ahead of what they are.
> > > > > >
> > > > > > For recovery, we will need to checkpoint all the tuples between
> > ports
> > > > > with
> > > > > > the to replay the looped tuples.  During the recovery, if the
> > > operator
> > > > > has
> > > > > > an input port, with ITERATION_WINDOW_COUNT=2, is recovering from
> > > > > checkpoint
> > > > > > window 14, the tuples for that input port from window 13 and
> window
> > > 14
> > > > > need
> > > > > > to be replayed to be treated as window 15 and window 16
> > respectively
> > > > > (13+2
> > > > > > and 14+2).
> > > > > >
> > > > > > In other words, we need to store all the tuples from window with
> ID
> > > > > > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and
> > purge
> > > > the
> > > > > > tuples earlier than that window.
> > > > > > We can optimize this by only storing the tuples for
> > > > > ITERATION_WINDOW_COUNT
> > > > > > windows prior to any checkpoint.
> > > > > >
> > > > > > For that, we need a storage mechanism for the tuples.  Chandni
> > > already
> > > > > has
> > > > > > something that fits this usage case in Apex Malhar.  The class is
> > > > > > IdempotentStorageManager.  In order for this to be used in Apex
> > core,
> > > > we
> > > > > > need to deprecate the class in Apex Malhar and move it to Apex
> > Core.
> > > > > >
> > > > > > A JIRA ticket has been created for this particular work:
> > > > > >
> > > > > > https://malhar.atlassian.net/browse/APEX-128
> > > > > >
> > > > > > Some of the above has been discussed among Thomas, Chetan,
> Chandni,
> > > and
> > > > > > myself.
> > > > > >
> > > > > > For partitioning, we have not started any discussion or
> > > brainstorming.
> > > > > We
> > > > > > appreciate any feedback on this and any other aspect related to
> > > > > supporting
> > > > > > iterations in general.
> > > > > >
> > > > > > Thanks!
> > > > > >
> > > > > > David
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Supporting iterations in Apex

Posted by Sasha Parfenov <sa...@datatorrent.com>.
Given some Apache projects have already been referring to a similar concept
as iterations, it may make sense to stick with that terminology.  See
https://ci.apache.org/projects/flink/flink-docs-release-0.7/iterations.html

On Thu, Sep 17, 2015 at 4:31 PM, Pramod Immaneni <pr...@datatorrent.com>
wrote:

> I agree DELAY or MEMORY is a well defined concept in other areas such as
> digital electronics, VLSI, Digital Signal Processing.
>
> On Thu, Sep 17, 2015 at 3:39 PM, Chetan Narsude <ch...@datatorrent.com>
> wrote:
>
> > Iteration implies that something is looping over. Whereas that's just one
> > use case of this functionality. One can take the output of an upstream
> > operator and give it to input of the downstream operator.
> >
> > AFAIK, DELAY  is very well understood concept in event processing and
> > analogous to how we intend to use it.
> >
> >
> > On Wed, Sep 16, 2015 at 5:32 PM, David Yan <da...@datatorrent.com>
> wrote:
> >
> > > I think keeping the word ITERATION is clearer to the users because
> that's
> > > what it is for.
> > > The user wouldn't think he/she is trying to "delay" something...
> > > In any case, I am fine either way :)
> > >
> > > David
> > >
> > > On Wed, Sep 16, 2015 at 5:12 PM, Munagala Ramanath <
> ram@datatorrent.com>
> > > wrote:
> > >
> > > > I like ITERATION_WINDOW_OFFSET.
> > > >
> > > > Ram
> > > >
> > > > On Wed, Sep 16, 2015 at 4:42 PM, David Yan <da...@datatorrent.com>
> > > wrote:
> > > >
> > > > > Thanks Chetan.
> > > > >
> > > > > Can you point me to the location of Deduper code that may be
> helpful
> > > with
> > > > > the recovery implementation?
> > > > >
> > > > > Does anyone have any opinion on the renaming of
> > ITERATION_WINDOW_COUNT?
> > > > > DELAY_BY_WINDOW_COUNT? DELAY_WINDOW_COUNT?
> > > > >
> > > > > David
> > > > >
> > > > > On Wed, Sep 16, 2015 at 2:21 PM, Chetan Narsude <
> > > chetan@datatorrent.com>
> > > > > wrote:
> > > > >
> > > > > > David,
> > > > > >
> > > > > >  I have 3 comments:
> > > > > >
> > > > > > 1. The "ahead window" phrase you discussed above is really behind
> > > > window.
> > > > > > With Apex, the windows which are ahead are the windows with
> smaller
> > > > > window
> > > > > > Id. smaller window ids are followed by bigger window ids.
> > > > > >
> > > > > > 2.  ITERATION_WINDOW_COUNT sounds like a misnomer. IMO, It
> should
> > be
> > > > > > something akin to DELAY_BY_WINDOW_COUNT as you are delaying the
> > > events
> > > > by
> > > > > > those many windows. You are not iterating over them as many
> times.
> > It
> > > > > also
> > > > > > resonates with PortContext.SLIDE_BY_WINDOW_COUNT
> > > > > >
> > > > > > 3. Deduper has similar requirement where large amount of data
> > > > > (potentially
> > > > > > even larger) needs to be partitioned. You can borrow the
> idea/code
> > > from
> > > > > > there. And perhaps abstract the code to be reusable.
> > > > > >
> > > > > > HTH.
> > > > > >
> > > > > > --
> > > > > > Chetan
> > > > > >
> > > > > > On Wed, Sep 16, 2015 at 1:44 PM, David Yan <
> david@datatorrent.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > One current disadvantage of Apex is the inability to do
> > iterations
> > > > and
> > > > > > > machine learning algorithms because we don't allow loops in the
> > > > > > application
> > > > > > > DAG (hence the name DAG).  I am proposing that we allow loops
> in
> > > the
> > > > > DAG
> > > > > > if
> > > > > > > the loop advances the window ID by a configured amount.  A JIRA
> > > > ticket
> > > > > > has
> > > > > > > been created:
> > > > > > >
> > > > > > > https://malhar.atlassian.net/browse/APEX-60
> > > > > > >
> > > > > > > I have started this work in my fork at
> > > > > > > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60
> .
> > > > > > >
> > > > > > > The current progress is that a simple test case works.  Major
> > work
> > > > > still
> > > > > > > needs to be done with respect to recovery and partitioning.
> > > > > > >
> > > > > > > The value ITERATION_WINDOW_COUNT is an attribute to an input
> port
> > > of
> > > > an
> > > > > > > operator.  If the value of the attribute is greater than or
> equal
> > > to
> > > > 1,
> > > > > > any
> > > > > > > tuples sent to the input port are treated to be
> > > > ITERATION_WINDOW_COUNT
> > > > > > > windows ahead of what they are.
> > > > > > >
> > > > > > > For recovery, we will need to checkpoint all the tuples between
> > > ports
> > > > > > with
> > > > > > > the to replay the looped tuples.  During the recovery, if the
> > > > operator
> > > > > > has
> > > > > > > an input port, with ITERATION_WINDOW_COUNT=2, is recovering
> from
> > > > > > checkpoint
> > > > > > > window 14, the tuples for that input port from window 13 and
> > window
> > > > 14
> > > > > > need
> > > > > > > to be replayed to be treated as window 15 and window 16
> > > respectively
> > > > > > (13+2
> > > > > > > and 14+2).
> > > > > > >
> > > > > > > In other words, we need to store all the tuples from window
> with
> > ID
> > > > > > > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and
> > > purge
> > > > > the
> > > > > > > tuples earlier than that window.
> > > > > > > We can optimize this by only storing the tuples for
> > > > > > ITERATION_WINDOW_COUNT
> > > > > > > windows prior to any checkpoint.
> > > > > > >
> > > > > > > For that, we need a storage mechanism for the tuples.  Chandni
> > > > already
> > > > > > has
> > > > > > > something that fits this usage case in Apex Malhar.  The class
> is
> > > > > > > IdempotentStorageManager.  In order for this to be used in Apex
> > > core,
> > > > > we
> > > > > > > need to deprecate the class in Apex Malhar and move it to Apex
> > > Core.
> > > > > > >
> > > > > > > A JIRA ticket has been created for this particular work:
> > > > > > >
> > > > > > > https://malhar.atlassian.net/browse/APEX-128
> > > > > > >
> > > > > > > Some of the above has been discussed among Thomas, Chetan,
> > Chandni,
> > > > and
> > > > > > > myself.
> > > > > > >
> > > > > > > For partitioning, we have not started any discussion or
> > > > brainstorming.
> > > > > > We
> > > > > > > appreciate any feedback on this and any other aspect related to
> > > > > > supporting
> > > > > > > iterations in general.
> > > > > > >
> > > > > > > Thanks!
> > > > > > >
> > > > > > > David
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Supporting iterations in Apex

Posted by Pramod Immaneni <pr...@datatorrent.com>.
I agree DELAY or MEMORY is a well defined concept in other areas such as
digital electronics, VLSI, Digital Signal Processing.

On Thu, Sep 17, 2015 at 3:39 PM, Chetan Narsude <ch...@datatorrent.com>
wrote:

> Iteration implies that something is looping over. Whereas that's just one
> use case of this functionality. One can take the output of an upstream
> operator and give it to input of the downstream operator.
>
> AFAIK, DELAY  is very well understood concept in event processing and
> analogous to how we intend to use it.
>
>
> On Wed, Sep 16, 2015 at 5:32 PM, David Yan <da...@datatorrent.com> wrote:
>
> > I think keeping the word ITERATION is clearer to the users because that's
> > what it is for.
> > The user wouldn't think he/she is trying to "delay" something...
> > In any case, I am fine either way :)
> >
> > David
> >
> > On Wed, Sep 16, 2015 at 5:12 PM, Munagala Ramanath <ra...@datatorrent.com>
> > wrote:
> >
> > > I like ITERATION_WINDOW_OFFSET.
> > >
> > > Ram
> > >
> > > On Wed, Sep 16, 2015 at 4:42 PM, David Yan <da...@datatorrent.com>
> > wrote:
> > >
> > > > Thanks Chetan.
> > > >
> > > > Can you point me to the location of Deduper code that may be helpful
> > with
> > > > the recovery implementation?
> > > >
> > > > Does anyone have any opinion on the renaming of
> ITERATION_WINDOW_COUNT?
> > > > DELAY_BY_WINDOW_COUNT? DELAY_WINDOW_COUNT?
> > > >
> > > > David
> > > >
> > > > On Wed, Sep 16, 2015 at 2:21 PM, Chetan Narsude <
> > chetan@datatorrent.com>
> > > > wrote:
> > > >
> > > > > David,
> > > > >
> > > > >  I have 3 comments:
> > > > >
> > > > > 1. The "ahead window" phrase you discussed above is really behind
> > > window.
> > > > > With Apex, the windows which are ahead are the windows with smaller
> > > > window
> > > > > Id. smaller window ids are followed by bigger window ids.
> > > > >
> > > > > 2.  ITERATION_WINDOW_COUNT sounds like a misnomer. IMO, It  should
> be
> > > > > something akin to DELAY_BY_WINDOW_COUNT as you are delaying the
> > events
> > > by
> > > > > those many windows. You are not iterating over them as many times.
> It
> > > > also
> > > > > resonates with PortContext.SLIDE_BY_WINDOW_COUNT
> > > > >
> > > > > 3. Deduper has similar requirement where large amount of data
> > > > (potentially
> > > > > even larger) needs to be partitioned. You can borrow the idea/code
> > from
> > > > > there. And perhaps abstract the code to be reusable.
> > > > >
> > > > > HTH.
> > > > >
> > > > > --
> > > > > Chetan
> > > > >
> > > > > On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com>
> > > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > One current disadvantage of Apex is the inability to do
> iterations
> > > and
> > > > > > machine learning algorithms because we don't allow loops in the
> > > > > application
> > > > > > DAG (hence the name DAG).  I am proposing that we allow loops in
> > the
> > > > DAG
> > > > > if
> > > > > > the loop advances the window ID by a configured amount.  A JIRA
> > > ticket
> > > > > has
> > > > > > been created:
> > > > > >
> > > > > > https://malhar.atlassian.net/browse/APEX-60
> > > > > >
> > > > > > I have started this work in my fork at
> > > > > > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
> > > > > >
> > > > > > The current progress is that a simple test case works.  Major
> work
> > > > still
> > > > > > needs to be done with respect to recovery and partitioning.
> > > > > >
> > > > > > The value ITERATION_WINDOW_COUNT is an attribute to an input port
> > of
> > > an
> > > > > > operator.  If the value of the attribute is greater than or equal
> > to
> > > 1,
> > > > > any
> > > > > > tuples sent to the input port are treated to be
> > > ITERATION_WINDOW_COUNT
> > > > > > windows ahead of what they are.
> > > > > >
> > > > > > For recovery, we will need to checkpoint all the tuples between
> > ports
> > > > > with
> > > > > > the to replay the looped tuples.  During the recovery, if the
> > > operator
> > > > > has
> > > > > > an input port, with ITERATION_WINDOW_COUNT=2, is recovering from
> > > > > checkpoint
> > > > > > window 14, the tuples for that input port from window 13 and
> window
> > > 14
> > > > > need
> > > > > > to be replayed to be treated as window 15 and window 16
> > respectively
> > > > > (13+2
> > > > > > and 14+2).
> > > > > >
> > > > > > In other words, we need to store all the tuples from window with
> ID
> > > > > > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and
> > purge
> > > > the
> > > > > > tuples earlier than that window.
> > > > > > We can optimize this by only storing the tuples for
> > > > > ITERATION_WINDOW_COUNT
> > > > > > windows prior to any checkpoint.
> > > > > >
> > > > > > For that, we need a storage mechanism for the tuples.  Chandni
> > > already
> > > > > has
> > > > > > something that fits this usage case in Apex Malhar.  The class is
> > > > > > IdempotentStorageManager.  In order for this to be used in Apex
> > core,
> > > > we
> > > > > > need to deprecate the class in Apex Malhar and move it to Apex
> > Core.
> > > > > >
> > > > > > A JIRA ticket has been created for this particular work:
> > > > > >
> > > > > > https://malhar.atlassian.net/browse/APEX-128
> > > > > >
> > > > > > Some of the above has been discussed among Thomas, Chetan,
> Chandni,
> > > and
> > > > > > myself.
> > > > > >
> > > > > > For partitioning, we have not started any discussion or
> > > brainstorming.
> > > > > We
> > > > > > appreciate any feedback on this and any other aspect related to
> > > > > supporting
> > > > > > iterations in general.
> > > > > >
> > > > > > Thanks!
> > > > > >
> > > > > > David
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Supporting iterations in Apex

Posted by Chetan Narsude <ch...@datatorrent.com>.
Iteration implies that something is looping over. Whereas that's just one
use case of this functionality. One can take the output of an upstream
operator and give it to input of the downstream operator.

AFAIK, DELAY  is very well understood concept in event processing and
analogous to how we intend to use it.


On Wed, Sep 16, 2015 at 5:32 PM, David Yan <da...@datatorrent.com> wrote:

> I think keeping the word ITERATION is clearer to the users because that's
> what it is for.
> The user wouldn't think he/she is trying to "delay" something...
> In any case, I am fine either way :)
>
> David
>
> On Wed, Sep 16, 2015 at 5:12 PM, Munagala Ramanath <ra...@datatorrent.com>
> wrote:
>
> > I like ITERATION_WINDOW_OFFSET.
> >
> > Ram
> >
> > On Wed, Sep 16, 2015 at 4:42 PM, David Yan <da...@datatorrent.com>
> wrote:
> >
> > > Thanks Chetan.
> > >
> > > Can you point me to the location of Deduper code that may be helpful
> with
> > > the recovery implementation?
> > >
> > > Does anyone have any opinion on the renaming of ITERATION_WINDOW_COUNT?
> > > DELAY_BY_WINDOW_COUNT? DELAY_WINDOW_COUNT?
> > >
> > > David
> > >
> > > On Wed, Sep 16, 2015 at 2:21 PM, Chetan Narsude <
> chetan@datatorrent.com>
> > > wrote:
> > >
> > > > David,
> > > >
> > > >  I have 3 comments:
> > > >
> > > > 1. The "ahead window" phrase you discussed above is really behind
> > window.
> > > > With Apex, the windows which are ahead are the windows with smaller
> > > window
> > > > Id. smaller window ids are followed by bigger window ids.
> > > >
> > > > 2.  ITERATION_WINDOW_COUNT sounds like a misnomer. IMO, It  should be
> > > > something akin to DELAY_BY_WINDOW_COUNT as you are delaying the
> events
> > by
> > > > those many windows. You are not iterating over them as many times. It
> > > also
> > > > resonates with PortContext.SLIDE_BY_WINDOW_COUNT
> > > >
> > > > 3. Deduper has similar requirement where large amount of data
> > > (potentially
> > > > even larger) needs to be partitioned. You can borrow the idea/code
> from
> > > > there. And perhaps abstract the code to be reusable.
> > > >
> > > > HTH.
> > > >
> > > > --
> > > > Chetan
> > > >
> > > > On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com>
> > > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > One current disadvantage of Apex is the inability to do iterations
> > and
> > > > > machine learning algorithms because we don't allow loops in the
> > > > application
> > > > > DAG (hence the name DAG).  I am proposing that we allow loops in
> the
> > > DAG
> > > > if
> > > > > the loop advances the window ID by a configured amount.  A JIRA
> > ticket
> > > > has
> > > > > been created:
> > > > >
> > > > > https://malhar.atlassian.net/browse/APEX-60
> > > > >
> > > > > I have started this work in my fork at
> > > > > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
> > > > >
> > > > > The current progress is that a simple test case works.  Major work
> > > still
> > > > > needs to be done with respect to recovery and partitioning.
> > > > >
> > > > > The value ITERATION_WINDOW_COUNT is an attribute to an input port
> of
> > an
> > > > > operator.  If the value of the attribute is greater than or equal
> to
> > 1,
> > > > any
> > > > > tuples sent to the input port are treated to be
> > ITERATION_WINDOW_COUNT
> > > > > windows ahead of what they are.
> > > > >
> > > > > For recovery, we will need to checkpoint all the tuples between
> ports
> > > > with
> > > > > the to replay the looped tuples.  During the recovery, if the
> > operator
> > > > has
> > > > > an input port, with ITERATION_WINDOW_COUNT=2, is recovering from
> > > > checkpoint
> > > > > window 14, the tuples for that input port from window 13 and window
> > 14
> > > > need
> > > > > to be replayed to be treated as window 15 and window 16
> respectively
> > > > (13+2
> > > > > and 14+2).
> > > > >
> > > > > In other words, we need to store all the tuples from window with ID
> > > > > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and
> purge
> > > the
> > > > > tuples earlier than that window.
> > > > > We can optimize this by only storing the tuples for
> > > > ITERATION_WINDOW_COUNT
> > > > > windows prior to any checkpoint.
> > > > >
> > > > > For that, we need a storage mechanism for the tuples.  Chandni
> > already
> > > > has
> > > > > something that fits this usage case in Apex Malhar.  The class is
> > > > > IdempotentStorageManager.  In order for this to be used in Apex
> core,
> > > we
> > > > > need to deprecate the class in Apex Malhar and move it to Apex
> Core.
> > > > >
> > > > > A JIRA ticket has been created for this particular work:
> > > > >
> > > > > https://malhar.atlassian.net/browse/APEX-128
> > > > >
> > > > > Some of the above has been discussed among Thomas, Chetan, Chandni,
> > and
> > > > > myself.
> > > > >
> > > > > For partitioning, we have not started any discussion or
> > brainstorming.
> > > > We
> > > > > appreciate any feedback on this and any other aspect related to
> > > > supporting
> > > > > iterations in general.
> > > > >
> > > > > Thanks!
> > > > >
> > > > > David
> > > > >
> > > >
> > >
> >
>

Re: Supporting iterations in Apex

Posted by David Yan <da...@datatorrent.com>.
I think keeping the word ITERATION is clearer to the users because that's
what it is for.
The user wouldn't think he/she is trying to "delay" something...
In any case, I am fine either way :)

David

On Wed, Sep 16, 2015 at 5:12 PM, Munagala Ramanath <ra...@datatorrent.com>
wrote:

> I like ITERATION_WINDOW_OFFSET.
>
> Ram
>
> On Wed, Sep 16, 2015 at 4:42 PM, David Yan <da...@datatorrent.com> wrote:
>
> > Thanks Chetan.
> >
> > Can you point me to the location of Deduper code that may be helpful with
> > the recovery implementation?
> >
> > Does anyone have any opinion on the renaming of ITERATION_WINDOW_COUNT?
> > DELAY_BY_WINDOW_COUNT? DELAY_WINDOW_COUNT?
> >
> > David
> >
> > On Wed, Sep 16, 2015 at 2:21 PM, Chetan Narsude <ch...@datatorrent.com>
> > wrote:
> >
> > > David,
> > >
> > >  I have 3 comments:
> > >
> > > 1. The "ahead window" phrase you discussed above is really behind
> window.
> > > With Apex, the windows which are ahead are the windows with smaller
> > window
> > > Id. smaller window ids are followed by bigger window ids.
> > >
> > > 2.  ITERATION_WINDOW_COUNT sounds like a misnomer. IMO, It  should be
> > > something akin to DELAY_BY_WINDOW_COUNT as you are delaying the events
> by
> > > those many windows. You are not iterating over them as many times. It
> > also
> > > resonates with PortContext.SLIDE_BY_WINDOW_COUNT
> > >
> > > 3. Deduper has similar requirement where large amount of data
> > (potentially
> > > even larger) needs to be partitioned. You can borrow the idea/code from
> > > there. And perhaps abstract the code to be reusable.
> > >
> > > HTH.
> > >
> > > --
> > > Chetan
> > >
> > > On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com>
> > wrote:
> > >
> > > > Hi all,
> > > >
> > > > One current disadvantage of Apex is the inability to do iterations
> and
> > > > machine learning algorithms because we don't allow loops in the
> > > application
> > > > DAG (hence the name DAG).  I am proposing that we allow loops in the
> > DAG
> > > if
> > > > the loop advances the window ID by a configured amount.  A JIRA
> ticket
> > > has
> > > > been created:
> > > >
> > > > https://malhar.atlassian.net/browse/APEX-60
> > > >
> > > > I have started this work in my fork at
> > > > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
> > > >
> > > > The current progress is that a simple test case works.  Major work
> > still
> > > > needs to be done with respect to recovery and partitioning.
> > > >
> > > > The value ITERATION_WINDOW_COUNT is an attribute to an input port of
> an
> > > > operator.  If the value of the attribute is greater than or equal to
> 1,
> > > any
> > > > tuples sent to the input port are treated to be
> ITERATION_WINDOW_COUNT
> > > > windows ahead of what they are.
> > > >
> > > > For recovery, we will need to checkpoint all the tuples between ports
> > > with
> > > > the to replay the looped tuples.  During the recovery, if the
> operator
> > > has
> > > > an input port, with ITERATION_WINDOW_COUNT=2, is recovering from
> > > checkpoint
> > > > window 14, the tuples for that input port from window 13 and window
> 14
> > > need
> > > > to be replayed to be treated as window 15 and window 16 respectively
> > > (13+2
> > > > and 14+2).
> > > >
> > > > In other words, we need to store all the tuples from window with ID
> > > > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and purge
> > the
> > > > tuples earlier than that window.
> > > > We can optimize this by only storing the tuples for
> > > ITERATION_WINDOW_COUNT
> > > > windows prior to any checkpoint.
> > > >
> > > > For that, we need a storage mechanism for the tuples.  Chandni
> already
> > > has
> > > > something that fits this usage case in Apex Malhar.  The class is
> > > > IdempotentStorageManager.  In order for this to be used in Apex core,
> > we
> > > > need to deprecate the class in Apex Malhar and move it to Apex Core.
> > > >
> > > > A JIRA ticket has been created for this particular work:
> > > >
> > > > https://malhar.atlassian.net/browse/APEX-128
> > > >
> > > > Some of the above has been discussed among Thomas, Chetan, Chandni,
> and
> > > > myself.
> > > >
> > > > For partitioning, we have not started any discussion or
> brainstorming.
> > > We
> > > > appreciate any feedback on this and any other aspect related to
> > > supporting
> > > > iterations in general.
> > > >
> > > > Thanks!
> > > >
> > > > David
> > > >
> > >
> >
>

Re: Supporting iterations in Apex

Posted by Munagala Ramanath <ra...@datatorrent.com>.
I like ITERATION_WINDOW_OFFSET.

Ram

On Wed, Sep 16, 2015 at 4:42 PM, David Yan <da...@datatorrent.com> wrote:

> Thanks Chetan.
>
> Can you point me to the location of Deduper code that may be helpful with
> the recovery implementation?
>
> Does anyone have any opinion on the renaming of ITERATION_WINDOW_COUNT?
> DELAY_BY_WINDOW_COUNT? DELAY_WINDOW_COUNT?
>
> David
>
> On Wed, Sep 16, 2015 at 2:21 PM, Chetan Narsude <ch...@datatorrent.com>
> wrote:
>
> > David,
> >
> >  I have 3 comments:
> >
> > 1. The "ahead window" phrase you discussed above is really behind window.
> > With Apex, the windows which are ahead are the windows with smaller
> window
> > Id. smaller window ids are followed by bigger window ids.
> >
> > 2.  ITERATION_WINDOW_COUNT sounds like a misnomer. IMO, It  should be
> > something akin to DELAY_BY_WINDOW_COUNT as you are delaying the events by
> > those many windows. You are not iterating over them as many times. It
> also
> > resonates with PortContext.SLIDE_BY_WINDOW_COUNT
> >
> > 3. Deduper has similar requirement where large amount of data
> (potentially
> > even larger) needs to be partitioned. You can borrow the idea/code from
> > there. And perhaps abstract the code to be reusable.
> >
> > HTH.
> >
> > --
> > Chetan
> >
> > On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com>
> wrote:
> >
> > > Hi all,
> > >
> > > One current disadvantage of Apex is the inability to do iterations and
> > > machine learning algorithms because we don't allow loops in the
> > application
> > > DAG (hence the name DAG).  I am proposing that we allow loops in the
> DAG
> > if
> > > the loop advances the window ID by a configured amount.  A JIRA ticket
> > has
> > > been created:
> > >
> > > https://malhar.atlassian.net/browse/APEX-60
> > >
> > > I have started this work in my fork at
> > > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
> > >
> > > The current progress is that a simple test case works.  Major work
> still
> > > needs to be done with respect to recovery and partitioning.
> > >
> > > The value ITERATION_WINDOW_COUNT is an attribute to an input port of an
> > > operator.  If the value of the attribute is greater than or equal to 1,
> > any
> > > tuples sent to the input port are treated to be ITERATION_WINDOW_COUNT
> > > windows ahead of what they are.
> > >
> > > For recovery, we will need to checkpoint all the tuples between ports
> > with
> > > the to replay the looped tuples.  During the recovery, if the operator
> > has
> > > an input port, with ITERATION_WINDOW_COUNT=2, is recovering from
> > checkpoint
> > > window 14, the tuples for that input port from window 13 and window 14
> > need
> > > to be replayed to be treated as window 15 and window 16 respectively
> > (13+2
> > > and 14+2).
> > >
> > > In other words, we need to store all the tuples from window with ID
> > > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and purge
> the
> > > tuples earlier than that window.
> > > We can optimize this by only storing the tuples for
> > ITERATION_WINDOW_COUNT
> > > windows prior to any checkpoint.
> > >
> > > For that, we need a storage mechanism for the tuples.  Chandni already
> > has
> > > something that fits this usage case in Apex Malhar.  The class is
> > > IdempotentStorageManager.  In order for this to be used in Apex core,
> we
> > > need to deprecate the class in Apex Malhar and move it to Apex Core.
> > >
> > > A JIRA ticket has been created for this particular work:
> > >
> > > https://malhar.atlassian.net/browse/APEX-128
> > >
> > > Some of the above has been discussed among Thomas, Chetan, Chandni, and
> > > myself.
> > >
> > > For partitioning, we have not started any discussion or brainstorming.
> > We
> > > appreciate any feedback on this and any other aspect related to
> > supporting
> > > iterations in general.
> > >
> > > Thanks!
> > >
> > > David
> > >
> >
>

Re: Supporting iterations in Apex

Posted by David Yan <da...@datatorrent.com>.
Thanks Chetan.

Can you point me to the location of Deduper code that may be helpful with
the recovery implementation?

Does anyone have any opinion on the renaming of ITERATION_WINDOW_COUNT?
DELAY_BY_WINDOW_COUNT? DELAY_WINDOW_COUNT?

David

On Wed, Sep 16, 2015 at 2:21 PM, Chetan Narsude <ch...@datatorrent.com>
wrote:

> David,
>
>  I have 3 comments:
>
> 1. The "ahead window" phrase you discussed above is really behind window.
> With Apex, the windows which are ahead are the windows with smaller window
> Id. smaller window ids are followed by bigger window ids.
>
> 2.  ITERATION_WINDOW_COUNT sounds like a misnomer. IMO, It  should be
> something akin to DELAY_BY_WINDOW_COUNT as you are delaying the events by
> those many windows. You are not iterating over them as many times. It also
> resonates with PortContext.SLIDE_BY_WINDOW_COUNT
>
> 3. Deduper has similar requirement where large amount of data (potentially
> even larger) needs to be partitioned. You can borrow the idea/code from
> there. And perhaps abstract the code to be reusable.
>
> HTH.
>
> --
> Chetan
>
> On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com> wrote:
>
> > Hi all,
> >
> > One current disadvantage of Apex is the inability to do iterations and
> > machine learning algorithms because we don't allow loops in the
> application
> > DAG (hence the name DAG).  I am proposing that we allow loops in the DAG
> if
> > the loop advances the window ID by a configured amount.  A JIRA ticket
> has
> > been created:
> >
> > https://malhar.atlassian.net/browse/APEX-60
> >
> > I have started this work in my fork at
> > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
> >
> > The current progress is that a simple test case works.  Major work still
> > needs to be done with respect to recovery and partitioning.
> >
> > The value ITERATION_WINDOW_COUNT is an attribute to an input port of an
> > operator.  If the value of the attribute is greater than or equal to 1,
> any
> > tuples sent to the input port are treated to be ITERATION_WINDOW_COUNT
> > windows ahead of what they are.
> >
> > For recovery, we will need to checkpoint all the tuples between ports
> with
> > the to replay the looped tuples.  During the recovery, if the operator
> has
> > an input port, with ITERATION_WINDOW_COUNT=2, is recovering from
> checkpoint
> > window 14, the tuples for that input port from window 13 and window 14
> need
> > to be replayed to be treated as window 15 and window 16 respectively
> (13+2
> > and 14+2).
> >
> > In other words, we need to store all the tuples from window with ID
> > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and purge the
> > tuples earlier than that window.
> > We can optimize this by only storing the tuples for
> ITERATION_WINDOW_COUNT
> > windows prior to any checkpoint.
> >
> > For that, we need a storage mechanism for the tuples.  Chandni already
> has
> > something that fits this usage case in Apex Malhar.  The class is
> > IdempotentStorageManager.  In order for this to be used in Apex core, we
> > need to deprecate the class in Apex Malhar and move it to Apex Core.
> >
> > A JIRA ticket has been created for this particular work:
> >
> > https://malhar.atlassian.net/browse/APEX-128
> >
> > Some of the above has been discussed among Thomas, Chetan, Chandni, and
> > myself.
> >
> > For partitioning, we have not started any discussion or brainstorming.
> We
> > appreciate any feedback on this and any other aspect related to
> supporting
> > iterations in general.
> >
> > Thanks!
> >
> > David
> >
>

Re: Supporting iterations in Apex

Posted by Chetan Narsude <ch...@datatorrent.com>.
David,

 I have 3 comments:

1. The "ahead window" phrase you discussed above is really behind window.
With Apex, the windows which are ahead are the windows with smaller window
Id. smaller window ids are followed by bigger window ids.

2.  ITERATION_WINDOW_COUNT sounds like a misnomer. IMO, It  should be
something akin to DELAY_BY_WINDOW_COUNT as you are delaying the events by
those many windows. You are not iterating over them as many times. It also
resonates with PortContext.SLIDE_BY_WINDOW_COUNT

3. Deduper has similar requirement where large amount of data (potentially
even larger) needs to be partitioned. You can borrow the idea/code from
there. And perhaps abstract the code to be reusable.

HTH.

--
Chetan

On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com> wrote:

> Hi all,
>
> One current disadvantage of Apex is the inability to do iterations and
> machine learning algorithms because we don't allow loops in the application
> DAG (hence the name DAG).  I am proposing that we allow loops in the DAG if
> the loop advances the window ID by a configured amount.  A JIRA ticket has
> been created:
>
> https://malhar.atlassian.net/browse/APEX-60
>
> I have started this work in my fork at
> https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
>
> The current progress is that a simple test case works.  Major work still
> needs to be done with respect to recovery and partitioning.
>
> The value ITERATION_WINDOW_COUNT is an attribute to an input port of an
> operator.  If the value of the attribute is greater than or equal to 1, any
> tuples sent to the input port are treated to be ITERATION_WINDOW_COUNT
> windows ahead of what they are.
>
> For recovery, we will need to checkpoint all the tuples between ports with
> the to replay the looped tuples.  During the recovery, if the operator has
> an input port, with ITERATION_WINDOW_COUNT=2, is recovering from checkpoint
> window 14, the tuples for that input port from window 13 and window 14 need
> to be replayed to be treated as window 15 and window 16 respectively (13+2
> and 14+2).
>
> In other words, we need to store all the tuples from window with ID
> committedWindowId minus ITERATION_WINDOW_COUNT for recovery and purge the
> tuples earlier than that window.
> We can optimize this by only storing the tuples for ITERATION_WINDOW_COUNT
> windows prior to any checkpoint.
>
> For that, we need a storage mechanism for the tuples.  Chandni already has
> something that fits this usage case in Apex Malhar.  The class is
> IdempotentStorageManager.  In order for this to be used in Apex core, we
> need to deprecate the class in Apex Malhar and move it to Apex Core.
>
> A JIRA ticket has been created for this particular work:
>
> https://malhar.atlassian.net/browse/APEX-128
>
> Some of the above has been discussed among Thomas, Chetan, Chandni, and
> myself.
>
> For partitioning, we have not started any discussion or brainstorming.  We
> appreciate any feedback on this and any other aspect related to supporting
> iterations in general.
>
> Thanks!
>
> David
>

Re: Supporting iterations in Apex

Posted by Pramod Immaneni <pr...@datatorrent.com>.
Some more thoughts.

We would need a way to know that the data for a particular window needs to
be saved for replay in case of failure, namely checkpoint-iteration to
checkpoint the ones you mentioned, so we can save the data for only those
windows. The engine could provide the information in some manner. A related
ticket is https://malhar.atlassian.net/browse/APEX-78

There may be advantages to storing data on the source side (loop
originator) like single source of truth when the same source is connected
to multiple operators (loop destinations) but it could introduce
complications in implementation since delaying of data now needs to happen
on the source side and if you have same stream going to a loop destination
as well as a non loop destination the same data needs to be sent out in two
different windows. Anyone have any other thoughts on this.

For partitioning it seems like it should work correctly if the operators in
the loop path should all be restored to the same checkpoint including loop
destination and loop source.

You may have already thought about the changes for idempotent storage
manager such as being able to progressively save data.

Thanks


On Wed, Sep 16, 2015 at 2:22 PM, Pramod Immaneni <pr...@datatorrent.com>
wrote:

> This is a good idea. One thing I can think of is you could save the tuples
> on the side of the operator that is the loop originator (downstream
> operator) itself for the [checkpoint-iteration..checkpoint] windows because
> they are going to be in persistence storage like HDFS and can be accessed
> by the loop destination (upstream operator) during recovery. The advantage
> of this is you don't have to do a special save on the destination side but
> rather piggyback on the HDFS save features of the bufferserver itself.
>
> On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com> wrote:
>
>> Hi all,
>>
>> One current disadvantage of Apex is the inability to do iterations and
>> machine learning algorithms because we don't allow loops in the
>> application
>> DAG (hence the name DAG).  I am proposing that we allow loops in the DAG
>> if
>> the loop advances the window ID by a configured amount.  A JIRA ticket has
>> been created:
>>
>> https://malhar.atlassian.net/browse/APEX-60
>>
>> I have started this work in my fork at
>> https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
>>
>> The current progress is that a simple test case works.  Major work still
>> needs to be done with respect to recovery and partitioning.
>>
>> The value ITERATION_WINDOW_COUNT is an attribute to an input port of an
>> operator.  If the value of the attribute is greater than or equal to 1,
>> any
>> tuples sent to the input port are treated to be ITERATION_WINDOW_COUNT
>> windows ahead of what they are.
>>
>> For recovery, we will need to checkpoint all the tuples between ports with
>> the to replay the looped tuples.  During the recovery, if the operator has
>> an input port, with ITERATION_WINDOW_COUNT=2, is recovering from
>> checkpoint
>> window 14, the tuples for that input port from window 13 and window 14
>> need
>> to be replayed to be treated as window 15 and window 16 respectively (13+2
>> and 14+2).
>>
>> In other words, we need to store all the tuples from window with ID
>> committedWindowId minus ITERATION_WINDOW_COUNT for recovery and purge the
>> tuples earlier than that window.
>> We can optimize this by only storing the tuples for ITERATION_WINDOW_COUNT
>> windows prior to any checkpoint.
>>
>> For that, we need a storage mechanism for the tuples.  Chandni already has
>> something that fits this usage case in Apex Malhar.  The class is
>> IdempotentStorageManager.  In order for this to be used in Apex core, we
>> need to deprecate the class in Apex Malhar and move it to Apex Core.
>>
>> A JIRA ticket has been created for this particular work:
>>
>> https://malhar.atlassian.net/browse/APEX-128
>>
>> Some of the above has been discussed among Thomas, Chetan, Chandni, and
>> myself.
>>
>> For partitioning, we have not started any discussion or brainstorming.  We
>> appreciate any feedback on this and any other aspect related to supporting
>> iterations in general.
>>
>> Thanks!
>>
>> David
>>
>
>

Re: Supporting iterations in Apex

Posted by Pramod Immaneni <pr...@datatorrent.com>.
This is a good idea. One thing I can think of is you could save the tuples
on the side of the operator that is the loop originator (downstream
operator) itself for the [checkpoint-iteration..checkpoint] windows because
they are going to be in persistence storage like HDFS and can be accessed
by the loop destination (upstream operator) during recovery. The advantage
of this is you don't have to do a special save on the destination side but
rather piggyback on the HDFS save features of the bufferserver itself.

On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com> wrote:

> Hi all,
>
> One current disadvantage of Apex is the inability to do iterations and
> machine learning algorithms because we don't allow loops in the application
> DAG (hence the name DAG).  I am proposing that we allow loops in the DAG if
> the loop advances the window ID by a configured amount.  A JIRA ticket has
> been created:
>
> https://malhar.atlassian.net/browse/APEX-60
>
> I have started this work in my fork at
> https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
>
> The current progress is that a simple test case works.  Major work still
> needs to be done with respect to recovery and partitioning.
>
> The value ITERATION_WINDOW_COUNT is an attribute to an input port of an
> operator.  If the value of the attribute is greater than or equal to 1, any
> tuples sent to the input port are treated to be ITERATION_WINDOW_COUNT
> windows ahead of what they are.
>
> For recovery, we will need to checkpoint all the tuples between ports with
> the to replay the looped tuples.  During the recovery, if the operator has
> an input port, with ITERATION_WINDOW_COUNT=2, is recovering from checkpoint
> window 14, the tuples for that input port from window 13 and window 14 need
> to be replayed to be treated as window 15 and window 16 respectively (13+2
> and 14+2).
>
> In other words, we need to store all the tuples from window with ID
> committedWindowId minus ITERATION_WINDOW_COUNT for recovery and purge the
> tuples earlier than that window.
> We can optimize this by only storing the tuples for ITERATION_WINDOW_COUNT
> windows prior to any checkpoint.
>
> For that, we need a storage mechanism for the tuples.  Chandni already has
> something that fits this usage case in Apex Malhar.  The class is
> IdempotentStorageManager.  In order for this to be used in Apex core, we
> need to deprecate the class in Apex Malhar and move it to Apex Core.
>
> A JIRA ticket has been created for this particular work:
>
> https://malhar.atlassian.net/browse/APEX-128
>
> Some of the above has been discussed among Thomas, Chetan, Chandni, and
> myself.
>
> For partitioning, we have not started any discussion or brainstorming.  We
> appreciate any feedback on this and any other aspect related to supporting
> iterations in general.
>
> Thanks!
>
> David
>

Re: Supporting iterations in Apex

Posted by Chetan Narsude <ch...@datatorrent.com>.
that's what i thought. +1 for this approach.

On Wed, Oct 7, 2015 at 4:32 PM, David Yan <da...@datatorrent.com> wrote:

> Don't worry about it.  I was thinking of specifying the attribute as an
> operator class, but we can make the attribute take an operator object
> instead.
>
> On Wed, Oct 7, 2015 at 2:29 PM, Chetan Narsude <ch...@datatorrent.com>
> wrote:
>
> > what do you mean by "because you can't configure the delay operator
> > itself"?
> >
> > On Wed, Oct 7, 2015 at 9:53 AM, David Yan <da...@datatorrent.com> wrote:
> >
> > > The iteration operator actually resembles the usage of unifiers.  We
> have
> > > getUnifier in the interface of OutputPort.
> > >
> > > But if we add getDelayOperator in the interface of InputPort, that
> would
> > > introduce backward incompatibility especially since we can't use the
> > > default implementation feature of interfaces that is in Java 8.
> > >
> > > Putting the class object as an attribute of the InputPort is not good
> > > either because you can't configure the delay operator itself.
> > >
> > > Thoughts?
> > >
> > > David
> > >
> > > On Fri, Sep 25, 2015 at 10:10 AM, David Yan <da...@datatorrent.com>
> > wrote:
> > >
> > > > This is a very good idea.  This way, we can have a default
> > implementation
> > > > of that operator and the user can control how the tuples are stored
> by
> > > > having his/her own implementation.  How many windows the operator
> > delays
> > > is
> > > > part of the implementation of that operator.
> > > >
> > > > I am thinking of getting rid of the ITERATION_WINDOW_OFFSET attribute
> > and
> > > > introduce a DELAY_OPERATOR_CLASS attribute so that the user can
> specify
> > > the
> > > > delay operator class to be used.
> > > >
> > > > More thoughts?
> > > >
> > > > David
> > > >
> > > > On Thu, Sep 17, 2015 at 7:16 PM, Gaurav Gupta <
> gaurav@datatorrent.com>
> > > > wrote:
> > > >
> > > >> Hey David,
> > > >>
> > > >> I was thinking can we add another operator in front of the input
> port
> > > that
> > > >> has ITERATION_WINDOW_COUNT set. The new additional operator will
> have
> > > >> property whose value  will be set equal to ITERATION_WINDOW_COUNT
> and
> > it
> > > >> will be responsible for caching the data for those many windows and
> > > >> delaying the data. This operator can act as unifier cum iterator
> > > operator.
> > > >> For this you may not need any external storage agent as platform
> > > >> checkpointing should help you here.
> > > >>
> > > >> We are doing something similar for Sliding window.
> > > >>
> > > >> Thanks
> > > >> -Gaurav
> > > >>
> > > >> On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com>
> > > wrote:
> > > >>
> > > >> > Hi all,
> > > >> >
> > > >> > One current disadvantage of Apex is the inability to do iterations
> > and
> > > >> > machine learning algorithms because we don't allow loops in the
> > > >> application
> > > >> > DAG (hence the name DAG).  I am proposing that we allow loops in
> the
> > > >> DAG if
> > > >> > the loop advances the window ID by a configured amount.  A JIRA
> > ticket
> > > >> has
> > > >> > been created:
> > > >> >
> > > >> > https://malhar.atlassian.net/browse/APEX-60
> > > >> >
> > > >> > I have started this work in my fork at
> > > >> > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
> > > >> >
> > > >> > The current progress is that a simple test case works.  Major work
> > > still
> > > >> > needs to be done with respect to recovery and partitioning.
> > > >> >
> > > >> > The value ITERATION_WINDOW_COUNT is an attribute to an input port
> of
> > > an
> > > >> > operator.  If the value of the attribute is greater than or equal
> to
> > > 1,
> > > >> any
> > > >> > tuples sent to the input port are treated to be
> > ITERATION_WINDOW_COUNT
> > > >> > windows ahead of what they are.
> > > >> >
> > > >> > For recovery, we will need to checkpoint all the tuples between
> > ports
> > > >> with
> > > >> > the to replay the looped tuples.  During the recovery, if the
> > operator
> > > >> has
> > > >> > an input port, with ITERATION_WINDOW_COUNT=2, is recovering from
> > > >> checkpoint
> > > >> > window 14, the tuples for that input port from window 13 and
> window
> > 14
> > > >> need
> > > >> > to be replayed to be treated as window 15 and window 16
> respectively
> > > >> (13+2
> > > >> > and 14+2).
> > > >> >
> > > >> > In other words, we need to store all the tuples from window with
> ID
> > > >> > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and
> > purge
> > > >> the
> > > >> > tuples earlier than that window.
> > > >> > We can optimize this by only storing the tuples for
> > > >> ITERATION_WINDOW_COUNT
> > > >> > windows prior to any checkpoint.
> > > >> >
> > > >> > For that, we need a storage mechanism for the tuples.  Chandni
> > already
> > > >> has
> > > >> > something that fits this usage case in Apex Malhar.  The class is
> > > >> > IdempotentStorageManager.  In order for this to be used in Apex
> > core,
> > > we
> > > >> > need to deprecate the class in Apex Malhar and move it to Apex
> Core.
> > > >> >
> > > >> > A JIRA ticket has been created for this particular work:
> > > >> >
> > > >> > https://malhar.atlassian.net/browse/APEX-128
> > > >> >
> > > >> > Some of the above has been discussed among Thomas, Chetan,
> Chandni,
> > > and
> > > >> > myself.
> > > >> >
> > > >> > For partitioning, we have not started any discussion or
> > brainstorming.
> > > >> We
> > > >> > appreciate any feedback on this and any other aspect related to
> > > >> supporting
> > > >> > iterations in general.
> > > >> >
> > > >> > Thanks!
> > > >> >
> > > >> > David
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>

Re: Supporting iterations in Apex

Posted by David Yan <da...@datatorrent.com>.
Don't worry about it.  I was thinking of specifying the attribute as an
operator class, but we can make the attribute take an operator object
instead.

On Wed, Oct 7, 2015 at 2:29 PM, Chetan Narsude <ch...@datatorrent.com>
wrote:

> what do you mean by "because you can't configure the delay operator
> itself"?
>
> On Wed, Oct 7, 2015 at 9:53 AM, David Yan <da...@datatorrent.com> wrote:
>
> > The iteration operator actually resembles the usage of unifiers.  We have
> > getUnifier in the interface of OutputPort.
> >
> > But if we add getDelayOperator in the interface of InputPort, that would
> > introduce backward incompatibility especially since we can't use the
> > default implementation feature of interfaces that is in Java 8.
> >
> > Putting the class object as an attribute of the InputPort is not good
> > either because you can't configure the delay operator itself.
> >
> > Thoughts?
> >
> > David
> >
> > On Fri, Sep 25, 2015 at 10:10 AM, David Yan <da...@datatorrent.com>
> wrote:
> >
> > > This is a very good idea.  This way, we can have a default
> implementation
> > > of that operator and the user can control how the tuples are stored by
> > > having his/her own implementation.  How many windows the operator
> delays
> > is
> > > part of the implementation of that operator.
> > >
> > > I am thinking of getting rid of the ITERATION_WINDOW_OFFSET attribute
> and
> > > introduce a DELAY_OPERATOR_CLASS attribute so that the user can specify
> > the
> > > delay operator class to be used.
> > >
> > > More thoughts?
> > >
> > > David
> > >
> > > On Thu, Sep 17, 2015 at 7:16 PM, Gaurav Gupta <ga...@datatorrent.com>
> > > wrote:
> > >
> > >> Hey David,
> > >>
> > >> I was thinking can we add another operator in front of the input port
> > that
> > >> has ITERATION_WINDOW_COUNT set. The new additional operator will have
> > >> property whose value  will be set equal to ITERATION_WINDOW_COUNT and
> it
> > >> will be responsible for caching the data for those many windows and
> > >> delaying the data. This operator can act as unifier cum iterator
> > operator.
> > >> For this you may not need any external storage agent as platform
> > >> checkpointing should help you here.
> > >>
> > >> We are doing something similar for Sliding window.
> > >>
> > >> Thanks
> > >> -Gaurav
> > >>
> > >> On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com>
> > wrote:
> > >>
> > >> > Hi all,
> > >> >
> > >> > One current disadvantage of Apex is the inability to do iterations
> and
> > >> > machine learning algorithms because we don't allow loops in the
> > >> application
> > >> > DAG (hence the name DAG).  I am proposing that we allow loops in the
> > >> DAG if
> > >> > the loop advances the window ID by a configured amount.  A JIRA
> ticket
> > >> has
> > >> > been created:
> > >> >
> > >> > https://malhar.atlassian.net/browse/APEX-60
> > >> >
> > >> > I have started this work in my fork at
> > >> > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
> > >> >
> > >> > The current progress is that a simple test case works.  Major work
> > still
> > >> > needs to be done with respect to recovery and partitioning.
> > >> >
> > >> > The value ITERATION_WINDOW_COUNT is an attribute to an input port of
> > an
> > >> > operator.  If the value of the attribute is greater than or equal to
> > 1,
> > >> any
> > >> > tuples sent to the input port are treated to be
> ITERATION_WINDOW_COUNT
> > >> > windows ahead of what they are.
> > >> >
> > >> > For recovery, we will need to checkpoint all the tuples between
> ports
> > >> with
> > >> > the to replay the looped tuples.  During the recovery, if the
> operator
> > >> has
> > >> > an input port, with ITERATION_WINDOW_COUNT=2, is recovering from
> > >> checkpoint
> > >> > window 14, the tuples for that input port from window 13 and window
> 14
> > >> need
> > >> > to be replayed to be treated as window 15 and window 16 respectively
> > >> (13+2
> > >> > and 14+2).
> > >> >
> > >> > In other words, we need to store all the tuples from window with ID
> > >> > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and
> purge
> > >> the
> > >> > tuples earlier than that window.
> > >> > We can optimize this by only storing the tuples for
> > >> ITERATION_WINDOW_COUNT
> > >> > windows prior to any checkpoint.
> > >> >
> > >> > For that, we need a storage mechanism for the tuples.  Chandni
> already
> > >> has
> > >> > something that fits this usage case in Apex Malhar.  The class is
> > >> > IdempotentStorageManager.  In order for this to be used in Apex
> core,
> > we
> > >> > need to deprecate the class in Apex Malhar and move it to Apex Core.
> > >> >
> > >> > A JIRA ticket has been created for this particular work:
> > >> >
> > >> > https://malhar.atlassian.net/browse/APEX-128
> > >> >
> > >> > Some of the above has been discussed among Thomas, Chetan, Chandni,
> > and
> > >> > myself.
> > >> >
> > >> > For partitioning, we have not started any discussion or
> brainstorming.
> > >> We
> > >> > appreciate any feedback on this and any other aspect related to
> > >> supporting
> > >> > iterations in general.
> > >> >
> > >> > Thanks!
> > >> >
> > >> > David
> > >> >
> > >>
> > >
> > >
> >
>

Re: Supporting iterations in Apex

Posted by Chetan Narsude <ch...@datatorrent.com>.
what do you mean by "because you can't configure the delay operator itself"?

On Wed, Oct 7, 2015 at 9:53 AM, David Yan <da...@datatorrent.com> wrote:

> The iteration operator actually resembles the usage of unifiers.  We have
> getUnifier in the interface of OutputPort.
>
> But if we add getDelayOperator in the interface of InputPort, that would
> introduce backward incompatibility especially since we can't use the
> default implementation feature of interfaces that is in Java 8.
>
> Putting the class object as an attribute of the InputPort is not good
> either because you can't configure the delay operator itself.
>
> Thoughts?
>
> David
>
> On Fri, Sep 25, 2015 at 10:10 AM, David Yan <da...@datatorrent.com> wrote:
>
> > This is a very good idea.  This way, we can have a default implementation
> > of that operator and the user can control how the tuples are stored by
> > having his/her own implementation.  How many windows the operator delays
> is
> > part of the implementation of that operator.
> >
> > I am thinking of getting rid of the ITERATION_WINDOW_OFFSET attribute and
> > introduce a DELAY_OPERATOR_CLASS attribute so that the user can specify
> the
> > delay operator class to be used.
> >
> > More thoughts?
> >
> > David
> >
> > On Thu, Sep 17, 2015 at 7:16 PM, Gaurav Gupta <ga...@datatorrent.com>
> > wrote:
> >
> >> Hey David,
> >>
> >> I was thinking can we add another operator in front of the input port
> that
> >> has ITERATION_WINDOW_COUNT set. The new additional operator will have
> >> property whose value  will be set equal to ITERATION_WINDOW_COUNT and it
> >> will be responsible for caching the data for those many windows and
> >> delaying the data. This operator can act as unifier cum iterator
> operator.
> >> For this you may not need any external storage agent as platform
> >> checkpointing should help you here.
> >>
> >> We are doing something similar for Sliding window.
> >>
> >> Thanks
> >> -Gaurav
> >>
> >> On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com>
> wrote:
> >>
> >> > Hi all,
> >> >
> >> > One current disadvantage of Apex is the inability to do iterations and
> >> > machine learning algorithms because we don't allow loops in the
> >> application
> >> > DAG (hence the name DAG).  I am proposing that we allow loops in the
> >> DAG if
> >> > the loop advances the window ID by a configured amount.  A JIRA ticket
> >> has
> >> > been created:
> >> >
> >> > https://malhar.atlassian.net/browse/APEX-60
> >> >
> >> > I have started this work in my fork at
> >> > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
> >> >
> >> > The current progress is that a simple test case works.  Major work
> still
> >> > needs to be done with respect to recovery and partitioning.
> >> >
> >> > The value ITERATION_WINDOW_COUNT is an attribute to an input port of
> an
> >> > operator.  If the value of the attribute is greater than or equal to
> 1,
> >> any
> >> > tuples sent to the input port are treated to be ITERATION_WINDOW_COUNT
> >> > windows ahead of what they are.
> >> >
> >> > For recovery, we will need to checkpoint all the tuples between ports
> >> with
> >> > the to replay the looped tuples.  During the recovery, if the operator
> >> has
> >> > an input port, with ITERATION_WINDOW_COUNT=2, is recovering from
> >> checkpoint
> >> > window 14, the tuples for that input port from window 13 and window 14
> >> need
> >> > to be replayed to be treated as window 15 and window 16 respectively
> >> (13+2
> >> > and 14+2).
> >> >
> >> > In other words, we need to store all the tuples from window with ID
> >> > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and purge
> >> the
> >> > tuples earlier than that window.
> >> > We can optimize this by only storing the tuples for
> >> ITERATION_WINDOW_COUNT
> >> > windows prior to any checkpoint.
> >> >
> >> > For that, we need a storage mechanism for the tuples.  Chandni already
> >> has
> >> > something that fits this usage case in Apex Malhar.  The class is
> >> > IdempotentStorageManager.  In order for this to be used in Apex core,
> we
> >> > need to deprecate the class in Apex Malhar and move it to Apex Core.
> >> >
> >> > A JIRA ticket has been created for this particular work:
> >> >
> >> > https://malhar.atlassian.net/browse/APEX-128
> >> >
> >> > Some of the above has been discussed among Thomas, Chetan, Chandni,
> and
> >> > myself.
> >> >
> >> > For partitioning, we have not started any discussion or brainstorming.
> >> We
> >> > appreciate any feedback on this and any other aspect related to
> >> supporting
> >> > iterations in general.
> >> >
> >> > Thanks!
> >> >
> >> > David
> >> >
> >>
> >
> >
>

Re: Supporting iterations in Apex

Posted by Pramod Immaneni <pr...@datatorrent.com>.
+1 for 1 as delay is a specialized operator that requires special engine
interactions. Also it might just be me but in the diagram it is showing the
delay loop from A -> upstream instead of B -> A.

Thanks

On Thu, Oct 29, 2015 at 11:11 AM, David Yan <da...@datatorrent.com> wrote:

> This delay operator will act as an input operator for the first window and
> act as a regular operator after that.
> The engine will increment the window id of the windows from all the output
> ports of the delay operator.
>
> We will need a new interface for the delay operator, extending the existing
> Operator interface.  The interface will probably include:
>
> - Emitting the tuples for the first window
> - Emitting the tuples after recovery
>
> We will provide a default implementation of the delay operator with a
> write-ahead log that stores the tuples for the window before each
> checkpoint for recovery.  We will also probably support the number of
> windows to delay using an operator property.
>
> Let's look at this DAG with an iteration loop:
>
> upstream --> A --> B --> downstream
>              ^     |
>              |-----|
>
> With the delay operator, the physical view of the DAG looks like this with
> D being the delay operator:
>
> upstream --> A --> B --> downstream
>              ^     |
>              |-D<--|
>
> There are two approaches for specifying the delay operator.
>
> 1) As discussed earlier on this thread, the delay operator can be specified
> as an *input port attribute* of A. The delay operator D will not appear in
> the logical DAG.  The engine will do the +1 on the window ID based on the
> presence of the input port attribute.  In this case, the delay operator
> does not need to specify any input port, just like the unifier, with the
> process(tuple) method implicitly taking in the tuples from the output port
> of B, which logically connects to the input port of A.
>
> 2) The delay operator is specified and connected *as any other operator* in
> the logical DAG.  The engine will do the +1 on the window ID if the
> operator implements the delay operator interface.  In this case, the delay
> operator D will need to specify at least one input port (just like a
> regular operator), and can actually have multiple input ports.
>
> I'm leaning toward the 2nd approach.
>
> Please share your thoughts.  Which one you think is better?  Or maybe
> suggest a different approach altogether?
>
> Thanks!
>
> David
>
> David
>
> On Wed, Oct 7, 2015 at 10:51 AM, Thomas Weise <th...@datatorrent.com>
> wrote:
>
> > Why not set the the delay operator as attribute? We already support
> > partitioners and stream codecs as attribute.
> >
> >
> > On Wed, Oct 7, 2015 at 10:09 AM, Pramod Immaneni <pramod@datatorrent.com
> >
> > wrote:
> >
> > > How about making just the window delay an attribute on the input port.
> > The
> > > operator connection is just like a normal DAG stream creation. We could
> > > also support connecting same operator to multiple input ports with
> > > different delay and handle fault recovery accordingly.
> > >
> > > On Wed, Oct 7, 2015 at 9:53 AM, David Yan <da...@datatorrent.com>
> wrote:
> > >
> > > > The iteration operator actually resembles the usage of unifiers.  We
> > have
> > > > getUnifier in the interface of OutputPort.
> > > >
> > > > But if we add getDelayOperator in the interface of InputPort, that
> > would
> > > > introduce backward incompatibility especially since we can't use the
> > > > default implementation feature of interfaces that is in Java 8.
> > > >
> > > > Putting the class object as an attribute of the InputPort is not good
> > > > either because you can't configure the delay operator itself.
> > > >
> > > > Thoughts?
> > > >
> > > > David
> > > >
> > > > On Fri, Sep 25, 2015 at 10:10 AM, David Yan <da...@datatorrent.com>
> > > wrote:
> > > >
> > > > > This is a very good idea.  This way, we can have a default
> > > implementation
> > > > > of that operator and the user can control how the tuples are stored
> > by
> > > > > having his/her own implementation.  How many windows the operator
> > > delays
> > > > is
> > > > > part of the implementation of that operator.
> > > > >
> > > > > I am thinking of getting rid of the ITERATION_WINDOW_OFFSET
> attribute
> > > and
> > > > > introduce a DELAY_OPERATOR_CLASS attribute so that the user can
> > specify
> > > > the
> > > > > delay operator class to be used.
> > > > >
> > > > > More thoughts?
> > > > >
> > > > > David
> > > > >
> > > > > On Thu, Sep 17, 2015 at 7:16 PM, Gaurav Gupta <
> > gaurav@datatorrent.com>
> > > > > wrote:
> > > > >
> > > > >> Hey David,
> > > > >>
> > > > >> I was thinking can we add another operator in front of the input
> > port
> > > > that
> > > > >> has ITERATION_WINDOW_COUNT set. The new additional operator will
> > have
> > > > >> property whose value  will be set equal to ITERATION_WINDOW_COUNT
> > and
> > > it
> > > > >> will be responsible for caching the data for those many windows
> and
> > > > >> delaying the data. This operator can act as unifier cum iterator
> > > > operator.
> > > > >> For this you may not need any external storage agent as platform
> > > > >> checkpointing should help you here.
> > > > >>
> > > > >> We are doing something similar for Sliding window.
> > > > >>
> > > > >> Thanks
> > > > >> -Gaurav
> > > > >>
> > > > >> On Wed, Sep 16, 2015 at 1:44 PM, David Yan <david@datatorrent.com
> >
> > > > wrote:
> > > > >>
> > > > >> > Hi all,
> > > > >> >
> > > > >> > One current disadvantage of Apex is the inability to do
> iterations
> > > and
> > > > >> > machine learning algorithms because we don't allow loops in the
> > > > >> application
> > > > >> > DAG (hence the name DAG).  I am proposing that we allow loops in
> > the
> > > > >> DAG if
> > > > >> > the loop advances the window ID by a configured amount.  A JIRA
> > > ticket
> > > > >> has
> > > > >> > been created:
> > > > >> >
> > > > >> > https://malhar.atlassian.net/browse/APEX-60
> > > > >> >
> > > > >> > I have started this work in my fork at
> > > > >> > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
> > > > >> >
> > > > >> > The current progress is that a simple test case works.  Major
> work
> > > > still
> > > > >> > needs to be done with respect to recovery and partitioning.
> > > > >> >
> > > > >> > The value ITERATION_WINDOW_COUNT is an attribute to an input
> port
> > of
> > > > an
> > > > >> > operator.  If the value of the attribute is greater than or
> equal
> > to
> > > > 1,
> > > > >> any
> > > > >> > tuples sent to the input port are treated to be
> > > ITERATION_WINDOW_COUNT
> > > > >> > windows ahead of what they are.
> > > > >> >
> > > > >> > For recovery, we will need to checkpoint all the tuples between
> > > ports
> > > > >> with
> > > > >> > the to replay the looped tuples.  During the recovery, if the
> > > operator
> > > > >> has
> > > > >> > an input port, with ITERATION_WINDOW_COUNT=2, is recovering from
> > > > >> checkpoint
> > > > >> > window 14, the tuples for that input port from window 13 and
> > window
> > > 14
> > > > >> need
> > > > >> > to be replayed to be treated as window 15 and window 16
> > respectively
> > > > >> (13+2
> > > > >> > and 14+2).
> > > > >> >
> > > > >> > In other words, we need to store all the tuples from window with
> > ID
> > > > >> > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and
> > > purge
> > > > >> the
> > > > >> > tuples earlier than that window.
> > > > >> > We can optimize this by only storing the tuples for
> > > > >> ITERATION_WINDOW_COUNT
> > > > >> > windows prior to any checkpoint.
> > > > >> >
> > > > >> > For that, we need a storage mechanism for the tuples.  Chandni
> > > already
> > > > >> has
> > > > >> > something that fits this usage case in Apex Malhar.  The class
> is
> > > > >> > IdempotentStorageManager.  In order for this to be used in Apex
> > > core,
> > > > we
> > > > >> > need to deprecate the class in Apex Malhar and move it to Apex
> > Core.
> > > > >> >
> > > > >> > A JIRA ticket has been created for this particular work:
> > > > >> >
> > > > >> > https://malhar.atlassian.net/browse/APEX-128
> > > > >> >
> > > > >> > Some of the above has been discussed among Thomas, Chetan,
> > Chandni,
> > > > and
> > > > >> > myself.
> > > > >> >
> > > > >> > For partitioning, we have not started any discussion or
> > > brainstorming.
> > > > >> We
> > > > >> > appreciate any feedback on this and any other aspect related to
> > > > >> supporting
> > > > >> > iterations in general.
> > > > >> >
> > > > >> > Thanks!
> > > > >> >
> > > > >> > David
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Re: Supporting iterations in Apex

Posted by Chandni Singh <ch...@datatorrent.com>.
+1 for 2nd approach.
I have 2 reasons - 1. As a user this looks simpler. 2. I think it will
benefit us to be able to specify multiple input ports.

On Thu, Oct 29, 2015 at 12:09 PM, Gaurav Gupta <ga...@datatorrent.com>
wrote:

> +1 for 1st approach as this Operator behaves just like an Unifier and
> actual connection is between B->A
>
> Thanks
> -Gaurav
>
> On Thu, Oct 29, 2015 at 11:11 AM, David Yan <da...@datatorrent.com> wrote:
>
> > This delay operator will act as an input operator for the first window
> and
> > act as a regular operator after that.
> > The engine will increment the window id of the windows from all the
> output
> > ports of the delay operator.
> >
> > We will need a new interface for the delay operator, extending the
> existing
> > Operator interface.  The interface will probably include:
> >
> > - Emitting the tuples for the first window
> > - Emitting the tuples after recovery
> >
> > We will provide a default implementation of the delay operator with a
> > write-ahead log that stores the tuples for the window before each
> > checkpoint for recovery.  We will also probably support the number of
> > windows to delay using an operator property.
> >
> > Let's look at this DAG with an iteration loop:
> >
> > upstream --> A --> B --> downstream
> >              ^     |
> >              |-----|
> >
> > With the delay operator, the physical view of the DAG looks like this
> with
> > D being the delay operator:
> >
> > upstream --> A --> B --> downstream
> >              ^     |
> >              |-D<--|
> >
> > There are two approaches for specifying the delay operator.
> >
> > 1) As discussed earlier on this thread, the delay operator can be
> specified
> > as an *input port attribute* of A. The delay operator D will not appear
> in
> > the logical DAG.  The engine will do the +1 on the window ID based on the
> > presence of the input port attribute.  In this case, the delay operator
> > does not need to specify any input port, just like the unifier, with the
> > process(tuple) method implicitly taking in the tuples from the output
> port
> > of B, which logically connects to the input port of A.
> >
> > 2) The delay operator is specified and connected *as any other operator*
> in
> > the logical DAG.  The engine will do the +1 on the window ID if the
> > operator implements the delay operator interface.  In this case, the
> delay
> > operator D will need to specify at least one input port (just like a
> > regular operator), and can actually have multiple input ports.
> >
> > I'm leaning toward the 2nd approach.
> >
> > Please share your thoughts.  Which one you think is better?  Or maybe
> > suggest a different approach altogether?
> >
> > Thanks!
> >
> > David
> >
> > David
> >
> > On Wed, Oct 7, 2015 at 10:51 AM, Thomas Weise <th...@datatorrent.com>
> > wrote:
> >
> > > Why not set the the delay operator as attribute? We already support
> > > partitioners and stream codecs as attribute.
> > >
> > >
> > > On Wed, Oct 7, 2015 at 10:09 AM, Pramod Immaneni <
> pramod@datatorrent.com
> > >
> > > wrote:
> > >
> > > > How about making just the window delay an attribute on the input
> port.
> > > The
> > > > operator connection is just like a normal DAG stream creation. We
> could
> > > > also support connecting same operator to multiple input ports with
> > > > different delay and handle fault recovery accordingly.
> > > >
> > > > On Wed, Oct 7, 2015 at 9:53 AM, David Yan <da...@datatorrent.com>
> > wrote:
> > > >
> > > > > The iteration operator actually resembles the usage of unifiers.
> We
> > > have
> > > > > getUnifier in the interface of OutputPort.
> > > > >
> > > > > But if we add getDelayOperator in the interface of InputPort, that
> > > would
> > > > > introduce backward incompatibility especially since we can't use
> the
> > > > > default implementation feature of interfaces that is in Java 8.
> > > > >
> > > > > Putting the class object as an attribute of the InputPort is not
> good
> > > > > either because you can't configure the delay operator itself.
> > > > >
> > > > > Thoughts?
> > > > >
> > > > > David
> > > > >
> > > > > On Fri, Sep 25, 2015 at 10:10 AM, David Yan <david@datatorrent.com
> >
> > > > wrote:
> > > > >
> > > > > > This is a very good idea.  This way, we can have a default
> > > > implementation
> > > > > > of that operator and the user can control how the tuples are
> stored
> > > by
> > > > > > having his/her own implementation.  How many windows the operator
> > > > delays
> > > > > is
> > > > > > part of the implementation of that operator.
> > > > > >
> > > > > > I am thinking of getting rid of the ITERATION_WINDOW_OFFSET
> > attribute
> > > > and
> > > > > > introduce a DELAY_OPERATOR_CLASS attribute so that the user can
> > > specify
> > > > > the
> > > > > > delay operator class to be used.
> > > > > >
> > > > > > More thoughts?
> > > > > >
> > > > > > David
> > > > > >
> > > > > > On Thu, Sep 17, 2015 at 7:16 PM, Gaurav Gupta <
> > > gaurav@datatorrent.com>
> > > > > > wrote:
> > > > > >
> > > > > >> Hey David,
> > > > > >>
> > > > > >> I was thinking can we add another operator in front of the input
> > > port
> > > > > that
> > > > > >> has ITERATION_WINDOW_COUNT set. The new additional operator will
> > > have
> > > > > >> property whose value  will be set equal to
> ITERATION_WINDOW_COUNT
> > > and
> > > > it
> > > > > >> will be responsible for caching the data for those many windows
> > and
> > > > > >> delaying the data. This operator can act as unifier cum iterator
> > > > > operator.
> > > > > >> For this you may not need any external storage agent as platform
> > > > > >> checkpointing should help you here.
> > > > > >>
> > > > > >> We are doing something similar for Sliding window.
> > > > > >>
> > > > > >> Thanks
> > > > > >> -Gaurav
> > > > > >>
> > > > > >> On Wed, Sep 16, 2015 at 1:44 PM, David Yan <
> david@datatorrent.com
> > >
> > > > > wrote:
> > > > > >>
> > > > > >> > Hi all,
> > > > > >> >
> > > > > >> > One current disadvantage of Apex is the inability to do
> > iterations
> > > > and
> > > > > >> > machine learning algorithms because we don't allow loops in
> the
> > > > > >> application
> > > > > >> > DAG (hence the name DAG).  I am proposing that we allow loops
> in
> > > the
> > > > > >> DAG if
> > > > > >> > the loop advances the window ID by a configured amount.  A
> JIRA
> > > > ticket
> > > > > >> has
> > > > > >> > been created:
> > > > > >> >
> > > > > >> > https://malhar.atlassian.net/browse/APEX-60
> > > > > >> >
> > > > > >> > I have started this work in my fork at
> > > > > >> >
> https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
> > > > > >> >
> > > > > >> > The current progress is that a simple test case works.  Major
> > work
> > > > > still
> > > > > >> > needs to be done with respect to recovery and partitioning.
> > > > > >> >
> > > > > >> > The value ITERATION_WINDOW_COUNT is an attribute to an input
> > port
> > > of
> > > > > an
> > > > > >> > operator.  If the value of the attribute is greater than or
> > equal
> > > to
> > > > > 1,
> > > > > >> any
> > > > > >> > tuples sent to the input port are treated to be
> > > > ITERATION_WINDOW_COUNT
> > > > > >> > windows ahead of what they are.
> > > > > >> >
> > > > > >> > For recovery, we will need to checkpoint all the tuples
> between
> > > > ports
> > > > > >> with
> > > > > >> > the to replay the looped tuples.  During the recovery, if the
> > > > operator
> > > > > >> has
> > > > > >> > an input port, with ITERATION_WINDOW_COUNT=2, is recovering
> from
> > > > > >> checkpoint
> > > > > >> > window 14, the tuples for that input port from window 13 and
> > > window
> > > > 14
> > > > > >> need
> > > > > >> > to be replayed to be treated as window 15 and window 16
> > > respectively
> > > > > >> (13+2
> > > > > >> > and 14+2).
> > > > > >> >
> > > > > >> > In other words, we need to store all the tuples from window
> with
> > > ID
> > > > > >> > committedWindowId minus ITERATION_WINDOW_COUNT for recovery
> and
> > > > purge
> > > > > >> the
> > > > > >> > tuples earlier than that window.
> > > > > >> > We can optimize this by only storing the tuples for
> > > > > >> ITERATION_WINDOW_COUNT
> > > > > >> > windows prior to any checkpoint.
> > > > > >> >
> > > > > >> > For that, we need a storage mechanism for the tuples.  Chandni
> > > > already
> > > > > >> has
> > > > > >> > something that fits this usage case in Apex Malhar.  The class
> > is
> > > > > >> > IdempotentStorageManager.  In order for this to be used in
> Apex
> > > > core,
> > > > > we
> > > > > >> > need to deprecate the class in Apex Malhar and move it to Apex
> > > Core.
> > > > > >> >
> > > > > >> > A JIRA ticket has been created for this particular work:
> > > > > >> >
> > > > > >> > https://malhar.atlassian.net/browse/APEX-128
> > > > > >> >
> > > > > >> > Some of the above has been discussed among Thomas, Chetan,
> > > Chandni,
> > > > > and
> > > > > >> > myself.
> > > > > >> >
> > > > > >> > For partitioning, we have not started any discussion or
> > > > brainstorming.
> > > > > >> We
> > > > > >> > appreciate any feedback on this and any other aspect related
> to
> > > > > >> supporting
> > > > > >> > iterations in general.
> > > > > >> >
> > > > > >> > Thanks!
> > > > > >> >
> > > > > >> > David
> > > > > >> >
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Supporting iterations in Apex

Posted by Gaurav Gupta <ga...@datatorrent.com>.
+1 for 1st approach as this Operator behaves just like an Unifier and
actual connection is between B->A

Thanks
-Gaurav

On Thu, Oct 29, 2015 at 11:11 AM, David Yan <da...@datatorrent.com> wrote:

> This delay operator will act as an input operator for the first window and
> act as a regular operator after that.
> The engine will increment the window id of the windows from all the output
> ports of the delay operator.
>
> We will need a new interface for the delay operator, extending the existing
> Operator interface.  The interface will probably include:
>
> - Emitting the tuples for the first window
> - Emitting the tuples after recovery
>
> We will provide a default implementation of the delay operator with a
> write-ahead log that stores the tuples for the window before each
> checkpoint for recovery.  We will also probably support the number of
> windows to delay using an operator property.
>
> Let's look at this DAG with an iteration loop:
>
> upstream --> A --> B --> downstream
>              ^     |
>              |-----|
>
> With the delay operator, the physical view of the DAG looks like this with
> D being the delay operator:
>
> upstream --> A --> B --> downstream
>              ^     |
>              |-D<--|
>
> There are two approaches for specifying the delay operator.
>
> 1) As discussed earlier on this thread, the delay operator can be specified
> as an *input port attribute* of A. The delay operator D will not appear in
> the logical DAG.  The engine will do the +1 on the window ID based on the
> presence of the input port attribute.  In this case, the delay operator
> does not need to specify any input port, just like the unifier, with the
> process(tuple) method implicitly taking in the tuples from the output port
> of B, which logically connects to the input port of A.
>
> 2) The delay operator is specified and connected *as any other operator* in
> the logical DAG.  The engine will do the +1 on the window ID if the
> operator implements the delay operator interface.  In this case, the delay
> operator D will need to specify at least one input port (just like a
> regular operator), and can actually have multiple input ports.
>
> I'm leaning toward the 2nd approach.
>
> Please share your thoughts.  Which one you think is better?  Or maybe
> suggest a different approach altogether?
>
> Thanks!
>
> David
>
> David
>
> On Wed, Oct 7, 2015 at 10:51 AM, Thomas Weise <th...@datatorrent.com>
> wrote:
>
> > Why not set the the delay operator as attribute? We already support
> > partitioners and stream codecs as attribute.
> >
> >
> > On Wed, Oct 7, 2015 at 10:09 AM, Pramod Immaneni <pramod@datatorrent.com
> >
> > wrote:
> >
> > > How about making just the window delay an attribute on the input port.
> > The
> > > operator connection is just like a normal DAG stream creation. We could
> > > also support connecting same operator to multiple input ports with
> > > different delay and handle fault recovery accordingly.
> > >
> > > On Wed, Oct 7, 2015 at 9:53 AM, David Yan <da...@datatorrent.com>
> wrote:
> > >
> > > > The iteration operator actually resembles the usage of unifiers.  We
> > have
> > > > getUnifier in the interface of OutputPort.
> > > >
> > > > But if we add getDelayOperator in the interface of InputPort, that
> > would
> > > > introduce backward incompatibility especially since we can't use the
> > > > default implementation feature of interfaces that is in Java 8.
> > > >
> > > > Putting the class object as an attribute of the InputPort is not good
> > > > either because you can't configure the delay operator itself.
> > > >
> > > > Thoughts?
> > > >
> > > > David
> > > >
> > > > On Fri, Sep 25, 2015 at 10:10 AM, David Yan <da...@datatorrent.com>
> > > wrote:
> > > >
> > > > > This is a very good idea.  This way, we can have a default
> > > implementation
> > > > > of that operator and the user can control how the tuples are stored
> > by
> > > > > having his/her own implementation.  How many windows the operator
> > > delays
> > > > is
> > > > > part of the implementation of that operator.
> > > > >
> > > > > I am thinking of getting rid of the ITERATION_WINDOW_OFFSET
> attribute
> > > and
> > > > > introduce a DELAY_OPERATOR_CLASS attribute so that the user can
> > specify
> > > > the
> > > > > delay operator class to be used.
> > > > >
> > > > > More thoughts?
> > > > >
> > > > > David
> > > > >
> > > > > On Thu, Sep 17, 2015 at 7:16 PM, Gaurav Gupta <
> > gaurav@datatorrent.com>
> > > > > wrote:
> > > > >
> > > > >> Hey David,
> > > > >>
> > > > >> I was thinking can we add another operator in front of the input
> > port
> > > > that
> > > > >> has ITERATION_WINDOW_COUNT set. The new additional operator will
> > have
> > > > >> property whose value  will be set equal to ITERATION_WINDOW_COUNT
> > and
> > > it
> > > > >> will be responsible for caching the data for those many windows
> and
> > > > >> delaying the data. This operator can act as unifier cum iterator
> > > > operator.
> > > > >> For this you may not need any external storage agent as platform
> > > > >> checkpointing should help you here.
> > > > >>
> > > > >> We are doing something similar for Sliding window.
> > > > >>
> > > > >> Thanks
> > > > >> -Gaurav
> > > > >>
> > > > >> On Wed, Sep 16, 2015 at 1:44 PM, David Yan <david@datatorrent.com
> >
> > > > wrote:
> > > > >>
> > > > >> > Hi all,
> > > > >> >
> > > > >> > One current disadvantage of Apex is the inability to do
> iterations
> > > and
> > > > >> > machine learning algorithms because we don't allow loops in the
> > > > >> application
> > > > >> > DAG (hence the name DAG).  I am proposing that we allow loops in
> > the
> > > > >> DAG if
> > > > >> > the loop advances the window ID by a configured amount.  A JIRA
> > > ticket
> > > > >> has
> > > > >> > been created:
> > > > >> >
> > > > >> > https://malhar.atlassian.net/browse/APEX-60
> > > > >> >
> > > > >> > I have started this work in my fork at
> > > > >> > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
> > > > >> >
> > > > >> > The current progress is that a simple test case works.  Major
> work
> > > > still
> > > > >> > needs to be done with respect to recovery and partitioning.
> > > > >> >
> > > > >> > The value ITERATION_WINDOW_COUNT is an attribute to an input
> port
> > of
> > > > an
> > > > >> > operator.  If the value of the attribute is greater than or
> equal
> > to
> > > > 1,
> > > > >> any
> > > > >> > tuples sent to the input port are treated to be
> > > ITERATION_WINDOW_COUNT
> > > > >> > windows ahead of what they are.
> > > > >> >
> > > > >> > For recovery, we will need to checkpoint all the tuples between
> > > ports
> > > > >> with
> > > > >> > the to replay the looped tuples.  During the recovery, if the
> > > operator
> > > > >> has
> > > > >> > an input port, with ITERATION_WINDOW_COUNT=2, is recovering from
> > > > >> checkpoint
> > > > >> > window 14, the tuples for that input port from window 13 and
> > window
> > > 14
> > > > >> need
> > > > >> > to be replayed to be treated as window 15 and window 16
> > respectively
> > > > >> (13+2
> > > > >> > and 14+2).
> > > > >> >
> > > > >> > In other words, we need to store all the tuples from window with
> > ID
> > > > >> > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and
> > > purge
> > > > >> the
> > > > >> > tuples earlier than that window.
> > > > >> > We can optimize this by only storing the tuples for
> > > > >> ITERATION_WINDOW_COUNT
> > > > >> > windows prior to any checkpoint.
> > > > >> >
> > > > >> > For that, we need a storage mechanism for the tuples.  Chandni
> > > already
> > > > >> has
> > > > >> > something that fits this usage case in Apex Malhar.  The class
> is
> > > > >> > IdempotentStorageManager.  In order for this to be used in Apex
> > > core,
> > > > we
> > > > >> > need to deprecate the class in Apex Malhar and move it to Apex
> > Core.
> > > > >> >
> > > > >> > A JIRA ticket has been created for this particular work:
> > > > >> >
> > > > >> > https://malhar.atlassian.net/browse/APEX-128
> > > > >> >
> > > > >> > Some of the above has been discussed among Thomas, Chetan,
> > Chandni,
> > > > and
> > > > >> > myself.
> > > > >> >
> > > > >> > For partitioning, we have not started any discussion or
> > > brainstorming.
> > > > >> We
> > > > >> > appreciate any feedback on this and any other aspect related to
> > > > >> supporting
> > > > >> > iterations in general.
> > > > >> >
> > > > >> > Thanks!
> > > > >> >
> > > > >> > David
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Re: Supporting iterations in Apex

Posted by Vlad Rozov <v....@datatorrent.com>.
+1 to option #2 - the delay operator is an explicit operator in the DAG 
(with D and A dropped from the name after the feature is implemented :-) 
). After discussion with David I think it is more flexible option that 
can be simplified if necessary by using higher level API.

Thank you,

Vlad

On 10/29/15 11:11, David Yan wrote:
> This delay operator will act as an input operator for the first window and
> act as a regular operator after that.
> The engine will increment the window id of the windows from all the output
> ports of the delay operator.
>
> We will need a new interface for the delay operator, extending the existing
> Operator interface.  The interface will probably include:
>
> - Emitting the tuples for the first window
> - Emitting the tuples after recovery
>
> We will provide a default implementation of the delay operator with a
> write-ahead log that stores the tuples for the window before each
> checkpoint for recovery.  We will also probably support the number of
> windows to delay using an operator property.
>
> Let's look at this DAG with an iteration loop:
>
> upstream --> A --> B --> downstream
>               ^     |
>               |-----|
>
> With the delay operator, the physical view of the DAG looks like this with
> D being the delay operator:
>
> upstream --> A --> B --> downstream
>               ^     |
>               |-D<--|
>
> There are two approaches for specifying the delay operator.
>
> 1) As discussed earlier on this thread, the delay operator can be specified
> as an *input port attribute* of A. The delay operator D will not appear in
> the logical DAG.  The engine will do the +1 on the window ID based on the
> presence of the input port attribute.  In this case, the delay operator
> does not need to specify any input port, just like the unifier, with the
> process(tuple) method implicitly taking in the tuples from the output port
> of B, which logically connects to the input port of A.
>
> 2) The delay operator is specified and connected *as any other operator* in
> the logical DAG.  The engine will do the +1 on the window ID if the
> operator implements the delay operator interface.  In this case, the delay
> operator D will need to specify at least one input port (just like a
> regular operator), and can actually have multiple input ports.
>
> I'm leaning toward the 2nd approach.
>
> Please share your thoughts.  Which one you think is better?  Or maybe
> suggest a different approach altogether?
>
> Thanks!
>
> David
>
> David
>
> On Wed, Oct 7, 2015 at 10:51 AM, Thomas Weise <th...@datatorrent.com>
> wrote:
>
>> Why not set the the delay operator as attribute? We already support
>> partitioners and stream codecs as attribute.
>>
>>
>> On Wed, Oct 7, 2015 at 10:09 AM, Pramod Immaneni <pr...@datatorrent.com>
>> wrote:
>>
>>> How about making just the window delay an attribute on the input port.
>> The
>>> operator connection is just like a normal DAG stream creation. We could
>>> also support connecting same operator to multiple input ports with
>>> different delay and handle fault recovery accordingly.
>>>
>>> On Wed, Oct 7, 2015 at 9:53 AM, David Yan <da...@datatorrent.com> wrote:
>>>
>>>> The iteration operator actually resembles the usage of unifiers.  We
>> have
>>>> getUnifier in the interface of OutputPort.
>>>>
>>>> But if we add getDelayOperator in the interface of InputPort, that
>> would
>>>> introduce backward incompatibility especially since we can't use the
>>>> default implementation feature of interfaces that is in Java 8.
>>>>
>>>> Putting the class object as an attribute of the InputPort is not good
>>>> either because you can't configure the delay operator itself.
>>>>
>>>> Thoughts?
>>>>
>>>> David
>>>>
>>>> On Fri, Sep 25, 2015 at 10:10 AM, David Yan <da...@datatorrent.com>
>>> wrote:
>>>>> This is a very good idea.  This way, we can have a default
>>> implementation
>>>>> of that operator and the user can control how the tuples are stored
>> by
>>>>> having his/her own implementation.  How many windows the operator
>>> delays
>>>> is
>>>>> part of the implementation of that operator.
>>>>>
>>>>> I am thinking of getting rid of the ITERATION_WINDOW_OFFSET attribute
>>> and
>>>>> introduce a DELAY_OPERATOR_CLASS attribute so that the user can
>> specify
>>>> the
>>>>> delay operator class to be used.
>>>>>
>>>>> More thoughts?
>>>>>
>>>>> David
>>>>>
>>>>> On Thu, Sep 17, 2015 at 7:16 PM, Gaurav Gupta <
>> gaurav@datatorrent.com>
>>>>> wrote:
>>>>>
>>>>>> Hey David,
>>>>>>
>>>>>> I was thinking can we add another operator in front of the input
>> port
>>>> that
>>>>>> has ITERATION_WINDOW_COUNT set. The new additional operator will
>> have
>>>>>> property whose value  will be set equal to ITERATION_WINDOW_COUNT
>> and
>>> it
>>>>>> will be responsible for caching the data for those many windows and
>>>>>> delaying the data. This operator can act as unifier cum iterator
>>>> operator.
>>>>>> For this you may not need any external storage agent as platform
>>>>>> checkpointing should help you here.
>>>>>>
>>>>>> We are doing something similar for Sliding window.
>>>>>>
>>>>>> Thanks
>>>>>> -Gaurav
>>>>>>
>>>>>> On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com>
>>>> wrote:
>>>>>>> Hi all,
>>>>>>>
>>>>>>> One current disadvantage of Apex is the inability to do iterations
>>> and
>>>>>>> machine learning algorithms because we don't allow loops in the
>>>>>> application
>>>>>>> DAG (hence the name DAG).  I am proposing that we allow loops in
>> the
>>>>>> DAG if
>>>>>>> the loop advances the window ID by a configured amount.  A JIRA
>>> ticket
>>>>>> has
>>>>>>> been created:
>>>>>>>
>>>>>>> https://malhar.atlassian.net/browse/APEX-60
>>>>>>>
>>>>>>> I have started this work in my fork at
>>>>>>> https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
>>>>>>>
>>>>>>> The current progress is that a simple test case works.  Major work
>>>> still
>>>>>>> needs to be done with respect to recovery and partitioning.
>>>>>>>
>>>>>>> The value ITERATION_WINDOW_COUNT is an attribute to an input port
>> of
>>>> an
>>>>>>> operator.  If the value of the attribute is greater than or equal
>> to
>>>> 1,
>>>>>> any
>>>>>>> tuples sent to the input port are treated to be
>>> ITERATION_WINDOW_COUNT
>>>>>>> windows ahead of what they are.
>>>>>>>
>>>>>>> For recovery, we will need to checkpoint all the tuples between
>>> ports
>>>>>> with
>>>>>>> the to replay the looped tuples.  During the recovery, if the
>>> operator
>>>>>> has
>>>>>>> an input port, with ITERATION_WINDOW_COUNT=2, is recovering from
>>>>>> checkpoint
>>>>>>> window 14, the tuples for that input port from window 13 and
>> window
>>> 14
>>>>>> need
>>>>>>> to be replayed to be treated as window 15 and window 16
>> respectively
>>>>>> (13+2
>>>>>>> and 14+2).
>>>>>>>
>>>>>>> In other words, we need to store all the tuples from window with
>> ID
>>>>>>> committedWindowId minus ITERATION_WINDOW_COUNT for recovery and
>>> purge
>>>>>> the
>>>>>>> tuples earlier than that window.
>>>>>>> We can optimize this by only storing the tuples for
>>>>>> ITERATION_WINDOW_COUNT
>>>>>>> windows prior to any checkpoint.
>>>>>>>
>>>>>>> For that, we need a storage mechanism for the tuples.  Chandni
>>> already
>>>>>> has
>>>>>>> something that fits this usage case in Apex Malhar.  The class is
>>>>>>> IdempotentStorageManager.  In order for this to be used in Apex
>>> core,
>>>> we
>>>>>>> need to deprecate the class in Apex Malhar and move it to Apex
>> Core.
>>>>>>> A JIRA ticket has been created for this particular work:
>>>>>>>
>>>>>>> https://malhar.atlassian.net/browse/APEX-128
>>>>>>>
>>>>>>> Some of the above has been discussed among Thomas, Chetan,
>> Chandni,
>>>> and
>>>>>>> myself.
>>>>>>>
>>>>>>> For partitioning, we have not started any discussion or
>>> brainstorming.
>>>>>> We
>>>>>>> appreciate any feedback on this and any other aspect related to
>>>>>> supporting
>>>>>>> iterations in general.
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> David
>>>>>>>
>>>>>


Re: Supporting iterations in Apex

Posted by David Yan <da...@datatorrent.com>.
Interesting idea about chaining multiple delay operators.
I think there's nothing that prevents us from supporting that scenario in
both options, except in the case for option 1, there needs be an extra
dedicated delay operator for each feedback loop.


On Tue, Nov 3, 2015 at 3:34 PM, Timothy Farkas <ti...@datatorrent.com> wrote:

> +1 for option 2
>
> Also would it be possible to chain delay operators? A lot of stochastic and
> adaptive methods depend on finding the correlations between the current
> time step n and previous k time steps  (n, n - 1), (n, n - 2), (n, n - 3)
> ... (n, n - k)
>
> Here is a picture of a model that uses delay operators (in the picture
> these are represented by z^-1) and is used for time series prediction.
>
>
> https://upload.wikimedia.org/wikipedia/commons/thumb/d/d2/IIRFilter2.svg/250px-IIRFilter2.svg.png
>
> On Tue, Nov 3, 2015 at 3:14 PM, Sasha Parfenov <sa...@datatorrent.com>
> wrote:
>
> > +1 for option 2.  Although option 2 doesn't mirror the current unifiers
> > like option 1, and may look more complicated when viewing logical plan, I
> > think the benefits of flexibility of specifying locality and ability to
> > bring multiple downstream operators into a single delay operators may be
> > important for some projects.  For me the added flexibility wins,
> > particularly in light of efforts towards a simpler high level API.
> >
> >
> >
> > On Mon, Nov 2, 2015 at 11:25 AM, David Yan <da...@datatorrent.com>
> wrote:
> >
> > > Please share your thoughts using the dev mailing list on this topic if
> > you
> > > can.  Thanks.
> > >
> > > ---------- Forwarded message ----------
> > > From: David Yan <da...@datatorrent.com>
> > > Date: Thu, Oct 29, 2015 at 11:11 AM
> > > Subject: Re: Supporting iterations in Apex
> > > To: dev@apex.incubator.apache.org
> > >
> > >
> > > This delay operator will act as an input operator for the first window
> > and
> > > act as a regular operator after that.
> > > The engine will increment the window id of the windows from all the
> > output
> > > ports of the delay operator.
> > >
> > > We will need a new interface for the delay operator, extending the
> > > existing Operator interface.  The interface will probably include:
> > >
> > > - Emitting the tuples for the first window
> > > - Emitting the tuples after recovery
> > >
> > > We will provide a default implementation of the delay operator with a
> > > write-ahead log that stores the tuples for the window before each
> > > checkpoint for recovery.  We will also probably support the number of
> > > windows to delay using an operator property.
> > >
> > > Let's look at this DAG with an iteration loop:
> > >
> > > upstream --> A --> B --> downstream
> > >              ^     |
> > >              |-----|
> > >
> > > With the delay operator, the physical view of the DAG looks like this
> > with
> > > D being the delay operator:
> > >
> > > upstream --> A --> B --> downstream
> > >              ^     |
> > >              |-D<--|
> > >
> > > There are two approaches for specifying the delay operator.
> > >
> > > 1) As discussed earlier on this thread, the delay operator can be
> > > specified as an *input port attribute* of A. The delay operator D will
> > > not appear in the logical DAG.  The engine will do the +1 on the window
> > ID
> > > based on the presence of the input port attribute.  In this case, the
> > delay
> > > operator does not need to specify any input port, just like the
> unifier,
> > > with the process(tuple) method implicitly taking in the tuples from the
> > > output port of B, which logically connects to the input port of A.
> > >
> > > 2) The delay operator is specified and connected *as any other
> operator*
> > > in the logical DAG.  The engine will do the +1 on the window ID if the
> > > operator implements the delay operator interface.  In this case, the
> > delay
> > > operator D will need to specify at least one input port (just like a
> > > regular operator), and can actually have multiple input ports.
> > >
> > > I'm leaning toward the 2nd approach.
> > >
> > > Please share your thoughts.  Which one you think is better?  Or maybe
> > > suggest a different approach altogether?
> > >
> > > Thanks!
> > >
> > > David
> > >
> > > David
> > >
> > > On Wed, Oct 7, 2015 at 10:51 AM, Thomas Weise <th...@datatorrent.com>
> > > wrote:
> > >
> > >> Why not set the the delay operator as attribute? We already support
> > >> partitioners and stream codecs as attribute.
> > >>
> > >>
> > >> On Wed, Oct 7, 2015 at 10:09 AM, Pramod Immaneni <
> > pramod@datatorrent.com>
> > >> wrote:
> > >>
> > >> > How about making just the window delay an attribute on the input
> port.
> > >> The
> > >> > operator connection is just like a normal DAG stream creation. We
> > could
> > >> > also support connecting same operator to multiple input ports with
> > >> > different delay and handle fault recovery accordingly.
> > >> >
> > >> > On Wed, Oct 7, 2015 at 9:53 AM, David Yan <da...@datatorrent.com>
> > >> wrote:
> > >> >
> > >> > > The iteration operator actually resembles the usage of unifiers.
> We
> > >> have
> > >> > > getUnifier in the interface of OutputPort.
> > >> > >
> > >> > > But if we add getDelayOperator in the interface of InputPort, that
> > >> would
> > >> > > introduce backward incompatibility especially since we can't use
> the
> > >> > > default implementation feature of interfaces that is in Java 8.
> > >> > >
> > >> > > Putting the class object as an attribute of the InputPort is not
> > good
> > >> > > either because you can't configure the delay operator itself.
> > >> > >
> > >> > > Thoughts?
> > >> > >
> > >> > > David
> > >> > >
> > >> > > On Fri, Sep 25, 2015 at 10:10 AM, David Yan <
> david@datatorrent.com>
> > >> > wrote:
> > >> > >
> > >> > > > This is a very good idea.  This way, we can have a default
> > >> > implementation
> > >> > > > of that operator and the user can control how the tuples are
> > stored
> > >> by
> > >> > > > having his/her own implementation.  How many windows the
> operator
> > >> > delays
> > >> > > is
> > >> > > > part of the implementation of that operator.
> > >> > > >
> > >> > > > I am thinking of getting rid of the ITERATION_WINDOW_OFFSET
> > >> attribute
> > >> > and
> > >> > > > introduce a DELAY_OPERATOR_CLASS attribute so that the user can
> > >> specify
> > >> > > the
> > >> > > > delay operator class to be used.
> > >> > > >
> > >> > > > More thoughts?
> > >> > > >
> > >> > > > David
> > >> > > >
> > >> > > > On Thu, Sep 17, 2015 at 7:16 PM, Gaurav Gupta <
> > >> gaurav@datatorrent.com>
> > >> > > > wrote:
> > >> > > >
> > >> > > >> Hey David,
> > >> > > >>
> > >> > > >> I was thinking can we add another operator in front of the
> input
> > >> port
> > >> > > that
> > >> > > >> has ITERATION_WINDOW_COUNT set. The new additional operator
> will
> > >> have
> > >> > > >> property whose value  will be set equal to
> ITERATION_WINDOW_COUNT
> > >> and
> > >> > it
> > >> > > >> will be responsible for caching the data for those many windows
> > and
> > >> > > >> delaying the data. This operator can act as unifier cum
> iterator
> > >> > > operator.
> > >> > > >> For this you may not need any external storage agent as
> platform
> > >> > > >> checkpointing should help you here.
> > >> > > >>
> > >> > > >> We are doing something similar for Sliding window.
> > >> > > >>
> > >> > > >> Thanks
> > >> > > >> -Gaurav
> > >> > > >>
> > >> > > >> On Wed, Sep 16, 2015 at 1:44 PM, David Yan <
> > david@datatorrent.com>
> > >> > > wrote:
> > >> > > >>
> > >> > > >> > Hi all,
> > >> > > >> >
> > >> > > >> > One current disadvantage of Apex is the inability to do
> > >> iterations
> > >> > and
> > >> > > >> > machine learning algorithms because we don't allow loops in
> the
> > >> > > >> application
> > >> > > >> > DAG (hence the name DAG).  I am proposing that we allow loops
> > in
> > >> the
> > >> > > >> DAG if
> > >> > > >> > the loop advances the window ID by a configured amount.  A
> JIRA
> > >> > ticket
> > >> > > >> has
> > >> > > >> > been created:
> > >> > > >> >
> > >> > > >> > https://malhar.atlassian.net/browse/APEX-60
> > >> > > >> >
> > >> > > >> > I have started this work in my fork at
> > >> > > >> >
> https://github.com/davidyan74/incubator-apex-core/tree/APEX-60
> > .
> > >> > > >> >
> > >> > > >> > The current progress is that a simple test case works.  Major
> > >> work
> > >> > > still
> > >> > > >> > needs to be done with respect to recovery and partitioning.
> > >> > > >> >
> > >> > > >> > The value ITERATION_WINDOW_COUNT is an attribute to an input
> > >> port of
> > >> > > an
> > >> > > >> > operator.  If the value of the attribute is greater than or
> > >> equal to
> > >> > > 1,
> > >> > > >> any
> > >> > > >> > tuples sent to the input port are treated to be
> > >> > ITERATION_WINDOW_COUNT
> > >> > > >> > windows ahead of what they are.
> > >> > > >> >
> > >> > > >> > For recovery, we will need to checkpoint all the tuples
> between
> > >> > ports
> > >> > > >> with
> > >> > > >> > the to replay the looped tuples.  During the recovery, if the
> > >> > operator
> > >> > > >> has
> > >> > > >> > an input port, with ITERATION_WINDOW_COUNT=2, is recovering
> > from
> > >> > > >> checkpoint
> > >> > > >> > window 14, the tuples for that input port from window 13 and
> > >> window
> > >> > 14
> > >> > > >> need
> > >> > > >> > to be replayed to be treated as window 15 and window 16
> > >> respectively
> > >> > > >> (13+2
> > >> > > >> > and 14+2).
> > >> > > >> >
> > >> > > >> > In other words, we need to store all the tuples from window
> > with
> > >> ID
> > >> > > >> > committedWindowId minus ITERATION_WINDOW_COUNT for recovery
> and
> > >> > purge
> > >> > > >> the
> > >> > > >> > tuples earlier than that window.
> > >> > > >> > We can optimize this by only storing the tuples for
> > >> > > >> ITERATION_WINDOW_COUNT
> > >> > > >> > windows prior to any checkpoint.
> > >> > > >> >
> > >> > > >> > For that, we need a storage mechanism for the tuples.
> Chandni
> > >> > already
> > >> > > >> has
> > >> > > >> > something that fits this usage case in Apex Malhar.  The
> class
> > is
> > >> > > >> > IdempotentStorageManager.  In order for this to be used in
> Apex
> > >> > core,
> > >> > > we
> > >> > > >> > need to deprecate the class in Apex Malhar and move it to
> Apex
> > >> Core.
> > >> > > >> >
> > >> > > >> > A JIRA ticket has been created for this particular work:
> > >> > > >> >
> > >> > > >> > https://malhar.atlassian.net/browse/APEX-128
> > >> > > >> >
> > >> > > >> > Some of the above has been discussed among Thomas, Chetan,
> > >> Chandni,
> > >> > > and
> > >> > > >> > myself.
> > >> > > >> >
> > >> > > >> > For partitioning, we have not started any discussion or
> > >> > brainstorming.
> > >> > > >> We
> > >> > > >> > appreciate any feedback on this and any other aspect related
> to
> > >> > > >> supporting
> > >> > > >> > iterations in general.
> > >> > > >> >
> > >> > > >> > Thanks!
> > >> > > >> >
> > >> > > >> > David
> > >> > > >> >
> > >> > > >>
> > >> > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> > >
> >
>

Re: Supporting iterations in Apex

Posted by David Yan <da...@datatorrent.com>.
FYI, a PR has been issued to Apex Core for iteration support in the engine.


https://github.com/apache/incubator-apex-core/pull/185

Also, another review-only PR has been issued to Apex Malhar to demonstrate
a simple case for iteration.

https://github.com/apache/incubator-apex-malhar/pull/122

I would appreciate any feedback.  Thanks!

David

On Wed, Nov 11, 2015 at 2:27 PM, David Yan <da...@datatorrent.com> wrote:

> Per Pramod's suggestion, I added another test that simulates Fibonacci
> sequence.
>
> You can try it out from APEX-60 branch in my fork.
>
> (cd engine; mvn test -Dtest=LogicalPlanTest#testFibonacci)
>
> David
>
> On Wed, Nov 4, 2015 at 6:27 PM, David Yan <da...@datatorrent.com> wrote:
>
>> The JIRAs have been created.
>> You can start from running the small unit test for iteration from the
>> APEX-60 branch like this:
>>
>> mvn test -Dtest=LogicalPlanTest#testIteration
>>
>> Run it under engine directory.  Let me know if you run into issues.
>>
>> David
>>
>> On Wed, Nov 4, 2015 at 1:01 PM, David Yan <da...@datatorrent.com> wrote:
>>
>>> fork; https://github.com/davidyan74/incubator-apex-core
>>> branch: APEX-60
>>>
>>> I'll create the JIRA subtasks soon.
>>>
>>> David
>>>
>>> On Wed, Nov 4, 2015 at 11:56 AM, Pramod Immaneni <pramod@datatorrent.com
>>> > wrote:
>>>
>>>> Can you also add your development branch and fork information to the
>>>> JIRA?
>>>>
>>>> Thanks
>>>>
>>>> On Wed, Nov 4, 2015 at 11:09 AM, Pramod Immaneni <
>>>> pramod@datatorrent.com>
>>>> wrote:
>>>>
>>>> > I will look at it along with 5 if you are not doing that already.
>>>> >
>>>> > On Wed, Nov 4, 2015 at 11:08 AM, David Yan <da...@datatorrent.com>
>>>> wrote:
>>>> >
>>>> >> Yes, let's add that.  During that time, firstWindow() will need to be
>>>> >> called.  That should be done before 5.
>>>> >>
>>>> >> On Wed, Nov 4, 2015 at 11:05 AM, Pramod Immaneni <
>>>> pramod@datatorrent.com>
>>>> >> wrote:
>>>> >>
>>>> >> > I can help with 5. What about the engine introducing the synthetic
>>>> >> window
>>>> >> > when recovering from failure or at the start of the application.
>>>> >> >
>>>> >> > On Wed, Nov 4, 2015 at 10:41 AM, David Yan <da...@datatorrent.com>
>>>> >> wrote:
>>>> >> >
>>>> >> > > Thanks Pramod and Sandesh for offering to help.
>>>> >> > >
>>>> >> > > Here's my rough plan:
>>>> >> > >
>>>> >> > > 1. Add DelayOperator interface.  The DelayOperator will have a
>>>> method
>>>> >> > > firstWindow(long windowId).  Implementations of this interface is
>>>> >> > supposed
>>>> >> > > to emit tuples for the first window of the execution of the
>>>> operator
>>>> >> > > (either the first window of the execution of the application or
>>>> the
>>>> >> first
>>>> >> > > window after recovery).
>>>> >> > >
>>>> >> > > 2. Add SimpleDelayOperator that implements DelayOperator.  It
>>>> has one
>>>> >> > input
>>>> >> > > port and one output port.  It simply passes the tuples from the
>>>> input
>>>> >> > port
>>>> >> > > to the output port and does not do anything for firstWindow()
>>>> call.
>>>> >> > >
>>>> >> > > 3. Engine (e.g. DAG validation) to support loops in case of
>>>> >> > DelayOperator.
>>>> >> > >
>>>> >> > > 4. Implement the +1 delay in the engine for input ports that are
>>>> >> > connected
>>>> >> > > to output ports of DelayOperator.
>>>> >> > >
>>>> >> > > 5. Add capability in the engine to let the operator know when is
>>>> the
>>>> >> next
>>>> >> > > checkpoint window.
>>>> >> > >
>>>> >> > > 6. Add DefaultDelayOperator that extends SimpleDelayOperator.  It
>>>> >> writes
>>>> >> > > the tuples in the window before each checkpoint to a DFS-backed
>>>> WAL
>>>> >> > (using
>>>> >> > > item 5 above), and it overrides firstWindow() to read from the
>>>> WAL and
>>>> >> > > emits the tuples at recovery.
>>>> >> > >
>>>> >> > > 7. Add +N delay capability (in addition to +1) in the
>>>> >> > DefaultDelayOperator.
>>>> >> > >
>>>> >> > > Let me know whether this plan sounds good to you.
>>>> >> > >
>>>> >> > > I'm done with 1, 2, and 3.  4 is in progress.
>>>> >> > > I think the bulk of the work is 5 and 6 and we can discuss how
>>>> we can
>>>> >> > > divide the work.
>>>> >> > > 7 is a nice-to-have and does not have to be done unless there is
>>>> a
>>>> >> > demand.
>>>> >> > >
>>>> >> > > David
>>>> >> > >
>>>> >> > >
>>>> >> > > On Wed, Nov 4, 2015 at 10:11 AM, Pramod Immaneni <
>>>> >> pramod@datatorrent.com
>>>> >> > >
>>>> >> > > wrote:
>>>> >> > >
>>>> >> > > > I would like to help. I might be able to pick up some of the
>>>> smaller
>>>> >> > > tasks.
>>>> >> > > >
>>>> >> > > > On Wed, Nov 4, 2015 at 10:05 AM, David Yan <
>>>> david@datatorrent.com>
>>>> >> > > wrote:
>>>> >> > > >
>>>> >> > > > > Thank you for all your feedback.  Looks like option #2 wins.
>>>> >> > > > >
>>>> >> > > > > I will be working on this in November and please let me know
>>>> if
>>>> >> you'd
>>>> >> > > > like
>>>> >> > > > > to join the effort!
>>>> >> > > > >
>>>> >> > > > > On Wed, Nov 4, 2015 at 9:58 AM, Thomas Weise <
>>>> >> thomas@datatorrent.com
>>>> >> > >
>>>> >> > > > > wrote:
>>>> >> > > > >
>>>> >> > > > > > Agreed, there is no ambiguity.
>>>> >> > > > > >
>>>> >> > > > > > #2 will also allow the user to tune locality as there are
>>>> no
>>>> >> > implicit
>>>> >> > > > > > streams as opposed to the unifier like approach.
>>>> >> > > > > >
>>>> >> > > > > > On Wed, Nov 4, 2015 at 9:54 AM, David Yan <
>>>> >> david@datatorrent.com>
>>>> >> > > > wrote:
>>>> >> > > > > >
>>>> >> > > > > > > On Tue, Nov 3, 2015 at 9:57 PM, Thomas Weise <
>>>> >> > > thomas@datatorrent.com
>>>> >> > > > >
>>>> >> > > > > > > wrote:
>>>> >> > > > > > >
>>>> >> > > > > > > >
>>>> >> > > > > > > > #2 will address that. But if an operator with the delay
>>>> >> > interface
>>>> >> > > > has
>>>> >> > > > > > > > multiple input ports, on which port will the engine
>>>> perform
>>>> >> the
>>>> >> > > > > delay?
>>>> >> > > > > > > > Maybe we will need to validate that a delay operator
>>>> can
>>>> >> only
>>>> >> > > have
>>>> >> > > > a
>>>> >> > > > > > > single
>>>> >> > > > > > > > input port?
>>>> >> > > > > > > >
>>>> >> > > > > > >
>>>> >> > > > > > > My understanding is that the engine performs the +1
>>>> delay on
>>>> >> the
>>>> >> > > > input
>>>> >> > > > > > > ports of operators that are connected to output ports of
>>>> delay
>>>> >> > > > > operators.
>>>> >> > > > > > > So whether or not the delay operator has multiple input
>>>> ports
>>>> >> > > should
>>>> >> > > > > not
>>>> >> > > > > > > matter.
>>>> >> > > > > > >
>>>> >> > > > > >
>>>> >> > > > >
>>>> >> > > >
>>>> >> > >
>>>> >> >
>>>> >>
>>>> >
>>>> >
>>>>
>>>
>>>
>>
>

Re: Supporting iterations in Apex

Posted by David Yan <da...@datatorrent.com>.
Per Pramod's suggestion, I added another test that simulates Fibonacci
sequence.

You can try it out from APEX-60 branch in my fork.

(cd engine; mvn test -Dtest=LogicalPlanTest#testFibonacci)

David

On Wed, Nov 4, 2015 at 6:27 PM, David Yan <da...@datatorrent.com> wrote:

> The JIRAs have been created.
> You can start from running the small unit test for iteration from the
> APEX-60 branch like this:
>
> mvn test -Dtest=LogicalPlanTest#testIteration
>
> Run it under engine directory.  Let me know if you run into issues.
>
> David
>
> On Wed, Nov 4, 2015 at 1:01 PM, David Yan <da...@datatorrent.com> wrote:
>
>> fork; https://github.com/davidyan74/incubator-apex-core
>> branch: APEX-60
>>
>> I'll create the JIRA subtasks soon.
>>
>> David
>>
>> On Wed, Nov 4, 2015 at 11:56 AM, Pramod Immaneni <pr...@datatorrent.com>
>> wrote:
>>
>>> Can you also add your development branch and fork information to the
>>> JIRA?
>>>
>>> Thanks
>>>
>>> On Wed, Nov 4, 2015 at 11:09 AM, Pramod Immaneni <pramod@datatorrent.com
>>> >
>>> wrote:
>>>
>>> > I will look at it along with 5 if you are not doing that already.
>>> >
>>> > On Wed, Nov 4, 2015 at 11:08 AM, David Yan <da...@datatorrent.com>
>>> wrote:
>>> >
>>> >> Yes, let's add that.  During that time, firstWindow() will need to be
>>> >> called.  That should be done before 5.
>>> >>
>>> >> On Wed, Nov 4, 2015 at 11:05 AM, Pramod Immaneni <
>>> pramod@datatorrent.com>
>>> >> wrote:
>>> >>
>>> >> > I can help with 5. What about the engine introducing the synthetic
>>> >> window
>>> >> > when recovering from failure or at the start of the application.
>>> >> >
>>> >> > On Wed, Nov 4, 2015 at 10:41 AM, David Yan <da...@datatorrent.com>
>>> >> wrote:
>>> >> >
>>> >> > > Thanks Pramod and Sandesh for offering to help.
>>> >> > >
>>> >> > > Here's my rough plan:
>>> >> > >
>>> >> > > 1. Add DelayOperator interface.  The DelayOperator will have a
>>> method
>>> >> > > firstWindow(long windowId).  Implementations of this interface is
>>> >> > supposed
>>> >> > > to emit tuples for the first window of the execution of the
>>> operator
>>> >> > > (either the first window of the execution of the application or
>>> the
>>> >> first
>>> >> > > window after recovery).
>>> >> > >
>>> >> > > 2. Add SimpleDelayOperator that implements DelayOperator.  It has
>>> one
>>> >> > input
>>> >> > > port and one output port.  It simply passes the tuples from the
>>> input
>>> >> > port
>>> >> > > to the output port and does not do anything for firstWindow()
>>> call.
>>> >> > >
>>> >> > > 3. Engine (e.g. DAG validation) to support loops in case of
>>> >> > DelayOperator.
>>> >> > >
>>> >> > > 4. Implement the +1 delay in the engine for input ports that are
>>> >> > connected
>>> >> > > to output ports of DelayOperator.
>>> >> > >
>>> >> > > 5. Add capability in the engine to let the operator know when is
>>> the
>>> >> next
>>> >> > > checkpoint window.
>>> >> > >
>>> >> > > 6. Add DefaultDelayOperator that extends SimpleDelayOperator.  It
>>> >> writes
>>> >> > > the tuples in the window before each checkpoint to a DFS-backed
>>> WAL
>>> >> > (using
>>> >> > > item 5 above), and it overrides firstWindow() to read from the
>>> WAL and
>>> >> > > emits the tuples at recovery.
>>> >> > >
>>> >> > > 7. Add +N delay capability (in addition to +1) in the
>>> >> > DefaultDelayOperator.
>>> >> > >
>>> >> > > Let me know whether this plan sounds good to you.
>>> >> > >
>>> >> > > I'm done with 1, 2, and 3.  4 is in progress.
>>> >> > > I think the bulk of the work is 5 and 6 and we can discuss how we
>>> can
>>> >> > > divide the work.
>>> >> > > 7 is a nice-to-have and does not have to be done unless there is a
>>> >> > demand.
>>> >> > >
>>> >> > > David
>>> >> > >
>>> >> > >
>>> >> > > On Wed, Nov 4, 2015 at 10:11 AM, Pramod Immaneni <
>>> >> pramod@datatorrent.com
>>> >> > >
>>> >> > > wrote:
>>> >> > >
>>> >> > > > I would like to help. I might be able to pick up some of the
>>> smaller
>>> >> > > tasks.
>>> >> > > >
>>> >> > > > On Wed, Nov 4, 2015 at 10:05 AM, David Yan <
>>> david@datatorrent.com>
>>> >> > > wrote:
>>> >> > > >
>>> >> > > > > Thank you for all your feedback.  Looks like option #2 wins.
>>> >> > > > >
>>> >> > > > > I will be working on this in November and please let me know
>>> if
>>> >> you'd
>>> >> > > > like
>>> >> > > > > to join the effort!
>>> >> > > > >
>>> >> > > > > On Wed, Nov 4, 2015 at 9:58 AM, Thomas Weise <
>>> >> thomas@datatorrent.com
>>> >> > >
>>> >> > > > > wrote:
>>> >> > > > >
>>> >> > > > > > Agreed, there is no ambiguity.
>>> >> > > > > >
>>> >> > > > > > #2 will also allow the user to tune locality as there are no
>>> >> > implicit
>>> >> > > > > > streams as opposed to the unifier like approach.
>>> >> > > > > >
>>> >> > > > > > On Wed, Nov 4, 2015 at 9:54 AM, David Yan <
>>> >> david@datatorrent.com>
>>> >> > > > wrote:
>>> >> > > > > >
>>> >> > > > > > > On Tue, Nov 3, 2015 at 9:57 PM, Thomas Weise <
>>> >> > > thomas@datatorrent.com
>>> >> > > > >
>>> >> > > > > > > wrote:
>>> >> > > > > > >
>>> >> > > > > > > >
>>> >> > > > > > > > #2 will address that. But if an operator with the delay
>>> >> > interface
>>> >> > > > has
>>> >> > > > > > > > multiple input ports, on which port will the engine
>>> perform
>>> >> the
>>> >> > > > > delay?
>>> >> > > > > > > > Maybe we will need to validate that a delay operator can
>>> >> only
>>> >> > > have
>>> >> > > > a
>>> >> > > > > > > single
>>> >> > > > > > > > input port?
>>> >> > > > > > > >
>>> >> > > > > > >
>>> >> > > > > > > My understanding is that the engine performs the +1 delay
>>> on
>>> >> the
>>> >> > > > input
>>> >> > > > > > > ports of operators that are connected to output ports of
>>> delay
>>> >> > > > > operators.
>>> >> > > > > > > So whether or not the delay operator has multiple input
>>> ports
>>> >> > > should
>>> >> > > > > not
>>> >> > > > > > > matter.
>>> >> > > > > > >
>>> >> > > > > >
>>> >> > > > >
>>> >> > > >
>>> >> > >
>>> >> >
>>> >>
>>> >
>>> >
>>>
>>
>>
>

Re: Supporting iterations in Apex

Posted by David Yan <da...@datatorrent.com>.
The JIRAs have been created.
You can start from running the small unit test for iteration from the
APEX-60 branch like this:

mvn test -Dtest=LogicalPlanTest#testIteration

Run it under engine directory.  Let me know if you run into issues.

David

On Wed, Nov 4, 2015 at 1:01 PM, David Yan <da...@datatorrent.com> wrote:

> fork; https://github.com/davidyan74/incubator-apex-core
> branch: APEX-60
>
> I'll create the JIRA subtasks soon.
>
> David
>
> On Wed, Nov 4, 2015 at 11:56 AM, Pramod Immaneni <pr...@datatorrent.com>
> wrote:
>
>> Can you also add your development branch and fork information to the JIRA?
>>
>> Thanks
>>
>> On Wed, Nov 4, 2015 at 11:09 AM, Pramod Immaneni <pr...@datatorrent.com>
>> wrote:
>>
>> > I will look at it along with 5 if you are not doing that already.
>> >
>> > On Wed, Nov 4, 2015 at 11:08 AM, David Yan <da...@datatorrent.com>
>> wrote:
>> >
>> >> Yes, let's add that.  During that time, firstWindow() will need to be
>> >> called.  That should be done before 5.
>> >>
>> >> On Wed, Nov 4, 2015 at 11:05 AM, Pramod Immaneni <
>> pramod@datatorrent.com>
>> >> wrote:
>> >>
>> >> > I can help with 5. What about the engine introducing the synthetic
>> >> window
>> >> > when recovering from failure or at the start of the application.
>> >> >
>> >> > On Wed, Nov 4, 2015 at 10:41 AM, David Yan <da...@datatorrent.com>
>> >> wrote:
>> >> >
>> >> > > Thanks Pramod and Sandesh for offering to help.
>> >> > >
>> >> > > Here's my rough plan:
>> >> > >
>> >> > > 1. Add DelayOperator interface.  The DelayOperator will have a
>> method
>> >> > > firstWindow(long windowId).  Implementations of this interface is
>> >> > supposed
>> >> > > to emit tuples for the first window of the execution of the
>> operator
>> >> > > (either the first window of the execution of the application or the
>> >> first
>> >> > > window after recovery).
>> >> > >
>> >> > > 2. Add SimpleDelayOperator that implements DelayOperator.  It has
>> one
>> >> > input
>> >> > > port and one output port.  It simply passes the tuples from the
>> input
>> >> > port
>> >> > > to the output port and does not do anything for firstWindow() call.
>> >> > >
>> >> > > 3. Engine (e.g. DAG validation) to support loops in case of
>> >> > DelayOperator.
>> >> > >
>> >> > > 4. Implement the +1 delay in the engine for input ports that are
>> >> > connected
>> >> > > to output ports of DelayOperator.
>> >> > >
>> >> > > 5. Add capability in the engine to let the operator know when is
>> the
>> >> next
>> >> > > checkpoint window.
>> >> > >
>> >> > > 6. Add DefaultDelayOperator that extends SimpleDelayOperator.  It
>> >> writes
>> >> > > the tuples in the window before each checkpoint to a DFS-backed WAL
>> >> > (using
>> >> > > item 5 above), and it overrides firstWindow() to read from the WAL
>> and
>> >> > > emits the tuples at recovery.
>> >> > >
>> >> > > 7. Add +N delay capability (in addition to +1) in the
>> >> > DefaultDelayOperator.
>> >> > >
>> >> > > Let me know whether this plan sounds good to you.
>> >> > >
>> >> > > I'm done with 1, 2, and 3.  4 is in progress.
>> >> > > I think the bulk of the work is 5 and 6 and we can discuss how we
>> can
>> >> > > divide the work.
>> >> > > 7 is a nice-to-have and does not have to be done unless there is a
>> >> > demand.
>> >> > >
>> >> > > David
>> >> > >
>> >> > >
>> >> > > On Wed, Nov 4, 2015 at 10:11 AM, Pramod Immaneni <
>> >> pramod@datatorrent.com
>> >> > >
>> >> > > wrote:
>> >> > >
>> >> > > > I would like to help. I might be able to pick up some of the
>> smaller
>> >> > > tasks.
>> >> > > >
>> >> > > > On Wed, Nov 4, 2015 at 10:05 AM, David Yan <
>> david@datatorrent.com>
>> >> > > wrote:
>> >> > > >
>> >> > > > > Thank you for all your feedback.  Looks like option #2 wins.
>> >> > > > >
>> >> > > > > I will be working on this in November and please let me know if
>> >> you'd
>> >> > > > like
>> >> > > > > to join the effort!
>> >> > > > >
>> >> > > > > On Wed, Nov 4, 2015 at 9:58 AM, Thomas Weise <
>> >> thomas@datatorrent.com
>> >> > >
>> >> > > > > wrote:
>> >> > > > >
>> >> > > > > > Agreed, there is no ambiguity.
>> >> > > > > >
>> >> > > > > > #2 will also allow the user to tune locality as there are no
>> >> > implicit
>> >> > > > > > streams as opposed to the unifier like approach.
>> >> > > > > >
>> >> > > > > > On Wed, Nov 4, 2015 at 9:54 AM, David Yan <
>> >> david@datatorrent.com>
>> >> > > > wrote:
>> >> > > > > >
>> >> > > > > > > On Tue, Nov 3, 2015 at 9:57 PM, Thomas Weise <
>> >> > > thomas@datatorrent.com
>> >> > > > >
>> >> > > > > > > wrote:
>> >> > > > > > >
>> >> > > > > > > >
>> >> > > > > > > > #2 will address that. But if an operator with the delay
>> >> > interface
>> >> > > > has
>> >> > > > > > > > multiple input ports, on which port will the engine
>> perform
>> >> the
>> >> > > > > delay?
>> >> > > > > > > > Maybe we will need to validate that a delay operator can
>> >> only
>> >> > > have
>> >> > > > a
>> >> > > > > > > single
>> >> > > > > > > > input port?
>> >> > > > > > > >
>> >> > > > > > >
>> >> > > > > > > My understanding is that the engine performs the +1 delay
>> on
>> >> the
>> >> > > > input
>> >> > > > > > > ports of operators that are connected to output ports of
>> delay
>> >> > > > > operators.
>> >> > > > > > > So whether or not the delay operator has multiple input
>> ports
>> >> > > should
>> >> > > > > not
>> >> > > > > > > matter.
>> >> > > > > > >
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> >
>> >
>>
>
>

Re: Supporting iterations in Apex

Posted by David Yan <da...@datatorrent.com>.
fork; https://github.com/davidyan74/incubator-apex-core
branch: APEX-60

I'll create the JIRA subtasks soon.

David

On Wed, Nov 4, 2015 at 11:56 AM, Pramod Immaneni <pr...@datatorrent.com>
wrote:

> Can you also add your development branch and fork information to the JIRA?
>
> Thanks
>
> On Wed, Nov 4, 2015 at 11:09 AM, Pramod Immaneni <pr...@datatorrent.com>
> wrote:
>
> > I will look at it along with 5 if you are not doing that already.
> >
> > On Wed, Nov 4, 2015 at 11:08 AM, David Yan <da...@datatorrent.com>
> wrote:
> >
> >> Yes, let's add that.  During that time, firstWindow() will need to be
> >> called.  That should be done before 5.
> >>
> >> On Wed, Nov 4, 2015 at 11:05 AM, Pramod Immaneni <
> pramod@datatorrent.com>
> >> wrote:
> >>
> >> > I can help with 5. What about the engine introducing the synthetic
> >> window
> >> > when recovering from failure or at the start of the application.
> >> >
> >> > On Wed, Nov 4, 2015 at 10:41 AM, David Yan <da...@datatorrent.com>
> >> wrote:
> >> >
> >> > > Thanks Pramod and Sandesh for offering to help.
> >> > >
> >> > > Here's my rough plan:
> >> > >
> >> > > 1. Add DelayOperator interface.  The DelayOperator will have a
> method
> >> > > firstWindow(long windowId).  Implementations of this interface is
> >> > supposed
> >> > > to emit tuples for the first window of the execution of the operator
> >> > > (either the first window of the execution of the application or the
> >> first
> >> > > window after recovery).
> >> > >
> >> > > 2. Add SimpleDelayOperator that implements DelayOperator.  It has
> one
> >> > input
> >> > > port and one output port.  It simply passes the tuples from the
> input
> >> > port
> >> > > to the output port and does not do anything for firstWindow() call.
> >> > >
> >> > > 3. Engine (e.g. DAG validation) to support loops in case of
> >> > DelayOperator.
> >> > >
> >> > > 4. Implement the +1 delay in the engine for input ports that are
> >> > connected
> >> > > to output ports of DelayOperator.
> >> > >
> >> > > 5. Add capability in the engine to let the operator know when is the
> >> next
> >> > > checkpoint window.
> >> > >
> >> > > 6. Add DefaultDelayOperator that extends SimpleDelayOperator.  It
> >> writes
> >> > > the tuples in the window before each checkpoint to a DFS-backed WAL
> >> > (using
> >> > > item 5 above), and it overrides firstWindow() to read from the WAL
> and
> >> > > emits the tuples at recovery.
> >> > >
> >> > > 7. Add +N delay capability (in addition to +1) in the
> >> > DefaultDelayOperator.
> >> > >
> >> > > Let me know whether this plan sounds good to you.
> >> > >
> >> > > I'm done with 1, 2, and 3.  4 is in progress.
> >> > > I think the bulk of the work is 5 and 6 and we can discuss how we
> can
> >> > > divide the work.
> >> > > 7 is a nice-to-have and does not have to be done unless there is a
> >> > demand.
> >> > >
> >> > > David
> >> > >
> >> > >
> >> > > On Wed, Nov 4, 2015 at 10:11 AM, Pramod Immaneni <
> >> pramod@datatorrent.com
> >> > >
> >> > > wrote:
> >> > >
> >> > > > I would like to help. I might be able to pick up some of the
> smaller
> >> > > tasks.
> >> > > >
> >> > > > On Wed, Nov 4, 2015 at 10:05 AM, David Yan <david@datatorrent.com
> >
> >> > > wrote:
> >> > > >
> >> > > > > Thank you for all your feedback.  Looks like option #2 wins.
> >> > > > >
> >> > > > > I will be working on this in November and please let me know if
> >> you'd
> >> > > > like
> >> > > > > to join the effort!
> >> > > > >
> >> > > > > On Wed, Nov 4, 2015 at 9:58 AM, Thomas Weise <
> >> thomas@datatorrent.com
> >> > >
> >> > > > > wrote:
> >> > > > >
> >> > > > > > Agreed, there is no ambiguity.
> >> > > > > >
> >> > > > > > #2 will also allow the user to tune locality as there are no
> >> > implicit
> >> > > > > > streams as opposed to the unifier like approach.
> >> > > > > >
> >> > > > > > On Wed, Nov 4, 2015 at 9:54 AM, David Yan <
> >> david@datatorrent.com>
> >> > > > wrote:
> >> > > > > >
> >> > > > > > > On Tue, Nov 3, 2015 at 9:57 PM, Thomas Weise <
> >> > > thomas@datatorrent.com
> >> > > > >
> >> > > > > > > wrote:
> >> > > > > > >
> >> > > > > > > >
> >> > > > > > > > #2 will address that. But if an operator with the delay
> >> > interface
> >> > > > has
> >> > > > > > > > multiple input ports, on which port will the engine
> perform
> >> the
> >> > > > > delay?
> >> > > > > > > > Maybe we will need to validate that a delay operator can
> >> only
> >> > > have
> >> > > > a
> >> > > > > > > single
> >> > > > > > > > input port?
> >> > > > > > > >
> >> > > > > > >
> >> > > > > > > My understanding is that the engine performs the +1 delay on
> >> the
> >> > > > input
> >> > > > > > > ports of operators that are connected to output ports of
> delay
> >> > > > > operators.
> >> > > > > > > So whether or not the delay operator has multiple input
> ports
> >> > > should
> >> > > > > not
> >> > > > > > > matter.
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>

Re: Supporting iterations in Apex

Posted by Pramod Immaneni <pr...@datatorrent.com>.
Can you also add your development branch and fork information to the JIRA?

Thanks

On Wed, Nov 4, 2015 at 11:09 AM, Pramod Immaneni <pr...@datatorrent.com>
wrote:

> I will look at it along with 5 if you are not doing that already.
>
> On Wed, Nov 4, 2015 at 11:08 AM, David Yan <da...@datatorrent.com> wrote:
>
>> Yes, let's add that.  During that time, firstWindow() will need to be
>> called.  That should be done before 5.
>>
>> On Wed, Nov 4, 2015 at 11:05 AM, Pramod Immaneni <pr...@datatorrent.com>
>> wrote:
>>
>> > I can help with 5. What about the engine introducing the synthetic
>> window
>> > when recovering from failure or at the start of the application.
>> >
>> > On Wed, Nov 4, 2015 at 10:41 AM, David Yan <da...@datatorrent.com>
>> wrote:
>> >
>> > > Thanks Pramod and Sandesh for offering to help.
>> > >
>> > > Here's my rough plan:
>> > >
>> > > 1. Add DelayOperator interface.  The DelayOperator will have a method
>> > > firstWindow(long windowId).  Implementations of this interface is
>> > supposed
>> > > to emit tuples for the first window of the execution of the operator
>> > > (either the first window of the execution of the application or the
>> first
>> > > window after recovery).
>> > >
>> > > 2. Add SimpleDelayOperator that implements DelayOperator.  It has one
>> > input
>> > > port and one output port.  It simply passes the tuples from the input
>> > port
>> > > to the output port and does not do anything for firstWindow() call.
>> > >
>> > > 3. Engine (e.g. DAG validation) to support loops in case of
>> > DelayOperator.
>> > >
>> > > 4. Implement the +1 delay in the engine for input ports that are
>> > connected
>> > > to output ports of DelayOperator.
>> > >
>> > > 5. Add capability in the engine to let the operator know when is the
>> next
>> > > checkpoint window.
>> > >
>> > > 6. Add DefaultDelayOperator that extends SimpleDelayOperator.  It
>> writes
>> > > the tuples in the window before each checkpoint to a DFS-backed WAL
>> > (using
>> > > item 5 above), and it overrides firstWindow() to read from the WAL and
>> > > emits the tuples at recovery.
>> > >
>> > > 7. Add +N delay capability (in addition to +1) in the
>> > DefaultDelayOperator.
>> > >
>> > > Let me know whether this plan sounds good to you.
>> > >
>> > > I'm done with 1, 2, and 3.  4 is in progress.
>> > > I think the bulk of the work is 5 and 6 and we can discuss how we can
>> > > divide the work.
>> > > 7 is a nice-to-have and does not have to be done unless there is a
>> > demand.
>> > >
>> > > David
>> > >
>> > >
>> > > On Wed, Nov 4, 2015 at 10:11 AM, Pramod Immaneni <
>> pramod@datatorrent.com
>> > >
>> > > wrote:
>> > >
>> > > > I would like to help. I might be able to pick up some of the smaller
>> > > tasks.
>> > > >
>> > > > On Wed, Nov 4, 2015 at 10:05 AM, David Yan <da...@datatorrent.com>
>> > > wrote:
>> > > >
>> > > > > Thank you for all your feedback.  Looks like option #2 wins.
>> > > > >
>> > > > > I will be working on this in November and please let me know if
>> you'd
>> > > > like
>> > > > > to join the effort!
>> > > > >
>> > > > > On Wed, Nov 4, 2015 at 9:58 AM, Thomas Weise <
>> thomas@datatorrent.com
>> > >
>> > > > > wrote:
>> > > > >
>> > > > > > Agreed, there is no ambiguity.
>> > > > > >
>> > > > > > #2 will also allow the user to tune locality as there are no
>> > implicit
>> > > > > > streams as opposed to the unifier like approach.
>> > > > > >
>> > > > > > On Wed, Nov 4, 2015 at 9:54 AM, David Yan <
>> david@datatorrent.com>
>> > > > wrote:
>> > > > > >
>> > > > > > > On Tue, Nov 3, 2015 at 9:57 PM, Thomas Weise <
>> > > thomas@datatorrent.com
>> > > > >
>> > > > > > > wrote:
>> > > > > > >
>> > > > > > > >
>> > > > > > > > #2 will address that. But if an operator with the delay
>> > interface
>> > > > has
>> > > > > > > > multiple input ports, on which port will the engine perform
>> the
>> > > > > delay?
>> > > > > > > > Maybe we will need to validate that a delay operator can
>> only
>> > > have
>> > > > a
>> > > > > > > single
>> > > > > > > > input port?
>> > > > > > > >
>> > > > > > >
>> > > > > > > My understanding is that the engine performs the +1 delay on
>> the
>> > > > input
>> > > > > > > ports of operators that are connected to output ports of delay
>> > > > > operators.
>> > > > > > > So whether or not the delay operator has multiple input ports
>> > > should
>> > > > > not
>> > > > > > > matter.
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Re: Supporting iterations in Apex

Posted by Sandesh Hegde <sa...@datatorrent.com>.
I will take up 6.

On Wed, Nov 4, 2015 at 11:10 AM Pramod Immaneni <pr...@datatorrent.com>
wrote:

> I will look at it along with 5 if you are not doing that already.
>
> On Wed, Nov 4, 2015 at 11:08 AM, David Yan <da...@datatorrent.com> wrote:
>
> > Yes, let's add that.  During that time, firstWindow() will need to be
> > called.  That should be done before 5.
> >
> > On Wed, Nov 4, 2015 at 11:05 AM, Pramod Immaneni <pramod@datatorrent.com
> >
> > wrote:
> >
> > > I can help with 5. What about the engine introducing the synthetic
> window
> > > when recovering from failure or at the start of the application.
> > >
> > > On Wed, Nov 4, 2015 at 10:41 AM, David Yan <da...@datatorrent.com>
> > wrote:
> > >
> > > > Thanks Pramod and Sandesh for offering to help.
> > > >
> > > > Here's my rough plan:
> > > >
> > > > 1. Add DelayOperator interface.  The DelayOperator will have a method
> > > > firstWindow(long windowId).  Implementations of this interface is
> > > supposed
> > > > to emit tuples for the first window of the execution of the operator
> > > > (either the first window of the execution of the application or the
> > first
> > > > window after recovery).
> > > >
> > > > 2. Add SimpleDelayOperator that implements DelayOperator.  It has one
> > > input
> > > > port and one output port.  It simply passes the tuples from the input
> > > port
> > > > to the output port and does not do anything for firstWindow() call.
> > > >
> > > > 3. Engine (e.g. DAG validation) to support loops in case of
> > > DelayOperator.
> > > >
> > > > 4. Implement the +1 delay in the engine for input ports that are
> > > connected
> > > > to output ports of DelayOperator.
> > > >
> > > > 5. Add capability in the engine to let the operator know when is the
> > next
> > > > checkpoint window.
> > > >
> > > > 6. Add DefaultDelayOperator that extends SimpleDelayOperator.  It
> > writes
> > > > the tuples in the window before each checkpoint to a DFS-backed WAL
> > > (using
> > > > item 5 above), and it overrides firstWindow() to read from the WAL
> and
> > > > emits the tuples at recovery.
> > > >
> > > > 7. Add +N delay capability (in addition to +1) in the
> > > DefaultDelayOperator.
> > > >
> > > > Let me know whether this plan sounds good to you.
> > > >
> > > > I'm done with 1, 2, and 3.  4 is in progress.
> > > > I think the bulk of the work is 5 and 6 and we can discuss how we can
> > > > divide the work.
> > > > 7 is a nice-to-have and does not have to be done unless there is a
> > > demand.
> > > >
> > > > David
> > > >
> > > >
> > > > On Wed, Nov 4, 2015 at 10:11 AM, Pramod Immaneni <
> > pramod@datatorrent.com
> > > >
> > > > wrote:
> > > >
> > > > > I would like to help. I might be able to pick up some of the
> smaller
> > > > tasks.
> > > > >
> > > > > On Wed, Nov 4, 2015 at 10:05 AM, David Yan <da...@datatorrent.com>
> > > > wrote:
> > > > >
> > > > > > Thank you for all your feedback.  Looks like option #2 wins.
> > > > > >
> > > > > > I will be working on this in November and please let me know if
> > you'd
> > > > > like
> > > > > > to join the effort!
> > > > > >
> > > > > > On Wed, Nov 4, 2015 at 9:58 AM, Thomas Weise <
> > thomas@datatorrent.com
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Agreed, there is no ambiguity.
> > > > > > >
> > > > > > > #2 will also allow the user to tune locality as there are no
> > > implicit
> > > > > > > streams as opposed to the unifier like approach.
> > > > > > >
> > > > > > > On Wed, Nov 4, 2015 at 9:54 AM, David Yan <
> david@datatorrent.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > On Tue, Nov 3, 2015 at 9:57 PM, Thomas Weise <
> > > > thomas@datatorrent.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > >
> > > > > > > > > #2 will address that. But if an operator with the delay
> > > interface
> > > > > has
> > > > > > > > > multiple input ports, on which port will the engine perform
> > the
> > > > > > delay?
> > > > > > > > > Maybe we will need to validate that a delay operator can
> only
> > > > have
> > > > > a
> > > > > > > > single
> > > > > > > > > input port?
> > > > > > > > >
> > > > > > > >
> > > > > > > > My understanding is that the engine performs the +1 delay on
> > the
> > > > > input
> > > > > > > > ports of operators that are connected to output ports of
> delay
> > > > > > operators.
> > > > > > > > So whether or not the delay operator has multiple input ports
> > > > should
> > > > > > not
> > > > > > > > matter.
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Supporting iterations in Apex

Posted by Pramod Immaneni <pr...@datatorrent.com>.
I will look at it along with 5 if you are not doing that already.

On Wed, Nov 4, 2015 at 11:08 AM, David Yan <da...@datatorrent.com> wrote:

> Yes, let's add that.  During that time, firstWindow() will need to be
> called.  That should be done before 5.
>
> On Wed, Nov 4, 2015 at 11:05 AM, Pramod Immaneni <pr...@datatorrent.com>
> wrote:
>
> > I can help with 5. What about the engine introducing the synthetic window
> > when recovering from failure or at the start of the application.
> >
> > On Wed, Nov 4, 2015 at 10:41 AM, David Yan <da...@datatorrent.com>
> wrote:
> >
> > > Thanks Pramod and Sandesh for offering to help.
> > >
> > > Here's my rough plan:
> > >
> > > 1. Add DelayOperator interface.  The DelayOperator will have a method
> > > firstWindow(long windowId).  Implementations of this interface is
> > supposed
> > > to emit tuples for the first window of the execution of the operator
> > > (either the first window of the execution of the application or the
> first
> > > window after recovery).
> > >
> > > 2. Add SimpleDelayOperator that implements DelayOperator.  It has one
> > input
> > > port and one output port.  It simply passes the tuples from the input
> > port
> > > to the output port and does not do anything for firstWindow() call.
> > >
> > > 3. Engine (e.g. DAG validation) to support loops in case of
> > DelayOperator.
> > >
> > > 4. Implement the +1 delay in the engine for input ports that are
> > connected
> > > to output ports of DelayOperator.
> > >
> > > 5. Add capability in the engine to let the operator know when is the
> next
> > > checkpoint window.
> > >
> > > 6. Add DefaultDelayOperator that extends SimpleDelayOperator.  It
> writes
> > > the tuples in the window before each checkpoint to a DFS-backed WAL
> > (using
> > > item 5 above), and it overrides firstWindow() to read from the WAL and
> > > emits the tuples at recovery.
> > >
> > > 7. Add +N delay capability (in addition to +1) in the
> > DefaultDelayOperator.
> > >
> > > Let me know whether this plan sounds good to you.
> > >
> > > I'm done with 1, 2, and 3.  4 is in progress.
> > > I think the bulk of the work is 5 and 6 and we can discuss how we can
> > > divide the work.
> > > 7 is a nice-to-have and does not have to be done unless there is a
> > demand.
> > >
> > > David
> > >
> > >
> > > On Wed, Nov 4, 2015 at 10:11 AM, Pramod Immaneni <
> pramod@datatorrent.com
> > >
> > > wrote:
> > >
> > > > I would like to help. I might be able to pick up some of the smaller
> > > tasks.
> > > >
> > > > On Wed, Nov 4, 2015 at 10:05 AM, David Yan <da...@datatorrent.com>
> > > wrote:
> > > >
> > > > > Thank you for all your feedback.  Looks like option #2 wins.
> > > > >
> > > > > I will be working on this in November and please let me know if
> you'd
> > > > like
> > > > > to join the effort!
> > > > >
> > > > > On Wed, Nov 4, 2015 at 9:58 AM, Thomas Weise <
> thomas@datatorrent.com
> > >
> > > > > wrote:
> > > > >
> > > > > > Agreed, there is no ambiguity.
> > > > > >
> > > > > > #2 will also allow the user to tune locality as there are no
> > implicit
> > > > > > streams as opposed to the unifier like approach.
> > > > > >
> > > > > > On Wed, Nov 4, 2015 at 9:54 AM, David Yan <david@datatorrent.com
> >
> > > > wrote:
> > > > > >
> > > > > > > On Tue, Nov 3, 2015 at 9:57 PM, Thomas Weise <
> > > thomas@datatorrent.com
> > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > >
> > > > > > > > #2 will address that. But if an operator with the delay
> > interface
> > > > has
> > > > > > > > multiple input ports, on which port will the engine perform
> the
> > > > > delay?
> > > > > > > > Maybe we will need to validate that a delay operator can only
> > > have
> > > > a
> > > > > > > single
> > > > > > > > input port?
> > > > > > > >
> > > > > > >
> > > > > > > My understanding is that the engine performs the +1 delay on
> the
> > > > input
> > > > > > > ports of operators that are connected to output ports of delay
> > > > > operators.
> > > > > > > So whether or not the delay operator has multiple input ports
> > > should
> > > > > not
> > > > > > > matter.
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Supporting iterations in Apex

Posted by David Yan <da...@datatorrent.com>.
Yes, let's add that.  During that time, firstWindow() will need to be
called.  That should be done before 5.

On Wed, Nov 4, 2015 at 11:05 AM, Pramod Immaneni <pr...@datatorrent.com>
wrote:

> I can help with 5. What about the engine introducing the synthetic window
> when recovering from failure or at the start of the application.
>
> On Wed, Nov 4, 2015 at 10:41 AM, David Yan <da...@datatorrent.com> wrote:
>
> > Thanks Pramod and Sandesh for offering to help.
> >
> > Here's my rough plan:
> >
> > 1. Add DelayOperator interface.  The DelayOperator will have a method
> > firstWindow(long windowId).  Implementations of this interface is
> supposed
> > to emit tuples for the first window of the execution of the operator
> > (either the first window of the execution of the application or the first
> > window after recovery).
> >
> > 2. Add SimpleDelayOperator that implements DelayOperator.  It has one
> input
> > port and one output port.  It simply passes the tuples from the input
> port
> > to the output port and does not do anything for firstWindow() call.
> >
> > 3. Engine (e.g. DAG validation) to support loops in case of
> DelayOperator.
> >
> > 4. Implement the +1 delay in the engine for input ports that are
> connected
> > to output ports of DelayOperator.
> >
> > 5. Add capability in the engine to let the operator know when is the next
> > checkpoint window.
> >
> > 6. Add DefaultDelayOperator that extends SimpleDelayOperator.  It writes
> > the tuples in the window before each checkpoint to a DFS-backed WAL
> (using
> > item 5 above), and it overrides firstWindow() to read from the WAL and
> > emits the tuples at recovery.
> >
> > 7. Add +N delay capability (in addition to +1) in the
> DefaultDelayOperator.
> >
> > Let me know whether this plan sounds good to you.
> >
> > I'm done with 1, 2, and 3.  4 is in progress.
> > I think the bulk of the work is 5 and 6 and we can discuss how we can
> > divide the work.
> > 7 is a nice-to-have and does not have to be done unless there is a
> demand.
> >
> > David
> >
> >
> > On Wed, Nov 4, 2015 at 10:11 AM, Pramod Immaneni <pramod@datatorrent.com
> >
> > wrote:
> >
> > > I would like to help. I might be able to pick up some of the smaller
> > tasks.
> > >
> > > On Wed, Nov 4, 2015 at 10:05 AM, David Yan <da...@datatorrent.com>
> > wrote:
> > >
> > > > Thank you for all your feedback.  Looks like option #2 wins.
> > > >
> > > > I will be working on this in November and please let me know if you'd
> > > like
> > > > to join the effort!
> > > >
> > > > On Wed, Nov 4, 2015 at 9:58 AM, Thomas Weise <thomas@datatorrent.com
> >
> > > > wrote:
> > > >
> > > > > Agreed, there is no ambiguity.
> > > > >
> > > > > #2 will also allow the user to tune locality as there are no
> implicit
> > > > > streams as opposed to the unifier like approach.
> > > > >
> > > > > On Wed, Nov 4, 2015 at 9:54 AM, David Yan <da...@datatorrent.com>
> > > wrote:
> > > > >
> > > > > > On Tue, Nov 3, 2015 at 9:57 PM, Thomas Weise <
> > thomas@datatorrent.com
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > >
> > > > > > > #2 will address that. But if an operator with the delay
> interface
> > > has
> > > > > > > multiple input ports, on which port will the engine perform the
> > > > delay?
> > > > > > > Maybe we will need to validate that a delay operator can only
> > have
> > > a
> > > > > > single
> > > > > > > input port?
> > > > > > >
> > > > > >
> > > > > > My understanding is that the engine performs the +1 delay on the
> > > input
> > > > > > ports of operators that are connected to output ports of delay
> > > > operators.
> > > > > > So whether or not the delay operator has multiple input ports
> > should
> > > > not
> > > > > > matter.
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Supporting iterations in Apex

Posted by Pramod Immaneni <pr...@datatorrent.com>.
I can help with 5. What about the engine introducing the synthetic window
when recovering from failure or at the start of the application.

On Wed, Nov 4, 2015 at 10:41 AM, David Yan <da...@datatorrent.com> wrote:

> Thanks Pramod and Sandesh for offering to help.
>
> Here's my rough plan:
>
> 1. Add DelayOperator interface.  The DelayOperator will have a method
> firstWindow(long windowId).  Implementations of this interface is supposed
> to emit tuples for the first window of the execution of the operator
> (either the first window of the execution of the application or the first
> window after recovery).
>
> 2. Add SimpleDelayOperator that implements DelayOperator.  It has one input
> port and one output port.  It simply passes the tuples from the input port
> to the output port and does not do anything for firstWindow() call.
>
> 3. Engine (e.g. DAG validation) to support loops in case of DelayOperator.
>
> 4. Implement the +1 delay in the engine for input ports that are connected
> to output ports of DelayOperator.
>
> 5. Add capability in the engine to let the operator know when is the next
> checkpoint window.
>
> 6. Add DefaultDelayOperator that extends SimpleDelayOperator.  It writes
> the tuples in the window before each checkpoint to a DFS-backed WAL (using
> item 5 above), and it overrides firstWindow() to read from the WAL and
> emits the tuples at recovery.
>
> 7. Add +N delay capability (in addition to +1) in the DefaultDelayOperator.
>
> Let me know whether this plan sounds good to you.
>
> I'm done with 1, 2, and 3.  4 is in progress.
> I think the bulk of the work is 5 and 6 and we can discuss how we can
> divide the work.
> 7 is a nice-to-have and does not have to be done unless there is a demand.
>
> David
>
>
> On Wed, Nov 4, 2015 at 10:11 AM, Pramod Immaneni <pr...@datatorrent.com>
> wrote:
>
> > I would like to help. I might be able to pick up some of the smaller
> tasks.
> >
> > On Wed, Nov 4, 2015 at 10:05 AM, David Yan <da...@datatorrent.com>
> wrote:
> >
> > > Thank you for all your feedback.  Looks like option #2 wins.
> > >
> > > I will be working on this in November and please let me know if you'd
> > like
> > > to join the effort!
> > >
> > > On Wed, Nov 4, 2015 at 9:58 AM, Thomas Weise <th...@datatorrent.com>
> > > wrote:
> > >
> > > > Agreed, there is no ambiguity.
> > > >
> > > > #2 will also allow the user to tune locality as there are no implicit
> > > > streams as opposed to the unifier like approach.
> > > >
> > > > On Wed, Nov 4, 2015 at 9:54 AM, David Yan <da...@datatorrent.com>
> > wrote:
> > > >
> > > > > On Tue, Nov 3, 2015 at 9:57 PM, Thomas Weise <
> thomas@datatorrent.com
> > >
> > > > > wrote:
> > > > >
> > > > > >
> > > > > > #2 will address that. But if an operator with the delay interface
> > has
> > > > > > multiple input ports, on which port will the engine perform the
> > > delay?
> > > > > > Maybe we will need to validate that a delay operator can only
> have
> > a
> > > > > single
> > > > > > input port?
> > > > > >
> > > > >
> > > > > My understanding is that the engine performs the +1 delay on the
> > input
> > > > > ports of operators that are connected to output ports of delay
> > > operators.
> > > > > So whether or not the delay operator has multiple input ports
> should
> > > not
> > > > > matter.
> > > > >
> > > >
> > >
> >
>

Re: Supporting iterations in Apex

Posted by David Yan <da...@datatorrent.com>.
APEX-78 says a callback before a checkpoint, but I think what the
DelayOperator needs is not a callback right before the checkpoint, but the
number of windows till the next checkpoint.

On Wed, Nov 4, 2015 at 10:48 AM, Thomas Weise <th...@datatorrent.com>
wrote:

> Is #5 same as https://malhar.atlassian.net/browse/APEX-78
>
> It would be good as we can leverage that in other operators also.
>
> On Wed, Nov 4, 2015 at 10:41 AM, David Yan <da...@datatorrent.com> wrote:
>
> > Thanks Pramod and Sandesh for offering to help.
> >
> > Here's my rough plan:
> >
> > 1. Add DelayOperator interface.  The DelayOperator will have a method
> > firstWindow(long windowId).  Implementations of this interface is
> supposed
> > to emit tuples for the first window of the execution of the operator
> > (either the first window of the execution of the application or the first
> > window after recovery).
> >
> > 2. Add SimpleDelayOperator that implements DelayOperator.  It has one
> input
> > port and one output port.  It simply passes the tuples from the input
> port
> > to the output port and does not do anything for firstWindow() call.
> >
> > 3. Engine (e.g. DAG validation) to support loops in case of
> DelayOperator.
> >
> > 4. Implement the +1 delay in the engine for input ports that are
> connected
> > to output ports of DelayOperator.
> >
> > 5. Add capability in the engine to let the operator know when is the next
> > checkpoint window.
> >
> > 6. Add DefaultDelayOperator that extends SimpleDelayOperator.  It writes
> > the tuples in the window before each checkpoint to a DFS-backed WAL
> (using
> > item 5 above), and it overrides firstWindow() to read from the WAL and
> > emits the tuples at recovery.
> >
> > 7. Add +N delay capability (in addition to +1) in the
> DefaultDelayOperator.
> >
> > Let me know whether this plan sounds good to you.
> >
> > I'm done with 1, 2, and 3.  4 is in progress.
> > I think the bulk of the work is 5 and 6 and we can discuss how we can
> > divide the work.
> > 7 is a nice-to-have and does not have to be done unless there is a
> demand.
> >
> > David
> >
> >
> > On Wed, Nov 4, 2015 at 10:11 AM, Pramod Immaneni <pramod@datatorrent.com
> >
> > wrote:
> >
> > > I would like to help. I might be able to pick up some of the smaller
> > tasks.
> > >
> > > On Wed, Nov 4, 2015 at 10:05 AM, David Yan <da...@datatorrent.com>
> > wrote:
> > >
> > > > Thank you for all your feedback.  Looks like option #2 wins.
> > > >
> > > > I will be working on this in November and please let me know if you'd
> > > like
> > > > to join the effort!
> > > >
> > > > On Wed, Nov 4, 2015 at 9:58 AM, Thomas Weise <thomas@datatorrent.com
> >
> > > > wrote:
> > > >
> > > > > Agreed, there is no ambiguity.
> > > > >
> > > > > #2 will also allow the user to tune locality as there are no
> implicit
> > > > > streams as opposed to the unifier like approach.
> > > > >
> > > > > On Wed, Nov 4, 2015 at 9:54 AM, David Yan <da...@datatorrent.com>
> > > wrote:
> > > > >
> > > > > > On Tue, Nov 3, 2015 at 9:57 PM, Thomas Weise <
> > thomas@datatorrent.com
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > >
> > > > > > > #2 will address that. But if an operator with the delay
> interface
> > > has
> > > > > > > multiple input ports, on which port will the engine perform the
> > > > delay?
> > > > > > > Maybe we will need to validate that a delay operator can only
> > have
> > > a
> > > > > > single
> > > > > > > input port?
> > > > > > >
> > > > > >
> > > > > > My understanding is that the engine performs the +1 delay on the
> > > input
> > > > > > ports of operators that are connected to output ports of delay
> > > > operators.
> > > > > > So whether or not the delay operator has multiple input ports
> > should
> > > > not
> > > > > > matter.
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Supporting iterations in Apex

Posted by Thomas Weise <th...@datatorrent.com>.
Is #5 same as https://malhar.atlassian.net/browse/APEX-78

It would be good as we can leverage that in other operators also.

On Wed, Nov 4, 2015 at 10:41 AM, David Yan <da...@datatorrent.com> wrote:

> Thanks Pramod and Sandesh for offering to help.
>
> Here's my rough plan:
>
> 1. Add DelayOperator interface.  The DelayOperator will have a method
> firstWindow(long windowId).  Implementations of this interface is supposed
> to emit tuples for the first window of the execution of the operator
> (either the first window of the execution of the application or the first
> window after recovery).
>
> 2. Add SimpleDelayOperator that implements DelayOperator.  It has one input
> port and one output port.  It simply passes the tuples from the input port
> to the output port and does not do anything for firstWindow() call.
>
> 3. Engine (e.g. DAG validation) to support loops in case of DelayOperator.
>
> 4. Implement the +1 delay in the engine for input ports that are connected
> to output ports of DelayOperator.
>
> 5. Add capability in the engine to let the operator know when is the next
> checkpoint window.
>
> 6. Add DefaultDelayOperator that extends SimpleDelayOperator.  It writes
> the tuples in the window before each checkpoint to a DFS-backed WAL (using
> item 5 above), and it overrides firstWindow() to read from the WAL and
> emits the tuples at recovery.
>
> 7. Add +N delay capability (in addition to +1) in the DefaultDelayOperator.
>
> Let me know whether this plan sounds good to you.
>
> I'm done with 1, 2, and 3.  4 is in progress.
> I think the bulk of the work is 5 and 6 and we can discuss how we can
> divide the work.
> 7 is a nice-to-have and does not have to be done unless there is a demand.
>
> David
>
>
> On Wed, Nov 4, 2015 at 10:11 AM, Pramod Immaneni <pr...@datatorrent.com>
> wrote:
>
> > I would like to help. I might be able to pick up some of the smaller
> tasks.
> >
> > On Wed, Nov 4, 2015 at 10:05 AM, David Yan <da...@datatorrent.com>
> wrote:
> >
> > > Thank you for all your feedback.  Looks like option #2 wins.
> > >
> > > I will be working on this in November and please let me know if you'd
> > like
> > > to join the effort!
> > >
> > > On Wed, Nov 4, 2015 at 9:58 AM, Thomas Weise <th...@datatorrent.com>
> > > wrote:
> > >
> > > > Agreed, there is no ambiguity.
> > > >
> > > > #2 will also allow the user to tune locality as there are no implicit
> > > > streams as opposed to the unifier like approach.
> > > >
> > > > On Wed, Nov 4, 2015 at 9:54 AM, David Yan <da...@datatorrent.com>
> > wrote:
> > > >
> > > > > On Tue, Nov 3, 2015 at 9:57 PM, Thomas Weise <
> thomas@datatorrent.com
> > >
> > > > > wrote:
> > > > >
> > > > > >
> > > > > > #2 will address that. But if an operator with the delay interface
> > has
> > > > > > multiple input ports, on which port will the engine perform the
> > > delay?
> > > > > > Maybe we will need to validate that a delay operator can only
> have
> > a
> > > > > single
> > > > > > input port?
> > > > > >
> > > > >
> > > > > My understanding is that the engine performs the +1 delay on the
> > input
> > > > > ports of operators that are connected to output ports of delay
> > > operators.
> > > > > So whether or not the delay operator has multiple input ports
> should
> > > not
> > > > > matter.
> > > > >
> > > >
> > >
> >
>

Re: Supporting iterations in Apex

Posted by David Yan <da...@datatorrent.com>.
Thanks Pramod and Sandesh for offering to help.

Here's my rough plan:

1. Add DelayOperator interface.  The DelayOperator will have a method
firstWindow(long windowId).  Implementations of this interface is supposed
to emit tuples for the first window of the execution of the operator
(either the first window of the execution of the application or the first
window after recovery).

2. Add SimpleDelayOperator that implements DelayOperator.  It has one input
port and one output port.  It simply passes the tuples from the input port
to the output port and does not do anything for firstWindow() call.

3. Engine (e.g. DAG validation) to support loops in case of DelayOperator.

4. Implement the +1 delay in the engine for input ports that are connected
to output ports of DelayOperator.

5. Add capability in the engine to let the operator know when is the next
checkpoint window.

6. Add DefaultDelayOperator that extends SimpleDelayOperator.  It writes
the tuples in the window before each checkpoint to a DFS-backed WAL (using
item 5 above), and it overrides firstWindow() to read from the WAL and
emits the tuples at recovery.

7. Add +N delay capability (in addition to +1) in the DefaultDelayOperator.

Let me know whether this plan sounds good to you.

I'm done with 1, 2, and 3.  4 is in progress.
I think the bulk of the work is 5 and 6 and we can discuss how we can
divide the work.
7 is a nice-to-have and does not have to be done unless there is a demand.

David


On Wed, Nov 4, 2015 at 10:11 AM, Pramod Immaneni <pr...@datatorrent.com>
wrote:

> I would like to help. I might be able to pick up some of the smaller tasks.
>
> On Wed, Nov 4, 2015 at 10:05 AM, David Yan <da...@datatorrent.com> wrote:
>
> > Thank you for all your feedback.  Looks like option #2 wins.
> >
> > I will be working on this in November and please let me know if you'd
> like
> > to join the effort!
> >
> > On Wed, Nov 4, 2015 at 9:58 AM, Thomas Weise <th...@datatorrent.com>
> > wrote:
> >
> > > Agreed, there is no ambiguity.
> > >
> > > #2 will also allow the user to tune locality as there are no implicit
> > > streams as opposed to the unifier like approach.
> > >
> > > On Wed, Nov 4, 2015 at 9:54 AM, David Yan <da...@datatorrent.com>
> wrote:
> > >
> > > > On Tue, Nov 3, 2015 at 9:57 PM, Thomas Weise <thomas@datatorrent.com
> >
> > > > wrote:
> > > >
> > > > >
> > > > > #2 will address that. But if an operator with the delay interface
> has
> > > > > multiple input ports, on which port will the engine perform the
> > delay?
> > > > > Maybe we will need to validate that a delay operator can only have
> a
> > > > single
> > > > > input port?
> > > > >
> > > >
> > > > My understanding is that the engine performs the +1 delay on the
> input
> > > > ports of operators that are connected to output ports of delay
> > operators.
> > > > So whether or not the delay operator has multiple input ports should
> > not
> > > > matter.
> > > >
> > >
> >
>

Re: Supporting iterations in Apex

Posted by Sandesh Hegde <sa...@datatorrent.com>.
+1 for 2
if 3 is not a crowd, I would like to help.

On Wed, Nov 4, 2015 at 10:12 AM Pramod Immaneni <pr...@datatorrent.com>
wrote:

> I would like to help. I might be able to pick up some of the smaller tasks.
>
> On Wed, Nov 4, 2015 at 10:05 AM, David Yan <da...@datatorrent.com> wrote:
>
> > Thank you for all your feedback.  Looks like option #2 wins.
> >
> > I will be working on this in November and please let me know if you'd
> like
> > to join the effort!
> >
> > On Wed, Nov 4, 2015 at 9:58 AM, Thomas Weise <th...@datatorrent.com>
> > wrote:
> >
> > > Agreed, there is no ambiguity.
> > >
> > > #2 will also allow the user to tune locality as there are no implicit
> > > streams as opposed to the unifier like approach.
> > >
> > > On Wed, Nov 4, 2015 at 9:54 AM, David Yan <da...@datatorrent.com>
> wrote:
> > >
> > > > On Tue, Nov 3, 2015 at 9:57 PM, Thomas Weise <thomas@datatorrent.com
> >
> > > > wrote:
> > > >
> > > > >
> > > > > #2 will address that. But if an operator with the delay interface
> has
> > > > > multiple input ports, on which port will the engine perform the
> > delay?
> > > > > Maybe we will need to validate that a delay operator can only have
> a
> > > > single
> > > > > input port?
> > > > >
> > > >
> > > > My understanding is that the engine performs the +1 delay on the
> input
> > > > ports of operators that are connected to output ports of delay
> > operators.
> > > > So whether or not the delay operator has multiple input ports should
> > not
> > > > matter.
> > > >
> > >
> >
>

Re: Supporting iterations in Apex

Posted by Pramod Immaneni <pr...@datatorrent.com>.
I would like to help. I might be able to pick up some of the smaller tasks.

On Wed, Nov 4, 2015 at 10:05 AM, David Yan <da...@datatorrent.com> wrote:

> Thank you for all your feedback.  Looks like option #2 wins.
>
> I will be working on this in November and please let me know if you'd like
> to join the effort!
>
> On Wed, Nov 4, 2015 at 9:58 AM, Thomas Weise <th...@datatorrent.com>
> wrote:
>
> > Agreed, there is no ambiguity.
> >
> > #2 will also allow the user to tune locality as there are no implicit
> > streams as opposed to the unifier like approach.
> >
> > On Wed, Nov 4, 2015 at 9:54 AM, David Yan <da...@datatorrent.com> wrote:
> >
> > > On Tue, Nov 3, 2015 at 9:57 PM, Thomas Weise <th...@datatorrent.com>
> > > wrote:
> > >
> > > >
> > > > #2 will address that. But if an operator with the delay interface has
> > > > multiple input ports, on which port will the engine perform the
> delay?
> > > > Maybe we will need to validate that a delay operator can only have a
> > > single
> > > > input port?
> > > >
> > >
> > > My understanding is that the engine performs the +1 delay on the input
> > > ports of operators that are connected to output ports of delay
> operators.
> > > So whether or not the delay operator has multiple input ports should
> not
> > > matter.
> > >
> >
>

Re: Supporting iterations in Apex

Posted by David Yan <da...@datatorrent.com>.
Thank you for all your feedback.  Looks like option #2 wins.

I will be working on this in November and please let me know if you'd like
to join the effort!

On Wed, Nov 4, 2015 at 9:58 AM, Thomas Weise <th...@datatorrent.com> wrote:

> Agreed, there is no ambiguity.
>
> #2 will also allow the user to tune locality as there are no implicit
> streams as opposed to the unifier like approach.
>
> On Wed, Nov 4, 2015 at 9:54 AM, David Yan <da...@datatorrent.com> wrote:
>
> > On Tue, Nov 3, 2015 at 9:57 PM, Thomas Weise <th...@datatorrent.com>
> > wrote:
> >
> > >
> > > #2 will address that. But if an operator with the delay interface has
> > > multiple input ports, on which port will the engine perform the delay?
> > > Maybe we will need to validate that a delay operator can only have a
> > single
> > > input port?
> > >
> >
> > My understanding is that the engine performs the +1 delay on the input
> > ports of operators that are connected to output ports of delay operators.
> > So whether or not the delay operator has multiple input ports should not
> > matter.
> >
>

Re: Supporting iterations in Apex

Posted by Thomas Weise <th...@datatorrent.com>.
Agreed, there is no ambiguity.

#2 will also allow the user to tune locality as there are no implicit
streams as opposed to the unifier like approach.

On Wed, Nov 4, 2015 at 9:54 AM, David Yan <da...@datatorrent.com> wrote:

> On Tue, Nov 3, 2015 at 9:57 PM, Thomas Weise <th...@datatorrent.com>
> wrote:
>
> >
> > #2 will address that. But if an operator with the delay interface has
> > multiple input ports, on which port will the engine perform the delay?
> > Maybe we will need to validate that a delay operator can only have a
> single
> > input port?
> >
>
> My understanding is that the engine performs the +1 delay on the input
> ports of operators that are connected to output ports of delay operators.
> So whether or not the delay operator has multiple input ports should not
> matter.
>

Re: Supporting iterations in Apex

Posted by David Yan <da...@datatorrent.com>.
On Tue, Nov 3, 2015 at 9:57 PM, Thomas Weise <th...@datatorrent.com> wrote:

>
> #2 will address that. But if an operator with the delay interface has
> multiple input ports, on which port will the engine perform the delay?
> Maybe we will need to validate that a delay operator can only have a single
> input port?
>

My understanding is that the engine performs the +1 delay on the input
ports of operators that are connected to output ports of delay operators.
So whether or not the delay operator has multiple input ports should not
matter.

Re: Supporting iterations in Apex

Posted by Thomas Weise <th...@datatorrent.com>.
+1 for 2

#1, as already commented, mirrors the unifier approach.

This option, as does the unifier, requires special handing for attributes
(extra DAG API, extra support for configuration), which is repetitive.

Also, since it is an operator that directly contributes to the application
logic, it deserves a direct representation in the logical plan vs. hidden
as attribute.

#2 will address that. But if an operator with the delay interface has
multiple input ports, on which port will the engine perform the delay?
Maybe we will need to validate that a delay operator can only have a single
input port?

Thomas

On Tue, Nov 3, 2015 at 4:01 PM, Pramod Immaneni <pr...@datatorrent.com>
wrote:

> That is a good point Tim. However the delay operations typically are just
> that, only delay and not overloaded with other things so why not  option 1.
>
> On Tue, Nov 3, 2015 at 3:34 PM, Timothy Farkas <ti...@datatorrent.com>
> wrote:
>
> > +1 for option 2
> >
> > Also would it be possible to chain delay operators? A lot of stochastic
> and
> > adaptive methods depend on finding the correlations between the current
> > time step n and previous k time steps  (n, n - 1), (n, n - 2), (n, n - 3)
> > ... (n, n - k)
> >
> > Here is a picture of a model that uses delay operators (in the picture
> > these are represented by z^-1) and is used for time series prediction.
> >
> >
> >
> https://upload.wikimedia.org/wikipedia/commons/thumb/d/d2/IIRFilter2.svg/250px-IIRFilter2.svg.png
> >
> > On Tue, Nov 3, 2015 at 3:14 PM, Sasha Parfenov <sa...@datatorrent.com>
> > wrote:
> >
> > > +1 for option 2.  Although option 2 doesn't mirror the current unifiers
> > > like option 1, and may look more complicated when viewing logical
> plan, I
> > > think the benefits of flexibility of specifying locality and ability to
> > > bring multiple downstream operators into a single delay operators may
> be
> > > important for some projects.  For me the added flexibility wins,
> > > particularly in light of efforts towards a simpler high level API.
> > >
> > >
> > >
> > > On Mon, Nov 2, 2015 at 11:25 AM, David Yan <da...@datatorrent.com>
> > wrote:
> > >
> > > > Please share your thoughts using the dev mailing list on this topic
> if
> > > you
> > > > can.  Thanks.
> > > >
> > > > ---------- Forwarded message ----------
> > > > From: David Yan <da...@datatorrent.com>
> > > > Date: Thu, Oct 29, 2015 at 11:11 AM
> > > > Subject: Re: Supporting iterations in Apex
> > > > To: dev@apex.incubator.apache.org
> > > >
> > > >
> > > > This delay operator will act as an input operator for the first
> window
> > > and
> > > > act as a regular operator after that.
> > > > The engine will increment the window id of the windows from all the
> > > output
> > > > ports of the delay operator.
> > > >
> > > > We will need a new interface for the delay operator, extending the
> > > > existing Operator interface.  The interface will probably include:
> > > >
> > > > - Emitting the tuples for the first window
> > > > - Emitting the tuples after recovery
> > > >
> > > > We will provide a default implementation of the delay operator with a
> > > > write-ahead log that stores the tuples for the window before each
> > > > checkpoint for recovery.  We will also probably support the number of
> > > > windows to delay using an operator property.
> > > >
> > > > Let's look at this DAG with an iteration loop:
> > > >
> > > > upstream --> A --> B --> downstream
> > > >              ^     |
> > > >              |-----|
> > > >
> > > > With the delay operator, the physical view of the DAG looks like this
> > > with
> > > > D being the delay operator:
> > > >
> > > > upstream --> A --> B --> downstream
> > > >              ^     |
> > > >              |-D<--|
> > > >
> > > > There are two approaches for specifying the delay operator.
> > > >
> > > > 1) As discussed earlier on this thread, the delay operator can be
> > > > specified as an *input port attribute* of A. The delay operator D
> will
> > > > not appear in the logical DAG.  The engine will do the +1 on the
> window
> > > ID
> > > > based on the presence of the input port attribute.  In this case, the
> > > delay
> > > > operator does not need to specify any input port, just like the
> > unifier,
> > > > with the process(tuple) method implicitly taking in the tuples from
> the
> > > > output port of B, which logically connects to the input port of A.
> > > >
> > > > 2) The delay operator is specified and connected *as any other
> > operator*
> > > > in the logical DAG.  The engine will do the +1 on the window ID if
> the
> > > > operator implements the delay operator interface.  In this case, the
> > > delay
> > > > operator D will need to specify at least one input port (just like a
> > > > regular operator), and can actually have multiple input ports.
> > > >
> > > > I'm leaning toward the 2nd approach.
> > > >
> > > > Please share your thoughts.  Which one you think is better?  Or maybe
> > > > suggest a different approach altogether?
> > > >
> > > > Thanks!
> > > >
> > > > David
> > > >
> > > > David
> > > >
> > > > On Wed, Oct 7, 2015 at 10:51 AM, Thomas Weise <
> thomas@datatorrent.com>
> > > > wrote:
> > > >
> > > >> Why not set the the delay operator as attribute? We already support
> > > >> partitioners and stream codecs as attribute.
> > > >>
> > > >>
> > > >> On Wed, Oct 7, 2015 at 10:09 AM, Pramod Immaneni <
> > > pramod@datatorrent.com>
> > > >> wrote:
> > > >>
> > > >> > How about making just the window delay an attribute on the input
> > port.
> > > >> The
> > > >> > operator connection is just like a normal DAG stream creation. We
> > > could
> > > >> > also support connecting same operator to multiple input ports with
> > > >> > different delay and handle fault recovery accordingly.
> > > >> >
> > > >> > On Wed, Oct 7, 2015 at 9:53 AM, David Yan <da...@datatorrent.com>
> > > >> wrote:
> > > >> >
> > > >> > > The iteration operator actually resembles the usage of unifiers.
> > We
> > > >> have
> > > >> > > getUnifier in the interface of OutputPort.
> > > >> > >
> > > >> > > But if we add getDelayOperator in the interface of InputPort,
> that
> > > >> would
> > > >> > > introduce backward incompatibility especially since we can't use
> > the
> > > >> > > default implementation feature of interfaces that is in Java 8.
> > > >> > >
> > > >> > > Putting the class object as an attribute of the InputPort is not
> > > good
> > > >> > > either because you can't configure the delay operator itself.
> > > >> > >
> > > >> > > Thoughts?
> > > >> > >
> > > >> > > David
> > > >> > >
> > > >> > > On Fri, Sep 25, 2015 at 10:10 AM, David Yan <
> > david@datatorrent.com>
> > > >> > wrote:
> > > >> > >
> > > >> > > > This is a very good idea.  This way, we can have a default
> > > >> > implementation
> > > >> > > > of that operator and the user can control how the tuples are
> > > stored
> > > >> by
> > > >> > > > having his/her own implementation.  How many windows the
> > operator
> > > >> > delays
> > > >> > > is
> > > >> > > > part of the implementation of that operator.
> > > >> > > >
> > > >> > > > I am thinking of getting rid of the ITERATION_WINDOW_OFFSET
> > > >> attribute
> > > >> > and
> > > >> > > > introduce a DELAY_OPERATOR_CLASS attribute so that the user
> can
> > > >> specify
> > > >> > > the
> > > >> > > > delay operator class to be used.
> > > >> > > >
> > > >> > > > More thoughts?
> > > >> > > >
> > > >> > > > David
> > > >> > > >
> > > >> > > > On Thu, Sep 17, 2015 at 7:16 PM, Gaurav Gupta <
> > > >> gaurav@datatorrent.com>
> > > >> > > > wrote:
> > > >> > > >
> > > >> > > >> Hey David,
> > > >> > > >>
> > > >> > > >> I was thinking can we add another operator in front of the
> > input
> > > >> port
> > > >> > > that
> > > >> > > >> has ITERATION_WINDOW_COUNT set. The new additional operator
> > will
> > > >> have
> > > >> > > >> property whose value  will be set equal to
> > ITERATION_WINDOW_COUNT
> > > >> and
> > > >> > it
> > > >> > > >> will be responsible for caching the data for those many
> windows
> > > and
> > > >> > > >> delaying the data. This operator can act as unifier cum
> > iterator
> > > >> > > operator.
> > > >> > > >> For this you may not need any external storage agent as
> > platform
> > > >> > > >> checkpointing should help you here.
> > > >> > > >>
> > > >> > > >> We are doing something similar for Sliding window.
> > > >> > > >>
> > > >> > > >> Thanks
> > > >> > > >> -Gaurav
> > > >> > > >>
> > > >> > > >> On Wed, Sep 16, 2015 at 1:44 PM, David Yan <
> > > david@datatorrent.com>
> > > >> > > wrote:
> > > >> > > >>
> > > >> > > >> > Hi all,
> > > >> > > >> >
> > > >> > > >> > One current disadvantage of Apex is the inability to do
> > > >> iterations
> > > >> > and
> > > >> > > >> > machine learning algorithms because we don't allow loops in
> > the
> > > >> > > >> application
> > > >> > > >> > DAG (hence the name DAG).  I am proposing that we allow
> loops
> > > in
> > > >> the
> > > >> > > >> DAG if
> > > >> > > >> > the loop advances the window ID by a configured amount.  A
> > JIRA
> > > >> > ticket
> > > >> > > >> has
> > > >> > > >> > been created:
> > > >> > > >> >
> > > >> > > >> > https://malhar.atlassian.net/browse/APEX-60
> > > >> > > >> >
> > > >> > > >> > I have started this work in my fork at
> > > >> > > >> >
> > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60
> > > .
> > > >> > > >> >
> > > >> > > >> > The current progress is that a simple test case works.
> Major
> > > >> work
> > > >> > > still
> > > >> > > >> > needs to be done with respect to recovery and partitioning.
> > > >> > > >> >
> > > >> > > >> > The value ITERATION_WINDOW_COUNT is an attribute to an
> input
> > > >> port of
> > > >> > > an
> > > >> > > >> > operator.  If the value of the attribute is greater than or
> > > >> equal to
> > > >> > > 1,
> > > >> > > >> any
> > > >> > > >> > tuples sent to the input port are treated to be
> > > >> > ITERATION_WINDOW_COUNT
> > > >> > > >> > windows ahead of what they are.
> > > >> > > >> >
> > > >> > > >> > For recovery, we will need to checkpoint all the tuples
> > between
> > > >> > ports
> > > >> > > >> with
> > > >> > > >> > the to replay the looped tuples.  During the recovery, if
> the
> > > >> > operator
> > > >> > > >> has
> > > >> > > >> > an input port, with ITERATION_WINDOW_COUNT=2, is recovering
> > > from
> > > >> > > >> checkpoint
> > > >> > > >> > window 14, the tuples for that input port from window 13
> and
> > > >> window
> > > >> > 14
> > > >> > > >> need
> > > >> > > >> > to be replayed to be treated as window 15 and window 16
> > > >> respectively
> > > >> > > >> (13+2
> > > >> > > >> > and 14+2).
> > > >> > > >> >
> > > >> > > >> > In other words, we need to store all the tuples from window
> > > with
> > > >> ID
> > > >> > > >> > committedWindowId minus ITERATION_WINDOW_COUNT for recovery
> > and
> > > >> > purge
> > > >> > > >> the
> > > >> > > >> > tuples earlier than that window.
> > > >> > > >> > We can optimize this by only storing the tuples for
> > > >> > > >> ITERATION_WINDOW_COUNT
> > > >> > > >> > windows prior to any checkpoint.
> > > >> > > >> >
> > > >> > > >> > For that, we need a storage mechanism for the tuples.
> > Chandni
> > > >> > already
> > > >> > > >> has
> > > >> > > >> > something that fits this usage case in Apex Malhar.  The
> > class
> > > is
> > > >> > > >> > IdempotentStorageManager.  In order for this to be used in
> > Apex
> > > >> > core,
> > > >> > > we
> > > >> > > >> > need to deprecate the class in Apex Malhar and move it to
> > Apex
> > > >> Core.
> > > >> > > >> >
> > > >> > > >> > A JIRA ticket has been created for this particular work:
> > > >> > > >> >
> > > >> > > >> > https://malhar.atlassian.net/browse/APEX-128
> > > >> > > >> >
> > > >> > > >> > Some of the above has been discussed among Thomas, Chetan,
> > > >> Chandni,
> > > >> > > and
> > > >> > > >> > myself.
> > > >> > > >> >
> > > >> > > >> > For partitioning, we have not started any discussion or
> > > >> > brainstorming.
> > > >> > > >> We
> > > >> > > >> > appreciate any feedback on this and any other aspect
> related
> > to
> > > >> > > >> supporting
> > > >> > > >> > iterations in general.
> > > >> > > >> >
> > > >> > > >> > Thanks!
> > > >> > > >> >
> > > >> > > >> > David
> > > >> > > >> >
> > > >> > > >>
> > > >> > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > > >
> > > >
> > >
> >
>

Re: Supporting iterations in Apex

Posted by Pramod Immaneni <pr...@datatorrent.com>.
That is a good point Tim. However the delay operations typically are just
that, only delay and not overloaded with other things so why not  option 1.

On Tue, Nov 3, 2015 at 3:34 PM, Timothy Farkas <ti...@datatorrent.com> wrote:

> +1 for option 2
>
> Also would it be possible to chain delay operators? A lot of stochastic and
> adaptive methods depend on finding the correlations between the current
> time step n and previous k time steps  (n, n - 1), (n, n - 2), (n, n - 3)
> ... (n, n - k)
>
> Here is a picture of a model that uses delay operators (in the picture
> these are represented by z^-1) and is used for time series prediction.
>
>
> https://upload.wikimedia.org/wikipedia/commons/thumb/d/d2/IIRFilter2.svg/250px-IIRFilter2.svg.png
>
> On Tue, Nov 3, 2015 at 3:14 PM, Sasha Parfenov <sa...@datatorrent.com>
> wrote:
>
> > +1 for option 2.  Although option 2 doesn't mirror the current unifiers
> > like option 1, and may look more complicated when viewing logical plan, I
> > think the benefits of flexibility of specifying locality and ability to
> > bring multiple downstream operators into a single delay operators may be
> > important for some projects.  For me the added flexibility wins,
> > particularly in light of efforts towards a simpler high level API.
> >
> >
> >
> > On Mon, Nov 2, 2015 at 11:25 AM, David Yan <da...@datatorrent.com>
> wrote:
> >
> > > Please share your thoughts using the dev mailing list on this topic if
> > you
> > > can.  Thanks.
> > >
> > > ---------- Forwarded message ----------
> > > From: David Yan <da...@datatorrent.com>
> > > Date: Thu, Oct 29, 2015 at 11:11 AM
> > > Subject: Re: Supporting iterations in Apex
> > > To: dev@apex.incubator.apache.org
> > >
> > >
> > > This delay operator will act as an input operator for the first window
> > and
> > > act as a regular operator after that.
> > > The engine will increment the window id of the windows from all the
> > output
> > > ports of the delay operator.
> > >
> > > We will need a new interface for the delay operator, extending the
> > > existing Operator interface.  The interface will probably include:
> > >
> > > - Emitting the tuples for the first window
> > > - Emitting the tuples after recovery
> > >
> > > We will provide a default implementation of the delay operator with a
> > > write-ahead log that stores the tuples for the window before each
> > > checkpoint for recovery.  We will also probably support the number of
> > > windows to delay using an operator property.
> > >
> > > Let's look at this DAG with an iteration loop:
> > >
> > > upstream --> A --> B --> downstream
> > >              ^     |
> > >              |-----|
> > >
> > > With the delay operator, the physical view of the DAG looks like this
> > with
> > > D being the delay operator:
> > >
> > > upstream --> A --> B --> downstream
> > >              ^     |
> > >              |-D<--|
> > >
> > > There are two approaches for specifying the delay operator.
> > >
> > > 1) As discussed earlier on this thread, the delay operator can be
> > > specified as an *input port attribute* of A. The delay operator D will
> > > not appear in the logical DAG.  The engine will do the +1 on the window
> > ID
> > > based on the presence of the input port attribute.  In this case, the
> > delay
> > > operator does not need to specify any input port, just like the
> unifier,
> > > with the process(tuple) method implicitly taking in the tuples from the
> > > output port of B, which logically connects to the input port of A.
> > >
> > > 2) The delay operator is specified and connected *as any other
> operator*
> > > in the logical DAG.  The engine will do the +1 on the window ID if the
> > > operator implements the delay operator interface.  In this case, the
> > delay
> > > operator D will need to specify at least one input port (just like a
> > > regular operator), and can actually have multiple input ports.
> > >
> > > I'm leaning toward the 2nd approach.
> > >
> > > Please share your thoughts.  Which one you think is better?  Or maybe
> > > suggest a different approach altogether?
> > >
> > > Thanks!
> > >
> > > David
> > >
> > > David
> > >
> > > On Wed, Oct 7, 2015 at 10:51 AM, Thomas Weise <th...@datatorrent.com>
> > > wrote:
> > >
> > >> Why not set the the delay operator as attribute? We already support
> > >> partitioners and stream codecs as attribute.
> > >>
> > >>
> > >> On Wed, Oct 7, 2015 at 10:09 AM, Pramod Immaneni <
> > pramod@datatorrent.com>
> > >> wrote:
> > >>
> > >> > How about making just the window delay an attribute on the input
> port.
> > >> The
> > >> > operator connection is just like a normal DAG stream creation. We
> > could
> > >> > also support connecting same operator to multiple input ports with
> > >> > different delay and handle fault recovery accordingly.
> > >> >
> > >> > On Wed, Oct 7, 2015 at 9:53 AM, David Yan <da...@datatorrent.com>
> > >> wrote:
> > >> >
> > >> > > The iteration operator actually resembles the usage of unifiers.
> We
> > >> have
> > >> > > getUnifier in the interface of OutputPort.
> > >> > >
> > >> > > But if we add getDelayOperator in the interface of InputPort, that
> > >> would
> > >> > > introduce backward incompatibility especially since we can't use
> the
> > >> > > default implementation feature of interfaces that is in Java 8.
> > >> > >
> > >> > > Putting the class object as an attribute of the InputPort is not
> > good
> > >> > > either because you can't configure the delay operator itself.
> > >> > >
> > >> > > Thoughts?
> > >> > >
> > >> > > David
> > >> > >
> > >> > > On Fri, Sep 25, 2015 at 10:10 AM, David Yan <
> david@datatorrent.com>
> > >> > wrote:
> > >> > >
> > >> > > > This is a very good idea.  This way, we can have a default
> > >> > implementation
> > >> > > > of that operator and the user can control how the tuples are
> > stored
> > >> by
> > >> > > > having his/her own implementation.  How many windows the
> operator
> > >> > delays
> > >> > > is
> > >> > > > part of the implementation of that operator.
> > >> > > >
> > >> > > > I am thinking of getting rid of the ITERATION_WINDOW_OFFSET
> > >> attribute
> > >> > and
> > >> > > > introduce a DELAY_OPERATOR_CLASS attribute so that the user can
> > >> specify
> > >> > > the
> > >> > > > delay operator class to be used.
> > >> > > >
> > >> > > > More thoughts?
> > >> > > >
> > >> > > > David
> > >> > > >
> > >> > > > On Thu, Sep 17, 2015 at 7:16 PM, Gaurav Gupta <
> > >> gaurav@datatorrent.com>
> > >> > > > wrote:
> > >> > > >
> > >> > > >> Hey David,
> > >> > > >>
> > >> > > >> I was thinking can we add another operator in front of the
> input
> > >> port
> > >> > > that
> > >> > > >> has ITERATION_WINDOW_COUNT set. The new additional operator
> will
> > >> have
> > >> > > >> property whose value  will be set equal to
> ITERATION_WINDOW_COUNT
> > >> and
> > >> > it
> > >> > > >> will be responsible for caching the data for those many windows
> > and
> > >> > > >> delaying the data. This operator can act as unifier cum
> iterator
> > >> > > operator.
> > >> > > >> For this you may not need any external storage agent as
> platform
> > >> > > >> checkpointing should help you here.
> > >> > > >>
> > >> > > >> We are doing something similar for Sliding window.
> > >> > > >>
> > >> > > >> Thanks
> > >> > > >> -Gaurav
> > >> > > >>
> > >> > > >> On Wed, Sep 16, 2015 at 1:44 PM, David Yan <
> > david@datatorrent.com>
> > >> > > wrote:
> > >> > > >>
> > >> > > >> > Hi all,
> > >> > > >> >
> > >> > > >> > One current disadvantage of Apex is the inability to do
> > >> iterations
> > >> > and
> > >> > > >> > machine learning algorithms because we don't allow loops in
> the
> > >> > > >> application
> > >> > > >> > DAG (hence the name DAG).  I am proposing that we allow loops
> > in
> > >> the
> > >> > > >> DAG if
> > >> > > >> > the loop advances the window ID by a configured amount.  A
> JIRA
> > >> > ticket
> > >> > > >> has
> > >> > > >> > been created:
> > >> > > >> >
> > >> > > >> > https://malhar.atlassian.net/browse/APEX-60
> > >> > > >> >
> > >> > > >> > I have started this work in my fork at
> > >> > > >> >
> https://github.com/davidyan74/incubator-apex-core/tree/APEX-60
> > .
> > >> > > >> >
> > >> > > >> > The current progress is that a simple test case works.  Major
> > >> work
> > >> > > still
> > >> > > >> > needs to be done with respect to recovery and partitioning.
> > >> > > >> >
> > >> > > >> > The value ITERATION_WINDOW_COUNT is an attribute to an input
> > >> port of
> > >> > > an
> > >> > > >> > operator.  If the value of the attribute is greater than or
> > >> equal to
> > >> > > 1,
> > >> > > >> any
> > >> > > >> > tuples sent to the input port are treated to be
> > >> > ITERATION_WINDOW_COUNT
> > >> > > >> > windows ahead of what they are.
> > >> > > >> >
> > >> > > >> > For recovery, we will need to checkpoint all the tuples
> between
> > >> > ports
> > >> > > >> with
> > >> > > >> > the to replay the looped tuples.  During the recovery, if the
> > >> > operator
> > >> > > >> has
> > >> > > >> > an input port, with ITERATION_WINDOW_COUNT=2, is recovering
> > from
> > >> > > >> checkpoint
> > >> > > >> > window 14, the tuples for that input port from window 13 and
> > >> window
> > >> > 14
> > >> > > >> need
> > >> > > >> > to be replayed to be treated as window 15 and window 16
> > >> respectively
> > >> > > >> (13+2
> > >> > > >> > and 14+2).
> > >> > > >> >
> > >> > > >> > In other words, we need to store all the tuples from window
> > with
> > >> ID
> > >> > > >> > committedWindowId minus ITERATION_WINDOW_COUNT for recovery
> and
> > >> > purge
> > >> > > >> the
> > >> > > >> > tuples earlier than that window.
> > >> > > >> > We can optimize this by only storing the tuples for
> > >> > > >> ITERATION_WINDOW_COUNT
> > >> > > >> > windows prior to any checkpoint.
> > >> > > >> >
> > >> > > >> > For that, we need a storage mechanism for the tuples.
> Chandni
> > >> > already
> > >> > > >> has
> > >> > > >> > something that fits this usage case in Apex Malhar.  The
> class
> > is
> > >> > > >> > IdempotentStorageManager.  In order for this to be used in
> Apex
> > >> > core,
> > >> > > we
> > >> > > >> > need to deprecate the class in Apex Malhar and move it to
> Apex
> > >> Core.
> > >> > > >> >
> > >> > > >> > A JIRA ticket has been created for this particular work:
> > >> > > >> >
> > >> > > >> > https://malhar.atlassian.net/browse/APEX-128
> > >> > > >> >
> > >> > > >> > Some of the above has been discussed among Thomas, Chetan,
> > >> Chandni,
> > >> > > and
> > >> > > >> > myself.
> > >> > > >> >
> > >> > > >> > For partitioning, we have not started any discussion or
> > >> > brainstorming.
> > >> > > >> We
> > >> > > >> > appreciate any feedback on this and any other aspect related
> to
> > >> > > >> supporting
> > >> > > >> > iterations in general.
> > >> > > >> >
> > >> > > >> > Thanks!
> > >> > > >> >
> > >> > > >> > David
> > >> > > >> >
> > >> > > >>
> > >> > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> > >
> >
>

Re: Supporting iterations in Apex

Posted by Amol Kekre <am...@datatorrent.com>.
I just had a talk with David, and I am +1 for option 2, with delay as an
explicit operator.

Thks,
Amol


On Tue, Nov 3, 2015 at 3:34 PM, Timothy Farkas <ti...@datatorrent.com> wrote:

> +1 for option 2
>
> Also would it be possible to chain delay operators? A lot of stochastic and
> adaptive methods depend on finding the correlations between the current
> time step n and previous k time steps  (n, n - 1), (n, n - 2), (n, n - 3)
> ... (n, n - k)
>
> Here is a picture of a model that uses delay operators (in the picture
> these are represented by z^-1) and is used for time series prediction.
>
>
> https://upload.wikimedia.org/wikipedia/commons/thumb/d/d2/IIRFilter2.svg/250px-IIRFilter2.svg.png
>
> On Tue, Nov 3, 2015 at 3:14 PM, Sasha Parfenov <sa...@datatorrent.com>
> wrote:
>
> > +1 for option 2.  Although option 2 doesn't mirror the current unifiers
> > like option 1, and may look more complicated when viewing logical plan, I
> > think the benefits of flexibility of specifying locality and ability to
> > bring multiple downstream operators into a single delay operators may be
> > important for some projects.  For me the added flexibility wins,
> > particularly in light of efforts towards a simpler high level API.
> >
> >
> >
> > On Mon, Nov 2, 2015 at 11:25 AM, David Yan <da...@datatorrent.com>
> wrote:
> >
> > > Please share your thoughts using the dev mailing list on this topic if
> > you
> > > can.  Thanks.
> > >
> > > ---------- Forwarded message ----------
> > > From: David Yan <da...@datatorrent.com>
> > > Date: Thu, Oct 29, 2015 at 11:11 AM
> > > Subject: Re: Supporting iterations in Apex
> > > To: dev@apex.incubator.apache.org
> > >
> > >
> > > This delay operator will act as an input operator for the first window
> > and
> > > act as a regular operator after that.
> > > The engine will increment the window id of the windows from all the
> > output
> > > ports of the delay operator.
> > >
> > > We will need a new interface for the delay operator, extending the
> > > existing Operator interface.  The interface will probably include:
> > >
> > > - Emitting the tuples for the first window
> > > - Emitting the tuples after recovery
> > >
> > > We will provide a default implementation of the delay operator with a
> > > write-ahead log that stores the tuples for the window before each
> > > checkpoint for recovery.  We will also probably support the number of
> > > windows to delay using an operator property.
> > >
> > > Let's look at this DAG with an iteration loop:
> > >
> > > upstream --> A --> B --> downstream
> > >              ^     |
> > >              |-----|
> > >
> > > With the delay operator, the physical view of the DAG looks like this
> > with
> > > D being the delay operator:
> > >
> > > upstream --> A --> B --> downstream
> > >              ^     |
> > >              |-D<--|
> > >
> > > There are two approaches for specifying the delay operator.
> > >
> > > 1) As discussed earlier on this thread, the delay operator can be
> > > specified as an *input port attribute* of A. The delay operator D will
> > > not appear in the logical DAG.  The engine will do the +1 on the window
> > ID
> > > based on the presence of the input port attribute.  In this case, the
> > delay
> > > operator does not need to specify any input port, just like the
> unifier,
> > > with the process(tuple) method implicitly taking in the tuples from the
> > > output port of B, which logically connects to the input port of A.
> > >
> > > 2) The delay operator is specified and connected *as any other
> operator*
> > > in the logical DAG.  The engine will do the +1 on the window ID if the
> > > operator implements the delay operator interface.  In this case, the
> > delay
> > > operator D will need to specify at least one input port (just like a
> > > regular operator), and can actually have multiple input ports.
> > >
> > > I'm leaning toward the 2nd approach.
> > >
> > > Please share your thoughts.  Which one you think is better?  Or maybe
> > > suggest a different approach altogether?
> > >
> > > Thanks!
> > >
> > > David
> > >
> > > David
> > >
> > > On Wed, Oct 7, 2015 at 10:51 AM, Thomas Weise <th...@datatorrent.com>
> > > wrote:
> > >
> > >> Why not set the the delay operator as attribute? We already support
> > >> partitioners and stream codecs as attribute.
> > >>
> > >>
> > >> On Wed, Oct 7, 2015 at 10:09 AM, Pramod Immaneni <
> > pramod@datatorrent.com>
> > >> wrote:
> > >>
> > >> > How about making just the window delay an attribute on the input
> port.
> > >> The
> > >> > operator connection is just like a normal DAG stream creation. We
> > could
> > >> > also support connecting same operator to multiple input ports with
> > >> > different delay and handle fault recovery accordingly.
> > >> >
> > >> > On Wed, Oct 7, 2015 at 9:53 AM, David Yan <da...@datatorrent.com>
> > >> wrote:
> > >> >
> > >> > > The iteration operator actually resembles the usage of unifiers.
> We
> > >> have
> > >> > > getUnifier in the interface of OutputPort.
> > >> > >
> > >> > > But if we add getDelayOperator in the interface of InputPort, that
> > >> would
> > >> > > introduce backward incompatibility especially since we can't use
> the
> > >> > > default implementation feature of interfaces that is in Java 8.
> > >> > >
> > >> > > Putting the class object as an attribute of the InputPort is not
> > good
> > >> > > either because you can't configure the delay operator itself.
> > >> > >
> > >> > > Thoughts?
> > >> > >
> > >> > > David
> > >> > >
> > >> > > On Fri, Sep 25, 2015 at 10:10 AM, David Yan <
> david@datatorrent.com>
> > >> > wrote:
> > >> > >
> > >> > > > This is a very good idea.  This way, we can have a default
> > >> > implementation
> > >> > > > of that operator and the user can control how the tuples are
> > stored
> > >> by
> > >> > > > having his/her own implementation.  How many windows the
> operator
> > >> > delays
> > >> > > is
> > >> > > > part of the implementation of that operator.
> > >> > > >
> > >> > > > I am thinking of getting rid of the ITERATION_WINDOW_OFFSET
> > >> attribute
> > >> > and
> > >> > > > introduce a DELAY_OPERATOR_CLASS attribute so that the user can
> > >> specify
> > >> > > the
> > >> > > > delay operator class to be used.
> > >> > > >
> > >> > > > More thoughts?
> > >> > > >
> > >> > > > David
> > >> > > >
> > >> > > > On Thu, Sep 17, 2015 at 7:16 PM, Gaurav Gupta <
> > >> gaurav@datatorrent.com>
> > >> > > > wrote:
> > >> > > >
> > >> > > >> Hey David,
> > >> > > >>
> > >> > > >> I was thinking can we add another operator in front of the
> input
> > >> port
> > >> > > that
> > >> > > >> has ITERATION_WINDOW_COUNT set. The new additional operator
> will
> > >> have
> > >> > > >> property whose value  will be set equal to
> ITERATION_WINDOW_COUNT
> > >> and
> > >> > it
> > >> > > >> will be responsible for caching the data for those many windows
> > and
> > >> > > >> delaying the data. This operator can act as unifier cum
> iterator
> > >> > > operator.
> > >> > > >> For this you may not need any external storage agent as
> platform
> > >> > > >> checkpointing should help you here.
> > >> > > >>
> > >> > > >> We are doing something similar for Sliding window.
> > >> > > >>
> > >> > > >> Thanks
> > >> > > >> -Gaurav
> > >> > > >>
> > >> > > >> On Wed, Sep 16, 2015 at 1:44 PM, David Yan <
> > david@datatorrent.com>
> > >> > > wrote:
> > >> > > >>
> > >> > > >> > Hi all,
> > >> > > >> >
> > >> > > >> > One current disadvantage of Apex is the inability to do
> > >> iterations
> > >> > and
> > >> > > >> > machine learning algorithms because we don't allow loops in
> the
> > >> > > >> application
> > >> > > >> > DAG (hence the name DAG).  I am proposing that we allow loops
> > in
> > >> the
> > >> > > >> DAG if
> > >> > > >> > the loop advances the window ID by a configured amount.  A
> JIRA
> > >> > ticket
> > >> > > >> has
> > >> > > >> > been created:
> > >> > > >> >
> > >> > > >> > https://malhar.atlassian.net/browse/APEX-60
> > >> > > >> >
> > >> > > >> > I have started this work in my fork at
> > >> > > >> >
> https://github.com/davidyan74/incubator-apex-core/tree/APEX-60
> > .
> > >> > > >> >
> > >> > > >> > The current progress is that a simple test case works.  Major
> > >> work
> > >> > > still
> > >> > > >> > needs to be done with respect to recovery and partitioning.
> > >> > > >> >
> > >> > > >> > The value ITERATION_WINDOW_COUNT is an attribute to an input
> > >> port of
> > >> > > an
> > >> > > >> > operator.  If the value of the attribute is greater than or
> > >> equal to
> > >> > > 1,
> > >> > > >> any
> > >> > > >> > tuples sent to the input port are treated to be
> > >> > ITERATION_WINDOW_COUNT
> > >> > > >> > windows ahead of what they are.
> > >> > > >> >
> > >> > > >> > For recovery, we will need to checkpoint all the tuples
> between
> > >> > ports
> > >> > > >> with
> > >> > > >> > the to replay the looped tuples.  During the recovery, if the
> > >> > operator
> > >> > > >> has
> > >> > > >> > an input port, with ITERATION_WINDOW_COUNT=2, is recovering
> > from
> > >> > > >> checkpoint
> > >> > > >> > window 14, the tuples for that input port from window 13 and
> > >> window
> > >> > 14
> > >> > > >> need
> > >> > > >> > to be replayed to be treated as window 15 and window 16
> > >> respectively
> > >> > > >> (13+2
> > >> > > >> > and 14+2).
> > >> > > >> >
> > >> > > >> > In other words, we need to store all the tuples from window
> > with
> > >> ID
> > >> > > >> > committedWindowId minus ITERATION_WINDOW_COUNT for recovery
> and
> > >> > purge
> > >> > > >> the
> > >> > > >> > tuples earlier than that window.
> > >> > > >> > We can optimize this by only storing the tuples for
> > >> > > >> ITERATION_WINDOW_COUNT
> > >> > > >> > windows prior to any checkpoint.
> > >> > > >> >
> > >> > > >> > For that, we need a storage mechanism for the tuples.
> Chandni
> > >> > already
> > >> > > >> has
> > >> > > >> > something that fits this usage case in Apex Malhar.  The
> class
> > is
> > >> > > >> > IdempotentStorageManager.  In order for this to be used in
> Apex
> > >> > core,
> > >> > > we
> > >> > > >> > need to deprecate the class in Apex Malhar and move it to
> Apex
> > >> Core.
> > >> > > >> >
> > >> > > >> > A JIRA ticket has been created for this particular work:
> > >> > > >> >
> > >> > > >> > https://malhar.atlassian.net/browse/APEX-128
> > >> > > >> >
> > >> > > >> > Some of the above has been discussed among Thomas, Chetan,
> > >> Chandni,
> > >> > > and
> > >> > > >> > myself.
> > >> > > >> >
> > >> > > >> > For partitioning, we have not started any discussion or
> > >> > brainstorming.
> > >> > > >> We
> > >> > > >> > appreciate any feedback on this and any other aspect related
> to
> > >> > > >> supporting
> > >> > > >> > iterations in general.
> > >> > > >> >
> > >> > > >> > Thanks!
> > >> > > >> >
> > >> > > >> > David
> > >> > > >> >
> > >> > > >>
> > >> > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> > >
> >
>

Re: Supporting iterations in Apex

Posted by Timothy Farkas <ti...@datatorrent.com>.
+1 for option 2

Also would it be possible to chain delay operators? A lot of stochastic and
adaptive methods depend on finding the correlations between the current
time step n and previous k time steps  (n, n - 1), (n, n - 2), (n, n - 3)
... (n, n - k)

Here is a picture of a model that uses delay operators (in the picture
these are represented by z^-1) and is used for time series prediction.

https://upload.wikimedia.org/wikipedia/commons/thumb/d/d2/IIRFilter2.svg/250px-IIRFilter2.svg.png

On Tue, Nov 3, 2015 at 3:14 PM, Sasha Parfenov <sa...@datatorrent.com>
wrote:

> +1 for option 2.  Although option 2 doesn't mirror the current unifiers
> like option 1, and may look more complicated when viewing logical plan, I
> think the benefits of flexibility of specifying locality and ability to
> bring multiple downstream operators into a single delay operators may be
> important for some projects.  For me the added flexibility wins,
> particularly in light of efforts towards a simpler high level API.
>
>
>
> On Mon, Nov 2, 2015 at 11:25 AM, David Yan <da...@datatorrent.com> wrote:
>
> > Please share your thoughts using the dev mailing list on this topic if
> you
> > can.  Thanks.
> >
> > ---------- Forwarded message ----------
> > From: David Yan <da...@datatorrent.com>
> > Date: Thu, Oct 29, 2015 at 11:11 AM
> > Subject: Re: Supporting iterations in Apex
> > To: dev@apex.incubator.apache.org
> >
> >
> > This delay operator will act as an input operator for the first window
> and
> > act as a regular operator after that.
> > The engine will increment the window id of the windows from all the
> output
> > ports of the delay operator.
> >
> > We will need a new interface for the delay operator, extending the
> > existing Operator interface.  The interface will probably include:
> >
> > - Emitting the tuples for the first window
> > - Emitting the tuples after recovery
> >
> > We will provide a default implementation of the delay operator with a
> > write-ahead log that stores the tuples for the window before each
> > checkpoint for recovery.  We will also probably support the number of
> > windows to delay using an operator property.
> >
> > Let's look at this DAG with an iteration loop:
> >
> > upstream --> A --> B --> downstream
> >              ^     |
> >              |-----|
> >
> > With the delay operator, the physical view of the DAG looks like this
> with
> > D being the delay operator:
> >
> > upstream --> A --> B --> downstream
> >              ^     |
> >              |-D<--|
> >
> > There are two approaches for specifying the delay operator.
> >
> > 1) As discussed earlier on this thread, the delay operator can be
> > specified as an *input port attribute* of A. The delay operator D will
> > not appear in the logical DAG.  The engine will do the +1 on the window
> ID
> > based on the presence of the input port attribute.  In this case, the
> delay
> > operator does not need to specify any input port, just like the unifier,
> > with the process(tuple) method implicitly taking in the tuples from the
> > output port of B, which logically connects to the input port of A.
> >
> > 2) The delay operator is specified and connected *as any other operator*
> > in the logical DAG.  The engine will do the +1 on the window ID if the
> > operator implements the delay operator interface.  In this case, the
> delay
> > operator D will need to specify at least one input port (just like a
> > regular operator), and can actually have multiple input ports.
> >
> > I'm leaning toward the 2nd approach.
> >
> > Please share your thoughts.  Which one you think is better?  Or maybe
> > suggest a different approach altogether?
> >
> > Thanks!
> >
> > David
> >
> > David
> >
> > On Wed, Oct 7, 2015 at 10:51 AM, Thomas Weise <th...@datatorrent.com>
> > wrote:
> >
> >> Why not set the the delay operator as attribute? We already support
> >> partitioners and stream codecs as attribute.
> >>
> >>
> >> On Wed, Oct 7, 2015 at 10:09 AM, Pramod Immaneni <
> pramod@datatorrent.com>
> >> wrote:
> >>
> >> > How about making just the window delay an attribute on the input port.
> >> The
> >> > operator connection is just like a normal DAG stream creation. We
> could
> >> > also support connecting same operator to multiple input ports with
> >> > different delay and handle fault recovery accordingly.
> >> >
> >> > On Wed, Oct 7, 2015 at 9:53 AM, David Yan <da...@datatorrent.com>
> >> wrote:
> >> >
> >> > > The iteration operator actually resembles the usage of unifiers.  We
> >> have
> >> > > getUnifier in the interface of OutputPort.
> >> > >
> >> > > But if we add getDelayOperator in the interface of InputPort, that
> >> would
> >> > > introduce backward incompatibility especially since we can't use the
> >> > > default implementation feature of interfaces that is in Java 8.
> >> > >
> >> > > Putting the class object as an attribute of the InputPort is not
> good
> >> > > either because you can't configure the delay operator itself.
> >> > >
> >> > > Thoughts?
> >> > >
> >> > > David
> >> > >
> >> > > On Fri, Sep 25, 2015 at 10:10 AM, David Yan <da...@datatorrent.com>
> >> > wrote:
> >> > >
> >> > > > This is a very good idea.  This way, we can have a default
> >> > implementation
> >> > > > of that operator and the user can control how the tuples are
> stored
> >> by
> >> > > > having his/her own implementation.  How many windows the operator
> >> > delays
> >> > > is
> >> > > > part of the implementation of that operator.
> >> > > >
> >> > > > I am thinking of getting rid of the ITERATION_WINDOW_OFFSET
> >> attribute
> >> > and
> >> > > > introduce a DELAY_OPERATOR_CLASS attribute so that the user can
> >> specify
> >> > > the
> >> > > > delay operator class to be used.
> >> > > >
> >> > > > More thoughts?
> >> > > >
> >> > > > David
> >> > > >
> >> > > > On Thu, Sep 17, 2015 at 7:16 PM, Gaurav Gupta <
> >> gaurav@datatorrent.com>
> >> > > > wrote:
> >> > > >
> >> > > >> Hey David,
> >> > > >>
> >> > > >> I was thinking can we add another operator in front of the input
> >> port
> >> > > that
> >> > > >> has ITERATION_WINDOW_COUNT set. The new additional operator will
> >> have
> >> > > >> property whose value  will be set equal to ITERATION_WINDOW_COUNT
> >> and
> >> > it
> >> > > >> will be responsible for caching the data for those many windows
> and
> >> > > >> delaying the data. This operator can act as unifier cum iterator
> >> > > operator.
> >> > > >> For this you may not need any external storage agent as platform
> >> > > >> checkpointing should help you here.
> >> > > >>
> >> > > >> We are doing something similar for Sliding window.
> >> > > >>
> >> > > >> Thanks
> >> > > >> -Gaurav
> >> > > >>
> >> > > >> On Wed, Sep 16, 2015 at 1:44 PM, David Yan <
> david@datatorrent.com>
> >> > > wrote:
> >> > > >>
> >> > > >> > Hi all,
> >> > > >> >
> >> > > >> > One current disadvantage of Apex is the inability to do
> >> iterations
> >> > and
> >> > > >> > machine learning algorithms because we don't allow loops in the
> >> > > >> application
> >> > > >> > DAG (hence the name DAG).  I am proposing that we allow loops
> in
> >> the
> >> > > >> DAG if
> >> > > >> > the loop advances the window ID by a configured amount.  A JIRA
> >> > ticket
> >> > > >> has
> >> > > >> > been created:
> >> > > >> >
> >> > > >> > https://malhar.atlassian.net/browse/APEX-60
> >> > > >> >
> >> > > >> > I have started this work in my fork at
> >> > > >> > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60
> .
> >> > > >> >
> >> > > >> > The current progress is that a simple test case works.  Major
> >> work
> >> > > still
> >> > > >> > needs to be done with respect to recovery and partitioning.
> >> > > >> >
> >> > > >> > The value ITERATION_WINDOW_COUNT is an attribute to an input
> >> port of
> >> > > an
> >> > > >> > operator.  If the value of the attribute is greater than or
> >> equal to
> >> > > 1,
> >> > > >> any
> >> > > >> > tuples sent to the input port are treated to be
> >> > ITERATION_WINDOW_COUNT
> >> > > >> > windows ahead of what they are.
> >> > > >> >
> >> > > >> > For recovery, we will need to checkpoint all the tuples between
> >> > ports
> >> > > >> with
> >> > > >> > the to replay the looped tuples.  During the recovery, if the
> >> > operator
> >> > > >> has
> >> > > >> > an input port, with ITERATION_WINDOW_COUNT=2, is recovering
> from
> >> > > >> checkpoint
> >> > > >> > window 14, the tuples for that input port from window 13 and
> >> window
> >> > 14
> >> > > >> need
> >> > > >> > to be replayed to be treated as window 15 and window 16
> >> respectively
> >> > > >> (13+2
> >> > > >> > and 14+2).
> >> > > >> >
> >> > > >> > In other words, we need to store all the tuples from window
> with
> >> ID
> >> > > >> > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and
> >> > purge
> >> > > >> the
> >> > > >> > tuples earlier than that window.
> >> > > >> > We can optimize this by only storing the tuples for
> >> > > >> ITERATION_WINDOW_COUNT
> >> > > >> > windows prior to any checkpoint.
> >> > > >> >
> >> > > >> > For that, we need a storage mechanism for the tuples.  Chandni
> >> > already
> >> > > >> has
> >> > > >> > something that fits this usage case in Apex Malhar.  The class
> is
> >> > > >> > IdempotentStorageManager.  In order for this to be used in Apex
> >> > core,
> >> > > we
> >> > > >> > need to deprecate the class in Apex Malhar and move it to Apex
> >> Core.
> >> > > >> >
> >> > > >> > A JIRA ticket has been created for this particular work:
> >> > > >> >
> >> > > >> > https://malhar.atlassian.net/browse/APEX-128
> >> > > >> >
> >> > > >> > Some of the above has been discussed among Thomas, Chetan,
> >> Chandni,
> >> > > and
> >> > > >> > myself.
> >> > > >> >
> >> > > >> > For partitioning, we have not started any discussion or
> >> > brainstorming.
> >> > > >> We
> >> > > >> > appreciate any feedback on this and any other aspect related to
> >> > > >> supporting
> >> > > >> > iterations in general.
> >> > > >> >
> >> > > >> > Thanks!
> >> > > >> >
> >> > > >> > David
> >> > > >> >
> >> > > >>
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> >
> >
> >
>

Re: Supporting iterations in Apex

Posted by Sasha Parfenov <sa...@datatorrent.com>.
+1 for option 2.  Although option 2 doesn't mirror the current unifiers
like option 1, and may look more complicated when viewing logical plan, I
think the benefits of flexibility of specifying locality and ability to
bring multiple downstream operators into a single delay operators may be
important for some projects.  For me the added flexibility wins,
particularly in light of efforts towards a simpler high level API.



On Mon, Nov 2, 2015 at 11:25 AM, David Yan <da...@datatorrent.com> wrote:

> Please share your thoughts using the dev mailing list on this topic if you
> can.  Thanks.
>
> ---------- Forwarded message ----------
> From: David Yan <da...@datatorrent.com>
> Date: Thu, Oct 29, 2015 at 11:11 AM
> Subject: Re: Supporting iterations in Apex
> To: dev@apex.incubator.apache.org
>
>
> This delay operator will act as an input operator for the first window and
> act as a regular operator after that.
> The engine will increment the window id of the windows from all the output
> ports of the delay operator.
>
> We will need a new interface for the delay operator, extending the
> existing Operator interface.  The interface will probably include:
>
> - Emitting the tuples for the first window
> - Emitting the tuples after recovery
>
> We will provide a default implementation of the delay operator with a
> write-ahead log that stores the tuples for the window before each
> checkpoint for recovery.  We will also probably support the number of
> windows to delay using an operator property.
>
> Let's look at this DAG with an iteration loop:
>
> upstream --> A --> B --> downstream
>              ^     |
>              |-----|
>
> With the delay operator, the physical view of the DAG looks like this with
> D being the delay operator:
>
> upstream --> A --> B --> downstream
>              ^     |
>              |-D<--|
>
> There are two approaches for specifying the delay operator.
>
> 1) As discussed earlier on this thread, the delay operator can be
> specified as an *input port attribute* of A. The delay operator D will
> not appear in the logical DAG.  The engine will do the +1 on the window ID
> based on the presence of the input port attribute.  In this case, the delay
> operator does not need to specify any input port, just like the unifier,
> with the process(tuple) method implicitly taking in the tuples from the
> output port of B, which logically connects to the input port of A.
>
> 2) The delay operator is specified and connected *as any other operator*
> in the logical DAG.  The engine will do the +1 on the window ID if the
> operator implements the delay operator interface.  In this case, the delay
> operator D will need to specify at least one input port (just like a
> regular operator), and can actually have multiple input ports.
>
> I'm leaning toward the 2nd approach.
>
> Please share your thoughts.  Which one you think is better?  Or maybe
> suggest a different approach altogether?
>
> Thanks!
>
> David
>
> David
>
> On Wed, Oct 7, 2015 at 10:51 AM, Thomas Weise <th...@datatorrent.com>
> wrote:
>
>> Why not set the the delay operator as attribute? We already support
>> partitioners and stream codecs as attribute.
>>
>>
>> On Wed, Oct 7, 2015 at 10:09 AM, Pramod Immaneni <pr...@datatorrent.com>
>> wrote:
>>
>> > How about making just the window delay an attribute on the input port.
>> The
>> > operator connection is just like a normal DAG stream creation. We could
>> > also support connecting same operator to multiple input ports with
>> > different delay and handle fault recovery accordingly.
>> >
>> > On Wed, Oct 7, 2015 at 9:53 AM, David Yan <da...@datatorrent.com>
>> wrote:
>> >
>> > > The iteration operator actually resembles the usage of unifiers.  We
>> have
>> > > getUnifier in the interface of OutputPort.
>> > >
>> > > But if we add getDelayOperator in the interface of InputPort, that
>> would
>> > > introduce backward incompatibility especially since we can't use the
>> > > default implementation feature of interfaces that is in Java 8.
>> > >
>> > > Putting the class object as an attribute of the InputPort is not good
>> > > either because you can't configure the delay operator itself.
>> > >
>> > > Thoughts?
>> > >
>> > > David
>> > >
>> > > On Fri, Sep 25, 2015 at 10:10 AM, David Yan <da...@datatorrent.com>
>> > wrote:
>> > >
>> > > > This is a very good idea.  This way, we can have a default
>> > implementation
>> > > > of that operator and the user can control how the tuples are stored
>> by
>> > > > having his/her own implementation.  How many windows the operator
>> > delays
>> > > is
>> > > > part of the implementation of that operator.
>> > > >
>> > > > I am thinking of getting rid of the ITERATION_WINDOW_OFFSET
>> attribute
>> > and
>> > > > introduce a DELAY_OPERATOR_CLASS attribute so that the user can
>> specify
>> > > the
>> > > > delay operator class to be used.
>> > > >
>> > > > More thoughts?
>> > > >
>> > > > David
>> > > >
>> > > > On Thu, Sep 17, 2015 at 7:16 PM, Gaurav Gupta <
>> gaurav@datatorrent.com>
>> > > > wrote:
>> > > >
>> > > >> Hey David,
>> > > >>
>> > > >> I was thinking can we add another operator in front of the input
>> port
>> > > that
>> > > >> has ITERATION_WINDOW_COUNT set. The new additional operator will
>> have
>> > > >> property whose value  will be set equal to ITERATION_WINDOW_COUNT
>> and
>> > it
>> > > >> will be responsible for caching the data for those many windows and
>> > > >> delaying the data. This operator can act as unifier cum iterator
>> > > operator.
>> > > >> For this you may not need any external storage agent as platform
>> > > >> checkpointing should help you here.
>> > > >>
>> > > >> We are doing something similar for Sliding window.
>> > > >>
>> > > >> Thanks
>> > > >> -Gaurav
>> > > >>
>> > > >> On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com>
>> > > wrote:
>> > > >>
>> > > >> > Hi all,
>> > > >> >
>> > > >> > One current disadvantage of Apex is the inability to do
>> iterations
>> > and
>> > > >> > machine learning algorithms because we don't allow loops in the
>> > > >> application
>> > > >> > DAG (hence the name DAG).  I am proposing that we allow loops in
>> the
>> > > >> DAG if
>> > > >> > the loop advances the window ID by a configured amount.  A JIRA
>> > ticket
>> > > >> has
>> > > >> > been created:
>> > > >> >
>> > > >> > https://malhar.atlassian.net/browse/APEX-60
>> > > >> >
>> > > >> > I have started this work in my fork at
>> > > >> > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
>> > > >> >
>> > > >> > The current progress is that a simple test case works.  Major
>> work
>> > > still
>> > > >> > needs to be done with respect to recovery and partitioning.
>> > > >> >
>> > > >> > The value ITERATION_WINDOW_COUNT is an attribute to an input
>> port of
>> > > an
>> > > >> > operator.  If the value of the attribute is greater than or
>> equal to
>> > > 1,
>> > > >> any
>> > > >> > tuples sent to the input port are treated to be
>> > ITERATION_WINDOW_COUNT
>> > > >> > windows ahead of what they are.
>> > > >> >
>> > > >> > For recovery, we will need to checkpoint all the tuples between
>> > ports
>> > > >> with
>> > > >> > the to replay the looped tuples.  During the recovery, if the
>> > operator
>> > > >> has
>> > > >> > an input port, with ITERATION_WINDOW_COUNT=2, is recovering from
>> > > >> checkpoint
>> > > >> > window 14, the tuples for that input port from window 13 and
>> window
>> > 14
>> > > >> need
>> > > >> > to be replayed to be treated as window 15 and window 16
>> respectively
>> > > >> (13+2
>> > > >> > and 14+2).
>> > > >> >
>> > > >> > In other words, we need to store all the tuples from window with
>> ID
>> > > >> > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and
>> > purge
>> > > >> the
>> > > >> > tuples earlier than that window.
>> > > >> > We can optimize this by only storing the tuples for
>> > > >> ITERATION_WINDOW_COUNT
>> > > >> > windows prior to any checkpoint.
>> > > >> >
>> > > >> > For that, we need a storage mechanism for the tuples.  Chandni
>> > already
>> > > >> has
>> > > >> > something that fits this usage case in Apex Malhar.  The class is
>> > > >> > IdempotentStorageManager.  In order for this to be used in Apex
>> > core,
>> > > we
>> > > >> > need to deprecate the class in Apex Malhar and move it to Apex
>> Core.
>> > > >> >
>> > > >> > A JIRA ticket has been created for this particular work:
>> > > >> >
>> > > >> > https://malhar.atlassian.net/browse/APEX-128
>> > > >> >
>> > > >> > Some of the above has been discussed among Thomas, Chetan,
>> Chandni,
>> > > and
>> > > >> > myself.
>> > > >> >
>> > > >> > For partitioning, we have not started any discussion or
>> > brainstorming.
>> > > >> We
>> > > >> > appreciate any feedback on this and any other aspect related to
>> > > >> supporting
>> > > >> > iterations in general.
>> > > >> >
>> > > >> > Thanks!
>> > > >> >
>> > > >> > David
>> > > >> >
>> > > >>
>> > > >
>> > > >
>> > >
>> >
>>
>
>
>

Re: Supporting iterations in Apex

Posted by David Yan <da...@datatorrent.com>.
This delay operator will act as an input operator for the first window and
act as a regular operator after that.
The engine will increment the window id of the windows from all the output
ports of the delay operator.

We will need a new interface for the delay operator, extending the existing
Operator interface.  The interface will probably include:

- Emitting the tuples for the first window
- Emitting the tuples after recovery

We will provide a default implementation of the delay operator with a
write-ahead log that stores the tuples for the window before each
checkpoint for recovery.  We will also probably support the number of
windows to delay using an operator property.

Let's look at this DAG with an iteration loop:

upstream --> A --> B --> downstream
             ^     |
             |-----|

With the delay operator, the physical view of the DAG looks like this with
D being the delay operator:

upstream --> A --> B --> downstream
             ^     |
             |-D<--|

There are two approaches for specifying the delay operator.

1) As discussed earlier on this thread, the delay operator can be specified
as an *input port attribute* of A. The delay operator D will not appear in
the logical DAG.  The engine will do the +1 on the window ID based on the
presence of the input port attribute.  In this case, the delay operator
does not need to specify any input port, just like the unifier, with the
process(tuple) method implicitly taking in the tuples from the output port
of B, which logically connects to the input port of A.

2) The delay operator is specified and connected *as any other operator* in
the logical DAG.  The engine will do the +1 on the window ID if the
operator implements the delay operator interface.  In this case, the delay
operator D will need to specify at least one input port (just like a
regular operator), and can actually have multiple input ports.

I'm leaning toward the 2nd approach.

Please share your thoughts.  Which one you think is better?  Or maybe
suggest a different approach altogether?

Thanks!

David

David

On Wed, Oct 7, 2015 at 10:51 AM, Thomas Weise <th...@datatorrent.com>
wrote:

> Why not set the the delay operator as attribute? We already support
> partitioners and stream codecs as attribute.
>
>
> On Wed, Oct 7, 2015 at 10:09 AM, Pramod Immaneni <pr...@datatorrent.com>
> wrote:
>
> > How about making just the window delay an attribute on the input port.
> The
> > operator connection is just like a normal DAG stream creation. We could
> > also support connecting same operator to multiple input ports with
> > different delay and handle fault recovery accordingly.
> >
> > On Wed, Oct 7, 2015 at 9:53 AM, David Yan <da...@datatorrent.com> wrote:
> >
> > > The iteration operator actually resembles the usage of unifiers.  We
> have
> > > getUnifier in the interface of OutputPort.
> > >
> > > But if we add getDelayOperator in the interface of InputPort, that
> would
> > > introduce backward incompatibility especially since we can't use the
> > > default implementation feature of interfaces that is in Java 8.
> > >
> > > Putting the class object as an attribute of the InputPort is not good
> > > either because you can't configure the delay operator itself.
> > >
> > > Thoughts?
> > >
> > > David
> > >
> > > On Fri, Sep 25, 2015 at 10:10 AM, David Yan <da...@datatorrent.com>
> > wrote:
> > >
> > > > This is a very good idea.  This way, we can have a default
> > implementation
> > > > of that operator and the user can control how the tuples are stored
> by
> > > > having his/her own implementation.  How many windows the operator
> > delays
> > > is
> > > > part of the implementation of that operator.
> > > >
> > > > I am thinking of getting rid of the ITERATION_WINDOW_OFFSET attribute
> > and
> > > > introduce a DELAY_OPERATOR_CLASS attribute so that the user can
> specify
> > > the
> > > > delay operator class to be used.
> > > >
> > > > More thoughts?
> > > >
> > > > David
> > > >
> > > > On Thu, Sep 17, 2015 at 7:16 PM, Gaurav Gupta <
> gaurav@datatorrent.com>
> > > > wrote:
> > > >
> > > >> Hey David,
> > > >>
> > > >> I was thinking can we add another operator in front of the input
> port
> > > that
> > > >> has ITERATION_WINDOW_COUNT set. The new additional operator will
> have
> > > >> property whose value  will be set equal to ITERATION_WINDOW_COUNT
> and
> > it
> > > >> will be responsible for caching the data for those many windows and
> > > >> delaying the data. This operator can act as unifier cum iterator
> > > operator.
> > > >> For this you may not need any external storage agent as platform
> > > >> checkpointing should help you here.
> > > >>
> > > >> We are doing something similar for Sliding window.
> > > >>
> > > >> Thanks
> > > >> -Gaurav
> > > >>
> > > >> On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com>
> > > wrote:
> > > >>
> > > >> > Hi all,
> > > >> >
> > > >> > One current disadvantage of Apex is the inability to do iterations
> > and
> > > >> > machine learning algorithms because we don't allow loops in the
> > > >> application
> > > >> > DAG (hence the name DAG).  I am proposing that we allow loops in
> the
> > > >> DAG if
> > > >> > the loop advances the window ID by a configured amount.  A JIRA
> > ticket
> > > >> has
> > > >> > been created:
> > > >> >
> > > >> > https://malhar.atlassian.net/browse/APEX-60
> > > >> >
> > > >> > I have started this work in my fork at
> > > >> > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
> > > >> >
> > > >> > The current progress is that a simple test case works.  Major work
> > > still
> > > >> > needs to be done with respect to recovery and partitioning.
> > > >> >
> > > >> > The value ITERATION_WINDOW_COUNT is an attribute to an input port
> of
> > > an
> > > >> > operator.  If the value of the attribute is greater than or equal
> to
> > > 1,
> > > >> any
> > > >> > tuples sent to the input port are treated to be
> > ITERATION_WINDOW_COUNT
> > > >> > windows ahead of what they are.
> > > >> >
> > > >> > For recovery, we will need to checkpoint all the tuples between
> > ports
> > > >> with
> > > >> > the to replay the looped tuples.  During the recovery, if the
> > operator
> > > >> has
> > > >> > an input port, with ITERATION_WINDOW_COUNT=2, is recovering from
> > > >> checkpoint
> > > >> > window 14, the tuples for that input port from window 13 and
> window
> > 14
> > > >> need
> > > >> > to be replayed to be treated as window 15 and window 16
> respectively
> > > >> (13+2
> > > >> > and 14+2).
> > > >> >
> > > >> > In other words, we need to store all the tuples from window with
> ID
> > > >> > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and
> > purge
> > > >> the
> > > >> > tuples earlier than that window.
> > > >> > We can optimize this by only storing the tuples for
> > > >> ITERATION_WINDOW_COUNT
> > > >> > windows prior to any checkpoint.
> > > >> >
> > > >> > For that, we need a storage mechanism for the tuples.  Chandni
> > already
> > > >> has
> > > >> > something that fits this usage case in Apex Malhar.  The class is
> > > >> > IdempotentStorageManager.  In order for this to be used in Apex
> > core,
> > > we
> > > >> > need to deprecate the class in Apex Malhar and move it to Apex
> Core.
> > > >> >
> > > >> > A JIRA ticket has been created for this particular work:
> > > >> >
> > > >> > https://malhar.atlassian.net/browse/APEX-128
> > > >> >
> > > >> > Some of the above has been discussed among Thomas, Chetan,
> Chandni,
> > > and
> > > >> > myself.
> > > >> >
> > > >> > For partitioning, we have not started any discussion or
> > brainstorming.
> > > >> We
> > > >> > appreciate any feedback on this and any other aspect related to
> > > >> supporting
> > > >> > iterations in general.
> > > >> >
> > > >> > Thanks!
> > > >> >
> > > >> > David
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>

Re: Supporting iterations in Apex

Posted by Thomas Weise <th...@datatorrent.com>.
Why not set the the delay operator as attribute? We already support
partitioners and stream codecs as attribute.


On Wed, Oct 7, 2015 at 10:09 AM, Pramod Immaneni <pr...@datatorrent.com>
wrote:

> How about making just the window delay an attribute on the input port. The
> operator connection is just like a normal DAG stream creation. We could
> also support connecting same operator to multiple input ports with
> different delay and handle fault recovery accordingly.
>
> On Wed, Oct 7, 2015 at 9:53 AM, David Yan <da...@datatorrent.com> wrote:
>
> > The iteration operator actually resembles the usage of unifiers.  We have
> > getUnifier in the interface of OutputPort.
> >
> > But if we add getDelayOperator in the interface of InputPort, that would
> > introduce backward incompatibility especially since we can't use the
> > default implementation feature of interfaces that is in Java 8.
> >
> > Putting the class object as an attribute of the InputPort is not good
> > either because you can't configure the delay operator itself.
> >
> > Thoughts?
> >
> > David
> >
> > On Fri, Sep 25, 2015 at 10:10 AM, David Yan <da...@datatorrent.com>
> wrote:
> >
> > > This is a very good idea.  This way, we can have a default
> implementation
> > > of that operator and the user can control how the tuples are stored by
> > > having his/her own implementation.  How many windows the operator
> delays
> > is
> > > part of the implementation of that operator.
> > >
> > > I am thinking of getting rid of the ITERATION_WINDOW_OFFSET attribute
> and
> > > introduce a DELAY_OPERATOR_CLASS attribute so that the user can specify
> > the
> > > delay operator class to be used.
> > >
> > > More thoughts?
> > >
> > > David
> > >
> > > On Thu, Sep 17, 2015 at 7:16 PM, Gaurav Gupta <ga...@datatorrent.com>
> > > wrote:
> > >
> > >> Hey David,
> > >>
> > >> I was thinking can we add another operator in front of the input port
> > that
> > >> has ITERATION_WINDOW_COUNT set. The new additional operator will have
> > >> property whose value  will be set equal to ITERATION_WINDOW_COUNT and
> it
> > >> will be responsible for caching the data for those many windows and
> > >> delaying the data. This operator can act as unifier cum iterator
> > operator.
> > >> For this you may not need any external storage agent as platform
> > >> checkpointing should help you here.
> > >>
> > >> We are doing something similar for Sliding window.
> > >>
> > >> Thanks
> > >> -Gaurav
> > >>
> > >> On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com>
> > wrote:
> > >>
> > >> > Hi all,
> > >> >
> > >> > One current disadvantage of Apex is the inability to do iterations
> and
> > >> > machine learning algorithms because we don't allow loops in the
> > >> application
> > >> > DAG (hence the name DAG).  I am proposing that we allow loops in the
> > >> DAG if
> > >> > the loop advances the window ID by a configured amount.  A JIRA
> ticket
> > >> has
> > >> > been created:
> > >> >
> > >> > https://malhar.atlassian.net/browse/APEX-60
> > >> >
> > >> > I have started this work in my fork at
> > >> > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
> > >> >
> > >> > The current progress is that a simple test case works.  Major work
> > still
> > >> > needs to be done with respect to recovery and partitioning.
> > >> >
> > >> > The value ITERATION_WINDOW_COUNT is an attribute to an input port of
> > an
> > >> > operator.  If the value of the attribute is greater than or equal to
> > 1,
> > >> any
> > >> > tuples sent to the input port are treated to be
> ITERATION_WINDOW_COUNT
> > >> > windows ahead of what they are.
> > >> >
> > >> > For recovery, we will need to checkpoint all the tuples between
> ports
> > >> with
> > >> > the to replay the looped tuples.  During the recovery, if the
> operator
> > >> has
> > >> > an input port, with ITERATION_WINDOW_COUNT=2, is recovering from
> > >> checkpoint
> > >> > window 14, the tuples for that input port from window 13 and window
> 14
> > >> need
> > >> > to be replayed to be treated as window 15 and window 16 respectively
> > >> (13+2
> > >> > and 14+2).
> > >> >
> > >> > In other words, we need to store all the tuples from window with ID
> > >> > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and
> purge
> > >> the
> > >> > tuples earlier than that window.
> > >> > We can optimize this by only storing the tuples for
> > >> ITERATION_WINDOW_COUNT
> > >> > windows prior to any checkpoint.
> > >> >
> > >> > For that, we need a storage mechanism for the tuples.  Chandni
> already
> > >> has
> > >> > something that fits this usage case in Apex Malhar.  The class is
> > >> > IdempotentStorageManager.  In order for this to be used in Apex
> core,
> > we
> > >> > need to deprecate the class in Apex Malhar and move it to Apex Core.
> > >> >
> > >> > A JIRA ticket has been created for this particular work:
> > >> >
> > >> > https://malhar.atlassian.net/browse/APEX-128
> > >> >
> > >> > Some of the above has been discussed among Thomas, Chetan, Chandni,
> > and
> > >> > myself.
> > >> >
> > >> > For partitioning, we have not started any discussion or
> brainstorming.
> > >> We
> > >> > appreciate any feedback on this and any other aspect related to
> > >> supporting
> > >> > iterations in general.
> > >> >
> > >> > Thanks!
> > >> >
> > >> > David
> > >> >
> > >>
> > >
> > >
> >
>

Re: Supporting iterations in Apex

Posted by Pramod Immaneni <pr...@datatorrent.com>.
How about making just the window delay an attribute on the input port. The
operator connection is just like a normal DAG stream creation. We could
also support connecting same operator to multiple input ports with
different delay and handle fault recovery accordingly.

On Wed, Oct 7, 2015 at 9:53 AM, David Yan <da...@datatorrent.com> wrote:

> The iteration operator actually resembles the usage of unifiers.  We have
> getUnifier in the interface of OutputPort.
>
> But if we add getDelayOperator in the interface of InputPort, that would
> introduce backward incompatibility especially since we can't use the
> default implementation feature of interfaces that is in Java 8.
>
> Putting the class object as an attribute of the InputPort is not good
> either because you can't configure the delay operator itself.
>
> Thoughts?
>
> David
>
> On Fri, Sep 25, 2015 at 10:10 AM, David Yan <da...@datatorrent.com> wrote:
>
> > This is a very good idea.  This way, we can have a default implementation
> > of that operator and the user can control how the tuples are stored by
> > having his/her own implementation.  How many windows the operator delays
> is
> > part of the implementation of that operator.
> >
> > I am thinking of getting rid of the ITERATION_WINDOW_OFFSET attribute and
> > introduce a DELAY_OPERATOR_CLASS attribute so that the user can specify
> the
> > delay operator class to be used.
> >
> > More thoughts?
> >
> > David
> >
> > On Thu, Sep 17, 2015 at 7:16 PM, Gaurav Gupta <ga...@datatorrent.com>
> > wrote:
> >
> >> Hey David,
> >>
> >> I was thinking can we add another operator in front of the input port
> that
> >> has ITERATION_WINDOW_COUNT set. The new additional operator will have
> >> property whose value  will be set equal to ITERATION_WINDOW_COUNT and it
> >> will be responsible for caching the data for those many windows and
> >> delaying the data. This operator can act as unifier cum iterator
> operator.
> >> For this you may not need any external storage agent as platform
> >> checkpointing should help you here.
> >>
> >> We are doing something similar for Sliding window.
> >>
> >> Thanks
> >> -Gaurav
> >>
> >> On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com>
> wrote:
> >>
> >> > Hi all,
> >> >
> >> > One current disadvantage of Apex is the inability to do iterations and
> >> > machine learning algorithms because we don't allow loops in the
> >> application
> >> > DAG (hence the name DAG).  I am proposing that we allow loops in the
> >> DAG if
> >> > the loop advances the window ID by a configured amount.  A JIRA ticket
> >> has
> >> > been created:
> >> >
> >> > https://malhar.atlassian.net/browse/APEX-60
> >> >
> >> > I have started this work in my fork at
> >> > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
> >> >
> >> > The current progress is that a simple test case works.  Major work
> still
> >> > needs to be done with respect to recovery and partitioning.
> >> >
> >> > The value ITERATION_WINDOW_COUNT is an attribute to an input port of
> an
> >> > operator.  If the value of the attribute is greater than or equal to
> 1,
> >> any
> >> > tuples sent to the input port are treated to be ITERATION_WINDOW_COUNT
> >> > windows ahead of what they are.
> >> >
> >> > For recovery, we will need to checkpoint all the tuples between ports
> >> with
> >> > the to replay the looped tuples.  During the recovery, if the operator
> >> has
> >> > an input port, with ITERATION_WINDOW_COUNT=2, is recovering from
> >> checkpoint
> >> > window 14, the tuples for that input port from window 13 and window 14
> >> need
> >> > to be replayed to be treated as window 15 and window 16 respectively
> >> (13+2
> >> > and 14+2).
> >> >
> >> > In other words, we need to store all the tuples from window with ID
> >> > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and purge
> >> the
> >> > tuples earlier than that window.
> >> > We can optimize this by only storing the tuples for
> >> ITERATION_WINDOW_COUNT
> >> > windows prior to any checkpoint.
> >> >
> >> > For that, we need a storage mechanism for the tuples.  Chandni already
> >> has
> >> > something that fits this usage case in Apex Malhar.  The class is
> >> > IdempotentStorageManager.  In order for this to be used in Apex core,
> we
> >> > need to deprecate the class in Apex Malhar and move it to Apex Core.
> >> >
> >> > A JIRA ticket has been created for this particular work:
> >> >
> >> > https://malhar.atlassian.net/browse/APEX-128
> >> >
> >> > Some of the above has been discussed among Thomas, Chetan, Chandni,
> and
> >> > myself.
> >> >
> >> > For partitioning, we have not started any discussion or brainstorming.
> >> We
> >> > appreciate any feedback on this and any other aspect related to
> >> supporting
> >> > iterations in general.
> >> >
> >> > Thanks!
> >> >
> >> > David
> >> >
> >>
> >
> >
>

Re: Supporting iterations in Apex

Posted by David Yan <da...@datatorrent.com>.
The iteration operator actually resembles the usage of unifiers.  We have
getUnifier in the interface of OutputPort.

But if we add getDelayOperator in the interface of InputPort, that would
introduce backward incompatibility especially since we can't use the
default implementation feature of interfaces that is in Java 8.

Putting the class object as an attribute of the InputPort is not good
either because you can't configure the delay operator itself.

Thoughts?

David

On Fri, Sep 25, 2015 at 10:10 AM, David Yan <da...@datatorrent.com> wrote:

> This is a very good idea.  This way, we can have a default implementation
> of that operator and the user can control how the tuples are stored by
> having his/her own implementation.  How many windows the operator delays is
> part of the implementation of that operator.
>
> I am thinking of getting rid of the ITERATION_WINDOW_OFFSET attribute and
> introduce a DELAY_OPERATOR_CLASS attribute so that the user can specify the
> delay operator class to be used.
>
> More thoughts?
>
> David
>
> On Thu, Sep 17, 2015 at 7:16 PM, Gaurav Gupta <ga...@datatorrent.com>
> wrote:
>
>> Hey David,
>>
>> I was thinking can we add another operator in front of the input port that
>> has ITERATION_WINDOW_COUNT set. The new additional operator will have
>> property whose value  will be set equal to ITERATION_WINDOW_COUNT and it
>> will be responsible for caching the data for those many windows and
>> delaying the data. This operator can act as unifier cum iterator operator.
>> For this you may not need any external storage agent as platform
>> checkpointing should help you here.
>>
>> We are doing something similar for Sliding window.
>>
>> Thanks
>> -Gaurav
>>
>> On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com> wrote:
>>
>> > Hi all,
>> >
>> > One current disadvantage of Apex is the inability to do iterations and
>> > machine learning algorithms because we don't allow loops in the
>> application
>> > DAG (hence the name DAG).  I am proposing that we allow loops in the
>> DAG if
>> > the loop advances the window ID by a configured amount.  A JIRA ticket
>> has
>> > been created:
>> >
>> > https://malhar.atlassian.net/browse/APEX-60
>> >
>> > I have started this work in my fork at
>> > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
>> >
>> > The current progress is that a simple test case works.  Major work still
>> > needs to be done with respect to recovery and partitioning.
>> >
>> > The value ITERATION_WINDOW_COUNT is an attribute to an input port of an
>> > operator.  If the value of the attribute is greater than or equal to 1,
>> any
>> > tuples sent to the input port are treated to be ITERATION_WINDOW_COUNT
>> > windows ahead of what they are.
>> >
>> > For recovery, we will need to checkpoint all the tuples between ports
>> with
>> > the to replay the looped tuples.  During the recovery, if the operator
>> has
>> > an input port, with ITERATION_WINDOW_COUNT=2, is recovering from
>> checkpoint
>> > window 14, the tuples for that input port from window 13 and window 14
>> need
>> > to be replayed to be treated as window 15 and window 16 respectively
>> (13+2
>> > and 14+2).
>> >
>> > In other words, we need to store all the tuples from window with ID
>> > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and purge
>> the
>> > tuples earlier than that window.
>> > We can optimize this by only storing the tuples for
>> ITERATION_WINDOW_COUNT
>> > windows prior to any checkpoint.
>> >
>> > For that, we need a storage mechanism for the tuples.  Chandni already
>> has
>> > something that fits this usage case in Apex Malhar.  The class is
>> > IdempotentStorageManager.  In order for this to be used in Apex core, we
>> > need to deprecate the class in Apex Malhar and move it to Apex Core.
>> >
>> > A JIRA ticket has been created for this particular work:
>> >
>> > https://malhar.atlassian.net/browse/APEX-128
>> >
>> > Some of the above has been discussed among Thomas, Chetan, Chandni, and
>> > myself.
>> >
>> > For partitioning, we have not started any discussion or brainstorming.
>> We
>> > appreciate any feedback on this and any other aspect related to
>> supporting
>> > iterations in general.
>> >
>> > Thanks!
>> >
>> > David
>> >
>>
>
>

Re: Supporting iterations in Apex

Posted by David Yan <da...@datatorrent.com>.
This is a very good idea.  This way, we can have a default implementation
of that operator and the user can control how the tuples are stored by
having his/her own implementation.  How many windows the operator delays is
part of the implementation of that operator.

I am thinking of getting rid of the ITERATION_WINDOW_OFFSET attribute and
introduce a DELAY_OPERATOR_CLASS attribute so that the user can specify the
delay operator class to be used.

More thoughts?

David

On Thu, Sep 17, 2015 at 7:16 PM, Gaurav Gupta <ga...@datatorrent.com>
wrote:

> Hey David,
>
> I was thinking can we add another operator in front of the input port that
> has ITERATION_WINDOW_COUNT set. The new additional operator will have
> property whose value  will be set equal to ITERATION_WINDOW_COUNT and it
> will be responsible for caching the data for those many windows and
> delaying the data. This operator can act as unifier cum iterator operator.
> For this you may not need any external storage agent as platform
> checkpointing should help you here.
>
> We are doing something similar for Sliding window.
>
> Thanks
> -Gaurav
>
> On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com> wrote:
>
> > Hi all,
> >
> > One current disadvantage of Apex is the inability to do iterations and
> > machine learning algorithms because we don't allow loops in the
> application
> > DAG (hence the name DAG).  I am proposing that we allow loops in the DAG
> if
> > the loop advances the window ID by a configured amount.  A JIRA ticket
> has
> > been created:
> >
> > https://malhar.atlassian.net/browse/APEX-60
> >
> > I have started this work in my fork at
> > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
> >
> > The current progress is that a simple test case works.  Major work still
> > needs to be done with respect to recovery and partitioning.
> >
> > The value ITERATION_WINDOW_COUNT is an attribute to an input port of an
> > operator.  If the value of the attribute is greater than or equal to 1,
> any
> > tuples sent to the input port are treated to be ITERATION_WINDOW_COUNT
> > windows ahead of what they are.
> >
> > For recovery, we will need to checkpoint all the tuples between ports
> with
> > the to replay the looped tuples.  During the recovery, if the operator
> has
> > an input port, with ITERATION_WINDOW_COUNT=2, is recovering from
> checkpoint
> > window 14, the tuples for that input port from window 13 and window 14
> need
> > to be replayed to be treated as window 15 and window 16 respectively
> (13+2
> > and 14+2).
> >
> > In other words, we need to store all the tuples from window with ID
> > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and purge the
> > tuples earlier than that window.
> > We can optimize this by only storing the tuples for
> ITERATION_WINDOW_COUNT
> > windows prior to any checkpoint.
> >
> > For that, we need a storage mechanism for the tuples.  Chandni already
> has
> > something that fits this usage case in Apex Malhar.  The class is
> > IdempotentStorageManager.  In order for this to be used in Apex core, we
> > need to deprecate the class in Apex Malhar and move it to Apex Core.
> >
> > A JIRA ticket has been created for this particular work:
> >
> > https://malhar.atlassian.net/browse/APEX-128
> >
> > Some of the above has been discussed among Thomas, Chetan, Chandni, and
> > myself.
> >
> > For partitioning, we have not started any discussion or brainstorming.
> We
> > appreciate any feedback on this and any other aspect related to
> supporting
> > iterations in general.
> >
> > Thanks!
> >
> > David
> >
>

Re: Supporting iterations in Apex

Posted by Gaurav Gupta <ga...@datatorrent.com>.
Hey David,

I was thinking can we add another operator in front of the input port that
has ITERATION_WINDOW_COUNT set. The new additional operator will have
property whose value  will be set equal to ITERATION_WINDOW_COUNT and it
will be responsible for caching the data for those many windows and
delaying the data. This operator can act as unifier cum iterator operator.
For this you may not need any external storage agent as platform
checkpointing should help you here.

We are doing something similar for Sliding window.

Thanks
-Gaurav

On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com> wrote:

> Hi all,
>
> One current disadvantage of Apex is the inability to do iterations and
> machine learning algorithms because we don't allow loops in the application
> DAG (hence the name DAG).  I am proposing that we allow loops in the DAG if
> the loop advances the window ID by a configured amount.  A JIRA ticket has
> been created:
>
> https://malhar.atlassian.net/browse/APEX-60
>
> I have started this work in my fork at
> https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
>
> The current progress is that a simple test case works.  Major work still
> needs to be done with respect to recovery and partitioning.
>
> The value ITERATION_WINDOW_COUNT is an attribute to an input port of an
> operator.  If the value of the attribute is greater than or equal to 1, any
> tuples sent to the input port are treated to be ITERATION_WINDOW_COUNT
> windows ahead of what they are.
>
> For recovery, we will need to checkpoint all the tuples between ports with
> the to replay the looped tuples.  During the recovery, if the operator has
> an input port, with ITERATION_WINDOW_COUNT=2, is recovering from checkpoint
> window 14, the tuples for that input port from window 13 and window 14 need
> to be replayed to be treated as window 15 and window 16 respectively (13+2
> and 14+2).
>
> In other words, we need to store all the tuples from window with ID
> committedWindowId minus ITERATION_WINDOW_COUNT for recovery and purge the
> tuples earlier than that window.
> We can optimize this by only storing the tuples for ITERATION_WINDOW_COUNT
> windows prior to any checkpoint.
>
> For that, we need a storage mechanism for the tuples.  Chandni already has
> something that fits this usage case in Apex Malhar.  The class is
> IdempotentStorageManager.  In order for this to be used in Apex core, we
> need to deprecate the class in Apex Malhar and move it to Apex Core.
>
> A JIRA ticket has been created for this particular work:
>
> https://malhar.atlassian.net/browse/APEX-128
>
> Some of the above has been discussed among Thomas, Chetan, Chandni, and
> myself.
>
> For partitioning, we have not started any discussion or brainstorming.  We
> appreciate any feedback on this and any other aspect related to supporting
> iterations in general.
>
> Thanks!
>
> David
>