You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by David Yan <da...@datatorrent.com> on 2015/12/10 03:45:51 UTC

Re: Supporting iterations in Apex

FYI, a PR has been issued to Apex Core for iteration support in the engine.


https://github.com/apache/incubator-apex-core/pull/185

Also, another review-only PR has been issued to Apex Malhar to demonstrate
a simple case for iteration.

https://github.com/apache/incubator-apex-malhar/pull/122

I would appreciate any feedback.  Thanks!

David

On Wed, Nov 11, 2015 at 2:27 PM, David Yan <da...@datatorrent.com> wrote:

> Per Pramod's suggestion, I added another test that simulates Fibonacci
> sequence.
>
> You can try it out from APEX-60 branch in my fork.
>
> (cd engine; mvn test -Dtest=LogicalPlanTest#testFibonacci)
>
> David
>
> On Wed, Nov 4, 2015 at 6:27 PM, David Yan <da...@datatorrent.com> wrote:
>
>> The JIRAs have been created.
>> You can start from running the small unit test for iteration from the
>> APEX-60 branch like this:
>>
>> mvn test -Dtest=LogicalPlanTest#testIteration
>>
>> Run it under engine directory.  Let me know if you run into issues.
>>
>> David
>>
>> On Wed, Nov 4, 2015 at 1:01 PM, David Yan <da...@datatorrent.com> wrote:
>>
>>> fork; https://github.com/davidyan74/incubator-apex-core
>>> branch: APEX-60
>>>
>>> I'll create the JIRA subtasks soon.
>>>
>>> David
>>>
>>> On Wed, Nov 4, 2015 at 11:56 AM, Pramod Immaneni <pramod@datatorrent.com
>>> > wrote:
>>>
>>>> Can you also add your development branch and fork information to the
>>>> JIRA?
>>>>
>>>> Thanks
>>>>
>>>> On Wed, Nov 4, 2015 at 11:09 AM, Pramod Immaneni <
>>>> pramod@datatorrent.com>
>>>> wrote:
>>>>
>>>> > I will look at it along with 5 if you are not doing that already.
>>>> >
>>>> > On Wed, Nov 4, 2015 at 11:08 AM, David Yan <da...@datatorrent.com>
>>>> wrote:
>>>> >
>>>> >> Yes, let's add that.  During that time, firstWindow() will need to be
>>>> >> called.  That should be done before 5.
>>>> >>
>>>> >> On Wed, Nov 4, 2015 at 11:05 AM, Pramod Immaneni <
>>>> pramod@datatorrent.com>
>>>> >> wrote:
>>>> >>
>>>> >> > I can help with 5. What about the engine introducing the synthetic
>>>> >> window
>>>> >> > when recovering from failure or at the start of the application.
>>>> >> >
>>>> >> > On Wed, Nov 4, 2015 at 10:41 AM, David Yan <da...@datatorrent.com>
>>>> >> wrote:
>>>> >> >
>>>> >> > > Thanks Pramod and Sandesh for offering to help.
>>>> >> > >
>>>> >> > > Here's my rough plan:
>>>> >> > >
>>>> >> > > 1. Add DelayOperator interface.  The DelayOperator will have a
>>>> method
>>>> >> > > firstWindow(long windowId).  Implementations of this interface is
>>>> >> > supposed
>>>> >> > > to emit tuples for the first window of the execution of the
>>>> operator
>>>> >> > > (either the first window of the execution of the application or
>>>> the
>>>> >> first
>>>> >> > > window after recovery).
>>>> >> > >
>>>> >> > > 2. Add SimpleDelayOperator that implements DelayOperator.  It
>>>> has one
>>>> >> > input
>>>> >> > > port and one output port.  It simply passes the tuples from the
>>>> input
>>>> >> > port
>>>> >> > > to the output port and does not do anything for firstWindow()
>>>> call.
>>>> >> > >
>>>> >> > > 3. Engine (e.g. DAG validation) to support loops in case of
>>>> >> > DelayOperator.
>>>> >> > >
>>>> >> > > 4. Implement the +1 delay in the engine for input ports that are
>>>> >> > connected
>>>> >> > > to output ports of DelayOperator.
>>>> >> > >
>>>> >> > > 5. Add capability in the engine to let the operator know when is
>>>> the
>>>> >> next
>>>> >> > > checkpoint window.
>>>> >> > >
>>>> >> > > 6. Add DefaultDelayOperator that extends SimpleDelayOperator.  It
>>>> >> writes
>>>> >> > > the tuples in the window before each checkpoint to a DFS-backed
>>>> WAL
>>>> >> > (using
>>>> >> > > item 5 above), and it overrides firstWindow() to read from the
>>>> WAL and
>>>> >> > > emits the tuples at recovery.
>>>> >> > >
>>>> >> > > 7. Add +N delay capability (in addition to +1) in the
>>>> >> > DefaultDelayOperator.
>>>> >> > >
>>>> >> > > Let me know whether this plan sounds good to you.
>>>> >> > >
>>>> >> > > I'm done with 1, 2, and 3.  4 is in progress.
>>>> >> > > I think the bulk of the work is 5 and 6 and we can discuss how
>>>> we can
>>>> >> > > divide the work.
>>>> >> > > 7 is a nice-to-have and does not have to be done unless there is
>>>> a
>>>> >> > demand.
>>>> >> > >
>>>> >> > > David
>>>> >> > >
>>>> >> > >
>>>> >> > > On Wed, Nov 4, 2015 at 10:11 AM, Pramod Immaneni <
>>>> >> pramod@datatorrent.com
>>>> >> > >
>>>> >> > > wrote:
>>>> >> > >
>>>> >> > > > I would like to help. I might be able to pick up some of the
>>>> smaller
>>>> >> > > tasks.
>>>> >> > > >
>>>> >> > > > On Wed, Nov 4, 2015 at 10:05 AM, David Yan <
>>>> david@datatorrent.com>
>>>> >> > > wrote:
>>>> >> > > >
>>>> >> > > > > Thank you for all your feedback.  Looks like option #2 wins.
>>>> >> > > > >
>>>> >> > > > > I will be working on this in November and please let me know
>>>> if
>>>> >> you'd
>>>> >> > > > like
>>>> >> > > > > to join the effort!
>>>> >> > > > >
>>>> >> > > > > On Wed, Nov 4, 2015 at 9:58 AM, Thomas Weise <
>>>> >> thomas@datatorrent.com
>>>> >> > >
>>>> >> > > > > wrote:
>>>> >> > > > >
>>>> >> > > > > > Agreed, there is no ambiguity.
>>>> >> > > > > >
>>>> >> > > > > > #2 will also allow the user to tune locality as there are
>>>> no
>>>> >> > implicit
>>>> >> > > > > > streams as opposed to the unifier like approach.
>>>> >> > > > > >
>>>> >> > > > > > On Wed, Nov 4, 2015 at 9:54 AM, David Yan <
>>>> >> david@datatorrent.com>
>>>> >> > > > wrote:
>>>> >> > > > > >
>>>> >> > > > > > > On Tue, Nov 3, 2015 at 9:57 PM, Thomas Weise <
>>>> >> > > thomas@datatorrent.com
>>>> >> > > > >
>>>> >> > > > > > > wrote:
>>>> >> > > > > > >
>>>> >> > > > > > > >
>>>> >> > > > > > > > #2 will address that. But if an operator with the delay
>>>> >> > interface
>>>> >> > > > has
>>>> >> > > > > > > > multiple input ports, on which port will the engine
>>>> perform
>>>> >> the
>>>> >> > > > > delay?
>>>> >> > > > > > > > Maybe we will need to validate that a delay operator
>>>> can
>>>> >> only
>>>> >> > > have
>>>> >> > > > a
>>>> >> > > > > > > single
>>>> >> > > > > > > > input port?
>>>> >> > > > > > > >
>>>> >> > > > > > >
>>>> >> > > > > > > My understanding is that the engine performs the +1
>>>> delay on
>>>> >> the
>>>> >> > > > input
>>>> >> > > > > > > ports of operators that are connected to output ports of
>>>> delay
>>>> >> > > > > operators.
>>>> >> > > > > > > So whether or not the delay operator has multiple input
>>>> ports
>>>> >> > > should
>>>> >> > > > > not
>>>> >> > > > > > > matter.
>>>> >> > > > > > >
>>>> >> > > > > >
>>>> >> > > > >
>>>> >> > > >
>>>> >> > >
>>>> >> >
>>>> >>
>>>> >
>>>> >
>>>>
>>>
>>>
>>
>