You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by Pramod Immaneni <pr...@datatorrent.com> on 2015/11/02 20:33:10 UTC

Re: Supporting iterations in Apex

+1 for 1 as delay is a specialized operator that requires special engine
interactions. Also it might just be me but in the diagram it is showing the
delay loop from A -> upstream instead of B -> A.

Thanks

On Thu, Oct 29, 2015 at 11:11 AM, David Yan <da...@datatorrent.com> wrote:

> This delay operator will act as an input operator for the first window and
> act as a regular operator after that.
> The engine will increment the window id of the windows from all the output
> ports of the delay operator.
>
> We will need a new interface for the delay operator, extending the existing
> Operator interface.  The interface will probably include:
>
> - Emitting the tuples for the first window
> - Emitting the tuples after recovery
>
> We will provide a default implementation of the delay operator with a
> write-ahead log that stores the tuples for the window before each
> checkpoint for recovery.  We will also probably support the number of
> windows to delay using an operator property.
>
> Let's look at this DAG with an iteration loop:
>
> upstream --> A --> B --> downstream
>              ^     |
>              |-----|
>
> With the delay operator, the physical view of the DAG looks like this with
> D being the delay operator:
>
> upstream --> A --> B --> downstream
>              ^     |
>              |-D<--|
>
> There are two approaches for specifying the delay operator.
>
> 1) As discussed earlier on this thread, the delay operator can be specified
> as an *input port attribute* of A. The delay operator D will not appear in
> the logical DAG.  The engine will do the +1 on the window ID based on the
> presence of the input port attribute.  In this case, the delay operator
> does not need to specify any input port, just like the unifier, with the
> process(tuple) method implicitly taking in the tuples from the output port
> of B, which logically connects to the input port of A.
>
> 2) The delay operator is specified and connected *as any other operator* in
> the logical DAG.  The engine will do the +1 on the window ID if the
> operator implements the delay operator interface.  In this case, the delay
> operator D will need to specify at least one input port (just like a
> regular operator), and can actually have multiple input ports.
>
> I'm leaning toward the 2nd approach.
>
> Please share your thoughts.  Which one you think is better?  Or maybe
> suggest a different approach altogether?
>
> Thanks!
>
> David
>
> David
>
> On Wed, Oct 7, 2015 at 10:51 AM, Thomas Weise <th...@datatorrent.com>
> wrote:
>
> > Why not set the the delay operator as attribute? We already support
> > partitioners and stream codecs as attribute.
> >
> >
> > On Wed, Oct 7, 2015 at 10:09 AM, Pramod Immaneni <pramod@datatorrent.com
> >
> > wrote:
> >
> > > How about making just the window delay an attribute on the input port.
> > The
> > > operator connection is just like a normal DAG stream creation. We could
> > > also support connecting same operator to multiple input ports with
> > > different delay and handle fault recovery accordingly.
> > >
> > > On Wed, Oct 7, 2015 at 9:53 AM, David Yan <da...@datatorrent.com>
> wrote:
> > >
> > > > The iteration operator actually resembles the usage of unifiers.  We
> > have
> > > > getUnifier in the interface of OutputPort.
> > > >
> > > > But if we add getDelayOperator in the interface of InputPort, that
> > would
> > > > introduce backward incompatibility especially since we can't use the
> > > > default implementation feature of interfaces that is in Java 8.
> > > >
> > > > Putting the class object as an attribute of the InputPort is not good
> > > > either because you can't configure the delay operator itself.
> > > >
> > > > Thoughts?
> > > >
> > > > David
> > > >
> > > > On Fri, Sep 25, 2015 at 10:10 AM, David Yan <da...@datatorrent.com>
> > > wrote:
> > > >
> > > > > This is a very good idea.  This way, we can have a default
> > > implementation
> > > > > of that operator and the user can control how the tuples are stored
> > by
> > > > > having his/her own implementation.  How many windows the operator
> > > delays
> > > > is
> > > > > part of the implementation of that operator.
> > > > >
> > > > > I am thinking of getting rid of the ITERATION_WINDOW_OFFSET
> attribute
> > > and
> > > > > introduce a DELAY_OPERATOR_CLASS attribute so that the user can
> > specify
> > > > the
> > > > > delay operator class to be used.
> > > > >
> > > > > More thoughts?
> > > > >
> > > > > David
> > > > >
> > > > > On Thu, Sep 17, 2015 at 7:16 PM, Gaurav Gupta <
> > gaurav@datatorrent.com>
> > > > > wrote:
> > > > >
> > > > >> Hey David,
> > > > >>
> > > > >> I was thinking can we add another operator in front of the input
> > port
> > > > that
> > > > >> has ITERATION_WINDOW_COUNT set. The new additional operator will
> > have
> > > > >> property whose value  will be set equal to ITERATION_WINDOW_COUNT
> > and
> > > it
> > > > >> will be responsible for caching the data for those many windows
> and
> > > > >> delaying the data. This operator can act as unifier cum iterator
> > > > operator.
> > > > >> For this you may not need any external storage agent as platform
> > > > >> checkpointing should help you here.
> > > > >>
> > > > >> We are doing something similar for Sliding window.
> > > > >>
> > > > >> Thanks
> > > > >> -Gaurav
> > > > >>
> > > > >> On Wed, Sep 16, 2015 at 1:44 PM, David Yan <david@datatorrent.com
> >
> > > > wrote:
> > > > >>
> > > > >> > Hi all,
> > > > >> >
> > > > >> > One current disadvantage of Apex is the inability to do
> iterations
> > > and
> > > > >> > machine learning algorithms because we don't allow loops in the
> > > > >> application
> > > > >> > DAG (hence the name DAG).  I am proposing that we allow loops in
> > the
> > > > >> DAG if
> > > > >> > the loop advances the window ID by a configured amount.  A JIRA
> > > ticket
> > > > >> has
> > > > >> > been created:
> > > > >> >
> > > > >> > https://malhar.atlassian.net/browse/APEX-60
> > > > >> >
> > > > >> > I have started this work in my fork at
> > > > >> > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
> > > > >> >
> > > > >> > The current progress is that a simple test case works.  Major
> work
> > > > still
> > > > >> > needs to be done with respect to recovery and partitioning.
> > > > >> >
> > > > >> > The value ITERATION_WINDOW_COUNT is an attribute to an input
> port
> > of
> > > > an
> > > > >> > operator.  If the value of the attribute is greater than or
> equal
> > to
> > > > 1,
> > > > >> any
> > > > >> > tuples sent to the input port are treated to be
> > > ITERATION_WINDOW_COUNT
> > > > >> > windows ahead of what they are.
> > > > >> >
> > > > >> > For recovery, we will need to checkpoint all the tuples between
> > > ports
> > > > >> with
> > > > >> > the to replay the looped tuples.  During the recovery, if the
> > > operator
> > > > >> has
> > > > >> > an input port, with ITERATION_WINDOW_COUNT=2, is recovering from
> > > > >> checkpoint
> > > > >> > window 14, the tuples for that input port from window 13 and
> > window
> > > 14
> > > > >> need
> > > > >> > to be replayed to be treated as window 15 and window 16
> > respectively
> > > > >> (13+2
> > > > >> > and 14+2).
> > > > >> >
> > > > >> > In other words, we need to store all the tuples from window with
> > ID
> > > > >> > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and
> > > purge
> > > > >> the
> > > > >> > tuples earlier than that window.
> > > > >> > We can optimize this by only storing the tuples for
> > > > >> ITERATION_WINDOW_COUNT
> > > > >> > windows prior to any checkpoint.
> > > > >> >
> > > > >> > For that, we need a storage mechanism for the tuples.  Chandni
> > > already
> > > > >> has
> > > > >> > something that fits this usage case in Apex Malhar.  The class
> is
> > > > >> > IdempotentStorageManager.  In order for this to be used in Apex
> > > core,
> > > > we
> > > > >> > need to deprecate the class in Apex Malhar and move it to Apex
> > Core.
> > > > >> >
> > > > >> > A JIRA ticket has been created for this particular work:
> > > > >> >
> > > > >> > https://malhar.atlassian.net/browse/APEX-128
> > > > >> >
> > > > >> > Some of the above has been discussed among Thomas, Chetan,
> > Chandni,
> > > > and
> > > > >> > myself.
> > > > >> >
> > > > >> > For partitioning, we have not started any discussion or
> > > brainstorming.
> > > > >> We
> > > > >> > appreciate any feedback on this and any other aspect related to
> > > > >> supporting
> > > > >> > iterations in general.
> > > > >> >
> > > > >> > Thanks!
> > > > >> >
> > > > >> > David
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>